Introducció

Google Cloud Dataflow és un servei de processament de dades en temps real i per lots que permet crear i gestionar canals de dades. Integrar BigQuery amb Dataflow permet processar grans volums de dades de manera eficient i flexible. En aquest tema, aprendrem com utilitzar Dataflow per llegir dades de BigQuery, processar-les i escriure els resultats de nou a BigQuery.

Objectius

  • Comprendre què és Google Cloud Dataflow i com es pot utilitzar amb BigQuery.
  • Aprendre a configurar un pipeline de Dataflow per llegir dades de BigQuery.
  • Processar dades utilitzant Dataflow.
  • Escriure els resultats processats de nou a BigQuery.

Requisits previs

  • Coneixements bàsics de BigQuery i SQL.
  • Familiaritat amb Google Cloud Platform (GCP).
  • Coneixements bàsics de programació en Python o Java (Dataflow suporta ambdós llenguatges).

Configuració de l'entorn

Passos previs

  1. Crear un projecte a Google Cloud Platform (GCP): Si encara no tens un projecte a GCP, crea'n un des del Google Cloud Console.
  2. Activar l'API de Dataflow i BigQuery: Assegura't que les APIs de Dataflow i BigQuery estiguin activades al teu projecte.
  3. Instal·lar el SDK de Dataflow: Pots instal·lar el SDK de Dataflow utilitzant pip (per Python) o Maven (per Java).

Instal·lació del SDK de Dataflow (Python)

pip install apache-beam[gcp]

Instal·lació del SDK de Dataflow (Java)

Afegeix la següent dependència al teu fitxer pom.xml:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.34.0</version>
</dependency>
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
  <version>2.34.0</version>
</dependency>
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>2.34.0</version>
</dependency>

Creació d'un pipeline de Dataflow

Exemple en Python

A continuació, es mostra un exemple de com crear un pipeline de Dataflow en Python que llegeix dades de BigQuery, les processa i escriu els resultats de nou a BigQuery.

Pas 1: Importar les biblioteques necessàries

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions

Pas 2: Configurar les opcions del pipeline

options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'el-teu-projecte'
google_cloud_options.job_name = 'bigquery-dataflow-job'
google_cloud_options.staging_location = 'gs://el-teu-bucket/staging'
google_cloud_options.temp_location = 'gs://el-teu-bucket/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'

Pas 3: Definir el pipeline

p = beam.Pipeline(options=options)

query = 'SELECT * FROM `el-teu-projecte.el-teu-dataset.la-teva-taula`'

(p
 | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query))
 | 'ProcessData' >> beam.Map(lambda record: {'field1': record['field1'], 'field2': record['field2'] * 2})
 | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
     'el-teu-projecte:el-teu-dataset.resultats',
     schema='field1:STRING, field2:INTEGER',
     write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
)

p.run().wait_until_finish()

Exemple en Java

A continuació, es mostra un exemple de com crear un pipeline de Dataflow en Java que llegeix dades de BigQuery, les processa i escriu els resultats de nou a BigQuery.

Pas 1: Importar les biblioteques necessàries

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.TypeDescriptor;

Pas 2: Configurar les opcions del pipeline

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
options.setJobName("bigquery-dataflow-job");
options.setProject("el-teu-projecte");
options.setTempLocation("gs://el-teu-bucket/temp");

Pas 3: Definir el pipeline

Pipeline p = Pipeline.create(options);

String query = "SELECT * FROM `el-teu-projecte.el-teu-dataset.la-teva-taula`";

p.apply("ReadFromBigQuery", BigQueryIO.readTableRows().fromQuery(query).usingStandardSql())
 .apply("ProcessData", MapElements.into(TypeDescriptor.of(TableRow.class))
     .via((TableRow row) -> {
         TableRow result = new TableRow();
         result.set("field1", row.get("field1"));
         result.set("field2", (Long) row.get("field2") * 2);
         return result;
     }))
 .apply("WriteToBigQuery", BigQueryIO.writeTableRows()
     .to("el-teu-projecte:el-teu-dataset.resultats")
     .withSchema(new TableSchema().setFields(Arrays.asList(
         new TableFieldSchema().setName("field1").setType("STRING"),
         new TableFieldSchema().setName("field2").setType("INTEGER")
     )))
     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

p.run().waitUntilFinish();

Exercici pràctic

Descripció

Crea un pipeline de Dataflow que llegeixi dades d'una taula de BigQuery, realitzi una transformació simple (per exemple, multiplicar un camp numèric per 2) i escrigui els resultats en una nova taula de BigQuery.

Solució

La solució es troba en els exemples de codi proporcionats anteriorment. Pots adaptar-los segons les teves necessitats específiques.

Errors comuns i consells

  • Error de permisos: Assegura't que el compte de servei que utilitza Dataflow tingui els permisos necessaris per accedir a BigQuery i als buckets de Google Cloud Storage.
  • Especificació incorrecta del runner: Si estàs provant el pipeline localment, utilitza DirectRunner en lloc de DataflowRunner.
  • Esquema incorrecte: Assegura't que l'esquema de la taula de destinació a BigQuery coincideixi amb les dades que estàs escrivint.

Conclusió

Integrar BigQuery amb Dataflow permet processar grans volums de dades de manera eficient i flexible. En aquest tema, hem après a configurar un pipeline de Dataflow per llegir dades de BigQuery, processar-les i escriure els resultats de nou a BigQuery. Aquesta habilitat és essencial per a qualsevol professional que treballi amb grans volums de dades i necessiti processar-les de manera eficient.

Curs de BigQuery

Mòdul 1: Introducció a BigQuery

Mòdul 2: SQL bàsic a BigQuery

Mòdul 3: SQL intermedi a BigQuery

Mòdul 4: SQL avançat a BigQuery

Mòdul 5: Gestió de dades a BigQuery

Mòdul 6: Optimització del rendiment de BigQuery

Mòdul 7: Seguretat i compliment de BigQuery

Mòdul 8: Integració i automatització de BigQuery

Mòdul 9: Aprenentatge automàtic a BigQuery (BQML)

Mòdul 10: Casos d'ús de BigQuery en el món real

© Copyright 2024. Tots els drets reservats