Introducció
Google Cloud Dataflow és un servei de processament de dades en temps real i per lots que permet crear i gestionar canals de dades. Integrar BigQuery amb Dataflow permet processar grans volums de dades de manera eficient i flexible. En aquest tema, aprendrem com utilitzar Dataflow per llegir dades de BigQuery, processar-les i escriure els resultats de nou a BigQuery.
Objectius
- Comprendre què és Google Cloud Dataflow i com es pot utilitzar amb BigQuery.
- Aprendre a configurar un pipeline de Dataflow per llegir dades de BigQuery.
- Processar dades utilitzant Dataflow.
- Escriure els resultats processats de nou a BigQuery.
Requisits previs
- Coneixements bàsics de BigQuery i SQL.
- Familiaritat amb Google Cloud Platform (GCP).
- Coneixements bàsics de programació en Python o Java (Dataflow suporta ambdós llenguatges).
Configuració de l'entorn
Passos previs
- Crear un projecte a Google Cloud Platform (GCP): Si encara no tens un projecte a GCP, crea'n un des del Google Cloud Console.
- Activar l'API de Dataflow i BigQuery: Assegura't que les APIs de Dataflow i BigQuery estiguin activades al teu projecte.
- Instal·lar el SDK de Dataflow: Pots instal·lar el SDK de Dataflow utilitzant pip (per Python) o Maven (per Java).
Instal·lació del SDK de Dataflow (Python)
Instal·lació del SDK de Dataflow (Java)
Afegeix la següent dependència al teu fitxer pom.xml
:
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <version>2.34.0</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> <version>2.34.0</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> <version>2.34.0</version> </dependency>
Creació d'un pipeline de Dataflow
Exemple en Python
A continuació, es mostra un exemple de com crear un pipeline de Dataflow en Python que llegeix dades de BigQuery, les processa i escriu els resultats de nou a BigQuery.
Pas 1: Importar les biblioteques necessàries
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions
Pas 2: Configurar les opcions del pipeline
options = PipelineOptions() google_cloud_options = options.view_as(GoogleCloudOptions) google_cloud_options.project = 'el-teu-projecte' google_cloud_options.job_name = 'bigquery-dataflow-job' google_cloud_options.staging_location = 'gs://el-teu-bucket/staging' google_cloud_options.temp_location = 'gs://el-teu-bucket/temp' options.view_as(StandardOptions).runner = 'DataflowRunner'
Pas 3: Definir el pipeline
p = beam.Pipeline(options=options) query = 'SELECT * FROM `el-teu-projecte.el-teu-dataset.la-teva-taula`' (p | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query)) | 'ProcessData' >> beam.Map(lambda record: {'field1': record['field1'], 'field2': record['field2'] * 2}) | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( 'el-teu-projecte:el-teu-dataset.resultats', schema='field1:STRING, field2:INTEGER', write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE) ) p.run().wait_until_finish()
Exemple en Java
A continuació, es mostra un exemple de com crear un pipeline de Dataflow en Java que llegeix dades de BigQuery, les processa i escriu els resultats de nou a BigQuery.
Pas 1: Importar les biblioteques necessàries
import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.TypeDescriptor;
Pas 2: Configurar les opcions del pipeline
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create(); options.setJobName("bigquery-dataflow-job"); options.setProject("el-teu-projecte"); options.setTempLocation("gs://el-teu-bucket/temp");
Pas 3: Definir el pipeline
Pipeline p = Pipeline.create(options); String query = "SELECT * FROM `el-teu-projecte.el-teu-dataset.la-teva-taula`"; p.apply("ReadFromBigQuery", BigQueryIO.readTableRows().fromQuery(query).usingStandardSql()) .apply("ProcessData", MapElements.into(TypeDescriptor.of(TableRow.class)) .via((TableRow row) -> { TableRow result = new TableRow(); result.set("field1", row.get("field1")); result.set("field2", (Long) row.get("field2") * 2); return result; })) .apply("WriteToBigQuery", BigQueryIO.writeTableRows() .to("el-teu-projecte:el-teu-dataset.resultats") .withSchema(new TableSchema().setFields(Arrays.asList( new TableFieldSchema().setName("field1").setType("STRING"), new TableFieldSchema().setName("field2").setType("INTEGER") ))) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); p.run().waitUntilFinish();
Exercici pràctic
Descripció
Crea un pipeline de Dataflow que llegeixi dades d'una taula de BigQuery, realitzi una transformació simple (per exemple, multiplicar un camp numèric per 2) i escrigui els resultats en una nova taula de BigQuery.
Solució
La solució es troba en els exemples de codi proporcionats anteriorment. Pots adaptar-los segons les teves necessitats específiques.
Errors comuns i consells
- Error de permisos: Assegura't que el compte de servei que utilitza Dataflow tingui els permisos necessaris per accedir a BigQuery i als buckets de Google Cloud Storage.
- Especificació incorrecta del runner: Si estàs provant el pipeline localment, utilitza
DirectRunner
en lloc deDataflowRunner
. - Esquema incorrecte: Assegura't que l'esquema de la taula de destinació a BigQuery coincideixi amb les dades que estàs escrivint.
Conclusió
Integrar BigQuery amb Dataflow permet processar grans volums de dades de manera eficient i flexible. En aquest tema, hem après a configurar un pipeline de Dataflow per llegir dades de BigQuery, processar-les i escriure els resultats de nou a BigQuery. Aquesta habilitat és essencial per a qualsevol professional que treballi amb grans volums de dades i necessiti processar-les de manera eficient.
Curs de BigQuery
Mòdul 1: Introducció a BigQuery
- Què és BigQuery?
- Configurar el teu entorn de BigQuery
- Comprendre l'arquitectura de BigQuery
- Visió general de la consola de BigQuery
Mòdul 2: SQL bàsic a BigQuery
Mòdul 3: SQL intermedi a BigQuery
Mòdul 4: SQL avançat a BigQuery
- Unions avançades
- Camps niats i repetits
- Funcions definides per l'usuari (UDFs)
- Particionament i agrupament
Mòdul 5: Gestió de dades a BigQuery
- Carregar dades a BigQuery
- Exportar dades de BigQuery
- Transformació i neteja de dades
- Gestió de conjunts de dades i taules
Mòdul 6: Optimització del rendiment de BigQuery
- Tècniques d'optimització de consultes
- Comprendre els plans d'execució de consultes
- Ús de vistes materialitzades
- Optimització de l'emmagatzematge
Mòdul 7: Seguretat i compliment de BigQuery
Mòdul 8: Integració i automatització de BigQuery
- Integració amb serveis de Google Cloud
- Ús de BigQuery amb Dataflow
- Automatització de fluxos de treball amb Cloud Functions
- Programació de consultes amb Cloud Scheduler
Mòdul 9: Aprenentatge automàtic a BigQuery (BQML)
- Introducció a BigQuery ML
- Creació i entrenament de models
- Avaluació i predicció amb models
- Funcions avançades de BQML