diff --git a/rony/cli_aux.py b/rony/cli_aux.py index 2370a01..e02c35e 100644 --- a/rony/cli_aux.py +++ b/rony/cli_aux.py @@ -70,6 +70,7 @@ def get_modules_to_add(command, opts, ctx): "gcp_bigquery", "gcp_cloud_function", "gcp_pubsub", + "gcp_dataflow", ] else: if click.confirm("Add CLOUD_STORAGE module?", default=True): @@ -80,6 +81,8 @@ def get_modules_to_add(command, opts, ctx): all_modules.append("gcp_cloud_function") if click.confirm("Add PUBSUB module?", default=True): all_modules.append("gcp_pubsub") + if click.confirm("Add DATAFLOW module?", default=True): + all_modules.append("gcp_dataflow") for plugin in plugins: if hasattr(plugin, "cli_aux") and hasattr(plugin.cli_aux, "get_modules_to_add"): diff --git a/rony/module_templates/gcp_cloud_function/functions/fn_write_file_gcs_bgq/requirements.txt b/rony/module_templates/gcp_cloud_function/functions/fn_write_file_gcs_bgq/requirements.txt index dfc96a6..02f776e 100644 --- a/rony/module_templates/gcp_cloud_function/functions/fn_write_file_gcs_bgq/requirements.txt +++ b/rony/module_templates/gcp_cloud_function/functions/fn_write_file_gcs_bgq/requirements.txt @@ -1,2 +1,3 @@ google-cloud-bigquery==2.23.2 -google-cloud-storage==1.41.1 \ No newline at end of file +google-cloud-storage==1.41.1 +pytz==2021.1 \ No newline at end of file diff --git a/rony/module_templates/gcp_cloud_function/infrastructure/gcp/storage.tf b/rony/module_templates/gcp_cloud_function/infrastructure/gcp/storage.tf index 0e24b79..976b23b 100644 --- a/rony/module_templates/gcp_cloud_function/infrastructure/gcp/storage.tf +++ b/rony/module_templates/gcp_cloud_function/infrastructure/gcp/storage.tf @@ -2,6 +2,7 @@ resource "google_storage_bucket" "bucket_functions" { name = "${var.bucket_functions}-${var.account}" location = var.region_id storage_class = "STANDARD" + force_destroy = true } resource "null_resource" "fn_write_file_gcs_bgq" { diff --git a/rony/module_templates/gcp_cloud_storage/infrastructure/gcp/storage.tf b/rony/module_templates/gcp_cloud_storage/infrastructure/gcp/storage.tf index b70815e..08a925b 100644 --- a/rony/module_templates/gcp_cloud_storage/infrastructure/gcp/storage.tf +++ b/rony/module_templates/gcp_cloud_storage/infrastructure/gcp/storage.tf @@ -3,6 +3,7 @@ resource "google_storage_bucket" "bucket_datalake" { name = "${var.bucket_names[count.index]}-${var.account}" location = var.region_id storage_class = "STANDARD" + force_destroy = true } # folder inside landing-zone diff --git a/rony/module_templates/gcp_dataflow.json b/rony/module_templates/gcp_dataflow.json new file mode 100644 index 0000000..d142917 --- /dev/null +++ b/rony/module_templates/gcp_dataflow.json @@ -0,0 +1,16 @@ +{ + "info": "Module for deploying dataflow", + "instructions": [ + "" + ], + "developers": [ + "westerley.reis@a3data.com.br" + ], + "input_info": [], + "version": "0.0.1", + "dependencies": [ + "__GCP_BASE__", + "gcp_pubsub", + "gcp_cloud_storage" + ] +} \ No newline at end of file diff --git a/rony/module_templates/gcp_dataflow/dataflow/scripts/deploy_custom_dataflow_templates.sh b/rony/module_templates/gcp_dataflow/dataflow/scripts/deploy_custom_dataflow_templates.sh new file mode 100755 index 0000000..8bcd8c0 --- /dev/null +++ b/rony/module_templates/gcp_dataflow/dataflow/scripts/deploy_custom_dataflow_templates.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +echo $GOOGLE_CREDENTIALS > service-account.json + +export GOOGLE_APPLICATION_CREDENTIALS="$(pwd)/service-account.json" + +# deploy custom dataflow template +python3 ../../dataflow/templates/pubsub_to_gcs.py \ + --project="$1" \ + --region="$2" \ + --runner=DataflowRunner \ + --temp_location="$3" \ + --template_location="$4" \ + --input_topic="$5" \ + --output_path="$6" \ No newline at end of file diff --git a/rony/module_templates/gcp_dataflow/dataflow/templates/pubsub_to_gcs.py b/rony/module_templates/gcp_dataflow/dataflow/templates/pubsub_to_gcs.py new file mode 100644 index 0000000..5c08f16 --- /dev/null +++ b/rony/module_templates/gcp_dataflow/dataflow/templates/pubsub_to_gcs.py @@ -0,0 +1,142 @@ +# Reference +# https://cloud.google.com/pubsub/docs/pubsub-dataflow?hl=pt-br +# https://github.com/GoogleCloudPlatform/DataflowTemplates +# https://jtaras.medium.com/building-a-simple-google-cloud-dataflow-pipeline-pubsub-to-google-cloud-storage-9bbf170e8bad + +import argparse +import logging +import random +import json +from datetime import datetime + +from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.transforms.window import FixedWindows +from apache_beam.transforms.trigger import Repeatedly, AfterAny, AfterCount, AccumulationMode, AfterProcessingTime + + +class GroupMessagesByFixedWindows(PTransform): + """A composite transform that groups Pub/Sub messages based on publish time + and outputs a list of tuples, each containing a message and its publish time. + """ + + def __init__(self, window_size, num_shards=5, num_events=1000): + # Set window size to 60 seconds. + self.window_size = int(window_size * 60) + self.num_events = int(num_events) + self.num_shards = num_shards + + def expand(self, pcoll): + return ( + pcoll + # Bind window info to each element using element timestamp (or publish time). + | "Window into fixed intervals" + >> WindowInto(FixedWindows(self.window_size), # Time + trigger=Repeatedly(AfterAny(AfterCount(self.num_events), # events processed + AfterProcessingTime(self.window_size))), + accumulation_mode=AccumulationMode.DISCARDING) + | "Add timestamp to windowed elements" >> ParDo(AddTimestamp()) + # Assign a random key to each windowed element based on the number of shards. + | "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1)) + # Group windowed elements by key. All the elements in the same window must fit + # memory for this. If not, you need to use `beam.util.BatchElements`. + | "Group by key" >> GroupByKey() + ) + + +class AddTimestamp(DoFn): + + def process(self, element, publish_time=DoFn.TimestampParam): + """Processes each windowed element by extracting the message body and its + publish time into a tuple. + """ + yield ( + element.decode("utf-8"), + datetime.utcfromtimestamp(float(publish_time)).strftime( + "%Y-%m-%d %H:%M:%S.%f" + ), + ) + + +class WriteToGCS(DoFn): + + def __init__(self, output_path): + self.output_path = output_path + + def process(self, key_value, window=DoFn.WindowParam): + """Write messages in a batch to Google Cloud Storage.""" + + ts_format = "%H:%M" + window_start = window.start.to_utc_datetime().strftime(ts_format) + window_end = window.end.to_utc_datetime().strftime(ts_format) + shard_id, batch = key_value + filename = "-".join([self.output_path, window_start, window_end, str(shard_id)]) + + with io.gcsio.GcsIO().open(filename=filename, mode="w") as f: + for message_body, publish_time in batch: + json_message_body = json.loads(message_body) + json_message_body['publish_ts'] = publish_time + f.write(f"{json_message_body}\n".encode("utf-8")) + + +def run(input_topic, output_path, window_size=1.0, num_shards=5, num_events=1000, pipeline_args=None): + # Set `save_main_session` to True so DoFns can access globally imported modules. + pipeline_options = PipelineOptions( + pipeline_args, streaming=True, save_main_session=True + ) + pipeline = Pipeline(options=pipeline_options) + + streaming = ( + pipeline + # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam + # binds the publish time returned by the Pub/Sub server for each message + # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`. + # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub + | "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic) + | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards, num_events) + | "Write to GCS" >> ParDo(WriteToGCS(output_path)) + ) + pipeline.run() + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + parser = argparse.ArgumentParser() + parser.add_argument( + "--input_topic", + help="The Cloud Pub/Sub topic to read from." + '"projects//topics/".', + ) + parser.add_argument( + "--window_size", + type=float, + default=1.0, + help="Output file's window size in minutes.", + ) + parser.add_argument( + "--output_path", + help="Path of the output GCS file including the prefix.", + ) + parser.add_argument( + "--num_shards", + type=int, + default=3, + help="Number of shards to use when writing windowed elements to GCS.", + ) + parser.add_argument( + "--num_events", + type=int, + default=1000, + help="Number of events processed.", + ) + known_args, pipeline_args = parser.parse_known_args() + + run( + known_args.input_topic, + known_args.output_path, + known_args.window_size, + known_args.num_shards, + known_args.num_events, + pipeline_args, + ) diff --git a/rony/module_templates/gcp_dataflow/infrastructure/gcp/dataflow.tf b/rony/module_templates/gcp_dataflow/infrastructure/gcp/dataflow.tf new file mode 100644 index 0000000..dc27675 --- /dev/null +++ b/rony/module_templates/gcp_dataflow/infrastructure/gcp/dataflow.tf @@ -0,0 +1,11 @@ +resource "google_dataflow_job" "dataflow_job" { + name = var.dataflow_job_name + template_gcs_path = "gs://${var.bucket_dataflow}-${var.account}/templates/${var.dataflow_job_name}" + temp_gcs_location = "gs://${var.bucket_dataflow}-${var.account}/temp" + enable_streaming_engine = true + max_workers = 2 + on_delete = "cancel" + project = var.project_id + region = var.region_id + depends_on = [null_resource.dataflow_job] +} \ No newline at end of file diff --git a/rony/module_templates/gcp_dataflow/infrastructure/gcp/storage.tf b/rony/module_templates/gcp_dataflow/infrastructure/gcp/storage.tf new file mode 100644 index 0000000..d29be98 --- /dev/null +++ b/rony/module_templates/gcp_dataflow/infrastructure/gcp/storage.tf @@ -0,0 +1,30 @@ +resource "google_storage_bucket" "bucket_dataflow" { + name = "${var.bucket_dataflow}-${var.account}" + location = var.region_id + storage_class = "STANDARD" + force_destroy = true +} + +resource "null_resource" "dataflow_job" { + triggers = { + always_run = uuid() + } + + provisioner "local-exec" { + command = "chmod +x ../../dataflow/scripts/deploy_custom_dataflow_templates.sh" + } + + provisioner "local-exec" { + command = join(" ", [ + "../../dataflow/scripts/deploy_custom_dataflow_templates.sh", + var.project_id, + var.region_id, + "gs://${var.bucket_dataflow}-${var.account}/temp", + "gs://${var.bucket_dataflow}-${var.account}/templates/${var.dataflow_job_name}", + "projects/${var.project_id}/topics/${var.pubsub_topic_name}", + "gs://${var.bucket_names[0]}-${var.account}/pubsub/events/" + ]) + } + + depends_on = [google_storage_bucket.bucket_dataflow, google_pubsub_topic.rony_topic] +} \ No newline at end of file diff --git a/rony/module_templates/gcp_dataflow/infrastructure/gcp/variables.tf b/rony/module_templates/gcp_dataflow/infrastructure/gcp/variables.tf new file mode 100644 index 0000000..f43f298 --- /dev/null +++ b/rony/module_templates/gcp_dataflow/infrastructure/gcp/variables.tf @@ -0,0 +1,8 @@ +variable "bucket_dataflow" { + default = "" + description = "Create bucket for dataflow" +} + +variable "dataflow_job_name" { + default = "" +} \ No newline at end of file diff --git a/rony/module_templates/gcp_pubsub/infrastructure/gcp/pubsub.tf b/rony/module_templates/gcp_pubsub/infrastructure/gcp/pubsub.tf index 34ae23c..2983794 100644 --- a/rony/module_templates/gcp_pubsub/infrastructure/gcp/pubsub.tf +++ b/rony/module_templates/gcp_pubsub/infrastructure/gcp/pubsub.tf @@ -1,5 +1,5 @@ resource "google_pubsub_topic" "rony_topic" { - name = var.pubsub_topic + name = var.pubsub_topic_name } resource "google_pubsub_subscription" "rony_sub" { diff --git a/rony/module_templates/gcp_pubsub/infrastructure/gcp/variables.tf b/rony/module_templates/gcp_pubsub/infrastructure/gcp/variables.tf index e40c2b3..e212aec 100644 --- a/rony/module_templates/gcp_pubsub/infrastructure/gcp/variables.tf +++ b/rony/module_templates/gcp_pubsub/infrastructure/gcp/variables.tf @@ -1,4 +1,4 @@ -variable "pubsub_topic" { +variable "pubsub_topic_name" { default = "" } diff --git a/rony/module_templates/gcp_pubsub/pubsub/ingestion/producer.py b/rony/module_templates/gcp_pubsub/pubsub/ingestion/producer.py index a3a6796..5122a35 100644 --- a/rony/module_templates/gcp_pubsub/pubsub/ingestion/producer.py +++ b/rony/module_templates/gcp_pubsub/pubsub/ingestion/producer.py @@ -3,8 +3,7 @@ from google.cloud import pubsub_v1 from fake_web_events import Simulation -path = os.path.dirname(os.path.dirname(__file__)) -os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = f"{path}/config/service-account.json" +os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = f"service-account.json" topic = "" publisher = pubsub_v1.PublisherClient()