En aquest tema, aprendrem com produir missatges a Kafka. Això inclou la configuració d'un productor, l'enviament de missatges a un tema i la gestió de les confirmacions (acknowledgements).
Objectius
- Entendre el rol dels productors en Kafka.
- Configurar un productor de Kafka.
- Enviar missatges a un tema de Kafka.
- Gestionar les confirmacions de missatges.
- Rol dels Productors en Kafka
Els productors són aplicacions que envien dades a Kafka. Aquestes dades es publiquen en temes, que són estructures lògiques dins de Kafka que organitzen els missatges.
Conceptes Clau:
- Productor: Aplicació que envia missatges a Kafka.
- Tema: Estructura lògica que organitza els missatges.
- Partició: Subdivisió d'un tema que permet la paral·lelització.
- Configuració d'un Productor de Kafka
Per configurar un productor de Kafka, necessitem especificar algunes propietats clau. A continuació es mostra un exemple de configuració en Java:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class ProducerConfigExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(props); // El productor està configurat i llest per enviar missatges } }
Explicació del Codi:
BOOTSTRAP_SERVERS_CONFIG
: Llista de servidors Kafka per connectar-se.KEY_SERIALIZER_CLASS_CONFIG
: Classe per serialitzar les claus dels missatges.VALUE_SERIALIZER_CLASS_CONFIG
: Classe per serialitzar els valors dels missatges.
- Enviament de Missatges a un Tema
Un cop el productor està configurat, podem enviar missatges a un tema. A continuació es mostra un exemple d'enviament de missatges:
import org.apache.kafka.clients.producer.ProducerRecord; public class SendMessageExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(props); String topic = "my-topic"; String key = "my-key"; String value = "Hello, Kafka!"; ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value); producer.send(record); producer.close(); } }
Explicació del Codi:
ProducerRecord
: Representa un missatge que s'enviarà a Kafka.send(record)
: Envia el missatge al tema especificat.
- Gestió de les Confirmacions
Les confirmacions (acknowledgements) són respostes del clúster de Kafka que indiquen que un missatge ha estat rebut correctament. Podem configurar el productor per gestionar les confirmacions de diverses maneres:
Tipus de Confirmacions:
- acks=0: El productor no espera cap confirmació del servidor.
- acks=1: El productor espera la confirmació del líder de la partició.
- acks=all: El productor espera la confirmació de tots els rèpliques.
Exemple de Configuració de Confirmacions:
Exercici Pràctic
Exercici:
- Configura un productor de Kafka.
- Envia un missatge a un tema anomenat "test-topic".
- Configura el productor per esperar confirmacions de tots els rèpliques.
Solució:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProducerExercise { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG, "all"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); String topic = "test-topic"; String key = "exercise-key"; String value = "This is a test message"; ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value); producer.send(record); producer.close(); } }
Resum
En aquest tema, hem après com configurar un productor de Kafka, enviar missatges a un tema i gestionar les confirmacions. Aquestes habilitats són fonamentals per treballar amb Kafka i assegurar-se que els missatges es publiquen correctament.
En el següent tema, explorarem com consumir missatges de Kafka, completant així el cicle de producció i consum de dades.
Curs de Kafka
Mòdul 1: Introducció a Kafka
Mòdul 2: Conceptes bàsics de Kafka
Mòdul 3: Operacions de Kafka
Mòdul 4: Configuració i Gestió de Kafka
Mòdul 5: Temes Avançats de Kafka
- Optimització del Rendiment de Kafka
- Kafka en una Configuració Multi-Centre de Dades
- Kafka amb Registre d'Esquemes
- Kafka Streams Avançat