Get it on Google Play
Tu partner Freelance para tus proyectos IT

Apache Kafka | Streams API

01-09-2020

Buenas digitales! Continuamos con el tema de Apache Kafka y hoy le ha tocado el turno a Streams API. El API Streams nos permite crear aplicaciones que procesen datos de una lista de entrada y los publiquen en una lista de salida(En streaming y gestionado desde dentro del propio cluster). El API nos proveerá de las operaciones mas habituales como mapeado, filtrado, anidamientos, etc. y gestionará el escalado.

Instalar y arrancar Kafka

En el vídeo no nos hemos entretenido a instalar Kafka pero aquí si que vamos ha hacerlo. Descargar y descomprimir Kafka de la URL https://kafka.apache.org/. Este es todo el proceso de instalación.

A partir de aquí hemos arrancado primero Zookeeper y después Kafka en modo ‘standalone'(Es decir, que trabajaremos con un cluster de un solo nodo):

<KAFKA_HOME>/bin/zookeeper-server-start.sh config/zookeeper.properties

<KAFKA_HOME>/bin/kafka-server-start.sh config/server.properties

Creamos nuestros dos topic

Ahora toca crear nuestros dos topics(Uno de entrada y otro de salida):

<KAFKA_HOME>/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-stream-in

<KAFKA_HOME>/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-stream-out

Después hemos comprobado que realmente ambos topics se han generado correctamente:

<KAFKA_HOME>/bin/kafka-topics.sh --list --bootstrap-server localhost:9092

Nos conectamos al topic de entrada y nos suscribimos al de salida

<KAFKA_HOME>/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-stream-in

<KAFKA_HOME>/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-stream-out --from-beginning

Recordad que de momento estos dos topics(O listas) no están conectadas porque todavía no tenemos la aplicación que lo hace.

Arrancamos nuesto IDE favorito creamos nuestro proyecto maven

Arrancamos nuestro IDE favorito, en este caso Eclipse IDE y creamos un proyecto maven(Aunque también podríamos hacerlo con Gradle). Después añadimos las siguientes dependencias al proyecto:

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-streams</artifactId>
	<version>2.6.0</version>
</dependency>

Seguidamente ya podemos empezar a programar nuestra Kafka Streams App:

package com.lostsys.youtube.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
import java.util.regex.Pattern;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;

public class StreamsDemo {

	public static void main(String[] args) {

	       Properties props = new Properties();
	        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "lostsys-kafka-sample");
	        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
	        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
	        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

	        final StreamsBuilder builder = new StreamsBuilder();
	        final KStream<String, String> source = builder.stream("test-stream-in");
	        
	        source.flatMapValues(value -> {
	        	ArrayList<String> r=new ArrayList<String>();
	        			
	        	r.add("{ word: '"+value+"', length: "+value.length()+", words: "+value.split(" ").length+" }");
	        			
	        	return r;
	        	})
	                .to("test-stream-out");

	        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
	        streams.start();
		}	
	
	}

Como veis el desarrollo es bien sencillo y ha habido mas trabajo de arrancar cosas que de programación.

Kafka Streams

Kafka Streams

Si te ha servido, por favor comparte
 

Leave a Reply