Introducció a Kafka Streams

Kafka Streams és una biblioteca client per construir aplicacions i microserveis que processin dades en temps real. Utilitza Apache Kafka com a sistema de missatgeria i permet transformar, agregar i analitzar dades de manera contínua.

Objectius d'aquest tema:

  1. Entendre què és Kafka Streams i per què és útil.
  2. Aprendre a configurar una aplicació de Kafka Streams.
  3. Explorar les operacions bàsiques de processament de fluxos.
  4. Implementar un exemple pràctic de Kafka Streams.

Què és Kafka Streams?

Kafka Streams és una API de processament de fluxos que permet construir aplicacions que consumeixen, processen i produeixen dades en temps real. A diferència d'altres solucions de processament de fluxos, Kafka Streams és lleugera i no requereix un clúster separat per executar-se.

Característiques clau:

  • Integració nativa amb Kafka: Kafka Streams es construeix sobre Kafka, aprofitant la seva robustesa i escalabilitat.
  • Processament d'alta disponibilitat: Suporta processament distribuït i tolerància a fallades.
  • API rica: Proporciona una API funcional i una API de DSL (Domain Specific Language) per facilitar el desenvolupament.

Configuració d'una Aplicació de Kafka Streams

Dependències

Per començar amb Kafka Streams, necessitem afegir les dependències necessàries al nostre projecte. Si utilitzem Maven, el pom.xml hauria d'incloure:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.0.0</version>
    </dependency>
</dependencies>

Configuració Bàsica

La configuració bàsica d'una aplicació de Kafka Streams inclou la definició de propietats com el nom de l'aplicació, el servidor Kafka i les configuracions de deserialització/serialització.

import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.common.serialization.Serdes;

import java.util.Properties;

public class KafkaStreamsExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        StreamsBuilder builder = new StreamsBuilder();
        // Definició del processament de fluxos aquí

        Topology topology = builder.build();
        KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();
    }
}

Operacions Bàsiques de Processament de Fluxos

Lectura de Dades

Per llegir dades d'un tema de Kafka, utilitzem el mètode stream de StreamsBuilder.

KStream<String, String> source = builder.stream("input-topic");

Transformació de Dades

Podem aplicar diverses transformacions a les dades, com ara map, filter, i flatMap.

KStream<String, String> transformed = source
    .filter((key, value) -> value.length() > 5)
    .mapValues(value -> value.toUpperCase());

Escriptura de Dades

Per escriure les dades processades a un altre tema de Kafka, utilitzem el mètode to.

transformed.to("output-topic");

Exemple Pràctic

A continuació, implementarem un exemple complet que llegeix missatges d'un tema, transforma els valors a majúscules i escriu els resultats a un altre tema.

import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.common.serialization.Serdes;

import java.util.Properties;

public class KafkaStreamsExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("input-topic");

        KStream<String, String> transformed = source
            .filter((key, value) -> value.length() > 5)
            .mapValues(value -> value.toUpperCase());

        transformed.to("output-topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

Exercicis Pràctics

Exercici 1: Filtrar Missatges

Objectiu: Filtrar missatges que continguin la paraula "error" i escriure'ls a un tema separat.

Instruccions:

  1. Llegeix missatges del tema "logs".
  2. Filtra els missatges que continguin la paraula "error".
  3. Escriu els missatges filtrats al tema "error-logs".

Solució:

KStream<String, String> logs = builder.stream("logs");

KStream<String, String> errorLogs = logs
    .filter((key, value) -> value.contains("error"));

errorLogs.to("error-logs");

Exercici 2: Comptar Paraules

Objectiu: Comptar la freqüència de cada paraula en els missatges i escriure els resultats a un tema.

Instruccions:

  1. Llegeix missatges del tema "text-input".
  2. Divideix els missatges en paraules.
  3. Comptar la freqüència de cada paraula.
  4. Escriu els resultats al tema "word-count-output".

Solució:

KStream<String, String> textLines = builder.stream("text-input");

KStream<String, String> words = textLines
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")));

KTable<String, Long> wordCounts = words
    .groupBy((key, word) -> word)
    .count();

wordCounts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));

Conclusió

En aquest tema, hem après què és Kafka Streams, com configurar una aplicació bàsica i com realitzar operacions de processament de fluxos. També hem implementat exemples pràctics per reforçar els conceptes apresos. Amb aquests coneixements, estàs preparat per explorar funcionalitats més avançades de Kafka Streams i integrar-les en les teves aplicacions de processament de dades en temps real.

© Copyright 2024. Tots els drets reservats