Storm, framework para procesar grandes cantidades de datos en tiempo real

14-08-2013
 

storm2En la anterior píldora de formación presentamos Hadoop, que nos permite procesar grandes cantidades de datos en batch(No en tiempo real). En esta píldora de formación toca hablar de Storm, que nos permite procesar, en tiempo real, grandes cantidades de datos(O que son complicados de procesar).

El funcionamiento de Storm es muy parecido a Hadoop, se podría decir que son de la misma familia. Productos que trabajan con Big Data que se diferencian horizontalmente por el factor ‘real-time’ (Quizá me he pasado simplificando).

Storm nos ofrece un entorno para distribuir el proceso de datos a través de distintos nodos(Workers). En el mundo Storm existen dos tipos de actores:

  • – Spouts: Que son los generadores de ‘Tuples'(O datos a procesar). Cada ‘Tuple’ es un tipo de dato a procesar.
  • – Bolts: Que son los procesadores de ‘Tuples’. Además de procesar ‘Tuples’ los ‘Bolts’ también puede generar ‘Tuples’ que se inyectan al sistema.

Debemos diseñar nuestras aplicaciones Big Data con el framework Storm en clave de Spouts y Bolts. El framework se encargará de repartir el trabajo entre los recursos.

Vamos a crear nuestro primer proyecto con Storm, en este caso vamos ha hacer una aplicación que durante 50 segundos estará buscando número primos.

Lo primero va a ser generar nuestro proyecto con maven. Una vez tengamos el fichero POM configurado podremos generar el proyecto ejecutando ‘mvn install‘:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.lostsys.storm1</groupId>
    <artifactId>storm-test</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>

    <name>storm1</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.0.2</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>com.lostsys.storm1.MyTopology</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy</id>
                        <phase>install</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>storm</groupId>
            <artifactId>storm-lib</artifactId>
            <version>0.8.1</version>
            <!-- keep storm out of the jar-with-dependencies
               <scope>provided</scope> -->
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>github-releases</id>
            <url>http://oss.sonatype.org/content/repositories/github-releases/</url>
        </repository>
        <repository>
            <id>clojars.org</id>
            <url>http://clojars.org/repo</url>
        </repository>
    </repositories>
</project>

A partir de aquí ya podemos configurar nuestro ‘Spout’. En este caso ira generando números que el ‘Bolt’ procesará y dirá si son primos o no (Por cierto, fijaos que tenemos métodos para controlar el proceso con éxito o fallido de un Objeto):

public class MySpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    
    private static int currentNumber = 1;
        
    @Override
    public void open( Map conf, TopologyContext context, SpoutOutputCollector collector ) {
        this.collector = collector;
    	}
    
    @Override
    public void nextTuple() {
        // Emit the next number
        collector.emit( new Values( new Integer( currentNumber++ ) ) );
    	}

    @Override
    public void ack(Object id) { }

    @Override
    public void fail(Object id) { }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare( new Fields( "number" ) );
    	}
}

Después configuraremos nuestro ‘Bolt’:

public class MyBolt extends BaseRichBolt {
    private OutputCollector collector;

    public void prepare( Map conf, TopologyContext context, OutputCollector collector ) {
        this.collector = collector;
    	}

    public void execute( Tuple tuple ) {
        int number = tuple.getInteger( 0 );

        if( isPrime( number) )
            System.out.println( number );

        collector.ack( tuple );
    	}

    public void declareOutputFields( OutputFieldsDeclarer declarer ) {
        declarer.declare( new Fields( "number" ) );
    	}   
    
    private boolean isPrime( int n ) {
        if( n == 1 || n == 2 || n == 3 ) return true;

        if( n % 2 == 0 ) return false;
        
        for( int i=3; i*i<=n; i+=2 ) 
            if( n % i == 0)
                return false;

        return true;
    	}
}

Y finalmente, crearemos una topología, que aunque no lo haya explicado, en el mundo de Storm es como se llama a la configuración del grafo de computación:

public class MyTopology {

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout( "spout", new MySpout() );
        builder.setBolt( "prime", new MyBolt() ).shuffleGrouping("spout");

        Config conf = new Config();
        
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("test", conf, builder.createTopology());
        Utils.sleep(50000);
        cluster.killTopology("test");
        cluster.shutdown();
    	}
}

Para ejecutar el ejemplo solamente tenemos que ir a la carpeta donde nos genera el JAR y ejecutar.

java -jar storm-test-1.0.jar

Por cierto, para instalar Storm os lo podéis descargar de aquí https://github.com/nathanmarz/storm/downloads (Aunque para el desarrollo no haga falta).

Bien, el mundo Storm nos abre todo un mundo en el campo del Big Data y tiene mucho potencial para crear valor(Sobretodo a nivel de nuevos negocios). Ahora falta encontrar proyectos para aplicarlo, que en este país vamos un poco cojos en esto.

Comments

One Response to “Storm, framework para procesar grandes cantidades de datos en tiempo real”
  1. Anonimo2 says:

    Muy buen artículo . Muchas gracias. Además tienes uno que habla de Hadoop en tu blog. Muy grande.

Leave a Reply

© Albert Coronado Calzada