Introducció
En aquest mòdul, explorarem conceptes avançats de Kafka Streams, una potent llibreria per construir aplicacions i microserveis que processin dades en temps real. Ens centrarem en tècniques avançades per optimitzar el rendiment, gestionar estats complexos i integrar Kafka Streams amb altres components de l'ecosistema de Kafka.
Continguts
Optimització del Rendiment
Estratègies d'Optimització
-
Configuració de Buffering i Caching:
- Ajusta els paràmetres de buffering (
cache.max.bytes.buffering
) per millorar el rendiment. - Utilitza el caching per reduir la latència de les operacions d'estat.
- Ajusta els paràmetres de buffering (
-
Paral·lelisme i Threads:
- Configura el nombre de threads (
num.stream.threads
) per aprofitar millor els recursos de la màquina. - Assegura't que el nombre de threads no superi el nombre de particions per evitar la sobrecàrrega.
- Configura el nombre de threads (
-
Optimització de Serdes:
- Utilitza Serdes personalitzats per optimitzar la serialització i deserialització de dades.
- Evita la serialització/deserialització innecessària utilitzant
Materialized
ambinMemoryKeyValueStore
.
Exemple de Configuració
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "advanced-streams-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); // 10 MB props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4); StreamsBuilder builder = new StreamsBuilder(); // Configuració de topologia... KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start();
Gestió d'Estats Complexos
Utilització de State Stores
-
State Stores Persistents:
- Utilitza
RocksDB
per emmagatzemar estats persistents. - Configura
StateStore
per a operacions de lectura/escriptura eficients.
- Utilitza
-
State Stores en Memòria:
- Utilitza
inMemoryKeyValueStore
per a estats temporals o de curta durada. - Beneficia't de la baixa latència de les operacions en memòria.
- Utilitza
Exemple de State Store
StoreBuilder<KeyValueStore<String, Long>> countStore = Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore("Counts"), Serdes.String(), Serdes.Long() ); StreamsBuilder builder = new StreamsBuilder(); builder.addStateStore(countStore); KStream<String, String> stream = builder.stream("input-topic"); stream.transform(() -> new Transformer<String, String, KeyValue<String, Long>>() { private KeyValueStore<String, Long> stateStore; @Override public void init(ProcessorContext context) { this.stateStore = (KeyValueStore<String, Long>) context.getStateStore("Counts"); } @Override public KeyValue<String, Long> transform(String key, String value) { Long count = stateStore.get(key); if (count == null) { count = 0L; } count++; stateStore.put(key, count); return new KeyValue<>(key, count); } @Override public void close() {} }, "Counts");
Processament de Fluxos amb Joins
Tipus de Joins
-
KStream-KStream Join:
- Uneix dos fluxos basats en una clau comuna.
- Configura la finestra de temps per a l'operació de join.
-
KStream-KTable Join:
- Uneix un flux amb una taula basada en una clau comuna.
- Utilitza
KTable
per a dades que canvien menys freqüentment.
-
KTable-KTable Join:
- Uneix dues taules basades en una clau comuna.
- Ideal per a operacions de join en dades estàtiques o semi-estàtiques.
Exemple de KStream-KStream Join
KStream<String, String> leftStream = builder.stream("left-topic"); KStream<String, String> rightStream = builder.stream("right-topic"); KStream<String, String> joinedStream = leftStream.join( rightStream, (leftValue, rightValue) -> leftValue + ", " + rightValue, JoinWindows.of(Duration.ofMinutes(5)), Joined.with(Serdes.String(), Serdes.String(), Serdes.String()) ); joinedStream.to("output-topic");
Integració amb Kafka Connect
Utilització de Connectors
-
Sink Connectors:
- Envia dades processades a sistemes externs com bases de dades, sistemes de fitxers, etc.
- Configura connectors com
JdbcSinkConnector
,ElasticsearchSinkConnector
, etc.
-
Source Connectors:
- Llegeix dades de sistemes externs i les publica en temes de Kafka.
- Configura connectors com
JdbcSourceConnector
,FileStreamSourceConnector
, etc.
Exemple de Configuració de Sink Connector
{ "name": "jdbc-sink-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "output-topic", "connection.url": "jdbc:mysql://localhost:3306/mydb", "connection.user": "user", "connection.password": "password", "auto.create": "true", "insert.mode": "insert" } }
Exercicis Pràctics
Exercici 1: Optimització de Rendiment
Descripció: Configura una aplicació de Kafka Streams per optimitzar el rendiment utilitzant caching i ajustant el nombre de threads.
Solució:
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "performance-optimized-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 20 * 1024 * 1024L); // 20 MB props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); StreamsBuilder builder = new StreamsBuilder(); // Configuració de topologia... KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start();
Exercici 2: Gestió d'Estats Complexos
Descripció: Implementa una aplicació de Kafka Streams que utilitzi un StateStore
per mantenir un comptador de paraules.
Solució:
StoreBuilder<KeyValueStore<String, Long>> wordCountStore = Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore("WordCounts"), Serdes.String(), Serdes.Long() ); StreamsBuilder builder = new StreamsBuilder(); builder.addStateStore(wordCountStore); KStream<String, String> textLines = builder.stream("text-input"); KStream<String, Long> wordCounts = textLines .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+"))) .transform(() -> new Transformer<String, String, KeyValue<String, Long>>() { private KeyValueStore<String, Long> stateStore; @Override public void init(ProcessorContext context) { this.stateStore = (KeyValueStore<String, Long>) context.getStateStore("WordCounts"); } @Override public KeyValue<String, Long> transform(String key, String word) { Long count = stateStore.get(word); if (count == null) { count = 0L; } count++; stateStore.put(word, count); return new KeyValue<>(word, count); } @Override public void close() {} }, "WordCounts"); wordCounts.to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));
Conclusió
En aquest mòdul, hem explorat conceptes avançats de Kafka Streams, incloent tècniques d'optimització del rendiment, gestió d'estats complexos, processament de fluxos amb joins i integració amb Kafka Connect. Aquests coneixements us permetran construir aplicacions de processament de dades en temps real més eficients i robustes. En el proper mòdul, ens centrarem en l'optimització del rendiment de Kafka en general.
Curs de Kafka
Mòdul 1: Introducció a Kafka
Mòdul 2: Conceptes bàsics de Kafka
Mòdul 3: Operacions de Kafka
Mòdul 4: Configuració i Gestió de Kafka
Mòdul 5: Temes Avançats de Kafka
- Optimització del Rendiment de Kafka
- Kafka en una Configuració Multi-Centre de Dades
- Kafka amb Registre d'Esquemes
- Kafka Streams Avançat