-
Notifications
You must be signed in to change notification settings - Fork 1
/
auditlogs-to-bigquery.yaml
99 lines (90 loc) · 3.51 KB
/
auditlogs-to-bigquery.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
id: auditlogs-to-bigquery
namespace: company.team
tasks:
- id: consume
type: io.kestra.plugin.kafka.Consume
properties:
auto.offset.reset: earliest
bootstrap.servers: local:9092
topic: kestra_auditlogs
valueDeserializer: JSON
maxRecords: 500
- id: transform
type: io.kestra.plugin.scripts.nashorn.FileTransform
from: "{{ outputs.consume.uri }}"
script: |
var jacksonMapper = Java.type('io.kestra.core.serializers.JacksonMapper');
delete row['headers'];
var value = row['value']
row['id'] = value['id']
row['type'] = value['type']
row['detail'] = value['detail']
row['date'] = value['date']
row['deleted'] = value['deleted']
row['value'] = jacksonMapper.ofJson().writeValueAsString(value)
row['detail_type'] = value['detail']['type']
row['detail_cls'] = value['detail']['cls']
row['detail_permission'] = value['detail']['permission']
row['detail_id'] = value['detail']['id']
row['detail_namespace'] = value['detail']['namespace']
row['detail_flowId'] = value['detail']['flowId']
row['detail_executionId'] = value['detail']['executionId']
- id: avro
type: io.kestra.plugin.serdes.avro.IonToAvro
from: "{{ outputs.transform.uri }}"
description: convert the file from Kestra internal storage to avro.
schema: |
{
"type": "record",
"name": "Root",
"fields":
[
{ "name": "id", "type": ["null", "string"] },
{ "name": "type", "type": ["null", "string"] },
{ "name": "detail", "type": ["null", "string"] },
{ "name": "date", "type": ["null", "string"] },
{ "name": "deleted", "type": ["null", "string"] },
{ "name": "value", "type": ["null", "string"] },
{ "name": "detail_type", "type": ["null", "string"] },
{ "name": "detail_cls", "type": ["null", "string"] },
{ "name": "detail_permission", "type": ["null", "string"] },
{ "name": "detail_id", "type": ["null", "string"] },
{ "name": "detail_namespace", "type": ["null", "string"] },
{ "name": "detail_flowId", "type": ["null", "string"] },
{ "name": "detail_executionId", "type": ["null", "string"] }
]
}
- id: load
type: io.kestra.plugin.gcp.bigquery.Load
avroOptions:
useAvroLogicalTypes: true
destinationTable: your_gcp_project.dwh.autditlogs
format: AVRO
from: "{{outputs.avro.uri }}"
writeDisposition: WRITE_TRUNCATE
serviceAccount: "{{ secret('GCP_CREDS') }}"
projectId: your_gcp_project
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: 0 10 * * *
extend:
title: Stream kestra audit logs from a Kafka topic to BigQuery for analytics and
troubleshooting
description: >-
This flow shows how to stream Kestra audit logs to BigQuery. The audit logs
are stored in the `kestra_auditlogs` Kafka topic. The flow consumes the
audit logs from the Kafka topic, transforms the data, and loads it into
BigQuery.
The flow is triggered every day at 10 AM UTC. You can customize the trigger
by changing the cron expression, timezone and more. For more information
about cron expressions, visit the [following
documentation](https://kestra.io/docs/developer-guide/triggers/schedule).
tags:
- Ingest
- BigQuery
- Trigger
- Outputs
ee: false
demo: false
meta_description: This flow shows how to stream Kestra audit logs to BigQuery.