Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added dataflow job #57

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rony/cli_aux.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def get_modules_to_add(command, opts, ctx):
"gcp_bigquery",
"gcp_cloud_function",
"gcp_pubsub",
"gcp_dataflow",
]
else:
if click.confirm("Add CLOUD_STORAGE module?", default=True):
Expand All @@ -80,6 +81,8 @@ def get_modules_to_add(command, opts, ctx):
all_modules.append("gcp_cloud_function")
if click.confirm("Add PUBSUB module?", default=True):
all_modules.append("gcp_pubsub")
if click.confirm("Add DATAFLOW module?", default=True):
all_modules.append("gcp_dataflow")

for plugin in plugins:
if hasattr(plugin, "cli_aux") and hasattr(plugin.cli_aux, "get_modules_to_add"):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
google-cloud-bigquery==2.23.2
google-cloud-storage==1.41.1
google-cloud-storage==1.41.1
pytz==2021.1
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ resource "google_storage_bucket" "bucket_functions" {
name = "${var.bucket_functions}-${var.account}"
location = var.region_id
storage_class = "STANDARD"
force_destroy = true
}

resource "null_resource" "fn_write_file_gcs_bgq" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ resource "google_storage_bucket" "bucket_datalake" {
name = "${var.bucket_names[count.index]}-${var.account}"
location = var.region_id
storage_class = "STANDARD"
force_destroy = true
}

# folder inside landing-zone
Expand Down
16 changes: 16 additions & 0 deletions rony/module_templates/gcp_dataflow.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"info": "Module for deploying dataflow",
"instructions": [
""
],
"developers": [
"[email protected]"
],
"input_info": [],
"version": "0.0.1",
"dependencies": [
"__GCP_BASE__",
"gcp_pubsub",
"gcp_cloud_storage"
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env bash

echo $GOOGLE_CREDENTIALS > service-account.json

export GOOGLE_APPLICATION_CREDENTIALS="$(pwd)/service-account.json"

# deploy custom dataflow template
python3 ../../dataflow/templates/pubsub_to_gcs.py \
--project="$1" \
--region="$2" \
--runner=DataflowRunner \
--temp_location="$3" \
--template_location="$4" \
--input_topic="$5" \
--output_path="$6"
142 changes: 142 additions & 0 deletions rony/module_templates/gcp_dataflow/dataflow/templates/pubsub_to_gcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Reference
# https://cloud.google.com/pubsub/docs/pubsub-dataflow?hl=pt-br
# https://github.com/GoogleCloudPlatform/DataflowTemplates
# https://jtaras.medium.com/building-a-simple-google-cloud-dataflow-pipeline-pubsub-to-google-cloud-storage-9bbf170e8bad

import argparse
import logging
import random
import json
from datetime import datetime

from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import Repeatedly, AfterAny, AfterCount, AccumulationMode, AfterProcessingTime


class GroupMessagesByFixedWindows(PTransform):
"""A composite transform that groups Pub/Sub messages based on publish time
and outputs a list of tuples, each containing a message and its publish time.
"""

def __init__(self, window_size, num_shards=5, num_events=1000):
# Set window size to 60 seconds.
self.window_size = int(window_size * 60)
self.num_events = int(num_events)
self.num_shards = num_shards

def expand(self, pcoll):
return (
pcoll
# Bind window info to each element using element timestamp (or publish time).
| "Window into fixed intervals"
>> WindowInto(FixedWindows(self.window_size), # Time
trigger=Repeatedly(AfterAny(AfterCount(self.num_events), # events processed
AfterProcessingTime(self.window_size))),
accumulation_mode=AccumulationMode.DISCARDING)
| "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
# Assign a random key to each windowed element based on the number of shards.
| "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
# Group windowed elements by key. All the elements in the same window must fit
# memory for this. If not, you need to use `beam.util.BatchElements`.
| "Group by key" >> GroupByKey()
)


class AddTimestamp(DoFn):

def process(self, element, publish_time=DoFn.TimestampParam):
"""Processes each windowed element by extracting the message body and its
publish time into a tuple.
"""
yield (
element.decode("utf-8"),
datetime.utcfromtimestamp(float(publish_time)).strftime(
"%Y-%m-%d %H:%M:%S.%f"
),
)


class WriteToGCS(DoFn):

def __init__(self, output_path):
self.output_path = output_path

def process(self, key_value, window=DoFn.WindowParam):
"""Write messages in a batch to Google Cloud Storage."""

ts_format = "%H:%M"
window_start = window.start.to_utc_datetime().strftime(ts_format)
window_end = window.end.to_utc_datetime().strftime(ts_format)
shard_id, batch = key_value
filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])

