En aquest tema, aprendrem com consumir missatges des de Kafka. Els consumidors són components essencials en l'arquitectura de Kafka, ja que permeten llegir i processar els missatges que es publiquen en els temes. A continuació, desglossarem els conceptes clau, proporcionarem exemples pràctics i oferirem exercicis per reforçar l'aprenentatge.
Conceptes Clau
- Consumidor de Kafka: Un consumidor és una aplicació que llegeix dades des d'un tema de Kafka.
- Grup de Consumidors: Un conjunt de consumidors que treballen junts per consumir missatges d'un tema.
- Desplaçaments (Offsets): Un desplaçament és un índex que indica la posició d'un missatge dins d'una partició.
- Commit de Desplaçaments: El procés de guardar el desplaçament actual d'un consumidor per assegurar-se que els missatges no es processin més d'una vegada.
Configuració del Consumidor
Abans de començar a consumir missatges, hem de configurar el consumidor. Aquí teniu un exemple de configuració bàsica en Java:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Properties; public class ConsumerConfigExample { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // Aquí afegirem més configuració i codi per consumir missatges } }
Explicació del Codi
- BOOTSTRAP_SERVERS_CONFIG: L'adreça del servidor Kafka.
- GROUP_ID_CONFIG: L'identificador del grup de consumidors.
- KEY_DESERIALIZER_CLASS_CONFIG: La classe que deserialitza les claus dels missatges.
- VALUE_DESERIALIZER_CLASS_CONFIG: La classe que deserialitza els valors dels missatges.
Consumir Missatges
Un cop configurat el consumidor, podem començar a consumir missatges. Aquí teniu un exemple complet:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class ConsumerExample { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singletonList("my-topic")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("Offset: %d, Key: %s, Value: %s%n", record.offset(), record.key(), record.value()); } } } finally { consumer.close(); } } }
Explicació del Codi
- subscribe: Subscriu el consumidor a un tema.
- poll: Recupera els missatges del tema. El paràmetre
Duration.ofMillis(100)
especifica el temps d'espera per obtenir els missatges. - ConsumerRecord: Representa un missatge consumit, incloent el desplaçament, la clau i el valor.
Exercicis Pràctics
- Configurar un Consumidor: Configura un consumidor per connectar-se a un clúster de Kafka i subscriure's a un tema anomenat "test-topic".
- Consumir Missatges: Escriu un programa que consumeixi missatges del tema "test-topic" i imprimeixi el desplaçament, la clau i el valor de cada missatge.
- Commit de Desplaçaments: Modifica el programa anterior per commitar els desplaçaments manualment després de processar cada missatge.
Solucions
- Configurar un Consumidor
Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singletonList("test-topic"));
- Consumir Missatges
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("Offset: %d, Key: %s, Value: %s%n", record.offset(), record.key(), record.value()); } }
- Commit de Desplaçaments
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("Offset: %d, Key: %s, Value: %s%n", record.offset(), record.key(), record.value()); } consumer.commitSync(); }
Errors Comuns i Consells
- No Subscriure's a un Tema: Assegura't de subscriure el consumidor a un tema abans de cridar
poll
. - No Commetre Desplaçaments: Si no commites els desplaçaments, podries processar els mateixos missatges més d'una vegada en cas de fallada.
- Durada de Poll Massa Llarga: Una durada de
poll
massa llarga pot causar latència en el processament dels missatges.
Resum
En aquesta secció, hem après com configurar un consumidor de Kafka, subscriure's a un tema i consumir missatges. També hem vist com commitar desplaçaments per assegurar-nos que els missatges no es processin més d'una vegada. Aquests conceptes són fonamentals per treballar amb Kafka i construir aplicacions robustes de processament de dades. En el següent tema, explorarem Kafka Connect, una eina poderosa per integrar Kafka amb altres sistemes.
Curs de Kafka
Mòdul 1: Introducció a Kafka
Mòdul 2: Conceptes bàsics de Kafka
Mòdul 3: Operacions de Kafka
Mòdul 4: Configuració i Gestió de Kafka
Mòdul 5: Temes Avançats de Kafka
- Optimització del Rendiment de Kafka
- Kafka en una Configuració Multi-Centre de Dades
- Kafka amb Registre d'Esquemes
- Kafka Streams Avançat