Skip to content

Commit

Permalink
Improved bigquery types and query
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriele Diener committed Apr 7, 2020
1 parent 3ef431e commit 8c1e1bc
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 84 deletions.
23 changes: 22 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
# BigQuery logger handler for Airflow

pip install airflow-bigquerylogger
## Installation

`pip install airflow-bigquerylogger`

## Configuration

```bash
AIRFLOW__CORE__REMOTE_LOGGING='true'
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER='gs://bucket/path'
AIRFLOW__CORE__REMOTE_LOG_CONN_ID='gcs_log'
AIRFLOW__CORE__LOGGING_CONFIG_CLASS='bigquerylogger.config.LOGGING_CLASS'
AIRFLOW__CORE__LOG_BIGQUERY_DATASET='dataset.table'
AIRFLOW__CORE__LOG_BIGQUERY_LIMIT=50
```

### Google Cloud BigQuery

Rows that were written to a table recently via streaming (using the tabledata.insertall method) cannot be modified using UPDATE, DELETE, or MERGE statements. I recommend setting up a table retention!

## Credits

Thanks to Bluecore engineering team for [this usefull article](https://medium.com/bluecore-engineering/kubernetes-pod-logging-in-the-airflow-ui-ed9ca6f37e9d).
83 changes: 46 additions & 37 deletions bigquerylogger/BQTaskHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from airflow.configuration import conf
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.gcs_task_handler import GCSTaskHandler
from airflow.contrib.executors.kubernetes_executor import AirflowKubernetesScheduler


class BQTaskHandler(GCSTaskHandler, LoggingMixin):
Expand All @@ -19,12 +20,11 @@ def __init__(self, base_log_folder, gcs_log_folder, filename_template,
super(BQTaskHandler, self).__init__(base_log_folder, gcs_log_folder, filename_template)
self._bq_cursor = None

self.dataset_name = dataset_name
self.query_limit = query_limit
self.dataset = dataset_name

self.where_statement = ""

self.closed = False
self.parameters = {
'limit': int(query_limit)
}

@cached_property
def bq_cursor(self):
Expand All @@ -34,7 +34,8 @@ def bq_cursor(self):

return BigQueryHook(
bigquery_conn_id = remote_conn_id,
use_legacy_sql = False).get_conn().cursor()
use_legacy_sql = False
).get_conn().cursor()
except Exception as e:
self.log.error(
'Could not create a BigQueryHook with connection id '
Expand All @@ -43,27 +44,13 @@ def bq_cursor(self):

def set_context(self, ti):
super(BQTaskHandler, self).set_context(ti)
self._set_parameters(ti)

self.where_statement = """
labels.k8s_pod_dag_id = '%s' AND
labels.k8s_pod_task_id = '%s' AND
labels.k8s_pod_execution_date = '%s' AND
labels.k8s_pod_try_number = '%d'
""" % (ti.dag_id, ti.task_id, ti.execution_date.isoformat(), ti.try_number)

def close(self):
if self.closed:
return

super(BQTaskHandler, self).close()

self.log.info("""*** Unable to delete logs from BigQuery {} because:\n\n
*** Rows that were written to a table recently via streaming
(using the tabledata.insertall method) cannot be modified using
UPDATE, DELETE, or MERGE statements.
I recommend setting up a table retention!\n""".format(self.dataset_name))

self.closed = True
def _set_parameters(self, ti, try_number):
self.parameters['dag_id'] = ti.dag_id
self.parameters['task_id'] = ti.task_id
self.parameters['try_number'] = str(try_number if try_number else ti.try_number)
self.parameters['execution_date'] = AirflowKubernetesScheduler._datetime_to_label_safe_datestring(ti.execution_date)

def _bq_read(self, metadata=None):
if not metadata:
Expand All @@ -72,13 +59,20 @@ def _bq_read(self, metadata=None):
if 'offset' not in metadata:
metadata['offset'] = 0

log = self._bq_query(metadata['offset'])
log_count = len(log)
try:
log = self._bq_query(metadata['offset'])
log_count = len(log)
log = "".join(log)
except Exception as e:
log = '*** Unable to read remote log on BigQuery from {}\n*** {}\n\n'.format(
self.dataset, str(e))
self.log.error(log)
log_count = 0

metadata['end_of_log'] = (log_count == 0)
metadata['end_of_log'] = (log_count == 0 or self.parameters['limit'] == 0)
metadata['offset'] += log_count

return "".join(log), metadata
return log, metadata

def _bq_query(self, offset):
"""
Expand All @@ -87,20 +81,30 @@ def _bq_query(self, offset):
:param offset: the query offset
:type offset: int
"""

query = """
SELECT timestamp, textPayload
FROM `%s.std*`
WHERE %s
""" % self.dataset

query += """
WHERE
labels.k8s_pod_dag_id = %(dag_id)s AND
labels.k8s_pod_task_id = %(task_id)s AND
labels.k8s_pod_execution_date = %(execution_date)s AND
labels.k8s_pod_try_number = %(try_number)s
ORDER BY timestamp ASC
OFFSET %d
"""

if (self.query_limit > 0):
query += 'LIMIT %d' % self.query_limit
if (self.parameters['limit'] > 0):
query += """
LIMIT %(limit)d
OFFSET %(offset)d
"""

self.bq_cursor.execute(query, (self.dataset_name,
self.where_statement,
offset))
self.parameters['offset'] = int(offset)

self.bq_cursor.execute(query, self.parameters)

return self.format_log(self.bq_cursor.fetchall())

Expand All @@ -115,6 +119,11 @@ def _read(self, ti, try_number, metadata=None):
can be used for steaming log reading and auto-tailing.
"""

# Explicitly set query parameters is necessary as the given
# task instance might be different than task instance passed in
# in set_context method.
self._set_parameters(ti, try_number)

log_relative_path = self._render_filename(ti, try_number)
remote_loc = os.path.join(self.remote_base, log_relative_path)

Expand Down
45 changes: 0 additions & 45 deletions bigquerylogger/__init__.py
Original file line number Diff line number Diff line change
@@ -1,45 +0,0 @@
import os

from airflow.configuration import conf
from airflow.utils.module_loading import import_string


def get_default_logging_config():
logging_class_path = 'airflow.config_templates.' \
'airflow_local_settings.DEFAULT_LOGGING_CONFIG'
return import_string(logging_class_path)


def set_bigquery_handler(default_logging_config):
remote_logging = conf.getboolean('core', 'remote_logging')
remote_base_log_folder = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')

if not (
remote_logging and remote_base_log_folder.startswith('gs://')
): return default_logging_config

base_log_folder = conf.get('core', 'BASE_LOG_FOLDER')
filename_template = conf.get('core', 'LOG_FILENAME_TEMPLATE')
bigquery_dataset = conf.get('core', 'LOG_BIGQUERY_DATASET')
bigquery_limit = conf.get('core', 'LOG_BIGQUERY_LIMIT', fallback=0)

bigquery_remote_handlers = {
'task': {
'class': 'bigquerylogger.BQTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(base_log_folder),
'gcs_log_folder': remote_base_log_folder,
'filename_template': filename_template,
'dataset_name': bigquery_dataset,
'query_limit': bigquery_limit
}
}

default_logging_config['handlers'].update(bigquery_remote_handlers)

return default_logging_config


default_logging_config = get_default_logging_config()

CONFIG_CLASS=set_bigquery_handler(default_logging_config)
32 changes: 32 additions & 0 deletions bigquerylogger/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import os

from airflow.configuration import conf
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG as LOGGING_CLASS


if LOGGING_CLASS['handlers']['task']['class'] != 'bigquerylogger.BQTaskHandler':

remote_logging = conf.getboolean('core', 'remote_logging')
remote_base_log_folder = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')

if remote_logging and remote_base_log_folder.startswith('gs://'):

base_log_folder = conf.get('core', 'BASE_LOG_FOLDER')
filename_template = conf.get('core', 'LOG_FILENAME_TEMPLATE')
bigquery_dataset = conf.get('core', 'LOG_BIGQUERY_DATASET')
bigquery_limit = conf.get('core', 'LOG_BIGQUERY_LIMIT', fallback=200)

bigquery_remote_handlers = {
'task': {
'class': 'bigquerylogger.BQTaskHandler.BQTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(base_log_folder),
'gcs_log_folder': remote_base_log_folder,
'filename_template': filename_template,
'dataset_name': bigquery_dataset,
'query_limit': bigquery_limit
}
}

LOGGING_CLASS['handlers'].update(bigquery_remote_handlers)

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

setup(
name='airflow-bigquerylogger',
version='0.1.0',
version='0.4.2',
description='BigQuery logger handler for Airflow',
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down

0 comments on commit 8c1e1bc

Please sign in to comment.