Introducció
En aquest mòdul, explorarem conceptes avançats de Kafka Streams, una potent llibreria per construir aplicacions i microserveis que processin dades en temps real. Ens centrarem en tècniques avançades per optimitzar el rendiment, gestionar estats complexos i integrar Kafka Streams amb altres components de l'ecosistema de Kafka.
Continguts
Optimització del Rendiment
Estratègies d'Optimització
- 
Configuració de Buffering i Caching: - Ajusta els paràmetres de buffering (cache.max.bytes.buffering) per millorar el rendiment.
- Utilitza el caching per reduir la latència de les operacions d'estat.
 
- Ajusta els paràmetres de buffering (
- 
Paral·lelisme i Threads: - Configura el nombre de threads (num.stream.threads) per aprofitar millor els recursos de la màquina.
- Assegura't que el nombre de threads no superi el nombre de particions per evitar la sobrecàrrega.
 
- Configura el nombre de threads (
- 
Optimització de Serdes: - Utilitza Serdes personalitzats per optimitzar la serialització i deserialització de dades.
- Evita la serialització/deserialització innecessària utilitzant MaterializedambinMemoryKeyValueStore.
 
Exemple de Configuració
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "advanced-streams-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); // 10 MB props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4); StreamsBuilder builder = new StreamsBuilder(); // Configuració de topologia... KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start();
Gestió d'Estats Complexos
Utilització de State Stores
- 
State Stores Persistents: - Utilitza RocksDBper emmagatzemar estats persistents.
- Configura StateStoreper a operacions de lectura/escriptura eficients.
 
- Utilitza 
- 
State Stores en Memòria: - Utilitza inMemoryKeyValueStoreper a estats temporals o de curta durada.
- Beneficia't de la baixa latència de les operacions en memòria.
 
- Utilitza 
Exemple de State Store
StoreBuilder<KeyValueStore<String, Long>> countStore = Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore("Counts"),
    Serdes.String(),
    Serdes.Long()
);
StreamsBuilder builder = new StreamsBuilder();
builder.addStateStore(countStore);
KStream<String, String> stream = builder.stream("input-topic");
stream.transform(() -> new Transformer<String, String, KeyValue<String, Long>>() {
    private KeyValueStore<String, Long> stateStore;
    @Override
    public void init(ProcessorContext context) {
        this.stateStore = (KeyValueStore<String, Long>) context.getStateStore("Counts");
    }
    @Override
    public KeyValue<String, Long> transform(String key, String value) {
        Long count = stateStore.get(key);
        if (count == null) {
            count = 0L;
        }
        count++;
        stateStore.put(key, count);
        return new KeyValue<>(key, count);
    }
    @Override
    public void close() {}
}, "Counts");Processament de Fluxos amb Joins
Tipus de Joins
- 
KStream-KStream Join: - Uneix dos fluxos basats en una clau comuna.
- Configura la finestra de temps per a l'operació de join.
 
- 
KStream-KTable Join: - Uneix un flux amb una taula basada en una clau comuna.
- Utilitza KTableper a dades que canvien menys freqüentment.
 
- 
KTable-KTable Join: - Uneix dues taules basades en una clau comuna.
- Ideal per a operacions de join en dades estàtiques o semi-estàtiques.
 
Exemple de KStream-KStream Join
KStream<String, String> leftStream = builder.stream("left-topic");
KStream<String, String> rightStream = builder.stream("right-topic");
KStream<String, String> joinedStream = leftStream.join(
    rightStream,
    (leftValue, rightValue) -> leftValue + ", " + rightValue,
    JoinWindows.of(Duration.ofMinutes(5)),
    Joined.with(Serdes.String(), Serdes.String(), Serdes.String())
);
joinedStream.to("output-topic");Integració amb Kafka Connect
Utilització de Connectors
- 
Sink Connectors: - Envia dades processades a sistemes externs com bases de dades, sistemes de fitxers, etc.
- Configura connectors com JdbcSinkConnector,ElasticsearchSinkConnector, etc.
 
- 
Source Connectors: - Llegeix dades de sistemes externs i les publica en temes de Kafka.
- Configura connectors com JdbcSourceConnector,FileStreamSourceConnector, etc.
 
Exemple de Configuració de Sink Connector
{
  "name": "jdbc-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "output-topic",
    "connection.url": "jdbc:mysql://localhost:3306/mydb",
    "connection.user": "user",
    "connection.password": "password",
    "auto.create": "true",
    "insert.mode": "insert"
  }
}Exercicis Pràctics
Exercici 1: Optimització de Rendiment
Descripció: Configura una aplicació de Kafka Streams per optimitzar el rendiment utilitzant caching i ajustant el nombre de threads.
Solució:
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "performance-optimized-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 20 * 1024 * 1024L); // 20 MB props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); StreamsBuilder builder = new StreamsBuilder(); // Configuració de topologia... KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start();
Exercici 2: Gestió d'Estats Complexos
Descripció: Implementa una aplicació de Kafka Streams que utilitzi un StateStore per mantenir un comptador de paraules.
Solució:
StoreBuilder<KeyValueStore<String, Long>> wordCountStore = Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore("WordCounts"),
    Serdes.String(),
    Serdes.Long()
);
StreamsBuilder builder = new StreamsBuilder();
builder.addStateStore(wordCountStore);
KStream<String, String> textLines = builder.stream("text-input");
KStream<String, Long> wordCounts = textLines
    .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
    .transform(() -> new Transformer<String, String, KeyValue<String, Long>>() {
        private KeyValueStore<String, Long> stateStore;
        @Override
        public void init(ProcessorContext context) {
            this.stateStore = (KeyValueStore<String, Long>) context.getStateStore("WordCounts");
        }
        @Override
        public KeyValue<String, Long> transform(String key, String word) {
            Long count = stateStore.get(word);
            if (count == null) {
                count = 0L;
            }
            count++;
            stateStore.put(word, count);
            return new KeyValue<>(word, count);
        }
        @Override
        public void close() {}
    }, "WordCounts");
wordCounts.to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));Conclusió
En aquest mòdul, hem explorat conceptes avançats de Kafka Streams, incloent tècniques d'optimització del rendiment, gestió d'estats complexos, processament de fluxos amb joins i integració amb Kafka Connect. Aquests coneixements us permetran construir aplicacions de processament de dades en temps real més eficients i robustes. En el proper mòdul, ens centrarem en l'optimització del rendiment de Kafka en general.
Curs de Kafka
Mòdul 1: Introducció a Kafka
Mòdul 2: Conceptes bàsics de Kafka
Mòdul 3: Operacions de Kafka
Mòdul 4: Configuració i Gestió de Kafka
Mòdul 5: Temes Avançats de Kafka
- Optimització del Rendiment de Kafka
- Kafka en una Configuració Multi-Centre de Dades
- Kafka amb Registre d'Esquemes
- Kafka Streams Avançat
