Introducció

El Streaming Estructurat és una API d'alt nivell per al processament de fluxos de dades en temps real a Apache Spark. Aquesta API permet treballar amb dades de streaming de la mateixa manera que es treballa amb dades estàtiques, utilitzant DataFrames i Datasets. Això facilita la integració del processament de dades en temps real amb les aplicacions de processament de dades existents.

Conceptes Clau

  1. Font de Dades de Streaming

Les fonts de dades de streaming poden ser diverses, com ara fitxers, Kafka, sockets, etc. Spark pot llegir dades contínuament d'aquestes fonts i processar-les en temps real.

  1. DataFrames i Datasets

El Streaming Estructurat utilitza DataFrames i Datasets per representar les dades de streaming. Això permet aplicar les mateixes operacions que es fan servir per a les dades estàtiques.

  1. Triggers

Els triggers defineixen la freqüència amb la qual Spark processa les dades de streaming. Per exemple, es pot configurar un trigger per processar les dades cada 1 segon.

  1. Estat

El processament de streaming pot mantenir un estat per gestionar operacions com ara agregacions, joins, etc. Spark gestiona aquest estat de manera eficient i tolerant a fallades.

Exemple Pràctic

Configuració de l'Entorn

Abans de començar, assegura't de tenir un entorn Spark configurat. Pots seguir les instruccions del mòdul "Configuració de l'entorn Spark" per configurar el teu entorn.

Lectura de Dades de Streaming

A continuació, es mostra un exemple de com llegir dades de streaming des d'un socket:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger

// Crear una sessió de Spark
val spark = SparkSession.builder
  .appName("StructuredStreamingExample")
  .getOrCreate()

// Llegir dades de streaming des d'un socket
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// Mostrar les dades llegides
val query = lines.writeStream
  .outputMode("append")
  .format("console")
  .start()

query.awaitTermination()

Explicació del Codi

  1. Crear una sessió de Spark: Es crea una sessió de Spark utilitzant SparkSession.builder.
  2. Llegir dades de streaming: Es llegeixen dades de streaming des d'un socket especificant el host i el port.
  3. Mostrar les dades llegides: Es mostren les dades llegides a la consola utilitzant writeStream amb el mode de sortida append.

Processament de Dades de Streaming

A continuació, es mostra un exemple de com processar les dades de streaming per comptar les paraules:

import org.apache.spark.sql.functions._

// Dividir les línies en paraules
val words = lines.as[String].flatMap(_.split(" "))

// Comptar les paraules
val wordCounts = words.groupBy("value").count()

// Mostrar el recompte de paraules
val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()

Explicació del Codi

  1. Dividir les línies en paraules: Es divideixen les línies en paraules utilitzant flatMap.
  2. Comptar les paraules: Es compten les paraules utilitzant groupBy i count.
  3. Mostrar el recompte de paraules: Es mostra el recompte de paraules a la consola utilitzant writeStream amb el mode de sortida complete.

Exercicis Pràctics

Exercici 1: Lectura de Dades de Kafka

Llegeix dades de streaming des d'un clúster de Kafka i mostra-les a la consola.

Solució

val kafkaDF = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test-topic")
  .load()

val query = kafkaDF.writeStream
  .outputMode("append")
  .format("console")
  .start()

query.awaitTermination()

Exercici 2: Agregació de Dades de Streaming

Llegeix dades de streaming des d'un socket, divideix-les en paraules i compta les paraules cada 10 segons.

Solució

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

val words = lines.as[String].flatMap(_.split(" "))

val wordCounts = words.groupBy("value").count()

val query = wordCounts.writeStream
  .outputMode("complete")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .format("console")
  .start()

query.awaitTermination()

Errors Comuns i Consells

  1. Configuració Incorrecta del Host i Port: Assegura't que el host i el port especificats són correctes i que el servidor de socket està en funcionament.
  2. Mode de Sortida Incorrecte: Utilitza el mode de sortida adequat (append, complete, update) segons el tipus de consulta que estàs realitzant.
  3. Gestió de l'Estat: Si estàs realitzant operacions que requereixen mantenir un estat, assegura't de configurar correctament la gestió de l'estat per evitar problemes de memòria.

Resum

En aquesta secció, hem après els conceptes clau del Streaming Estructurat a Apache Spark, com llegir dades de streaming des de diverses fonts, processar-les i mostrar els resultats. També hem vist exemples pràctics i exercicis per reforçar els conceptes apresos. El Streaming Estructurat facilita el processament de dades en temps real utilitzant les mateixes operacions que es fan servir per a les dades estàtiques, integrant-se perfectament amb les aplicacions de processament de dades existents.

© Copyright 2024. Tots els drets reservats