En aquest tema, aprendrem com produir missatges a Kafka. Això inclou la configuració d'un productor, l'enviament de missatges a un tema i la gestió de les confirmacions (acknowledgements).

Objectius

  • Entendre el rol dels productors en Kafka.
  • Configurar un productor de Kafka.
  • Enviar missatges a un tema de Kafka.
  • Gestionar les confirmacions de missatges.

  1. Rol dels Productors en Kafka

Els productors són aplicacions que envien dades a Kafka. Aquestes dades es publiquen en temes, que són estructures lògiques dins de Kafka que organitzen els missatges.

Conceptes Clau:

  • Productor: Aplicació que envia missatges a Kafka.
  • Tema: Estructura lògica que organitza els missatges.
  • Partició: Subdivisió d'un tema que permet la paral·lelització.

  1. Configuració d'un Productor de Kafka

Per configurar un productor de Kafka, necessitem especificar algunes propietats clau. A continuació es mostra un exemple de configuració en Java:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerConfigExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        // El productor està configurat i llest per enviar missatges
    }
}

Explicació del Codi:

  • BOOTSTRAP_SERVERS_CONFIG: Llista de servidors Kafka per connectar-se.
  • KEY_SERIALIZER_CLASS_CONFIG: Classe per serialitzar les claus dels missatges.
  • VALUE_SERIALIZER_CLASS_CONFIG: Classe per serialitzar els valors dels missatges.

  1. Enviament de Missatges a un Tema

Un cop el productor està configurat, podem enviar missatges a un tema. A continuació es mostra un exemple d'enviament de missatges:

import org.apache.kafka.clients.producer.ProducerRecord;

public class SendMessageExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        String topic = "my-topic";
        String key = "my-key";
        String value = "Hello, Kafka!";

        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
        producer.send(record);

        producer.close();
    }
}

Explicació del Codi:

  • ProducerRecord: Representa un missatge que s'enviarà a Kafka.
  • send(record): Envia el missatge al tema especificat.

  1. Gestió de les Confirmacions

Les confirmacions (acknowledgements) són respostes del clúster de Kafka que indiquen que un missatge ha estat rebut correctament. Podem configurar el productor per gestionar les confirmacions de diverses maneres:

Tipus de Confirmacions:

  • acks=0: El productor no espera cap confirmació del servidor.
  • acks=1: El productor espera la confirmació del líder de la partició.
  • acks=all: El productor espera la confirmació de tots els rèpliques.

Exemple de Configuració de Confirmacions:

props.put(ProducerConfig.ACKS_CONFIG, "all");

Exercici Pràctic

Exercici:

  1. Configura un productor de Kafka.
  2. Envia un missatge a un tema anomenat "test-topic".
  3. Configura el productor per esperar confirmacions de tots els rèpliques.

Solució:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerExercise {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.ACKS_CONFIG, "all");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        String topic = "test-topic";
        String key = "exercise-key";
        String value = "This is a test message";

        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
        producer.send(record);

        producer.close();
    }
}

Resum

En aquest tema, hem après com configurar un productor de Kafka, enviar missatges a un tema i gestionar les confirmacions. Aquestes habilitats són fonamentals per treballar amb Kafka i assegurar-se que els missatges es publiquen correctament.

En el següent tema, explorarem com consumir missatges de Kafka, completant així el cicle de producció i consum de dades.

© Copyright 2024. Tots els drets reservats