En aquest tema, explorarem els conceptes de missatges i desplaçaments en Kafka. Aquests són elements fonamentals per comprendre com funciona la transmissió de dades en aquest sistema de missatgeria distribuïda.
Què és un Missatge?
Un missatge en Kafka és una unitat de dades que es produeix i es consumeix. Els missatges poden contenir qualsevol tipus de dades, com ara text, JSON, XML, binari, etc.
Components d'un Missatge
- Clau (Key): Opcional. Pot ser utilitzada per particionar els missatges de manera específica.
- Valor (Value): El contingut del missatge.
- Encapçalament (Header): Informació addicional opcional que pot ser utilitzada per metadades.
Exemple de Missatge en JSON
{ "key": "user123", "value": { "event": "login", "timestamp": "2023-10-01T12:34:56Z" }, "headers": { "source": "web" } }
Què és un Desplaçament (Offset)?
El desplaçament és un identificador únic que Kafka assigna a cada missatge dins d'una partició d'un tema. Aquest identificador és un número seqüencial que permet als consumidors saber quins missatges han llegit i quins no.
Característiques del Desplaçament
- Seqüencial: Cada missatge dins d'una partició té un desplaçament únic i seqüencial.
- Persistència: Els desplaçaments es mantenen fins i tot després que els missatges hagin estat consumits.
- Control de Consum: Els consumidors utilitzen els desplaçaments per mantenir el seguiment dels missatges que han processat.
Exemple de Desplaçament
Si tenim una partició amb els següents missatges:
Desplaçament | Clau | Valor |
---|---|---|
0 | user123 | {"event": "login"} |
1 | user456 | {"event": "logout"} |
2 | user789 | {"event": "purchase"} |
El desplaçament 0 correspon al primer missatge, el desplaçament 1 al segon, i així successivament.
Com Funcionen els Desplaçaments en els Consumidors?
Els consumidors utilitzen els desplaçaments per saber on continuar llegint en una partició. Quan un consumidor llegeix un missatge, actualitza el desplaçament per indicar que ha processat aquest missatge.
Exemple de Consumidor
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } consumer.commitSync(); }
En aquest exemple, el consumidor llegeix missatges del tema "my-topic" i imprimeix el desplaçament, la clau i el valor de cada missatge. Després de processar els missatges, el consumidor sincronitza els desplaçaments per assegurar-se que no es perden missatges en cas de fallada.
Exercicis Pràctics
Exercici 1: Crear i Llegir Missatges
- Crear un Productor: Escriu un productor que enviï missatges a un tema anomenat "test-topic".
- Crear un Consumidor: Escriu un consumidor que llegeixi els missatges del tema "test-topic" i imprimeixi els desplaçaments, les claus i els valors.
Solució de l'Exercici 1
Productor
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key" + i, "value" + i); producer.send(record); } producer.close();
Consumidor
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } consumer.commitSync(); }
Errors Comuns i Consells
- No Commetre els Desplaçaments: Assegura't de commetre els desplaçaments després de processar els missatges per evitar la duplicació de processament.
- Desbordament de Missatges: Si el consumidor no pot seguir el ritme del productor, pot haver-hi un desbordament de missatges. Ajusta la configuració de consum per gestionar millor la càrrega.
- Particions i Clau: Utilitza claus de missatge per assegurar-te que els missatges relacionats es distribueixen a la mateixa partició, facilitant el processament seqüencial.
Conclusió
En aquesta secció, hem après què són els missatges i els desplaçaments en Kafka, com es gestionen i com els consumidors utilitzen els desplaçaments per mantenir el seguiment dels missatges processats. Aquests conceptes són fonamentals per treballar amb Kafka de manera eficient i segura. En el següent mòdul, explorarem com produir i consumir missatges en detall.
Curs de Kafka
Mòdul 1: Introducció a Kafka
Mòdul 2: Conceptes bàsics de Kafka
Mòdul 3: Operacions de Kafka
Mòdul 4: Configuració i Gestió de Kafka
Mòdul 5: Temes Avançats de Kafka
- Optimització del Rendiment de Kafka
- Kafka en una Configuració Multi-Centre de Dades
- Kafka amb Registre d'Esquemes
- Kafka Streams Avançat