with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
for message_body, publish_time in batch:
json_message_body = json.loads(message_body)
json_message_body['publish_ts'] = publish_time
f.write(f"{json_message_body}\n".encode("utf-8"))


def run(input_topic, output_path, window_size=1.0, num_shards=5, num_events=1000, pipeline_args=None):
# Set `save_main_session` to True so DoFns can access globally imported modules.
pipeline_options = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True
)
pipeline = Pipeline(options=pipeline_options)

streaming = (
pipeline
# Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam
# binds the publish time returned by the Pub/Sub server for each message
# to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
# https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
| "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
| "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards, num_events)
| "Write to GCS" >> ParDo(WriteToGCS(output_path))
)
pipeline.run()


if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)

parser = argparse.ArgumentParser()
parser.add_argument(
"--input_topic",
help="The Cloud Pub/Sub topic to read from."
'"projects/<PROJECT_ID>/topics/<TOPIC_ID>".',
)
parser.add_argument(
"--window_size",
type=float,
default=1.0,
help="Output file's window size in minutes.",
)
parser.add_argument(
"--output_path",
help="Path of the output GCS file including the prefix.",
)
parser.add_argument(
"--num_shards",
type=int,
default=3,
help="Number of shards to use when writing windowed elements to GCS.",
)
parser.add_argument(
"--num_events",
type=int,
default=1000,
help="Number of events processed.",
)
known_args, pipeline_args = parser.parse_known_args()

run(
known_args.input_topic,
known_args.output_path,
known_args.window_size,
known_args.num_shards,
known_args.num_events,
pipeline_args,
)
11 changes: 11 additions & 0 deletions rony/module_templates/gcp_dataflow/infrastructure/gcp/dataflow.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
resource "google_dataflow_job" "dataflow_job" {
name = var.dataflow_job_name
template_gcs_path = "gs://${var.bucket_dataflow}-${var.account}/templates/${var.dataflow_job_name}"
temp_gcs_location = "gs://${var.bucket_dataflow}-${var.account}/temp"
enable_streaming_engine = true
max_workers = 2
on_delete = "cancel"
project = var.project_id
region = var.region_id
depends_on = [null_resource.dataflow_job]
}
30 changes: 30 additions & 0 deletions rony/module_templates/gcp_dataflow/infrastructure/gcp/storage.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
resource "google_storage_bucket" "bucket_dataflow" {
name = "${var.bucket_dataflow}-${var.account}"
location = var.region_id
storage_class = "STANDARD"
force_destroy = true
}

resource "null_resource" "dataflow_job" {
triggers = {
always_run = uuid()
}

provisioner "local-exec" {
command = "chmod +x ../../dataflow/scripts/deploy_custom_dataflow_templates.sh"
}

provisioner "local-exec" {
command = join(" ", [
"../../dataflow/scripts/deploy_custom_dataflow_templates.sh",
var.project_id,
var.region_id,
"gs://${var.bucket_dataflow}-${var.account}/temp",
"gs://${var.bucket_dataflow}-${var.account}/templates/${var.dataflow_job_name}",
"projects/${var.project_id}/topics/${var.pubsub_topic_name}",
"gs://${var.bucket_names[0]}-${var.account}/pubsub/events/"
])
}

depends_on = [google_storage_bucket.bucket_dataflow, google_pubsub_topic.rony_topic]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
variable "bucket_dataflow" {
default = ""
description = "Create bucket for dataflow"
}

variable "dataflow_job_name" {
default = ""
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
resource "google_pubsub_topic" "rony_topic" {
name = var.pubsub_topic
name = var.pubsub_topic_name
}

resource "google_pubsub_subscription" "rony_sub" {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
variable "pubsub_topic" {
variable "pubsub_topic_name" {
default = ""
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
from google.cloud import pubsub_v1
from fake_web_events import Simulation

path = os.path.dirname(os.path.dirname(__file__))
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = f"{path}/config/service-account.json"
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = f"service-account.json"

topic = ""
publisher = pubsub_v1.PublisherClient()
Expand Down