diff --git a/topic/serverless/azure-eventhub/Azure Function/cratedb_writer.py b/topic/serverless/azure-eventhub/Azure Function/cratedb_writer.py new file mode 100644 index 00000000..0f36fbd4 --- /dev/null +++ b/topic/serverless/azure-eventhub/Azure Function/cratedb_writer.py @@ -0,0 +1,136 @@ +import time +import logging +from crate import client +from crate.client.exceptions import ProgrammingError, IntegrityError + + +class CrateDBWriter: + """ + The CrateWriter class is used to insert enriched and raw data in to CrateDB + """ + + CONNECTION_POOL_SIZE = 10 + + def __init__(self, tables, host, db_user, db_pass): + self._conn = None + self._cursor = None + self._tables = tables + self._host = host + self._db_user = db_user + self._db_pass = db_pass + self._failed = [] + + def insert_values(self, value_cache): + self._connect() + + if len(value_cache.readings) > 0: + self._insert_operation( + value_cache.readings, + self._tables["readings"], + ) + self._move_reading_to_error(value_cache) + + if len(value_cache.errors) > 0: + self._insert_operation( + value_cache.errors, + self._tables["errors"], + ) + + @staticmethod + def _insert_ts(): + return int(time.time() * 1000) + + def _connect(self): + if ( + self._cursor is None + or (self._cursor and getattr(self._cursor, "_closed", True)) + or self._conn is None + or (self._conn and getattr(self._conn, "_closed", True)) + ): + self._conn = client.connect( + self._host, + username=self._db_user, + password=self._db_pass, + pool_size=self.CONNECTION_POOL_SIZE, + ) + self._cursor = self._conn.cursor() + + def _insert_operation(self, value_list, table_name): + if self._cursor is None: + return + try: + stmt, parameters = self._prepare_insert_stmt( + value_list, table_name, (0, len(value_list)) + ) + result = self._cursor.executemany(stmt, parameters) + except (ProgrammingError, IntegrityError) as e: + for item in value_list: + self._add_item_to_failed( + str(e), stmt, parameters, type(e).__name__, table_name, item + ) + return + + for i, row in enumerate(result): + if row["rowcount"] == -2: + stmt, parameters = self._prepare_insert_stmt( + value_list, table_name, (i, i + 1) + ) + try: + self._cursor.executemany(stmt, parameters) + # IntegrityError is raised in case of PK violation (e.g. duplicated PK) + except (ProgrammingError, IntegrityError) as e: + self._add_item_to_failed( + str(e), + stmt, + parameters, + type(e).__name__, + table_name, + value_list[i], + ) + + def _add_item_to_failed( + self, error, stmt, parameters, error_type, table_name, payload + ): + logging.warning(f"error: {error} -- stmt: {stmt} -- parameters: {parameters}") + self._failed.append( + { + "type": table_name, + "error": error, + "error_type": error_type, + "payload": payload, + } + ) + + def _move_reading_to_error(self, value_cache): + for element in self._failed: + value_cache.add_error( + element["payload"], element["error"], element["error_type"] + ) + + def _prepare_insert_stmt(self, value_list, table_name, iteration_range): + stmt = f"INSERT INTO {table_name} (insert_ts, " + parameters = "?, " + parameter_list = [] + keys = value_list[0].keys() + + for key in keys: + stmt += f"{key}, " + parameters += "?, " + stmt = stmt.rstrip(", ") + parameters = parameters.rstrip(", ") + + stmt += f") VALUES ({parameters})" + + for i in range(iteration_range[0], iteration_range[1]): + parameter_entry = [self._insert_ts()] + parameter_entry.extend(self._add_entries(value_list, keys, i)) + parameter_list.append(tuple(parameter_entry)) + + return stmt, parameter_list + + @staticmethod + def _add_entries(values, keys, index): + entries = [] + for key in keys: + entries.append(values[index][key]) + return entries diff --git a/topic/serverless/azure-eventhub/Azure Function/enrichment.py b/topic/serverless/azure-eventhub/Azure Function/enrichment.py new file mode 100644 index 00000000..c6f4fc0c --- /dev/null +++ b/topic/serverless/azure-eventhub/Azure Function/enrichment.py @@ -0,0 +1,69 @@ +import logging + +KEY_MAPPING = TARGET_MAP = { + "ts": "reading_ts", + "time": "reading_ts", + "current_ts": "reading_ts", + "timestamp": "reading_ts", + "id": "sensor_id", + "loc": "location", +} + + +def transform(raw_payload, value_cache): + """ + This function takes a single event and transform it, checking for errors. + The result is saved in the value_cache variable. + + Args: + raw_payload: event from an Event Hub + value_cache: ValueCache object to transfer values to the database writer + """ + if raw_payload is None: + return + + try: + event_t = transform_payload(raw_payload) + location = event_t.get("location") + sensor_id = event_t.get("sensor_id") + timestamp = event_t.get("reading_ts") + payload = { + "temperature": event_t.get("temperature"), + "humidity": event_t.get("humidity"), + "light": event_t.get("light"), + } + + value_cache.add_reading(payload, location, timestamp, sensor_id) + + except (ValueError, KeyError) as e: + logging.info(f"enrichment error: {e}" f"-- payload: {raw_payload}") + value_cache.add_error(raw_payload, str(e), type(e).__name__) + + +def transform_payload(event): + # remove empty keys + event = remove_empty_keys(event) + # change column names + event = rename_keys(event) + # check for sensor_id, timestamp, location keys + check_fields(event) + return event + + +def remove_empty_keys(event): + if "" in event: + value = event.pop("") + return event + + +def rename_keys(event): + for key in list(event.keys()): + if key in KEY_MAPPING.keys(): + event[KEY_MAPPING[key]] = event.pop(key) + + return event + + +def check_fields(event): + if not event.keys() >= {"location", "sensor_id", "reading_ts"}: + raise KeyError("missing key in payload") diff --git a/topic/serverless/azure-eventhub/Azure Function/function_app.py b/topic/serverless/azure-eventhub/Azure Function/function_app.py new file mode 100644 index 00000000..730a821a --- /dev/null +++ b/topic/serverless/azure-eventhub/Azure Function/function_app.py @@ -0,0 +1,46 @@ +import os +import sys +import json +import logging + +import azure.functions as func + +from enrichment import transform +from cratedb_writer import CrateDBWriter +from value_cache import ValueCache + + +app = func.FunctionApp() + + +@app.event_hub_message_trigger( + arg_name="event", + event_hub_name="demo-event-ce", + connection="EVENT_HUB_CONNECTION_STRING", +) +def enrich_events(event: func.EventHubEvent): + crate_db = CrateDBWriter( + { + "readings": os.getenv("READING_TABLE"), + "errors": os.getenv("ERROR_TABLE"), + }, + os.getenv("HOST"), + os.getenv("DB_USER", None), + os.getenv("DB_PASSWORD", None), + ) + + try: + if event is None: + return + insert_value_cache = ValueCache() + raw_events = json.loads(event.get_body().decode("utf-8")) + + for event_ in raw_events: + raw_event = event_ + transform(raw_event, insert_value_cache) + + crate_db.insert_values(insert_value_cache) + except Exception as e: + # when any exception occurred, the function must exit unsuccessfully for events to be retried + logging.error(f"error: {e}") + sys.exit(1) diff --git a/topic/serverless/azure-eventhub/Azure Function/requirements.txt b/topic/serverless/azure-eventhub/Azure Function/requirements.txt new file mode 100644 index 00000000..d3fc8748 --- /dev/null +++ b/topic/serverless/azure-eventhub/Azure Function/requirements.txt @@ -0,0 +1,7 @@ +# DO NOT include azure-functions-worker in this file +# The Python Worker is managed by Azure Functions platform +# Manually managing azure-functions-worker may cause unexpected issues + +azure-functions +azure-eventhub +crate==1.0.1 diff --git a/topic/serverless/azure-eventhub/Azure Function/value_cache.py b/topic/serverless/azure-eventhub/Azure Function/value_cache.py new file mode 100644 index 00000000..04db58a7 --- /dev/null +++ b/topic/serverless/azure-eventhub/Azure Function/value_cache.py @@ -0,0 +1,27 @@ +class ValueCache: + """ + ValueCache class is used to structure enriched data for insert. + """ + + def __init__(self): + self.errors = [] + self.readings = [] + + def add_error(self, payload, message, type): + self.errors.append( + { + "payload": payload, + "error": {"message": message, "type": type}, + "type": type, + } + ) + + def add_reading(self, payload, location, timestamp, sensor_id): + self.readings.append( + { + "location": location, + "sensor_id": sensor_id, + "reading_ts": timestamp, + "reading": payload, + } + ) diff --git a/topic/serverless/azure-eventhub/README.md b/topic/serverless/azure-eventhub/README.md new file mode 100644 index 00000000..a16f671a --- /dev/null +++ b/topic/serverless/azure-eventhub/README.md @@ -0,0 +1,117 @@ +# Azure Function - Event hub triggered + +This is a sample Azure Function in Python programming model v2 consuming Event hub batches, enriching them and inserting into CrateDB. The processed data will end up in one of the following tables: reading or error. + +## Set up the Azure Function example + +### Clone Azure function folder +Clone the "Azure Function" folder locally and open VS Code with the Azure plugin installed. + +### Create Azure resources +Once you have this function locally, you should set up your Azure account with the required resources: + +- Azure Function App +- Event Hub + +### Set Azure Function variables +You have to configure the Environment variables in the Function app to ensure it connects with CrateDB with the right table references and connects with the Event Hub as well. It can be done in Function App > Settings > Environment variables. +```json +{ + "EVENT_HUB_CONNECTION_STRING": "Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=;SharedAccessKey=", + "READING_TABLE": "enrichment.reading", + "ERROR_TABLE": "enrichment.error", + "HOST": ":4200", + "DB_USER": "azure_demo", + "DB_PASSWORD": "" +} + ``` +You can find the connection string in Event Hub namespace > Shared access policies. + +### Create Tables +You should create the following tables in your CrateDB instance: + +
+ enrichment.error + + ```sql + +CREATE TABLE IF NOT EXISTS "enrichment"."error" ( + "error" OBJECT(IGNORED), + "payload" OBJECT(IGNORED), + "insert_ts" TIMESTAMP WITH TIME ZONE, + "type" TEXT +) + ``` +
+
+ enrichment.reading + + ```sql + +CREATE TABLE IF NOT EXISTS "enrichment"."reading" ( + "location" TEXT, + "sensor_id" TEXT, + "reading_ts" TIMESTAMP WITHOUT TIME ZONE, + "reading" OBJECT(DYNAMIC), + "insert_ts" TIMESTAMP WITHOUT TIME ZONE +) + ``` +
+ +### Deploy the Azure Function in VS Code +In the Azure plugin tab in the VS Code, there is an option to deploy the function to your already-created Function App. You can find the details [here](https://learn.microsoft.com/en-us/azure/azure-functions/functions-develop-vs-code?tabs=node-v4%2Cpython-v2%2Cisolated-process%2Cquick-create&pivots=programming-language-python#republish-project-files). This should send your function to your Azure account. + +### Manually trigger event in Event Hub +The Event hub is not connected to any source, so to test your function after it is configured and deployed, go to Event Hubs Instance > Data Explorer > Send events. In the payload text box, write the following json: +```json +[ + { + "id":"ABC001", + "location": "BR", + "ts":"1735231892159", + "temperature":1.23, + "humidity":72.3, + "light":"high" + } +] + ``` +This should result in a new record in the `enrichment.reading` table but no errors. +If you want to trigger an error, run the following payload: +```json +[ + { + "id":"ABC002", + "location": "BR", + "ts":"ABC", + "temperature":1.27, + "humidity":72.7, + "light":"high" + } +] + ``` +This second payload has a string value instead of a timestamp. With that, you should see one new record in the `enrichment.error` table. Finally, test with more than one event at the same time. + +```json +[ + { + "id":"ABC552", + "location": "US", + "ts":"ABC", + "temperature":1.27, + "humidity":52.7, + "light":"high" + }, + { + "id":"ABC762", + "location": "PT", + "ts":"1735232089882", + "temperature":15.7, + "humidity":82.7, + "light":"high" + } +] + ``` +The batch above should result in two new records: one in `enrichment.reading` and the other in `enrichment.error`, because of the string value for the timestamp column again. + +## Wrap-up +It is simple to set up an Azure Function to consume the Event Hub events. To achieve that, use the [crate client](https://cratedb.com/docs/python/en/latest/index.html) to connect with your CrateDB cluster as you saw in the `cratedb_writer.py` script. \ No newline at end of file