En aquest tema, aprendrem com consumir missatges des de Kafka. Els consumidors són components essencials en l'arquitectura de Kafka, ja que permeten llegir i processar els missatges que es publiquen en els temes. A continuació, desglossarem els conceptes clau, proporcionarem exemples pràctics i oferirem exercicis per reforçar l'aprenentatge.

Conceptes Clau

  1. Consumidor de Kafka: Un consumidor és una aplicació que llegeix dades des d'un tema de Kafka.
  2. Grup de Consumidors: Un conjunt de consumidors que treballen junts per consumir missatges d'un tema.
  3. Desplaçaments (Offsets): Un desplaçament és un índex que indica la posició d'un missatge dins d'una partició.
  4. Commit de Desplaçaments: El procés de guardar el desplaçament actual d'un consumidor per assegurar-se que els missatges no es processin més d'una vegada.

Configuració del Consumidor

Abans de començar a consumir missatges, hem de configurar el consumidor. Aquí teniu un exemple de configuració bàsica en Java:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Properties;

public class ConsumerConfigExample {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        // Aquí afegirem més configuració i codi per consumir missatges
    }
}

Explicació del Codi

  • BOOTSTRAP_SERVERS_CONFIG: L'adreça del servidor Kafka.
  • GROUP_ID_CONFIG: L'identificador del grup de consumidors.
  • KEY_DESERIALIZER_CLASS_CONFIG: La classe que deserialitza les claus dels missatges.
  • VALUE_DESERIALIZER_CLASS_CONFIG: La classe que deserialitza els valors dels missatges.

Consumir Missatges

Un cop configurat el consumidor, podem començar a consumir missatges. Aquí teniu un exemple complet:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumerExample {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList("my-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Offset: %d, Key: %s, Value: %s%n", record.offset(), record.key(), record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

Explicació del Codi

  • subscribe: Subscriu el consumidor a un tema.
  • poll: Recupera els missatges del tema. El paràmetre Duration.ofMillis(100) especifica el temps d'espera per obtenir els missatges.
  • ConsumerRecord: Representa un missatge consumit, incloent el desplaçament, la clau i el valor.

Exercicis Pràctics

  1. Configurar un Consumidor: Configura un consumidor per connectar-se a un clúster de Kafka i subscriure's a un tema anomenat "test-topic".
  2. Consumir Missatges: Escriu un programa que consumeixi missatges del tema "test-topic" i imprimeixi el desplaçament, la clau i el valor de cada missatge.
  3. Commit de Desplaçaments: Modifica el programa anterior per commitar els desplaçaments manualment després de processar cada missatge.

Solucions

  1. Configurar un Consumidor
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("test-topic"));
  1. Consumir Missatges
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("Offset: %d, Key: %s, Value: %s%n", record.offset(), record.key(), record.value());
    }
}
  1. Commit de Desplaçaments
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("Offset: %d, Key: %s, Value: %s%n", record.offset(), record.key(), record.value());
    }
    consumer.commitSync();
}

Errors Comuns i Consells

  • No Subscriure's a un Tema: Assegura't de subscriure el consumidor a un tema abans de cridar poll.
  • No Commetre Desplaçaments: Si no commites els desplaçaments, podries processar els mateixos missatges més d'una vegada en cas de fallada.
  • Durada de Poll Massa Llarga: Una durada de poll massa llarga pot causar latència en el processament dels missatges.

Resum

En aquesta secció, hem après com configurar un consumidor de Kafka, subscriure's a un tema i consumir missatges. També hem vist com commitar desplaçaments per assegurar-nos que els missatges no es processin més d'una vegada. Aquests conceptes són fonamentals per treballar amb Kafka i construir aplicacions robustes de processament de dades. En el següent tema, explorarem Kafka Connect, una eina poderosa per integrar Kafka amb altres sistemes.

© Copyright 2024. Tots els drets reservats