Skip to content

Commit

Permalink
Dev 850 tags (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
dariakharlan authored Jan 3, 2020
1 parent d6e3c28 commit bb8982f
Show file tree
Hide file tree
Showing 14 changed files with 164 additions and 34 deletions.
26 changes: 14 additions & 12 deletions agent/src/agent/pipeline/config_handlers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,16 @@ def convert_timestamp_to_unix(self, stage):
conf['value'][0]['expression'] = '${' + expression + '}'
return

def get_tags(self) -> dict:
return {
'source': ['anodot-agent'],
'source_host_id': [self.client_config['destination']['host_id']],
'source_host_name': [HOSTNAME],
'pipeline_id': [self.get_pipeline_id()],
'pipeline_type': [self.get_pipeline_type()],
**self.client_config.get('tags', {})
}

def set_constant_properties(self, stage):
for conf in stage['configuration']:
if conf['name'] != 'expressionProcessorConfigs':
Expand All @@ -113,18 +123,10 @@ def set_constant_properties(self, stage):
conf['value'].append({'fieldToSet': '/properties/' + key, 'expression': val})

conf['value'].append({'fieldToSet': '/tags', 'expression': '${emptyMap()}'})
conf['value'].append({'fieldToSet': '/tags/source', 'expression': '${emptyList()}'})
conf['value'].append({'fieldToSet': '/tags/source_host_id', 'expression': '${emptyList()}'})
conf['value'].append({'fieldToSet': '/tags/source_host_name', 'expression': '${emptyList()}'})
conf['value'].append({'fieldToSet': '/tags/pipeline_id', 'expression': '${emptyList()}'})
conf['value'].append({'fieldToSet': '/tags/pipeline_type', 'expression': '${emptyList()}'})
conf['value'].append({'fieldToSet': '/tags/source[0]', 'expression': 'anodot-agent'})
conf['value'].append({'fieldToSet': '/tags/source_host_id[0]',
'expression': self.client_config['destination']['host_id']})
conf['value'].append({'fieldToSet': '/tags/source_host_name[0]',
'expression': HOSTNAME})
conf['value'].append({'fieldToSet': '/tags/pipeline_id[0]', 'expression': self.get_pipeline_id()})
conf['value'].append({'fieldToSet': '/tags/pipeline_type[0]', 'expression': self.get_pipeline_type()})
for tag_name, tag_values in self.get_tags().items():
conf['value'].append({'fieldToSet': f'/tags/{tag_name}', 'expression': '${emptyList()}'})
for idx, val in enumerate(tag_values):
conf['value'].append({'fieldToSet': f'/tags/{tag_name}[{idx}]', 'expression': val})
return

def set_initial_offset(self, client_config=None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,7 @@
"value" : "/*\nstate['MEASUREMENT_NAME'] = 'clicks';\nstate['REQUIRED_DIMENSIONS'] = ['AdType', 'Exchange'];\nstate['OPTIONAL_DIMENSIONS'] = ['ver', 'AdSize', 'Country'];\nstate['VALUES_COLUMNS'] = ['value'];\nstate['TARGET_TYPE'] = 'gauge';\nstate['VALUE_CONSTANT'] = 1\n*/\n\nstate['MEASUREMENT_NAME'] = 'cpu_sample2';\nstate['REQUIRED_DIMENSIONS'] = [];\nstate['OPTIONAL_DIMENSIONS'] = ['host', 'host2'];\nstate['VALUES_COLUMNS'] = ['usage_active', 'usage_idle'];\nstate['TARGET_TYPE'] = 'gauge';\nstate['VALUE_CONSTANT'] = 1"
}, {
"name" : "script",
"value" : "/**\n * Available constants: \n * They are to assign a type to a field with a value null.\n * NULL_BOOLEAN, NULL_CHAR, NULL_BYTE, NULL_SHORT, NULL_INTEGER, NULL_LONG\n * NULL_FLOATNULL_DOUBLE, NULL_DATE, NULL_DATETIME, NULL_TIME, NULL_DECIMAL\n * NULL_BYTE_ARRAY, NULL_STRING, NULL_LIST, NULL_MAP\n *\n * Available Objects:\n * \n * records: an array of records to process, depending on the JavaScript processor\n * processing mode it may have 1 record or all the records in the batch.\n *\n * state: a dict that is preserved between invocations of this script. \n * Useful for caching bits of data e.g. counters.\n *\n * log.<loglevel>(msg, obj...): use instead of print to send log messages to the log4j log instead of stdout.\n * loglevel is any log4j level: e.g. info, error, warn, trace.\n *\n * output.write(record): writes a record to processor output\n *\n * error.write(record, message): sends a record to error\n *\n * sdcFunctions.getFieldNull(Record, 'field path'): Receive a constant defined above\n * to check if the field is typed field with value null\n * sdcFunctions.createRecord(String recordId): Creates a new record.\n * Pass a recordId to uniquely identify the record and include enough information to track down the record source. \n * sdcFunctions.createMap(boolean listMap): Create a map for use as a field in a record.\n * Pass true to this function to create a list map (ordered map)\n *\n * sdcFunctions.createEvent(String type, int version): Creates a new event.\n * Create new empty event with standard headers.\n * sdcFunctions.toEvent(Record): Send event to event stream\n * Only events created with sdcFunctions.createEvent are supported.\n * sdcFunctions.isPreview(): Determine if pipeline is in preview mode.\n *\n * Available Record Header Variables:n *\n * record.attributes: a map of record header attributes.\n *\n * record.<header name>: get the value of 'header name'.\n */\n\n// Sample JavaScript code\ntags = {'source': ['anodot-agent'], 'source_host_id': [state['HOST_ID']], 'source_host_name': [state['HOST_NAME']], 'pipeline_id': [state['PIPELINE_ID']], 'pipeline_type': ['influx']}\nbatch_size = 1000\nbatch = []\nfor(var i = 0; i < records.length; i++) {\n try {\n\ttimestamp = records[i].value['time'] / 1e3\n // Create a new record with map field \n \n for (var p in records[i].value) {\n if (state['REQUIRED_DIMENSIONS'].indexOf(p) >= 0 || state['OPTIONAL_DIMENSIONS'].indexOf(p) >= 0 || p == 'time'){\n continue;\n }\n var properties = {'target_type': state['TARGET_TYPE']}\n for (var key in state['CONSTANT_PROPERTIES']) {\n if (state['CONSTANT_PROPERTIES'].hasOwnProperty(key)) properties[key] = state['CONSTANT_PROPERTIES'][key];\n }\n for (var k = 0; k < state['REQUIRED_DIMENSIONS'].length; k++) {\n properties[state['REQUIRED_DIMENSIONS'][k]] = records[i].value[state['REQUIRED_DIMENSIONS'][k]]\n }\n for (var k = 0; k < state['OPTIONAL_DIMENSIONS'].length; k++) {\n if (records[i].value[state['OPTIONAL_DIMENSIONS'][k]] !== null) {\n properties[state['OPTIONAL_DIMENSIONS'][k]] = records[i].value[state['OPTIONAL_DIMENSIONS'][k]]\n }\n }\n item = {'timestamp' : timestamp,\n 'properties' : properties,\n 'tags': tags,\n 'value': records[i].value[p]}\n\n item['properties']['measurement_category'] = state['MEASUREMENT_NAME'].replace(/[\\. ]+/g, '_');\n item['properties']['what'] = p;\n\t if (batch.length == batch_size) {\n var newRecord = sdcFunctions.createRecord(records[i].sourceId + ':' + i);\n newRecord.value = batch\n output.write(newRecord);\n batch = []\n }\n \n batch.push(item);\n \n \n }\n\n // output.write(records[i]);\n } catch (e) {\n // Send record to error\n error.write(records[i], e);\n }\n}\n\nif (batch.length > 0) {\n var newRecord = sdcFunctions.createRecord(records[0].sourceId + ':' + i);\n newRecord.value = batch\n output.write(newRecord);\n}"
"value" : "/**\n * Available constants: \n * They are to assign a type to a field with a value null.\n * NULL_BOOLEAN, NULL_CHAR, NULL_BYTE, NULL_SHORT, NULL_INTEGER, NULL_LONG\n * NULL_FLOATNULL_DOUBLE, NULL_DATE, NULL_DATETIME, NULL_TIME, NULL_DECIMAL\n * NULL_BYTE_ARRAY, NULL_STRING, NULL_LIST, NULL_MAP\n *\n * Available Objects:\n * \n * records: an array of records to process, depending on the JavaScript processor\n * processing mode it may have 1 record or all the records in the batch.\n *\n * state: a dict that is preserved between invocations of this script. \n * Useful for caching bits of data e.g. counters.\n *\n * log.<loglevel>(msg, obj...): use instead of print to send log messages to the log4j log instead of stdout.\n * loglevel is any log4j level: e.g. info, error, warn, trace.\n *\n * output.write(record): writes a record to processor output\n *\n * error.write(record, message): sends a record to error\n *\n * sdcFunctions.getFieldNull(Record, 'field path'): Receive a constant defined above\n * to check if the field is typed field with value null\n * sdcFunctions.createRecord(String recordId): Creates a new record.\n * Pass a recordId to uniquely identify the record and include enough information to track down the record source. \n * sdcFunctions.createMap(boolean listMap): Create a map for use as a field in a record.\n * Pass true to this function to create a list map (ordered map)\n *\n * sdcFunctions.createEvent(String type, int version): Creates a new event.\n * Create new empty event with standard headers.\n * sdcFunctions.toEvent(Record): Send event to event stream\n * Only events created with sdcFunctions.createEvent are supported.\n * sdcFunctions.isPreview(): Determine if pipeline is in preview mode.\n *\n * Available Record Header Variables:n *\n * record.attributes: a map of record header attributes.\n *\n * record.<header name>: get the value of 'header name'.\n */\n\n// Sample JavaScript code\nbatch_size = 1000\nbatch = []\nfor(var i = 0; i < records.length; i++) {\n try {\n\ttimestamp = records[i].value['time'] / 1e3\n // Create a new record with map field \n \n for (var p in records[i].value) {\n if (state['REQUIRED_DIMENSIONS'].indexOf(p) >= 0 || state['OPTIONAL_DIMENSIONS'].indexOf(p) >= 0 || p == 'time'){\n continue;\n }\n var properties = {'target_type': state['TARGET_TYPE']}\n for (var key in state['CONSTANT_PROPERTIES']) {\n if (state['CONSTANT_PROPERTIES'].hasOwnProperty(key)) properties[key] = state['CONSTANT_PROPERTIES'][key];\n }\n for (var k = 0; k < state['REQUIRED_DIMENSIONS'].length; k++) {\n properties[state['REQUIRED_DIMENSIONS'][k]] = records[i].value[state['REQUIRED_DIMENSIONS'][k]]\n }\n for (var k = 0; k < state['OPTIONAL_DIMENSIONS'].length; k++) {\n if (records[i].value[state['OPTIONAL_DIMENSIONS'][k]] !== null) {\n properties[state['OPTIONAL_DIMENSIONS'][k]] = records[i].value[state['OPTIONAL_DIMENSIONS'][k]]\n }\n }\n item = {'timestamp' : timestamp,\n 'properties' : properties,\n 'tags': state['TAGS'],\n 'value': records[i].value[p]}\n\n item['properties']['measurement_category'] = state['MEASUREMENT_NAME'].replace(/[\\. ]+/g, '_');\n item['properties']['what'] = p;\n\t if (batch.length == batch_size) {\n var newRecord = sdcFunctions.createRecord(records[i].sourceId + ':' + i);\n newRecord.value = batch\n output.write(newRecord);\n batch = []\n }\n \n batch.push(item);\n \n \n }\n\n // output.write(records[i]);\n } catch (e) {\n // Send record to error\n error.write(records[i], e);\n }\n}\n\nif (batch.length > 0) {\n var newRecord = sdcFunctions.createRecord(records[0].sourceId + ':' + i);\n newRecord.value = batch\n output.write(newRecord);\n}"
}, {
"name" : "destroyScript",
"value" : "/**\n * Available Objects:\n * \n * state: a dict that is preserved between invocations of this script. \n * Useful for caching bits of data e.g. counters and long-lived resources.\n *\n * log.<loglevel>(msg, obj...): use instead of print to send log messages to the log4j log instead of stdout.\n * loglevel is any log4j level: e.g. info, error, warn, trace.\n * sdcFunctions.getFieldNull(Record, 'field path'): Receive a constant defined above \n * to check if the field is typed field with value null\n * sdcFunctions.createMap(boolean listMap): Create a map for use as a field in a record. \n * Pass true to this function to create a list map (ordered map)\n * sdcFunctions.createEvent(String type, int version): Creates a new event.\n * Create new empty event with standard headers.\n * sdcFunctions.toEvent(Record): Send event to event stream\n * Only events created with sdcFunctions.createEvent are supported.\n */\n\n// state['connection'].close();"
Expand Down
6 changes: 4 additions & 2 deletions agent/src/agent/pipeline/config_handlers/influx.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import os
import json

from .base import BaseConfigHandler, ConfigHandlerException
from ...logger import get_logger
Expand Down Expand Up @@ -29,6 +29,7 @@ class InfluxConfigHandler(BaseConfigHandler):
state['HOST_ID'] = '{host_id}'
state['HOST_NAME'] = '{host_name}'
state['PIPELINE_ID'] = '{pipeline_id}'
state['TAGS'] = {tags}
"""

QUERY_GET_DATA = "SELECT+{dimensions}+FROM+%22{metric}%22+WHERE+%28%22time%22+%3E%3D+${{record:value('/last_timestamp')}}+AND+%22time%22+%3C+${{record:value('/last_timestamp')}}%2B{interval}+AND+%22time%22+%3C+now%28%29-{delay}%29+{where}"
Expand Down Expand Up @@ -114,7 +115,8 @@ def override_stages(self):
constant_properties=str(self.client_config.get('properties', {})),
host_id=self.client_config['destination']['host_id'],
host_name=HOSTNAME,
pipeline_id=self.get_pipeline_id()
pipeline_id=self.get_pipeline_id(),
tags=json.dumps(self.get_tags())
)

if conf['name'] == 'stageRecordPreconditions':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
}}
]},
"properties": {"type": "object"},
"tags": {"type": "object"},
"delay": {"type": "string"},
"interval": {"type": "integer"}
},
Expand Down
1 change: 1 addition & 0 deletions agent/src/agent/pipeline/json_schema_definitions/jdbc.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"type": {"type": "string", "enum": ["datetime", "unix", "unix_ms"]}
}, "required": ["name", "type"]},
"properties": {"type": "object"},
"tags": {"type": "object"},
"offset_column": {"type": "string", "minLength": 1},
"initial_offset": {"oneOf": [{"type": "string", "minLength": 1}, {"type": "integer", "minimum": 0}]},
"limit": {"type": "integer", "minimum": 1}
Expand Down
3 changes: 2 additions & 1 deletion agent/src/agent/pipeline/json_schema_definitions/kafka.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"type": {"type": "string", "enum": ["string", "unix", "unix_ms"]},
"format": {"type": "string", "minLength": 1}
}, "required": ["name", "type"]},
"properties": {"type": "object"}
"properties": {"type": "object"},
"tags": {"type": "object"}
},
"required": ["values", "measurement_names", "dimensions", "timestamp"]
}
3 changes: 2 additions & 1 deletion agent/src/agent/pipeline/json_schema_definitions/mongo.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
"type": {"type": "string", "enum": ["string", "datetime", "unix", "unix_ms"]},
"format": {"type": "string", "minLength": 1}
}, "required": ["name", "type"]},
"properties": {"type": "object"}
"properties": {"type": "object"},
"tags": {"type": "object"}
},
"required": ["measurement_name", "value", "timestamp", "dimensions"]
}
34 changes: 27 additions & 7 deletions agent/src/agent/pipeline/prompt.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import click
import os
import csv

from agent.pipeline.config_handlers import expression_parser
from agent.tools import infinite_retry, if_validation_enabled, dict_get_nested
Expand Down Expand Up @@ -59,6 +57,24 @@ def set_dimensions(self):
value_proc=lambda x: x.split(),
default=self.config['dimensions'].get('optional', []))

@infinite_retry
def prompt_tags(self):
self.config['tags'] = self.default_config.get('tags', {})

properties_str = ''
if self.config['tags']:
properties_str = ' '.join([key + ':' + val for key, val in self.config['tags'].items()])

self.config['tags'] = {}

properties_str = click.prompt('Tags', type=click.STRING, default=properties_str)
for i in properties_str.split():
pair = i.split(':')
if len(pair) != 2:
raise click.UsageError('Wrong format, correct example - key:val key2:val2')

self.config['tags'][pair[0]] = [pair[1]]

@infinite_retry
def prompt_object(self, property_name, prompt_text):
self.config[property_name] = self.default_config.get(property_name, {})
Expand All @@ -81,6 +97,10 @@ def set_static_properties(self):
if self.advanced:
self.prompt_object('properties', 'Additional properties')

def set_tags(self):
if self.advanced:
self.prompt_tags()

def set_measurement_name(self):
self.config['measurement_name'] = click.prompt('Measurement name', type=click.STRING,
default=self.default_config.get('measurement_name'))
Expand Down Expand Up @@ -118,6 +138,7 @@ def set_config(self):
self.set_timestamp()
self.set_dimensions()
self.set_static_properties()
self.set_tags()

@infinite_retry
def prompt_value(self):
Expand Down Expand Up @@ -145,6 +166,7 @@ def set_config(self):
self.set_timestamp()
self.set_dimensions()
self.set_static_properties()
self.set_tags()
self.filter()
self.transform()

Expand Down Expand Up @@ -240,9 +262,9 @@ def set_config(self):
self.data_preview()
self.set_value()
self.set_target_type()
self.set_timestamp()
self.set_dimensions()
self.set_static_properties()
self.set_tags()
self.set_delay()
self.set_filtering()

Expand All @@ -256,9 +278,6 @@ def set_delay(self):
self.config['interval'] = click.prompt('Interval, seconds', type=click.INT,
default=self.default_config.get('interval', 60))

def set_timestamp(self):
pass

@infinite_retry
def set_value(self):
self.config['value'] = self.default_config.get('value', {'constant': 1, 'values': []})
Expand Down Expand Up @@ -288,7 +307,7 @@ def set_dimensions(self):
def set_filtering(self):
if self.advanced or self.config.get('filtering', ''):
self.config['filtering'] = click.prompt('Filtering condition', type=click.STRING,
default=self.default_config.get('filtering')).strip()
default=self.default_config.get('filtering', '')).strip()


class PromptConfigJDBC(PromptConfig):
Expand All @@ -303,6 +322,7 @@ def set_config(self):
self.set_timestamp()
self.set_dimensions()
self.set_static_properties()
self.set_tags()
self.set_condition()

@infinite_retry
Expand Down
Loading

0 comments on commit bb8982f

Please sign in to comment.