Skip to content

Commit

Permalink
Azure Function Python v2 (#762)
Browse files Browse the repository at this point in the history
* Azure Function Python v2

This is an example of an enrichment process function which consumes events from Event Hub, processes them and insert into CrateDB.

Co-authored-by: Niklas Schmidtmer <[email protected]>
  • Loading branch information
karynzv and hammerhead authored Jan 6, 2025
1 parent 3695bbb commit d1900c6
Show file tree
Hide file tree
Showing 6 changed files with 402 additions and 0 deletions.
136 changes: 136 additions & 0 deletions topic/serverless/azure-eventhub/Azure Function/cratedb_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import time
import logging
from crate import client
from crate.client.exceptions import ProgrammingError, IntegrityError


class CrateDBWriter:
"""
The CrateWriter class is used to insert enriched and raw data in to CrateDB
"""

CONNECTION_POOL_SIZE = 10

def __init__(self, tables, host, db_user, db_pass):
self._conn = None
self._cursor = None
self._tables = tables
self._host = host
self._db_user = db_user
self._db_pass = db_pass
self._failed = []

def insert_values(self, value_cache):
self._connect()

if len(value_cache.readings) > 0:
self._insert_operation(
value_cache.readings,
self._tables["readings"],
)
self._move_reading_to_error(value_cache)

if len(value_cache.errors) > 0:
self._insert_operation(
value_cache.errors,
self._tables["errors"],
)

@staticmethod
def _insert_ts():
return int(time.time() * 1000)

def _connect(self):
if (
self._cursor is None
or (self._cursor and getattr(self._cursor, "_closed", True))
or self._conn is None
or (self._conn and getattr(self._conn, "_closed", True))
):
self._conn = client.connect(
self._host,
username=self._db_user,
password=self._db_pass,
pool_size=self.CONNECTION_POOL_SIZE,
)
self._cursor = self._conn.cursor()

def _insert_operation(self, value_list, table_name):
if self._cursor is None:
return
try:
stmt, parameters = self._prepare_insert_stmt(
value_list, table_name, (0, len(value_list))
)
result = self._cursor.executemany(stmt, parameters)
except (ProgrammingError, IntegrityError) as e:
for item in value_list:
self._add_item_to_failed(
str(e), stmt, parameters, type(e).__name__, table_name, item
)
return

for i, row in enumerate(result):
if row["rowcount"] == -2:
stmt, parameters = self._prepare_insert_stmt(
value_list, table_name, (i, i + 1)
)
try:
self._cursor.executemany(stmt, parameters)
# IntegrityError is raised in case of PK violation (e.g. duplicated PK)
except (ProgrammingError, IntegrityError) as e:
self._add_item_to_failed(
str(e),
stmt,
parameters,
type(e).__name__,
table_name,
value_list[i],
)

def _add_item_to_failed(
self, error, stmt, parameters, error_type, table_name, payload
):
logging.warning(f"error: {error} -- stmt: {stmt} -- parameters: {parameters}")
self._failed.append(
{
"type": table_name,
"error": error,
"error_type": error_type,
"payload": payload,
}
)

def _move_reading_to_error(self, value_cache):
for element in self._failed:
value_cache.add_error(
element["payload"], element["error"], element["error_type"]
)

def _prepare_insert_stmt(self, value_list, table_name, iteration_range):
stmt = f"INSERT INTO {table_name} (insert_ts, "
parameters = "?, "
parameter_list = []
keys = value_list[0].keys()

for key in keys:
stmt += f"{key}, "
parameters += "?, "
stmt = stmt.rstrip(", ")
parameters = parameters.rstrip(", ")

stmt += f") VALUES ({parameters})"

for i in range(iteration_range[0], iteration_range[1]):
parameter_entry = [self._insert_ts()]
parameter_entry.extend(self._add_entries(value_list, keys, i))
parameter_list.append(tuple(parameter_entry))

return stmt, parameter_list

@staticmethod
def _add_entries(values, keys, index):
entries = []
for key in keys:
entries.append(values[index][key])
return entries
69 changes: 69 additions & 0 deletions topic/serverless/azure-eventhub/Azure Function/enrichment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import logging

KEY_MAPPING = TARGET_MAP = {
"ts": "reading_ts",
"time": "reading_ts",
"current_ts": "reading_ts",
"timestamp": "reading_ts",
"id": "sensor_id",
"loc": "location",
}


def transform(raw_payload, value_cache):
"""
This function takes a single event and transform it, checking for errors.
The result is saved in the value_cache variable.
Args:
raw_payload: event from an Event Hub
value_cache: ValueCache object to transfer values to the database writer
"""
if raw_payload is None:
return

try:
event_t = transform_payload(raw_payload)
location = event_t.get("location")
sensor_id = event_t.get("sensor_id")
timestamp = event_t.get("reading_ts")
payload = {
"temperature": event_t.get("temperature"),
"humidity": event_t.get("humidity"),
"light": event_t.get("light"),
}

value_cache.add_reading(payload, location, timestamp, sensor_id)

except (ValueError, KeyError) as e:
logging.info(f"enrichment error: {e}" f"-- payload: {raw_payload}")
value_cache.add_error(raw_payload, str(e), type(e).__name__)


def transform_payload(event):
# remove empty keys
event = remove_empty_keys(event)
# change column names
event = rename_keys(event)
# check for sensor_id, timestamp, location keys
check_fields(event)
return event


def remove_empty_keys(event):
if "" in event:
value = event.pop("")
return event


def rename_keys(event):
for key in list(event.keys()):
if key in KEY_MAPPING.keys():
event[KEY_MAPPING[key]] = event.pop(key)

return event


def check_fields(event):
if not event.keys() >= {"location", "sensor_id", "reading_ts"}:
raise KeyError("missing key in payload")
46 changes: 46 additions & 0 deletions topic/serverless/azure-eventhub/Azure Function/function_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import os
import sys
import json
import logging

import azure.functions as func

from enrichment import transform
from cratedb_writer import CrateDBWriter
from value_cache import ValueCache


app = func.FunctionApp()


@app.event_hub_message_trigger(
arg_name="event",
event_hub_name="demo-event-ce",
connection="EVENT_HUB_CONNECTION_STRING",
)
def enrich_events(event: func.EventHubEvent):
crate_db = CrateDBWriter(
{
"readings": os.getenv("READING_TABLE"),
"errors": os.getenv("ERROR_TABLE"),
},
os.getenv("HOST"),
os.getenv("DB_USER", None),
os.getenv("DB_PASSWORD", None),
)

try:
if event is None:
return
insert_value_cache = ValueCache()
raw_events = json.loads(event.get_body().decode("utf-8"))

for event_ in raw_events:
raw_event = event_
transform(raw_event, insert_value_cache)

crate_db.insert_values(insert_value_cache)
except Exception as e:
# when any exception occurred, the function must exit unsuccessfully for events to be retried
logging.error(f"error: {e}")
sys.exit(1)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# DO NOT include azure-functions-worker in this file
# The Python Worker is managed by Azure Functions platform
# Manually managing azure-functions-worker may cause unexpected issues

azure-functions
azure-eventhub
crate==1.0.1
27 changes: 27 additions & 0 deletions topic/serverless/azure-eventhub/Azure Function/value_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
class ValueCache:
"""
ValueCache class is used to structure enriched data for insert.
"""

def __init__(self):
self.errors = []
self.readings = []

def add_error(self, payload, message, type):
self.errors.append(
{
"payload": payload,
"error": {"message": message, "type": type},
"type": type,
}
)

def add_reading(self, payload, location, timestamp, sensor_id):
self.readings.append(
{
"location": location,
"sensor_id": sensor_id,
"reading_ts": timestamp,
"reading": payload,
}
)
Loading

0 comments on commit d1900c6

Please sign in to comment.