Introducció a Kafka Streams
Kafka Streams és una biblioteca client per construir aplicacions i microserveis que processin dades en temps real. Utilitza Apache Kafka com a sistema de missatgeria i permet transformar, agregar i analitzar dades de manera contínua.
Objectius d'aquest tema:
- Entendre què és Kafka Streams i per què és útil.
- Aprendre a configurar una aplicació de Kafka Streams.
- Explorar les operacions bàsiques de processament de fluxos.
- Implementar un exemple pràctic de Kafka Streams.
Què és Kafka Streams?
Kafka Streams és una API de processament de fluxos que permet construir aplicacions que consumeixen, processen i produeixen dades en temps real. A diferència d'altres solucions de processament de fluxos, Kafka Streams és lleugera i no requereix un clúster separat per executar-se.
Característiques clau:
- Integració nativa amb Kafka: Kafka Streams es construeix sobre Kafka, aprofitant la seva robustesa i escalabilitat.
- Processament d'alta disponibilitat: Suporta processament distribuït i tolerància a fallades.
- API rica: Proporciona una API funcional i una API de DSL (Domain Specific Language) per facilitar el desenvolupament.
Configuració d'una Aplicació de Kafka Streams
Dependències
Per començar amb Kafka Streams, necessitem afegir les dependències necessàries al nostre projecte. Si utilitzem Maven, el pom.xml
hauria d'incloure:
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency> </dependencies>
Configuració Bàsica
La configuració bàsica d'una aplicació de Kafka Streams inclou la definició de propietats com el nom de l'aplicació, el servidor Kafka i les configuracions de deserialització/serialització.
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.Topology; import org.apache.kafka.common.serialization.Serdes; import java.util.Properties; public class KafkaStreamsExample { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-example"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); StreamsBuilder builder = new StreamsBuilder(); // Definició del processament de fluxos aquí Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology, props); streams.start(); } }
Operacions Bàsiques de Processament de Fluxos
Lectura de Dades
Per llegir dades d'un tema de Kafka, utilitzem el mètode stream
de StreamsBuilder
.
Transformació de Dades
Podem aplicar diverses transformacions a les dades, com ara map, filter, i flatMap.
KStream<String, String> transformed = source .filter((key, value) -> value.length() > 5) .mapValues(value -> value.toUpperCase());
Escriptura de Dades
Per escriure les dades processades a un altre tema de Kafka, utilitzem el mètode to
.
Exemple Pràctic
A continuació, implementarem un exemple complet que llegeix missatges d'un tema, transforma els valors a majúscules i escriu els resultats a un altre tema.
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.common.serialization.Serdes; import java.util.Properties; public class KafkaStreamsExample { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-example"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("input-topic"); KStream<String, String> transformed = source .filter((key, value) -> value.length() > 5) .mapValues(value -> value.toUpperCase()); transformed.to("output-topic"); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); } }
Exercicis Pràctics
Exercici 1: Filtrar Missatges
Objectiu: Filtrar missatges que continguin la paraula "error" i escriure'ls a un tema separat.
Instruccions:
- Llegeix missatges del tema "logs".
- Filtra els missatges que continguin la paraula "error".
- Escriu els missatges filtrats al tema "error-logs".
Solució:
KStream<String, String> logs = builder.stream("logs"); KStream<String, String> errorLogs = logs .filter((key, value) -> value.contains("error")); errorLogs.to("error-logs");
Exercici 2: Comptar Paraules
Objectiu: Comptar la freqüència de cada paraula en els missatges i escriure els resultats a un tema.
Instruccions:
- Llegeix missatges del tema "text-input".
- Divideix els missatges en paraules.
- Comptar la freqüència de cada paraula.
- Escriu els resultats al tema "word-count-output".
Solució:
KStream<String, String> textLines = builder.stream("text-input"); KStream<String, String> words = textLines .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))); KTable<String, Long> wordCounts = words .groupBy((key, word) -> word) .count(); wordCounts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));
Conclusió
En aquest tema, hem après què és Kafka Streams, com configurar una aplicació bàsica i com realitzar operacions de processament de fluxos. També hem implementat exemples pràctics per reforçar els conceptes apresos. Amb aquests coneixements, estàs preparat per explorar funcionalitats més avançades de Kafka Streams i integrar-les en les teves aplicacions de processament de dades en temps real.
Curs de Kafka
Mòdul 1: Introducció a Kafka
Mòdul 2: Conceptes bàsics de Kafka
Mòdul 3: Operacions de Kafka
Mòdul 4: Configuració i Gestió de Kafka
Mòdul 5: Temes Avançats de Kafka
- Optimització del Rendiment de Kafka
- Kafka en una Configuració Multi-Centre de Dades
- Kafka amb Registre d'Esquemes
- Kafka Streams Avançat