diff --git a/.gitignore b/.gitignore index 7e7e896..3daa555 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,5 @@ build/ *.egg *.pyc debug* +demo* + diff --git a/MANIFEST.in b/MANIFEST.in index 563ae49..bb3ec5f 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,2 +1 @@ include README.md -include 中文说明.md diff --git a/README.md b/README.md index dcacdc2..2245733 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,14 @@ Export CSV To Influx ==================== -**Export CSV To Influx**: Process CSV data, and export the data to influx db +**Export CSV To Influx**: Process CSV data, and write the data to influx db + +## Support: + +- Influx 0.x, 1.x +- influx 2.x: Start supporting from 2.0.0 + +> Important Note: Influx 2.x has build-in csv write feature, it is more powerful: [https://docs.influxdata.com/influxdb/v2.1/write-data/developer-tools/csv/](https://docs.influxdata.com/influxdb/v2.1/write-data/developer-tools/csv/) ## Install @@ -29,74 +36,49 @@ pip install ExportCsvToInflux You could use `export_csv_to_influx -h` to see the help guide. -`-c, --csv`: Input CSV file path, or the folder path. `Mandatory` - -`-db, --dbname`: InfluxDB Database name. `Mandatory` - -`-m, --measurement`: Measurement name. `Mandatory` - -`-fc, --field_columns`: List of csv columns to use as fields, separated by comma. `Mandatory` - -`-d, --delimiter`: CSV delimiter. Default: ','. - -`-lt, --lineterminator`: CSV lineterminator. Default: '\n'. - -`-s, --server`: InfluxDB Server address. Default: localhost:8086. - -`-u, --user`: InfluxDB User name. Default: admin - -`-p, --password`: InfluxDB Password. Default: admin - -`-t, --time_column`: Timestamp column name. Default column name: timestamp. - -If no timestamp column, the timestamp is set to the last file modify time for whole csv rows. - -> Note: Also support the pure timestamp, like: 1517587275. Auto detected. - -`-tf, --time_format`: Timestamp format. Default: '%Y-%m-%d %H:%M:%S' e.g.: 1970-01-01 00:00:00. - -`-tz, --time_zone`: Timezone of supplied data. Default: UTC. - -`-tc, --tag_columns`: List of csv columns to use as tags, separated by comma. Default: None - -`-b, --batch_size`: Batch size when inserting data to influx. Default: 500. - -`-lslc, --limit_string_length_columns`: Limit string length column, separated by comma. Default: None. - -`-ls, --limit_length`: Limit length. Default: 20. - -`-dd, --drop_database`: Drop database before inserting data. Default: False. - -`-dm, --drop_measurement`: Drop measurement before inserting data. Default: False. - -`-mc, --match_columns`: Match the data you want to get for certain columns, separated by comma. Match Rule: All matches, then match. Default: None. - -`-mbs, --match_by_string`: Match by string, separated by comma. Default: None. - -`-mbr, --match_by_regex`: Match by regex, separated by comma. Default: None. - -`-fic, --filter_columns`: Filter the data you want to filter for certain columns, separated by comma. Filter Rule: Any one filter success, the filter. Default: None. - -`-fibs, --filter_by_string`: Filter by string, separated by comma. Default: None. - -`-fibr, --filter_by_regex`: Filter by regex, separated by comma. Default: None. - -`-ecm, --enable_count_measurement`: Enable count measurement. Default: False. - -`-fi, --force_insert_even_csv_no_update`: Force insert data to influx, even csv no update. Default: False. - -`-fsc, --force_string_columns`: Force columns as string type, seperated as comma. Default: None - -`-fintc, --force_int_columns`: Force columns as int type, seperated as comma. Default: None - -`-ffc, --force_float_columns`: Force columns as float type, seperated as comma. Default: None - - > **Note:** > 1. You could pass `*` to --field_columns to match all the fields: `--field_columns=*`, `--field_columns '*'` -> 2. CSV data won't insert into influx again if no update. Use to force insert: `--force_insert_even_csv_no_update=True`, `--force_insert_even_csv_no_update True` +> 2. CSV data won't insert into influx again if no update. Use to force insert, default True: `--force_insert_even_csv_no_update=True`, `--force_insert_even_csv_no_update True` > 3. If some csv cells have no value, auto fill the influx db based on column data type: `int: -999`, `float: -999.0`, `string: -` +| # | Option | Mandatory | Default | Description | +|:--:|------------------------------------------|:----------------------:|:-----------------:|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 1 | `-c, --csv` | Yes | | CSV file path, or the folder path | +| 2 | `-db, --dbname` | For 0.x, 1.x only: Yes | | InfluxDB Database name | +| 3 | `-u, --user` | For 0.x, 1.x only: No | admin | InfluxDB User name | +| 4 | `-p, --password` | For 0.x, 1.x only: No | admin | InfluxDB Password | +| 5 | `-org, --org` | For 2.x only: No | my-org | For 2.x only, my org | +| 6 | `-bucket, --bucket` | For 2.x only: No | my-bucket | For 2.x only, my bucket | +| 7 | `-http_schema, --http_schema` | For 2.x only: No | http | For 2.x only, influxdb http schema, could be http or https | +| 8 | `-token, --token` | For 2.x only: Yes | | For 2.x only, n | +| 9 | `-m, --measurement` | Yes | | Measurement name | +| 10 | `-fc, --field_columns` | Yes | | List of csv columns to use as fields, separated by comma | +| 11 | `-tc, --tag_columns` | No | None | List of csv columns to use as tags, separated by comma | +| 12 | `-d, --delimiter` | No | , | CSV delimiter | +| 13 | `-lt, --lineterminator` | No | \n | CSV lineterminator | +| 14 | `-s, --server` | No | localhost:8086 | InfluxDB Server address | +| 15 | `-t, --time_column` | No | timestamp | Timestamp column name. If no timestamp column, the timestamp is set to the last file modify time for whole csv rows. `Note: Also support the pure timestamp, like: 1517587275. Auto detected` | +| 16 | `-tf, --time_format` | No | %Y-%m-%d %H:%M:%S | Timestamp format, see more: https://strftime.org/ | +| 17 | `-tz, --time_zone` | No | UTC | Timezone of supplied data | +| 18 | `-b, --batch_size` | No | 500 | Batch size when inserting data to influx | +| 19 | `-lslc, --limit_string_length_columns` | No | None | Limit string length column, separated by comma | +| 20 | `-ls, --limit_length` | No | 20 | Limit length | +| 21 | `-dd, --drop_database` | Compatible with 2.x: No| False | Drop database or bucket before inserting data | +| 22 | `-dm, --drop_measurement` | No | False | Drop measurement before inserting data | +| 23 | `-mc, --match_columns` | No | None | Match the data you want to get for certain columns, separated by comma. Match Rule: All matches, then match | +| 24 | `-mbs, --match_by_string` | No | None | Match by string, separated by comma | +| 25 | `-mbr, --match_by_regex` | No | None | Match by regex, separated by comma | +| 26 | `-fic, --filter_columns` | No | None | Filter the data you want to filter for certain columns, separated by comma. Filter Rule: Any one filter success, the filter | +| 27 | `-fibs, --filter_by_string` | No | None | Filter by string, separated by comma | +| 28 | `-fibr, --filter_by_regex` | No | None | Filter by regex, separated by comma | +| 29 | `-ecm, --enable_count_measurement` | No | False | Enable count measurement | +| 30 | `-fi, --force_insert_even_csv_no_update` | No | True | Force insert data to influx, even csv no update | +| 31 | `-fsc, --force_string_columns` | No | None | Force columns as string type, separated as comma | +| 32 | `-fintc, --force_int_columns` | No | None | Force columns as int type, separated as comma | +| 33 | `-ffc, --force_float_columns` | No | None | Force columns as float type, separated as comma | +| 34 | `-uniq, --unique` | No | False | Write duplicated points | +| 35 | `--csv_charset, --csv_charset` | No | None | The csv charset. Default: None, which will auto detect | + ## Programmatically Also, we could run the exporter programmatically. @@ -113,118 +95,108 @@ print(exporter.export_csv_to_influx.__doc__) ## Sample -Here is the **demo.csv**. +1. Here is the **demo.csv** ``` timestamp,url,response_time -2019-07-11 02:04:05,https://jmeter.apache.org/,1.434 -2019-07-11 02:04:06,https://jmeter.apache.org/,2.434 -2019-07-11 02:04:07,https://jmeter.apache.org/,1.200 -2019-07-11 02:04:08,https://jmeter.apache.org/,1.675 -2019-07-11 02:04:09,https://jmeter.apache.org/,2.265 -2019-07-11 02:04:10,https://sample-demo.org/,1.430 -2019-07-12 08:54:13,https://sample-show.org/,1.300 -2019-07-12 14:06:00,https://sample-7.org/,1.289 -2019-07-12 18:45:34,https://sample-8.org/,2.876 +2022-03-08 02:04:05,https://jmeter.apache.org/,1.434 +2022-03-08 02:04:06,https://jmeter.apache.org/,2.434 +2022-03-08 02:04:07,https://jmeter.apache.org/,1.200 +2022-03-08 02:04:08,https://jmeter.apache.org/,1.675 +2022-03-08 02:04:09,https://jmeter.apache.org/,2.265 +2022-03-08 02:04:10,https://sample-demo.org/,1.430 +2022-03-08 03:54:13,https://sample-show.org/,1.300 +2022-03-07 04:06:00,https://sample-7.org/,1.289 +2022-03-07 05:45:34,https://sample-8.org/,2.876 ``` -1. Command to export whole data into influx: - - ``` - export_csv_to_influx \ - --csv demo.csv \ - --dbname demo \ - --measurement demo \ - --tag_columns url \ - --field_columns response_time \ - --user admin \ - --password admin \ - --force_insert_even_csv_no_update True \ - --server 127.0.0.1:8086 - ``` - -2. Command to export whole data into influx, **but: drop database** - - ``` - export_csv_to_influx \ - --csv demo.csv \ - --dbname demo \ - --measurement demo \ - --tag_columns url \ - --field_columns response_time \ - --user admin \ - --password admin \ - --server 127.0.0.1:8086 \ - --force_insert_even_csv_no_update True \ - --drop_database=True - ``` - -3. Command to export part of data: **timestamp matches 2019-07-12 and url matches sample-\d+** - - ``` - export_csv_to_influx \ - --csv demo.csv \ - --dbname demo \ - --measurement demo \ - --tag_columns url \ - --field_columns response_time \ - --user admin \ - --password admin \ - --server 127.0.0.1:8086 \ - --drop_database=True \ - --force_insert_even_csv_no_update True \ - --match_columns=timestamp,url \ - --match_by_reg='2019-07-12,sample-\d+' - ``` - -4. Filter part of data, and the export into influx: **url filter sample** - - ``` - export_csv_to_influx \ - --csv demo.csv \ - --dbname demo \ - --measurement demo \ - --tag_columns url \ - --field_columns response_time \ - --user admin \ - --password admin \ - --server 127.0.0.1:8086 \ - --drop_database True \ - --force_insert_even_csv_no_update True \ - --filter_columns timestamp,url \ - --filter_by_reg 'sample' - ``` - -5. Enable count measurement. A new measurement named: **demo.count** generated, with match: **timestamp matches 2019-07-12 and url matches sample-\d+** - - ``` - export_csv_to_influx \ - --csv demo.csv \ - --dbname demo \ - --measurement demo \ - --tag_columns url \ - --field_columns response_time \ - --user admin \ - --password admin \ - --server 127.0.0.1:8086 \ - --drop_database True \ - --force_insert_even_csv_no_update True \ - --match_columns timestamp,url \ - --match_by_reg '2019-07-12,sample-\d+' \ - --enable_count_measurement True - ``` - - The count measurement is: +2. Command samples + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
#DescriptionInflux 0.x, 1.xInflux 2.x
1Write whole data into influx +
export_csv_to_influx \
--csv demo.csv \
--dbname demo \
--measurement demo \
--tag_columns url \
--field_columns response_time \
--user admin \
--password admin \
--server 127.0.0.1:8086
+
+
 export_csv_to_influx \
--csv demo.csv \
--org my-org \
--bucket my-bucket \
--measurement demo \
--tag_columns url \
--field_columns response_time \
--token YourToken \
--server 127.0.0.1:8086
+
2Write whole data into influx, but: drop database or bucket +
export_csv_to_influx \
--csv demo.csv \
--dbname demo \
--measurement demo \
--tag_columns url \
--field_columns response_time \
--user admin \
--password admin \
--server 127.0.0.1:8086 \
--drop_database=True
+
+
 // The Read/Write API Token cannot create database. Before you using the --drop_database, make sure your toke have the access  
// See the bug here: https://github.com/influxdata/influxdb/issues/23170
export_csv_to_influx \
--csv demo.csv \
--org my-org \
--bucket my-bucket \
--measurement demo \
--tag_columns url \
--field_columns response_time \
--token YourToken \
--server 127.0.0.1:8086 \
--drop_database=True
+
3Write part of data: timestamp matches 2022-03-07 and url matches sample-\d+ +
export_csv_to_influx \
--csv demo.csv \
--dbname demo \
--measurement demo \
--tag_columns url \
--field_columns response_time \
--user admin \
--password admin \
--server 127.0.0.1:8086 \
--drop_database=True \
--match_columns=timestamp,url \
--match_by_reg='2022-03-07,sample-\d+'
+
+
export_csv_to_influx \
--csv demo.csv \
--org my-org \
--bucket my-bucket \
--measurement demo \
--tag_columns url \
--field_columns response_time \
--token YourToken \
--server 127.0.0.1:8086 \
--drop_measurement=True \
--match_columns=timestamp,url \
--match_by_reg='2022-03-07,sample-\d+'
+
4Filter part of data, and write into influx: url filters sample +
export_csv_to_influx \
--csv demo.csv \
--dbname demo \
--measurement demo \
--tag_columns url \
--field_columns response_time \
--user admin \
--password admin \
--server 127.0.0.1:8086 \
--drop_database True \
--filter_columns url \
--filter_by_reg 'sample'
+
+
export_csv_to_influx \
--csv demo.csv \
--org my-org \
--bucket my-bucket \
--measurement demo \
--tag_columns url \
--field_columns response_time \
--token YourToken \
--server 127.0.0.1:8086 \
--drop_measurement=True \
  --filter_columns url \
--filter_by_reg 'sample'
+
5Enable count measurement. A new measurement named: demo.count generated, with match: timestamp matches 2022-03-07 and url matches sample-\d+ +
export_csv_to_influx \
--csv demo.csv \
--dbname demo \
--measurement demo \
--tag_columns url \
--field_columns response_time \
--user admin \
--password admin \
--server 127.0.0.1:8086 \
--drop_database True \
--match_columns timestamp,url \
--match_by_reg '2022-03-07,sample-\d+' \
--enable_count_measurement True
+
+
export_csv_to_influx \
--csv demo.csv \
--org my-org \
--bucket my-bucket \
--measurement demo \
--tag_columns url \
--field_columns response_time \
--token YourToken \
--server 127.0.0.1:8086 \
--drop_measurement=True \
--match_columns=timestamp,url \
--match_by_reg='2022-03-07,sample-\d+' \
  --enable_count_measurement True
+
+ +3. If enable the count measurement, the count measurement is: ```text + // Influx 0.x, 1.x select * from "demo.count" name: demo.count time match_timestamp match_url total ---- --------------- --------- ----- 1562957134000000000 3 2 9 + + // Influx 2.x: For more info about Flux, see https://docs.influxdata.com/influxdb/v2.1/query-data/flux/ + influx query 'from(bucket:"my-bucket") |> range(start:-100h) |> filter(fn: (r) => r._measurement == "demo.count")' --raw + + #group,false,false,true,true,false,false,true,true + #datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string + #default,_result,,,,,,, + ,result,table,_start,_stop,_time,_value,_field,_measurement + ,,2,2022-03-04T09:51:49.7425566Z,2022-03-08T13:51:49.7425566Z,2022-03-07T05:45:34Z,2,match_timestamp,demo.count + ,,3,2022-03-04T09:51:49.7425566Z,2022-03-08T13:51:49.7425566Z,2022-03-07T05:45:34Z,2,match_url,demo.count + ,,4,2022-03-04T09:51:49.7425566Z,2022-03-08T13:51:49.7425566Z,2022-03-07T05:45:34Z,9,total,demo.count ``` - + ## Special Thanks The lib is inspired by: [https://github.com/fabio-miranda/csv-to-influxdb](https://github.com/fabio-miranda/csv-to-influxdb) diff --git a/setup.py b/setup.py index bdca255..89a31c3 100644 --- a/setup.py +++ b/setup.py @@ -1,17 +1,21 @@ from setuptools import setup, find_packages +import sys import os import re +import io + +version_info = sys.version_info CURDIR = os.path.dirname(os.path.abspath(__file__)) url = 'https://github.com/Bugazelle/export-csv-to-influx' -with open(os.path.join(CURDIR, 'src', 'ExportCsvToInflux', '__version__.py')) as f: +with io.open(os.path.join(CURDIR, 'src', 'ExportCsvToInflux', '__version__.py'), encoding='utf-8') as f: VERSION = re.search("__version__ = '(.*)'", f.read()).group(1) download_url = '{0}/archive/v{1}.tar.gz'.format(url, VERSION) def readme(): - with open('README.md') as f: + with io.open('README.md', encoding='utf-8') as f: long_description = f.read() return long_description @@ -31,12 +35,13 @@ def readme(): author_email='463407426@qq.com', keywords=['python', 'csv', 'influx'], install_requires=[ - 'influxdb>=5.2.2', - 'python-dateutil>=2.8.0' - ], + 'influxdb>=5.3.1', + 'influxdb-client[ciso]>=1.25.0' if version_info >= (3, 6) else '', + 'python-dateutil>=2.8.0', + ], download_url=download_url, url=url, - classifiers=( + classifiers=[ 'Development Status :: 5 - Production/Stable', 'Intended Audience :: Developers', 'Natural Language :: English', @@ -46,10 +51,39 @@ def readme(): 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', - ), + 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.9', + ], entry_points={ 'console_scripts': [ - 'export_csv_to_influx = ExportCsvToInflux.exporter_object:export_csv_to_influx', + 'export_csv_to_influx = ExportCsvToInflux.command_object:export_csv_to_influx', ], }, ) + + +class bcolors: + HEADER = '\033[95m' + OKBLUE = '\033[94m' + OKCYAN = '\033[96m' + OKGREEN = '\033[92m' + WARNING = '\033[93m' + FAIL = '\033[91m' + ENDC = '\033[0m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + + +if version_info <= (3, 5): + print(bcolors.WARNING + + 'WARNING: Your Python version is {0}.{1} < 3.6, ' + 'which only supports the influxDB 0.x, 1.x.'.format(version_info[0], version_info[1]) + + bcolors.ENDC) + print(bcolors.WARNING + + 'WARNING: If you would like the lib supports influxDB2.x, please upgrade Python >= 3.6.' + + bcolors.ENDC) + print(bcolors.WARNING + + 'WARNING: Alternatively, influxdb 2.x has build-in csv write feature, ' + 'it is more powerful: https://docs.influxdata.com/influxdb/v2.1/write-data/developer-tools/csv/' + + bcolors.ENDC) diff --git a/src/ExportCsvToInflux/__version__.py b/src/ExportCsvToInflux/__version__.py index a96d65a..7fd229a 100644 --- a/src/ExportCsvToInflux/__version__.py +++ b/src/ExportCsvToInflux/__version__.py @@ -1 +1 @@ -__version__ = '0.1.25' +__version__ = '0.2.0' diff --git a/src/ExportCsvToInflux/base_object.py b/src/ExportCsvToInflux/base_object.py index fa6dd2d..8511e89 100644 --- a/src/ExportCsvToInflux/base_object.py +++ b/src/ExportCsvToInflux/base_object.py @@ -20,13 +20,17 @@ def convert_boole(target): return target - def validate_str(self, target, ignore_exception=False): + def validate_str(self, target, ignore_exception=False, target_name=None): """Function: validate_string - :param target: the string + :param target: the target value :param ignore_exception: the True or False + :param target_name: the target name """ + if target is None or str(target).lower() == 'none': + return + get_type = type(target) ignore_exception = self.convert_boole(ignore_exception) try: @@ -34,7 +38,10 @@ def validate_str(self, target, ignore_exception=False): except NameError: string_type = get_type is str if not string_type and ignore_exception is False: - error_message = 'Error: The {0} is not string type. Please check.'.format(target) + if target_name: + error_message = 'Error: The {0} - {1} is not string type. Please check.'.format(target_name, target) + else: + error_message = 'Error: The {0} is not string type. Please check.'.format(target) sys.exit(error_message) return string_type @@ -48,6 +55,9 @@ def str_to_list(self, string, delimiter=',', lower=False): :return """ + if string is None or str(string).lower() == 'none': + return [] + get_type = type(string) error_message = 'Error: The string should be list or string, use comma to separate. ' \ 'Current is: type-{0}, {1}'.format(get_type, string) diff --git a/src/ExportCsvToInflux/command_object.py b/src/ExportCsvToInflux/command_object.py new file mode 100644 index 0000000..4fd6538 --- /dev/null +++ b/src/ExportCsvToInflux/command_object.py @@ -0,0 +1,143 @@ +from .exporter_object import ExporterObject +from .influx_object import InfluxObject +from .__version__ import __version__ +import argparse + + +class UserNamespace(object): + pass + + +def export_csv_to_influx(): + parser = argparse.ArgumentParser(description='CSV to InfluxDB.') + + # Parse: Parse the server name, and judge the influx version + parser.add_argument('-s', '--server', nargs='?', default='localhost:8086', const='localhost:8086', + help='InfluxDB Server address. Default: localhost:8086') + user_namespace = UserNamespace() + parser.parse_known_args(namespace=user_namespace) + influx_object = InfluxObject(db_server_name=user_namespace.server) + influx_version = influx_object.get_influxdb_version() + print('Info: The influxdb version is {influx_version}'.format(influx_version=influx_version)) + + # influxdb 0.x, 1.x + parser.add_argument('-db', '--dbname', + required=True if influx_version.startswith('0') or influx_version.startswith('1') else False, + help='For 0.x, 1.x only, InfluxDB Database name.') + parser.add_argument('-u', '--user', nargs='?', default='admin', const='admin', + help='For 0.x, 1.x only, InfluxDB User name.') + parser.add_argument('-p', '--password', nargs='?', default='admin', const='admin', + help='For 0.x, 1.x only, InfluxDB Password.') + + # influxdb 2.x + parser.add_argument('-http_schema', '--http_schema', nargs='?', default='http', const='http', + help='For 2.x only, the influxdb http schema, could be http or https. Default: http.') + parser.add_argument('-org', '--org', nargs='?', default='my-org', const='my-org', + help='For 2.x only, the org. Default: my-org.') + parser.add_argument('-bucket', '--bucket', nargs='?', default='my-bucket', const='my-bucket', + help='For 2.x only, the bucket. Default: my-bucket.') + parser.add_argument('-token', '--token', + required=True if influx_version.startswith('2') else False, + help='For 2.x only, the access token') + + # Parse: Parse the others + parser.add_argument('-c', '--csv', required=True, + help='Input CSV file.') + parser.add_argument('-d', '--delimiter', nargs='?', default=',', const=',', + help='CSV delimiter. Default: \',\'.') + parser.add_argument('-lt', '--lineterminator', nargs='?', default='\n', const='\n', + help='CSV lineterminator. Default: \'\\n\'.') + parser.add_argument('-m', '--measurement', required=True, + help='Measurement name.') + parser.add_argument('-t', '--time_column', nargs='?', default='timestamp', const='timestamp', + help='Timestamp column name. Default: timestamp. ' + 'If no timestamp column, ' + 'the timestamp is set to the last file modify time for whole csv rows') + parser.add_argument('-tf', '--time_format', nargs='?', default='%Y-%m-%d %H:%M:%S', const='%Y-%m-%d %H:%M:%S', + help='Timestamp format. Default: \'%%Y-%%m-%%d %%H:%%M:%%S\' e.g.: 1970-01-01 00:00:00') + parser.add_argument('-tz', '--time_zone', nargs='?', default='UTC', const='UTC', + help='Timezone of supplied data. Default: UTC') + parser.add_argument('-fc', '--field_columns', required=True, + help='List of csv columns to use as fields, separated by comma') + parser.add_argument('-tc', '--tag_columns', nargs='?', default=None, const=None, + help='List of csv columns to use as tags, separated by comma. Default: None') + parser.add_argument('-b', '--batch_size', nargs='?', default=500, const=500, + help='Batch size when inserting data to influx. Default: 500.') + parser.add_argument('-lslc', '--limit_string_length_columns', nargs='?', default=None, const=None, + help='Limit string length columns, separated by comma. Default: None.') + parser.add_argument('-ls', '--limit_length', nargs='?', default=20, const=20, + help='Limit length. Default: 20.') + parser.add_argument('-dd', '--drop_database', nargs='?', default=False, const=False, + help='Drop database before inserting data. Default: False') + parser.add_argument('-dm', '--drop_measurement', nargs='?', default=False, const=False, + help='Drop measurement before inserting data. Default: False') + parser.add_argument('-mc', '--match_columns', nargs='?', default=None, const=None, + help='Match the data you want to get for certain columns, separated by comma. ' + 'Match Rule: All matches, then match. Default: None') + parser.add_argument('-mbs', '--match_by_string', nargs='?', default=None, const=None, + help='Match by string, separated by comma. Default: None') + parser.add_argument('-mbr', '--match_by_regex', nargs='?', default=None, const=None, + help='Match by regex, separated by comma. Default: None') + parser.add_argument('-fic', '--filter_columns', nargs='?', default=None, const=None, + help='Filter the data you want to filter for certain columns, separated by comma. ' + 'Filter Rule: Any one matches, then match. Default: None') + parser.add_argument('-fibs', '--filter_by_string', nargs='?', default=None, const=None, + help='Filter by string, separated by comma. Default: None') + parser.add_argument('-fibr', '--filter_by_regex', nargs='?', default=None, const=None, + help='Filter by regex, separated by comma. Default: None') + parser.add_argument('-ecm', '--enable_count_measurement', nargs='?', default=False, const=False, + help='Enable count measurement. Default: False') + parser.add_argument('-fi', '--force_insert_even_csv_no_update', nargs='?', default=True, const=True, + help='Force insert data to influx, even csv no update. Default: False') + parser.add_argument('-fsc', '--force_string_columns', nargs='?', default=None, const=None, + help='Force columns as string type, separated by comma. Default: None.') + parser.add_argument('-fintc', '--force_int_columns', nargs='?', default=None, const=None, + help='Force columns as int type, separated by comma. Default: None.') + parser.add_argument('-ffc', '--force_float_columns', nargs='?', default=None, const=None, + help='Force columns as float type, separated by comma. Default: None.') + parser.add_argument('-uniq', '--unique', nargs='?', default=False, const=False, + help='Write duplicated points. Default: False.') + parser.add_argument('--csv_charset', '--csv_charset', nargs='?', default=None, const=None, + help='The csv charset. Default: None, which will auto detect') + parser.add_argument('-v', '--version', action="version", version=__version__) + + args = parser.parse_args(namespace=user_namespace) + exporter = ExporterObject() + input_data = { + 'csv_file': args.csv, + 'db_server_name': user_namespace.server, + 'db_user': args.user, + 'db_password': args.password, + 'db_name': 'None' if args.dbname is None else args.dbname, + 'db_measurement': args.measurement, + 'time_column': args.time_column, + 'time_format': args.time_format, + 'time_zone': args.time_zone, + 'field_columns': args.field_columns, + 'tag_columns': args.tag_columns, + 'batch_size': args.batch_size, + 'delimiter': args.delimiter, + 'lineterminator': args.lineterminator, + 'limit_string_length_columns': args.limit_string_length_columns, + 'limit_length': args.limit_length, + 'drop_database': args.drop_database, + 'drop_measurement': args.drop_measurement, + 'match_columns': args.match_columns, + 'match_by_string': args.match_by_string, + 'match_by_regex': args.match_by_regex, + 'filter_columns': args.filter_columns, + 'filter_by_string': args.filter_by_string, + 'filter_by_regex': args.filter_by_regex, + 'enable_count_measurement': args.enable_count_measurement, + 'force_insert_even_csv_no_update': args.force_insert_even_csv_no_update, + 'force_string_columns': args.force_string_columns, + 'force_int_columns': args.force_int_columns, + 'force_float_columns': args.force_float_columns, + 'http_schema': args.http_schema, + 'org_name': args.org, + 'bucket_name': args.bucket, + 'token': 'None' if args.token is None else args.token, + 'unique': args.unique, + 'csv_charset': args.csv_charset + } + exporter.export_csv_to_influx(**input_data) diff --git a/src/ExportCsvToInflux/config_object.py b/src/ExportCsvToInflux/config_object.py new file mode 100644 index 0000000..f87fe2e --- /dev/null +++ b/src/ExportCsvToInflux/config_object.py @@ -0,0 +1,139 @@ +from .base_object import BaseObject +import collections +import sys +import os + + +class Configuration(object): + """Configuration""" + + def __init__(self, **kwargs): + # Init conf + self.csv_file = kwargs.get('csv_file', None) + self.db_name = kwargs.get('db_name', None) + self.db_measurement = kwargs.get('db_measurement', None) + self.field_columns = kwargs.get('field_columns', None) + self.tag_columns = kwargs.get('tag_columns', None) + self.db_server_name = kwargs.get('db_server_name', 'localhost:8086') + self.db_user = kwargs.get('db_user', 'admin') + self.db_password = kwargs.get('db_password', 'admin') + self.time_column = kwargs.get('time_column', 'timestamp') + self.time_format = kwargs.get('time_format', '%Y-%m-%d %H:%M:%S') + self.delimiter = kwargs.get('delimiter', ',') + self.lineterminator = kwargs.get('lineterminator', '\n') + self.time_zone = kwargs.get('time_zone', 'UTC') + self.batch_size = kwargs.get('batch_size', 500) + self.limit_string_length_columns = kwargs.get('limit_string_length_columns', None) + self.limit_length = kwargs.get('limit_length', 20) + self.drop_database = kwargs.get('drop_database', False) + self.drop_measurement = kwargs.get('drop_measurement', False) + self.match_columns = kwargs.get('match_columns', None) + self.match_by_string = kwargs.get('match_by_string', None) + self.match_by_regex = kwargs.get('match_by_regex', None) + self.filter_columns = kwargs.get('filter_columns', None) + self.filter_by_string = kwargs.get('filter_by_string', None) + self.filter_by_regex = kwargs.get('filter_by_regex', None) + self.enable_count_measurement = kwargs.get('enable_count_measurement', False) + self.force_insert_even_csv_no_update = kwargs.get('force_insert_even_csv_no_update', True) + self.force_string_columns = kwargs.get('force_string_columns', None) + self.force_int_columns = kwargs.get('force_int_columns', None) + self.force_float_columns = kwargs.get('force_float_columns', None) + self.http_schema = kwargs.get('http_schema', 'http') + self.org_name = kwargs.get('org_name', 'my-org') + self.bucket_name = kwargs.get('bucket_name', 'my-bucket') + self.token = kwargs.get('token', None) + self.unique = kwargs.get('unique', False) + self.csv_charset = kwargs.get('csv_charset', None) + + # Validate conf + base_object = BaseObject() + base_object.validate_str(self.csv_file, target_name='csv_file') + base_object.validate_str(self.db_name, target_name='db_name') + base_object.validate_str(self.db_measurement, target_name='db_measurement') + base_object.validate_str(self.time_format, target_name='time_format') + base_object.validate_str(self.delimiter, target_name='delimiter') + base_object.validate_str(self.lineterminator, target_name='lineterminater') + base_object.validate_str(self.time_zone, target_name='time_zone') + self.tag_columns = base_object.str_to_list(self.tag_columns) + self.field_columns = base_object.str_to_list(self.field_columns) + self.limit_string_length_columns = base_object.str_to_list(self.limit_string_length_columns) + self.match_columns = base_object.str_to_list(self.match_columns) + self.match_by_string = base_object.str_to_list(self.match_by_string) + self.match_by_regex = base_object.str_to_list(self.match_by_regex, lower=False) + self.filter_columns = base_object.str_to_list(self.filter_columns) + self.filter_by_string = base_object.str_to_list(self.filter_by_string) + self.filter_by_regex = base_object.str_to_list(self.filter_by_regex) + self.drop_database = self.__validate_bool_string(self.drop_database) + self.drop_measurement = self.__validate_bool_string(self.drop_measurement) + self.enable_count_measurement = self.__validate_bool_string(self.enable_count_measurement) + self.force_insert_even_csv_no_update = self.__validate_bool_string(self.force_insert_even_csv_no_update) + self.force_string_columns = base_object.str_to_list(self.force_string_columns) + self.force_int_columns = base_object.str_to_list(self.force_int_columns) + self.force_float_columns = base_object.str_to_list(self.force_float_columns) + base_object.validate_str(self.http_schema, target_name='http_schema') + base_object.validate_str(self.org_name, target_name='org_name') + base_object.validate_str(self.bucket_name, target_name='bucket_name') + base_object.validate_str(self.token, target_name='token') + self.unique = self.__validate_bool_string(self.unique) + base_object.validate_str(self.csv_charset, target_name='csv_charset') + + # Fields should not duplicate in force_string_columns, force_int_columns, force_float_columns + all_force_columns = self.force_string_columns + self.force_int_columns + self.force_float_columns + duplicates = [item for item, count in collections.Counter(all_force_columns).items() if count > 1] + if duplicates: + error_message = 'Error: Find duplicate items: {0} in: \n' \ + ' force_string_columns: {1} \n' \ + ' force_int_columns: {2} \n' \ + ' force_float_columns: {3}'.format(duplicates, + self.force_string_columns, + self.force_int_columns, + self.force_float_columns) + sys.exit(error_message) + + # Fields should not duplicate in match_columns, filter_columns + all_force_columns = self.match_columns + self.filter_columns + duplicates = [item for item, count in collections.Counter(all_force_columns).items() if count > 1] + if duplicates: + error_message = 'Error: Find duplicate items: {0} in: \n' \ + ' match_columns: {1} \n' \ + ' filter_columns: {2} '.format(duplicates, + self.match_columns, + self.filter_columns) + sys.exit(error_message) + + # Validate: batch_size + try: + self.batch_size = int(self.batch_size) + except ValueError: + error_message = 'Error: The batch_size should be int, current is: {0}'.format(self.batch_size) + sys.exit(error_message) + + # Validate: limit_length + try: + self.limit_length = int(self.limit_length) + except ValueError: + error_message = 'Error: The limit_length should be int, current is: {0}'.format(self.limit_length) + sys.exit(error_message) + + # Validate csv + current_dir = os.path.curdir + csv_file = os.path.join(current_dir, self.csv_file) + csv_file_exists = os.path.exists(csv_file) + if csv_file_exists is False: + error_message = 'Error: CSV file not found, exiting...' + sys.exit(error_message) + + @staticmethod + def __validate_bool_string(target, alias=''): + """Private Function: Validate bool string + + :param target: the target + """ + + target = str(target).lower() + expected = ['true', 'false'] + if target not in expected: + error_message = 'Error: The input {0} should be True or False, current is {1}'.format(alias, target) + sys.exit(error_message) + + return True if target == 'true' else False diff --git a/src/ExportCsvToInflux/csv_object.py b/src/ExportCsvToInflux/csv_object.py index f4551b3..ff52746 100755 --- a/src/ExportCsvToInflux/csv_object.py +++ b/src/ExportCsvToInflux/csv_object.py @@ -1,8 +1,10 @@ +from chardet.universaldetector import UniversalDetector from collections import defaultdict from .base_object import BaseObject from itertools import tee from glob import glob import hashlib +import codecs import types import time import json @@ -11,12 +13,99 @@ import os +class UTF8Recoder: + """ + Python2.7: Iterator that reads an encoded stream and reencodes the input to UTF-8 + """ + + def __init__(self, f, encoding): + self.reader = codecs.getreader(encoding)(f) + + def __iter__(self): + return self + + def next(self): + return self.reader.next().encode("utf-8") + + +class UnicodeDictReader: + """ + Python2.7: A CSV reader which will iterate over lines in the CSV file "f", + which is encoded in the given encoding. + """ + + def __init__(self, f, encoding="utf-8", **kwargs): + f = UTF8Recoder(f, encoding) + self.reader = csv.reader(f, **kwargs) + self.header = self.reader.next() + + def next(self): + row = self.reader.next() + vals = [unicode(s, "utf-8") for s in row] + return dict((self.header[x], vals[x]) for x in range(len(self.header))) + + def __iter__(self): + return self + + class CSVObject(object): """CSV Object""" - def __init__(self, delimiter=',', lineterminator='\n'): + python_version = sys.version_info.major + + def __init__(self, delimiter=',', lineterminator='\n', csv_charset=None): self.delimiter = delimiter self.lineterminator = lineterminator + self.csv_charset = csv_charset + + @classmethod + def detect_csv_charset(cls, file_name, **csv_object): + """Function: detect_csv_charset + + :param file_name: the file name + :return return csv charset + """ + + detector = UniversalDetector() + detector.reset() + + with open(file_name, 'rb') as f: + for row in f: + detector.feed(row) + if detector.done: + break + detector.close() + encoding = detector.result.get('encoding') + csv_object['csv_charset'] = encoding + + return cls(**csv_object) + + def compatible_dict_reader(self, f, encoding, **kwargs): + """Function: compatible_dict_reader + + :param f: the file object from compatible_open + :param encoding: the encoding charset + :return return csvReader + """ + + if self.python_version == 2: + return UnicodeDictReader(f, encoding=encoding, **kwargs) + else: + return csv.DictReader(f, **kwargs) + + def compatible_open(self, file_name, encoding, mode='r'): + """Function: compatible_open + + :param file_name: the file name + :param mode: the open mode + :param encoding: the encoding charset + :return return file object + """ + + if self.python_version == 2: + return open(file_name, mode=mode) + else: + return open(file_name, mode=mode, encoding=encoding) def get_csv_header(self, file_name): """Function: get_csv_header. @@ -28,15 +117,21 @@ def get_csv_header(self, file_name): self.valid_file_exist(file_name) - with open(file_name) as f: + with self.compatible_open(file_name, encoding=self.csv_charset) as f: sniffer = csv.Sniffer() try: - has_header = sniffer.has_header(f.read(40960)) + has_header = sniffer.has_header(f.read(40960)) # python3.x + except (UnicodeEncodeError, UnicodeDecodeError): + f.seek(0) + has_header = sniffer.has_header(f.read(40960).encode(self.csv_charset)) # python2.x except csv.Error: has_header = False + f.seek(0) - csv_reader = csv.DictReader(f, delimiter=self.delimiter, lineterminator=self.lineterminator) - headers = csv_reader.fieldnames + csv_reader = self.compatible_dict_reader(f, encoding=self.csv_charset, delimiter=self.delimiter, + lineterminator=self.lineterminator) + for row in csv_reader: + headers = list(row.keys()) is_header = not any(field.isdigit() for field in headers) headers = headers if has_header or is_header else [] @@ -134,16 +229,16 @@ def get_csv_lines_count(self, file_name): """ has_header = self.get_csv_header(file_name) - - with open(file_name) as f: - csv_reader = csv.DictReader(f, delimiter=self.delimiter, lineterminator=self.lineterminator) + with self.compatible_open(file_name, encoding=self.csv_charset) as f: count = 0 if has_header else 1 + csv_reader = self.compatible_dict_reader(f, encoding=self.csv_charset, delimiter=self.delimiter, + lineterminator=self.lineterminator) for row in csv_reader: count += 1 return count - def convert_csv_data_to_int_float(self, file_name=None, csv_reader=None): + def convert_csv_data_to_int_float(self, file_name=None, csv_reader=None, ignore_filed=None): """Function: convert_csv_data_to_int_float :param file_name: the file name (default None) @@ -157,6 +252,7 @@ def convert_csv_data_to_int_float(self, file_name=None, csv_reader=None): {'csv_header_1': 'value', 'csv_header_2': 'value', 'csv_header_3': 'value', ...}, ... ] + :param ignore_filed: ignore the certain column, case sensitive """ # init @@ -181,9 +277,10 @@ def convert_csv_data_to_int_float(self, file_name=None, csv_reader=None): f = None if file_name: has_header = self.get_csv_header(file_name) - f = open(file_name) - csv_reader = csv.DictReader(f, delimiter=self.delimiter, lineterminator=self.lineterminator) - csv_reader, csv_reader_bk = tee(csv_reader) + with self.compatible_open(file_name, encoding=self.csv_charset) as f: + csv_reader = self.compatible_dict_reader(f, encoding=self.csv_charset, delimiter=self.delimiter, + lineterminator=self.lineterminator) + csv_reader, csv_reader_bk = tee(csv_reader) # Process for row in csv_reader: @@ -196,6 +293,11 @@ def convert_csv_data_to_int_float(self, file_name=None, csv_reader=None): int_type[key].append(False) float_type[key].append(False) continue + # Continue if ignore_filed is provided + if ignore_filed is not None and ignore_filed == key: + int_type[key].append(False) + float_type[key].append(False) + continue # Valid Int Type try: if float(value).is_integer(): @@ -314,11 +416,12 @@ def add_columns_to_csv(self, target_writer = None target_file = None if save_csv_file: - target_file = open(target, 'w+') + target_file = self.compatible_open(target, mode='w+', encoding=self.csv_charset) target_writer = csv.writer(target_file, delimiter=self.delimiter, lineterminator=self.lineterminator) - with open(file_name) as f: - source_reader = csv.DictReader(f, delimiter=self.delimiter, lineterminator=self.lineterminator) + with self.compatible_open(file_name, encoding=self.csv_charset) as f: + source_reader = self.compatible_dict_reader(f, encoding=self.csv_charset, delimiter=self.delimiter, + lineterminator=self.lineterminator) new_headers = [list(x.keys())[0] for x in data] row_id = 0 for row in source_reader: @@ -342,7 +445,11 @@ def add_columns_to_csv(self, values += new_values row_id += 1 if save_csv_file: - target_writer.writerow(values) + try: + target_writer.writerow(values) + except (UnicodeEncodeError, UnicodeDecodeError): + values = [v.encode('utf-8') for v in values] + target_writer.writerow(values) yield dict(zip(headers, values)) diff --git a/src/ExportCsvToInflux/exporter_object.py b/src/ExportCsvToInflux/exporter_object.py index d494dae..5287453 100755 --- a/src/ExportCsvToInflux/exporter_object.py +++ b/src/ExportCsvToInflux/exporter_object.py @@ -1,15 +1,13 @@ from pytz.exceptions import UnknownTimeZoneError -from influxdb.exceptions import InfluxDBClientError +from .config_object import Configuration from .influx_object import InfluxObject +from decimal import InvalidOperation from collections import defaultdict -from .base_object import BaseObject -from .__version__ import __version__ from .csv_object import CSVObject +from decimal import Decimal from pytz import timezone -import collections -import argparse import datetime -import csv +import uuid import sys import os import re @@ -21,17 +19,19 @@ class ExporterObject(object): def __init__(self): self.match_count = defaultdict(int) self.filter_count = defaultdict(int) + self._write_response = None + + def __error_cb(self, details, data, exception): + """Private Function: error callback for write api""" + self._write_response = False @staticmethod def __process_tags_fields(columns, row, - limit_length, int_type, float_type, - limit_string_length_columns, - force_string_columns, - force_int_columns, - force_float_columns): + conf, + encoding): """Private function: __process_tags_fields""" results = dict() @@ -39,23 +39,27 @@ def __process_tags_fields(columns, v = 0 if column in row: v = row[column] - if limit_string_length_columns and column in limit_string_length_columns: - v = str(v)[:limit_length + 1] - if force_string_columns and column in force_string_columns: + if conf.limit_string_length_columns and column in conf.limit_string_length_columns: + v = str(v)[:conf.limit_length + 1] + if conf.force_string_columns and column in conf.force_string_columns: v = str(v) - if force_int_columns and column in force_int_columns: + if conf.force_int_columns and column in conf.force_int_columns: try: v = int(v) except ValueError: print('Warning: Failed to force "{0}" to int, skip...'.format(v)) - if force_float_columns and column in force_float_columns: + if conf.force_float_columns and column in conf.force_float_columns: try: v = float(v) except ValueError: print('Warning: Failed to force "{0}" to float, skip...'.format(v)) # If field is empty - if len(str(v)) == 0: + try: + len_v = len(str(v)) + except (UnicodeDecodeError, UnicodeEncodeError): + len_v = 1 # Python2.7: If error happens, that means it has at least one character + if len_v == 0: # Process the empty if int_type[column] is True: v = -999 @@ -65,14 +69,18 @@ def __process_tags_fields(columns, v = '-' # Process the force - if force_string_columns and column in force_string_columns: + if conf.force_string_columns and column in conf.force_string_columns: v = '-' - if force_int_columns and column in force_int_columns: + if conf.force_int_columns and column in conf.force_int_columns: v = -999 - if force_float_columns and column in force_float_columns: + if conf.force_float_columns and column in conf.force_float_columns: v = -999.0 results[column] = v + + if conf.unique: + results['uniq'] = 'uniq-{0}'.format(str(uuid.uuid4())[:8]) + return results def __check_match_and_filter(self, @@ -101,8 +109,12 @@ def __check_match_and_filter(self, self.filter_count[k] = csv_file_length # Check string, regex + try: + v = str(v) + except (UnicodeDecodeError, UnicodeEncodeError): + v = v.encode('utf-8') check_by_string_status = v in check_by_string - check_by_regex_status = any(re.search(the_match, str(v), re.IGNORECASE) for the_match in check_by_regex) + check_by_regex_status = any(re.search(the_match, v, re.IGNORECASE) for the_match in check_by_regex) if key_status and (check_by_string_status or check_by_regex_status): check_status[k] = True if check_type == 'match': @@ -147,215 +159,240 @@ def __validate_columns(csv_headers, check_columns): return check_columns @staticmethod - def __validate_bool_string(target, alias=''): - """Private Function: Validate bool string + def __unix_time_millis(dt): + """Private Function: unix_time_millis""" - :param target: the target - """ + epoch_naive = datetime.datetime.utcfromtimestamp(0) + epoch = timezone('UTC').localize(epoch_naive) + return int((dt - epoch).total_seconds() * 1000) + + def __influx2_write(self, client, data_points, conf): + """Private function: __influx2_write""" + + from influxdb_client import WriteOptions - target = str(target).lower() - expected = ['true', 'false'] - if target not in expected: - error_message = 'Error: The input {0} should be True or False, current is {1}'.format(alias, target) + with client.write_api(write_options=WriteOptions(batch_size=conf.batch_size), + error_callback=self.__error_cb) as write_client: + write_client.write(conf.bucket_name, conf.org_name, data_points) + + def __process_timestamp(self, row, conf): + """Private function: __process_timestamp""" + + try: + # raise if not posix-time-like + timestamp_decimal = Decimal(row[conf.time_column]) + timestamp_str = str(timestamp_decimal) + timestamp_remove_decimal = int( + str(timestamp_str).replace('.', '') + ) + # add zeros to convert to nanoseconds: influxdb time is 19 digital length + timestamp_influx = '{:<019d}'.format(timestamp_remove_decimal) + timestamp_influx = timestamp_influx[:19] # deal with length > 19 timestamp + timestamp = int(timestamp_influx) + except (ValueError, InvalidOperation): + try: + datetime_naive = datetime.datetime.strptime(row[conf.time_column], conf.time_format) + datetime_local = timezone(conf.time_zone).localize(datetime_naive) + timestamp = self.__unix_time_millis(datetime_local) * 1000000 + except (TypeError, ValueError): + error_message = 'Error: Unexpected time with format: {0}, {1}'.format(row[conf.time_column], + conf.time_format) + sys.exit(error_message) + + return timestamp + + def __write_points(self, count, csv_file_item, data_points_len, influx_version, client, data_points, conf, + influx_object): + """Private function: __write_points""" + + print('Info: Read {0} lines from {1}'.format(count, csv_file_item)) + print('Info: Inserting {0} data_points...'.format(data_points_len)) + try: + if influx_version.startswith('0') or influx_version.startswith('1'): + self._write_response = client.write_points(data_points) + else: + self.__influx2_write(client, data_points, conf) + except influx_object.influxdb_client_error as e: + error_message = 'Error: System exited. Encounter data type conflict issue in influx. \n' \ + ' Please double check the csv data. \n' \ + ' If would like to force data type to target data type, use: \n' \ + ' --force_string_columns \n' \ + ' --force_int_columns \n' \ + ' --force_float_columns \n' \ + ' Error Details: {0}'.format(e) + sys.exit(error_message) + + if self._write_response is False: + error_message = 'Info: Problem inserting points, exiting...' sys.exit(error_message) - return True if target == 'true' else False + print('Info: Wrote {0} points'.format(data_points_len)) + + def __write_count_measurement(self, conf, csv_file_length, influx_version, client, timestamp): + """Private function: __write_count_measurement""" + + if conf.enable_count_measurement: + fields = dict() + fields['total'] = csv_file_length + for k, v in self.match_count.items(): + k = 'match_{0}'.format(k) + fields[k] = v + for k, v in self.filter_count.items(): + k = 'filter_{0}'.format(k) + fields[k] = v + count_point = [{'measurement': conf.count_measurement, 'time': timestamp, 'fields': fields, 'tags': {}}] + if influx_version.startswith('0') or influx_version.startswith('1'): + self._write_response = client.write_points(count_point) + else: + self.__influx2_write(client, count_point, conf) + + if self._write_response is False: + error_message = 'Error: Problem inserting points, exiting...' + sys.exit(error_message) + + self.match_count = defaultdict(int) + self.filter_count = defaultdict(int) + print('Info: Wrote count measurement {0} points'.format(count_point)) @staticmethod - def __unix_time_millis(dt): - """Private Function: unix_time_millis""" + def __no_new_data_check(csv_file_item, csv_object, conf, csv_file_md5): + """Private function: __no_new_data_check""" + + new_csv_file = '{0}_influx.csv'.format(csv_file_item.replace('.csv', '')) + new_csv_file_exists = os.path.exists(new_csv_file) + no_new_data_status = False + if new_csv_file_exists: + with csv_object.compatible_open(new_csv_file, encoding=csv_object.csv_charset) as f: + csv_reader = csv_object.compatible_dict_reader(f, + encoding=csv_object.csv_charset, + delimiter=conf.delimiter, + lineterminator=conf.lineterminator) + for row in csv_reader: + try: + new_csv_file_md5 = row['md5'] + except KeyError: + break + if new_csv_file_md5 == csv_file_md5 and conf.force_insert_even_csv_no_update is False: + warning_message = 'Warning: No new data found, ' \ + 'writer stop/jump for {0}...'.format(csv_file_item) + print(warning_message) + no_new_data_status = True + # sys.exit(warning_message) + break - epoch_naive = datetime.datetime.utcfromtimestamp(0) - epoch = timezone('UTC').localize(epoch_naive) - return int((dt - epoch).total_seconds() * 1000) + return no_new_data_status, new_csv_file - def export_csv_to_influx(self, - csv_file, - db_name, - db_measurement, - field_columns, - tag_columns=None, - db_server_name='localhost:8086', - db_user='admin', - db_password='admin', - time_column='timestamp', - time_format='%Y-%m-%d %H:%M:%S', - delimiter=',', - lineterminator='\n', - time_zone='UTC', - batch_size=500, - limit_string_length_columns=None, - limit_length=20, - drop_database=False, - drop_measurement=False, - match_columns=None, - match_by_string=None, - match_by_regex=None, - filter_columns=None, - filter_by_string=None, - filter_by_regex=None, - enable_count_measurement=False, - force_insert_even_csv_no_update=False, - force_string_columns=None, - force_int_columns=None, - force_float_columns=None): + def export_csv_to_influx(self, **kwargs): """Function: export_csv_to_influx - :param csv_file: the csv file path/folder - :param db_server_name: the influx server (default localhost:8086) - :param db_user: the influx db user (default admin) - :param db_password: the influx db password (default admin) - :param db_name: the influx db name - :param db_measurement: the measurement in a db - :param time_column: the time columns (default timestamp) - :param tag_columns: the tag columns (default None) - :param time_format: the time format (default %Y-%m-%d %H:%M:%S) - :param field_columns: the filed columns - :param delimiter: the csv delimiter (default comma) - :param lineterminator: the csv line terminator (default comma) - :param batch_size: how many rows insert every time (default 500) - :param time_zone: the data time zone (default UTC) - :param limit_string_length_columns: limit the string length columns (default None) - :param limit_length: limited length (default 20) - :param drop_database: drop database (default False) - :param drop_measurement: drop measurement (default False) - :param match_columns: the columns need to be matched (default None) - :param match_by_string: match columns by string (default None) - :param match_by_regex: match columns by regex (default None) - :param filter_columns: the columns need to be filter (default None) - :param filter_by_string: filter columns by string (default None) - :param filter_by_regex: filter columns by regex (default None) - :param enable_count_measurement: create the measurement with only count info (default False) - :param force_insert_even_csv_no_update: force insert data to influx even csv data no update (default False) - :param force_string_columns: force the columns as string (default None) - :param force_int_columns: force the columns as int (default None) - :param force_float_columns: force the columns as float (default None) + :key str csv_file: the csv file path/folder + :key str db_server_name: the influx server (default localhost:8086) + :key str db_user: for 0.x, 1.x only, the influx db user (default admin) + :key str db_password: for 0.x, 1.x only, the influx db password (default admin) + :key str db_name: for 0.x, 1.x only, the influx db name + :key str db_measurement: the measurement in a db + :key sry time_column: the time column (default timestamp) + :key str tag_columns: the tag columns, separated by comma (default None) + :key str time_format: the time format (default %Y-%m-%d %H:%M:%S) + :key str field_columns: the filed columns, separated by comma + :key str delimiter: the csv delimiter (default comma) + :key str lineterminator: the csv line terminator (default comma) + :key int batch_size: how many rows insert every time (default 500) + :key str time_zone: the data time zone (default UTC) + :key str limit_string_length_columns: limited the string length columns, separated by comma (default None) + :key int limit_length: limited length (default 20) + :key bool drop_database: drop database (default False) + :key bool drop_measurement: drop measurement (default False) + :key str match_columns: the columns need to be matched, separated by comma (default None) + :key str match_by_string: match columns by string, separated by comma (default None) + :key str match_by_regex: match columns by regex, separated by comma (default None) + :key str filter_columns: the columns need to be filter, separated by comma (default None) + :key str filter_by_string: filter columns by string, separated by comma (default None) + :key str filter_by_regex: filter columns by regex, separated by comma (default None) + :key bool enable_count_measurement: create the measurement with only count info (default False) + :key bool force_insert_even_csv_no_update: force insert data to influx even csv data no update (default True) + :key str force_string_columns: force the columns as string, separated by comma (default None) + :key str force_int_columns: force the columns as int, separated by comma (default None) + :key str force_float_columns: force the columns as float, separated by comma (default None) + :key str http_schema: for 2.x only, influxdb http schema, could be http or https (default http) + :key str org_name: for 2.x only, my org (default my-org) + :key str bucket_name: for 2.x only, my bucket (default my-bucket) + :key str token: for 2.x only, token (default None) + :key bool unique: insert the duplicated data (default False) + :key str csv_charset: the csv charset (default None, which will auto detect) """ + # Init the conf + conf = Configuration(**kwargs) + # Init: object - csv_object = CSVObject(delimiter=delimiter, lineterminator=lineterminator) - influx_object = InfluxObject(db_server_name=db_server_name, db_user=db_user, db_password=db_password) - base_object = BaseObject() - - # Init: Arguments - base_object.validate_str(csv_file) - base_object.validate_str(db_name) - base_object.validate_str(db_measurement) - base_object.validate_str(db_server_name) - base_object.validate_str(db_user) - base_object.validate_str(db_password) - base_object.validate_str(time_format) - base_object.validate_str(delimiter) - base_object.validate_str(lineterminator) - base_object.validate_str(time_zone) - tag_columns = base_object.str_to_list(tag_columns) - field_columns = base_object.str_to_list(field_columns) - limit_string_length_columns = [] if str(limit_string_length_columns).lower() == 'none' \ - else limit_string_length_columns - limit_string_length_columns = base_object.str_to_list(limit_string_length_columns) - match_columns = [] if str(match_columns).lower() == 'none' else match_columns - match_columns = base_object.str_to_list(match_columns) - match_by_string = [] if str(match_by_string).lower() == 'none' else match_by_string - match_by_string = base_object.str_to_list(match_by_string) - match_by_regex = [] if str(match_by_regex).lower() == 'none' else match_by_regex - match_by_regex = base_object.str_to_list(match_by_regex, lower=False) - filter_columns = [] if str(filter_columns).lower() == 'none' else filter_columns - filter_columns = base_object.str_to_list(filter_columns) - filter_by_string = [] if str(filter_by_string).lower() == 'none' else filter_by_string - filter_by_string = base_object.str_to_list(filter_by_string) - filter_by_regex = [] if str(filter_by_regex).lower() == 'none' else filter_by_regex - filter_by_regex = base_object.str_to_list(filter_by_regex) - drop_database = self.__validate_bool_string(drop_database) - drop_measurement = self.__validate_bool_string(drop_measurement) - enable_count_measurement = self.__validate_bool_string(enable_count_measurement) - force_insert_even_csv_no_update = self.__validate_bool_string(force_insert_even_csv_no_update) - force_string_columns = [] if str(force_string_columns).lower() == 'none' else force_string_columns - force_string_columns = base_object.str_to_list(force_string_columns) - force_int_columns = [] if str(force_int_columns).lower() == 'none' else force_int_columns - force_int_columns = base_object.str_to_list(force_int_columns) - force_float_columns = [] if str(force_float_columns).lower() == 'none' else force_float_columns - force_float_columns = base_object.str_to_list(force_float_columns) - - # Fields should not duplicate in force_string_columns, force_int_columns, force_float_columns - all_force_columns = force_string_columns + force_int_columns + force_float_columns - duplicates = [item for item, count in collections.Counter(all_force_columns).items() if count > 1] - if duplicates: - error_message = 'Error: Find duplicate items: {0} in: \n' \ - ' force_string_columns: {1} \n' \ - ' force_int_columns: {2} \n' \ - ' force_float_columns: {3}'.format(duplicates, - force_string_columns, - force_int_columns, - force_float_columns) - sys.exit(error_message) + csv_object = CSVObject(delimiter=conf.delimiter, + lineterminator=conf.lineterminator, + csv_charset=conf.csv_charset) + influx_object = InfluxObject(db_server_name=conf.db_server_name, + db_user=conf.db_user, + db_password=conf.db_password, + http_schema=conf.http_schema, + token=conf.token) + client = influx_object.connect_influx_db(db_name=conf.db_name, org_name=conf.org_name) + influx_version = influx_object.influxdb_version # Init: database behavior - drop_database = base_object.convert_boole(drop_database) - drop_measurement = base_object.convert_boole(drop_measurement) - enable_count_measurement = base_object.convert_boole(enable_count_measurement) - force_insert_even_csv_no_update = base_object.convert_boole(force_insert_even_csv_no_update) - count_measurement = '{0}.count'.format(db_measurement) - if drop_measurement: - influx_object.drop_measurement(db_name, db_measurement) - influx_object.drop_measurement(db_name, count_measurement) - if drop_database: - influx_object.drop_database(db_name) - client = influx_object.create_influx_db_if_not_exists(db_name) - client.switch_user(db_user, db_password) - - # Init: batch_size - try: - batch_size = int(batch_size) - except ValueError: - error_message = 'Error: The batch_size should be int, current is: {0}'.format(batch_size) - sys.exit(error_message) - - # Init: limit_length - try: - limit_length = int(limit_length) - except ValueError: - error_message = 'Error: The limit_length should be int, current is: {0}'.format(limit_length) - sys.exit(error_message) + conf.count_measurement = '{0}.count'.format(conf.db_measurement) + if conf.drop_measurement: + influx_object.drop_measurement(conf.db_name, conf.db_measurement, conf.bucket_name, conf.org_name, client) + influx_object.drop_measurement(conf.db_name, conf.count_measurement, conf.bucket_name, conf.org_name, + client) + if conf.drop_database: + if influx_version.startswith('0') or influx_version.startswith('1'): + influx_object.drop_database(conf.db_name, client) + influx_object.create_influx_db_if_not_exists(conf.db_name, client) + else: + influx_object.drop_bucket(org_name=conf.org_name, bucket_name=conf.bucket_name) + influx_object.create_influx_bucket_if_not_exists(org_name=conf.org_name, bucket_name=conf.bucket_name) + if influx_version.startswith('0') or influx_version.startswith('1'): + client.switch_user(conf.db_user, conf.db_password) # Process csv_file - current_dir = os.path.curdir - csv_file = os.path.join(current_dir, csv_file) - csv_file_exists = os.path.exists(csv_file) - if csv_file_exists is False: - error_message = 'Error: CSV file not found, exiting...' - sys.exit(error_message) - csv_file_generator = csv_object.search_files_in_dir(csv_file) + csv_file_generator = csv_object.search_files_in_dir(conf.csv_file) for csv_file_item in csv_file_generator: + if conf.csv_charset is None: + csv_object = csv_object.detect_csv_charset(file_name=csv_file_item, **csv_object.__dict__) csv_file_length = csv_object.get_csv_lines_count(csv_file_item) csv_file_md5 = csv_object.get_file_md5(csv_file_item) csv_headers = csv_object.get_csv_header(csv_file_item) # Validate csv_headers if not csv_headers: - print('Error: The csv file {0} has no header detected. Exporter stopping...'.format(csv_file_item)) + print('Error: The csv file has no header detected. Writer stopping for {0}...'.format(csv_file_item)) continue # Validate field_columns, tag_columns, match_columns, filter_columns - field_columns = self.__validate_columns(csv_headers, field_columns) - tag_columns = self.__validate_columns(csv_headers, tag_columns) + field_columns = self.__validate_columns(csv_headers, conf.field_columns) + tag_columns = self.__validate_columns(csv_headers, conf.tag_columns) if not field_columns: print('Error: The input --field_columns does not expected. ' - 'Please check the fields are in csv headers or not. Exporter stopping...') + 'Please check the fields are in csv headers or not. ' + 'Writer stopping for {0}...'.format(csv_file_item)) continue if not tag_columns: print('Warning: The input --tag_columns does not expected or leaves None. ' - 'Please check the fields are in csv headers or not. Continue checking...') + 'Please check the fields are in csv headers or not. ' + 'No tag will be added into influx for {0}...'.format(csv_file_item)) # continue - match_columns = self.__validate_columns(csv_headers, match_columns) - filter_columns = self.__validate_columns(csv_headers, filter_columns) + match_columns = self.__validate_columns(csv_headers, conf.match_columns) + filter_columns = self.__validate_columns(csv_headers, conf.filter_columns) # Validate time_column - with open(csv_file_item) as f: - csv_reader = csv.DictReader(f, delimiter=delimiter, lineterminator=lineterminator) + with csv_object.compatible_open(csv_file_item, encoding=csv_object.csv_charset) as f: + csv_reader = csv_object.compatible_dict_reader(f, + encoding=csv_object.csv_charset, + delimiter=conf.delimiter, + lineterminator=conf.lineterminator) time_column_exists = True for row in csv_reader: try: - time_column_exists = row[time_column] + time_column_exists = row[conf.time_column] except KeyError: print('Warning: The time column does not exists. ' 'We will use the csv last modified time as time column') @@ -363,24 +400,7 @@ def export_csv_to_influx(self, break # Check the timestamp, and generate the csv with checksum - new_csv_file = '{0}_influx.csv'.format(csv_file_item.replace('.csv', '')) - new_csv_file_exists = os.path.exists(new_csv_file) - no_new_data_status = False - if new_csv_file_exists: - with open(new_csv_file) as f: - csv_reader = csv.DictReader(f, delimiter=delimiter, lineterminator=lineterminator) - for row in csv_reader: - try: - new_csv_file_md5 = row['md5'] - except KeyError: - break - if new_csv_file_md5 == csv_file_md5 and force_insert_even_csv_no_update is False: - warning_message = 'Warning: No new data found, ' \ - 'exporter stop/jump for {0}...'.format(csv_file_item) - print(warning_message) - no_new_data_status = True - # sys.exit(warning_message) - break + no_new_data_status, new_csv_file = self.__no_new_data_check(csv_file_item, csv_object, conf, csv_file_md5) if no_new_data_status: continue data = [{'md5': [csv_file_md5] * csv_file_length}] @@ -388,28 +408,29 @@ def export_csv_to_influx(self, modified_time = csv_object.get_file_modify_time(csv_file_item) field_columns.append('timestamp') tag_columns.append('timestamp') - data.append({time_column: [modified_time] * csv_file_length}) + data.append({conf.time_column: [modified_time] * csv_file_length}) csv_reader_data = csv_object.add_columns_to_csv(file_name=csv_file_item, target=new_csv_file, data=data, - save_csv_file=not force_insert_even_csv_no_update) + save_csv_file=not conf.force_insert_even_csv_no_update) # Process influx csv data_points = list() count = 0 timestamp = 0 - convert_csv_data_to_int_float = csv_object.convert_csv_data_to_int_float(csv_reader=csv_reader_data) + convert_csv_data_to_int_float = csv_object.convert_csv_data_to_int_float(csv_reader=csv_reader_data, + ignore_filed=conf.time_column) for row, int_type, float_type in convert_csv_data_to_int_float: # Process Match & Filter: If match_columns exists and filter_columns not exists match_status = self.__check_match_and_filter(row, match_columns, - match_by_string, - match_by_regex, + conf.match_by_string, + conf.match_by_regex, check_type='match') filter_status = self.__check_match_and_filter(row, filter_columns, - filter_by_string, - filter_by_regex, + conf.filter_by_string, + conf.filter_by_regex, check_type='filter', csv_file_length=csv_file_length) if match_columns and not filter_columns: @@ -427,217 +448,43 @@ def export_csv_to_influx(self, continue # Process Time - try: - # raise if not posix-time-like - timestamp_str = str(float(row[time_column])) - timestamp_magnitude = len(timestamp_str.split('.')[0]) - timestamp_remove_decimal = int( - str(timestamp_str).replace('.', '') - ) - # add zeros to convert to nanoseconds - timestamp_influx = ( - '{:<0'+str(9+timestamp_magnitude)+'d}' - ).format(timestamp_remove_decimal) - timestamp = int(timestamp_influx) - except ValueError: - try: - datetime_naive = datetime.datetime.strptime(row[time_column], time_format) - datetime_local = timezone(time_zone).localize(datetime_naive) - timestamp = self.__unix_time_millis(datetime_local) * 1000000 - except (TypeError, ValueError): - error_message = 'Error: Unexpected time with format: {0}, {1}'.format(row[time_column], - time_format) - sys.exit(error_message) + timestamp = self.__process_timestamp(row, conf) # Process tags tags = self.__process_tags_fields(columns=tag_columns, row=row, - limit_length=limit_length, int_type=int_type, float_type=float_type, - limit_string_length_columns=limit_string_length_columns, - force_string_columns=force_string_columns, - force_int_columns=force_int_columns, - force_float_columns=force_float_columns) + conf=conf, + encoding=csv_object.csv_charset) # Process fields fields = self.__process_tags_fields(columns=field_columns, row=row, - limit_length=limit_length, int_type=int_type, float_type=float_type, - limit_string_length_columns=limit_string_length_columns, - force_string_columns=force_string_columns, - force_int_columns=force_int_columns, - force_float_columns=force_float_columns) + conf=conf, + encoding=csv_object.csv_charset) - point = {'measurement': db_measurement, 'time': timestamp, 'fields': fields, 'tags': tags} + point = {'measurement': conf.db_measurement, 'time': timestamp, 'fields': fields, 'tags': tags} data_points.append(point) - count += 1 # Write points data_points_len = len(data_points) - if data_points_len % batch_size == 0: - print('Info: Read {0} lines from {1}'.format(count, csv_file_item)) - print('Info: Inserting {0} data_points...'.format(data_points_len)) - try: - response = client.write_points(data_points) - except InfluxDBClientError as e: - error_message = 'Error: System exited. Encounter data type conflict issue in influx. \n' \ - ' Please double check the csv data. \n' \ - ' If would like to force data type to target data type, use: \n' \ - ' --force_string_columns \n' \ - ' --force_int_columns \n' \ - ' --force_float_columns \n' \ - ' Error Details: {0}'.format(e) - sys.exit(error_message) - if response is False: - error_message = 'Info: Problem inserting points, exiting...' - sys.exit(error_message) - print('Info: Wrote {0} lines, response: {1}'.format(data_points_len, response)) - + if data_points_len % conf.batch_size == 0: + self.__write_points(count, csv_file_item, data_points_len, influx_version, client, + data_points, conf, influx_object) data_points = list() # Write rest points data_points_len = len(data_points) if data_points_len > 0: - print('Info: Read {0} lines from {1}'.format(count, csv_file_item)) - print('Info: Inserting {0} data_points...'.format(data_points_len)) - try: - response = client.write_points(data_points) - except InfluxDBClientError as e: - error_message = 'Error: System exited. Encounter data type conflict issue in influx. \n' \ - ' Please double check the csv data. \n' \ - ' If would like to force data type to target data type, use: \n' \ - ' --force_string_columns \n' \ - ' --force_int_columns \n' \ - ' --force_float_columns \n' \ - ' Error Details: {0}'.format(e) - sys.exit(error_message) - if response is False: - error_message = 'Error: Problem inserting points, exiting...' - sys.exit(error_message) - print('Info: Wrote {0}, response: {1}'.format(data_points_len, response)) + self.__write_points(count, csv_file_item, data_points_len, influx_version, client, + data_points, conf, influx_object) # Write count measurement - if enable_count_measurement: - fields = dict() - fields['total'] = csv_file_length - for k, v in self.match_count.items(): - k = 'match_{0}'.format(k) - fields[k] = v - for k, v in self.filter_count.items(): - k = 'filter_{0}'.format(k) - fields[k] = v - count_point = [{'measurement': count_measurement, 'time': timestamp, 'fields': fields, 'tags': None}] - response = client.write_points(count_point) - if response is False: - error_message = 'Error: Problem inserting points, exiting...' - sys.exit(error_message) - print('Info: Wrote count measurement {0}, response: {1}'.format(count_point, response)) - - self.match_count = defaultdict(int) - self.filter_count = defaultdict(int) + self.__write_count_measurement(conf, csv_file_length, influx_version, client, timestamp) print('Info: Done') print('') - - -def export_csv_to_influx(): - parser = argparse.ArgumentParser(description='CSV to InfluxDB.') - parser.add_argument('-c', '--csv', required=True, - help='Input CSV file.') - parser.add_argument('-d', '--delimiter', nargs='?', default=',', const=',', - help='CSV delimiter. Default: \',\'.') - parser.add_argument('-lt', '--lineterminator', nargs='?', default='\n', const='\n', - help='CSV lineterminator. Default: \'\\n\'.') - parser.add_argument('-s', '--server', nargs='?', default='localhost:8086', const='localhost:8086', - help='InfluxDB Server address. Default: localhost:8086') - parser.add_argument('-u', '--user', nargs='?', default='admin', const='admin', - help='InfluxDB User name.') - parser.add_argument('-p', '--password', nargs='?', default='admin', const='admin', - help='InfluxDB Password.') - parser.add_argument('-db', '--dbname', required=True, - help='InfluxDB Database name.') - parser.add_argument('-m', '--measurement', required=True, - help='Measurement name.') - parser.add_argument('-t', '--time_column', nargs='?', default='timestamp', const='timestamp', - help='Timestamp column name. Default: timestamp. ' - 'If no timestamp column, ' - 'the timestamp is set to the last file modify time for whole csv rows') - parser.add_argument('-tf', '--time_format', nargs='?', default='%Y-%m-%d %H:%M:%S', const='%Y-%m-%d %H:%M:%S', - help='Timestamp format. Default: \'%%Y-%%m-%%d %%H:%%M:%%S\' e.g.: 1970-01-01 00:00:00') - parser.add_argument('-tz', '--time_zone', nargs='?', default='UTC', const='UTC', - help='Timezone of supplied data. Default: UTC') - parser.add_argument('-fc', '--field_columns', required=True, - help='List of csv columns to use as fields, separated by comma') - parser.add_argument('-tc', '--tag_columns', nargs='?', default=None, const=None, - help='List of csv columns to use as tags, separated by comma. Default: None') - parser.add_argument('-b', '--batch_size', nargs='?', default=500, const=500, - help='Batch size when inserting data to influx. Default: 500.') - parser.add_argument('-lslc', '--limit_string_length_columns', nargs='?', default=None, const=None, - help='Limit string length columns, separated by comma. Default: None.') - parser.add_argument('-ls', '--limit_length', nargs='?', default=20, const=20, - help='Limit length. Default: 20.') - parser.add_argument('-dd', '--drop_database', nargs='?', default=False, const=False, - help='Drop database before inserting data. Default: False') - parser.add_argument('-dm', '--drop_measurement', nargs='?', default=False, const=False, - help='Drop measurement before inserting data. Default: False') - parser.add_argument('-mc', '--match_columns', nargs='?', default=None, const=None, - help='Match the data you want to get for certain columns, separated by comma. ' - 'Match Rule: All matches, then match. Default: None') - parser.add_argument('-mbs', '--match_by_string', nargs='?', default=None, const=None, - help='Match by string, separated by comma. Default: None') - parser.add_argument('-mbr', '--match_by_regex', nargs='?', default=None, const=None, - help='Match by regex, separated by comma. Default: None') - parser.add_argument('-fic', '--filter_columns', nargs='?', default=None, const=None, - help='Filter the data you want to filter for certain columns, separated by comma. ' - 'Filter Rule: Any one matches, then match. Default: None') - parser.add_argument('-fibs', '--filter_by_string', nargs='?', default=None, const=None, - help='Filter by string, separated by comma. Default: None') - parser.add_argument('-fibr', '--filter_by_regex', nargs='?', default=None, const=None, - help='Filter by regex, separated by comma. Default: None') - parser.add_argument('-ecm', '--enable_count_measurement', nargs='?', default=False, const=False, - help='Enable count measurement. Default: False') - parser.add_argument('-fi', '--force_insert_even_csv_no_update', nargs='?', default=False, const=False, - help='Force insert data to influx, even csv no update. Default: False') - parser.add_argument('-fsc', '--force_string_columns', nargs='?', default=None, const=None, - help='Force columns as string type, separated by comma. Default: None.') - parser.add_argument('-fintc', '--force_int_columns', nargs='?', default=None, const=None, - help='Force columns as int type, separated by comma. Default: None.') - parser.add_argument('-ffc', '--force_float_columns', nargs='?', default=None, const=None, - help='Force columns as float type, separated by comma. Default: None.') - parser.add_argument('-v', '--version', action="version", version=__version__) - - args = parser.parse_args() - exporter = ExporterObject() - exporter.export_csv_to_influx(csv_file=args.csv, - db_server_name=args.server, - db_user=args.user, - db_password=args.password, - db_name=args.dbname, - db_measurement=args.measurement, - time_column=args.time_column, - time_format=args.time_format, - time_zone=args.time_zone, - field_columns=args.field_columns, - tag_columns=args.tag_columns, - batch_size=args.batch_size, - delimiter=args.delimiter, - lineterminator=args.lineterminator, - limit_string_length_columns=args.limit_string_length_columns, - limit_length=args.limit_length, - drop_database=args.drop_database, - drop_measurement=args.drop_measurement, - match_columns=args.match_columns, - match_by_string=args.match_by_string, - match_by_regex=args.match_by_regex, - filter_columns=args.filter_columns, - filter_by_string=args.filter_by_string, - filter_by_regex=args.filter_by_regex, - enable_count_measurement=args.enable_count_measurement, - force_insert_even_csv_no_update=args.force_insert_even_csv_no_update, - force_string_columns=args.force_string_columns, - force_int_columns=args.force_int_columns, - force_float_columns=args.force_float_columns) diff --git a/src/ExportCsvToInflux/influx_object.py b/src/ExportCsvToInflux/influx_object.py index 677a139..8b1a948 100644 --- a/src/ExportCsvToInflux/influx_object.py +++ b/src/ExportCsvToInflux/influx_object.py @@ -1,85 +1,323 @@ -from influxdb import InfluxDBClient +from requests import ConnectionError +import requests +import datetime +import sys class InfluxObject(object): """InfluxObject""" - def __init__(self, db_server_name='127.0.0.1:8086', db_user='admin', db_password='admin'): - self.db_server_name = db_server_name + def __init__(self, + db_server_name='127.0.0.1:8086', + db_user='admin', + db_password='admin', + http_schema='http', + token=None): + self.db_server_name = db_server_name.lower() self.db_user = db_user self.db_password = db_password + self.token = token + self.host = self.db_server_name[0:self.db_server_name.rfind(':')] + self.host = self.host.lower().replace('http://', '').replace('https://', '') + try: + self.port = int(self.db_server_name[self.db_server_name.rfind(':') + 1:]) + except ValueError: + if 'https' in self.db_server_name.lower(): + self.port = 433 + else: + self.port = 80 + self.http_schema = http_schema + if 'https' in self.db_server_name.lower(): + self.http_schema = 'https' + self.influxdb_url = '{http_schema}://{host}:{port}/'.format(http_schema=self.http_schema, + host=self.host, + port=self.port) + + self.influxdb_version = self.get_influxdb_version() + + @property + def influxdb_client_type(self): + """Function: influxdb_client_type + + :return influxdb client object + """ + if self.influxdb_version.startswith('0') or self.influxdb_version.startswith('1'): + from influxdb.client import InfluxDBClient + else: + from influxdb_client.client.influxdb_client import InfluxDBClient + return InfluxDBClient + + @property + def influxdb_client_error(self): + """Function: influxdb_client_error + + :return influxdb client error + """ + if self.influxdb_version.startswith('0') or self.influxdb_version.startswith('1'): + from influxdb.exceptions import InfluxDBClientError as InfluxDBError + else: + from influxdb_client.client.exceptions import InfluxDBError + return InfluxDBError + + def get_influxdb_version(self): + """Function: get_influxdb_version + + :return influxdb version + """ + try: + req_influxdb = requests.get(self.influxdb_url, verify=False) + response_headers = req_influxdb.headers + influxdb_version = response_headers['X-Influxdb-Version'] + return influxdb_version + except ConnectionError: + sys.exit('Error: Failed to connect the influx {influxdb_url}'.format(influxdb_url=self.influxdb_url)) + except KeyError: + sys.exit('Error: This is not a valid influx {influxdb_url}'.format(influxdb_url=self.influxdb_url)) + except requests.exceptions.SSLError: + sys.exit('Error: SSL error when connecting influx {influxdb_url}'.format(influxdb_url=self.influxdb_url)) - def connect_influx_db(self, db_name): + def connect_influx_db(self, db_name='', org_name=''): """Function: connect_influx_db - :param db_name: the influx db name + :param db_name: for influx 0.x 1.x, the influx db name + :param org_name: for influx 2.x, the influx org name :return client """ - host = self.db_server_name[0:self.db_server_name.rfind(':')] - try: - port = int(self.db_server_name[self.db_server_name.rfind(':') + 1:]) - except ValueError: - if 'https' in self.db_server_name.lower(): - port = 433 + + if self.influxdb_version.startswith('0') or self.influxdb_version.startswith('1'): + # influxdb 0.x, 1.x + from influxdb import InfluxDBClient + try: + client = InfluxDBClient(self.host, self.port, self.db_user, self.db_password, db_name, timeout=120) + client.get_list_database() + return client + except ConnectionError: + sys.exit('Error: Failed to connect the influx {host}:{port}'.format(host=self.host, port=self.port)) + else: + # influxdb 2.x + from influxdb_client import InfluxDBClient + try: + client = InfluxDBClient(url=self.influxdb_url, token=self.token, org=org_name) + client.buckets_api().find_buckets() + return client + except (ConnectionError, Exception): + sys.exit('Error: Failed to connect the influx, please check the provided server name - {0}, ' + 'org name - {1} and token - {2}'.format(self.db_server_name, org_name, self.token)) + + def create_influx_org_if_not_exists(self, org_name, client=None): + """Function: create_influx_org_if_not_exists + + :param org_name: for influx 2.x, the org name + :param client: the influxdb client + :return client + """ + + # Connect DB + if type(client) is not self.influxdb_client_type: + client = self.connect_influx_db(org_name=org_name) + + if self.influxdb_version.startswith('2'): + # influxdb 2.x + try: + orgs = client.organizations_api().find_organizations(org=org_name) + except self.influxdb_client_error: + orgs = None + if orgs: + print('Info: Org {0} already exists'.format(org_name)) else: - port = 80 - client = InfluxDBClient(host, port, self.db_user, self.db_password, db_name, timeout=120) + print('Info: Org {0} not found, trying to create it'.format(org_name)) + try: + client.organizations_api().create_organization(name=org_name) + print('Info: Org {0} created'.format(org_name)) + except self.influxdb_client_error as e: + sys.exit('Error: Failed to create org with the error {0}'.format(e)) + else: + # influxdb 0.x, 1.x + print('Waring: The influx {0} does not have org, skip creating'.format(self.influxdb_version)) return client - def create_influx_db_if_not_exists(self, db_name): + def drop_org(self, org_name, client=None): + """Function: drop_org + + :param org_name: for influx 2.x, the org name + :param client: the influxdb client + :return client + """ + + # Connect DB + if type(client) is not self.influxdb_client_type: + client = self.connect_influx_db(org_name=org_name) + + # Drop DB or Bucket + if self.influxdb_version.startswith('2') : + # influxdb 2.0 + try: + orgs = client.organizations_api().find_organizations(org=org_name) + except self.influxdb_client_error: + orgs = None + if orgs: + org_id = orgs[0].id + client.organizations_api().delete_organization(org_id) + print('Info: Org {0} dropped successfully'.format(org_name)) + else: + sys.exit("Error: You don't have access to drop the org, or the org does not exist") + else: + # influxdb 0.x, 1.x + print('Waring: The influx {0} does not have org, skip dropping'.format(self.influxdb_version)) + + return client + + def create_influx_bucket_if_not_exists(self, org_name, bucket_name, client=None): + """Function: create_influx_bucket_if_not_exists + + :param org_name: for influx 2.x, the org name + :param bucket_name: for influx 2.x, the bucket name + :param client: the influxdb client + :return client + """ + + # Connect DB + if type(client) is not self.influxdb_client_type: + client = self.connect_influx_db(org_name=org_name) + + if self.influxdb_version.startswith('2'): + # influxdb 2.x + try: + bucket = client.buckets_api().find_buckets(org=org_name, name=bucket_name) + except self.influxdb_client_error: + bucket = None + if bucket: + print('Info: Bucket {0} already exists'.format(bucket_name)) + else: + print('Info: Bucket {0} not found, trying to create it'.format(bucket_name)) + try: + client.buckets_api().create_bucket(bucket_name=bucket_name, org=org_name) + print('Info: Bucket {0} created'.format(bucket_name)) + except self.influxdb_client_error as e: + sys.exit('Error: Failed to create bucket with the error {0}'.format(e)) + else: + # influxdb 0.x, 1.x + print('Waring: The influx {0} does not have bucket, skip creating'.format(self.influxdb_version)) + + return client + + def drop_bucket(self, org_name, bucket_name, client=None): + """Function: drop_org + + :param org_name: for influx 2.x, the org name + :param bucket_name: for influx 2.x, the bucket name + :param client: the influxdb client + :return client + """ + + # Connect DB + if type(client) is not self.influxdb_client_type: + client = self.connect_influx_db(org_name=org_name) + + # Drop DB or Bucket + if self.influxdb_version.startswith('2'): + # influxdb 2.0 + try: + bucket = client.buckets_api().find_buckets(org=org_name, name=bucket_name) + bucket = bucket.buckets[0] + except self.influxdb_client_error: + bucket = None + if bucket: + client.buckets_api().delete_bucket(bucket) + print('Info: Bucket {0} dropped successfully'.format(bucket_name)) + else: + sys.exit("Error: You don't have access to drop the bucket, or the bucket does not exist") + else: + # influxdb 0.x, 1.x + print('Waring: The influx {0} does not have org, skip dropping'.format(self.influxdb_version)) + + return client + + def create_influx_db_if_not_exists(self, db_name, client=None): """Function: create_influx_db_if_not_exists - :param db_name: the influx db name + :param db_name: for influx 0.x, 1.x, the influx db name + :param client: the influxdb client :return client """ # Connect DB - client = self.connect_influx_db(db_name) - - # Check & Create DB - databases = client.get_list_database() - db_found = False - for db in databases: - if db['name'] == db_name: - db_found = True - print('Info: Database {0} already exists'.format(db_name)) - break - if db_found is False: - print('Info: Database {0} not found, trying to create it'.format(db_name)) - client.create_database(db_name) - print('Info: Database {0} created'.format(db_name)) - client.switch_database(db_name) + if type(client) is not self.influxdb_client_type: + client = self.connect_influx_db(db_name=db_name) + + if self.influxdb_version.startswith('0') or self.influxdb_version.startswith('1'): + # influxdb 0.x, 1.x + # Check & Create DB + databases = client.get_list_database() + db_found = False + for db in databases: + if db['name'] == db_name: + db_found = True + print('Info: Database {0} already exists'.format(db_name)) + break + if db_found is False: + print('Info: Database {0} not found, trying to create it'.format(db_name)) + client.create_database(db_name) + print('Info: Database {0} created'.format(db_name)) + client.switch_database(db_name) + else: + # influxdb 2.x + print('Warning: The influx is {0}, skip database creating'.format(self.influxdb_version)) return client - def drop_database(self, db_name): - """Function: drop_database + def drop_database(self, db_name, client=None): + """Function: drop_database_or_org - :param db_name: the influx db name - :return client - """ + :param db_name: for influx 0.x, 1.x, the influx db name + :param client: the influxdb client + :return client + """ # Connect DB - client = self.connect_influx_db(db_name) + if type(client) is not self.influxdb_client_type: + client = self.connect_influx_db(db_name=db_name) - # Drop DB - client.drop_database(db_name) + # Drop DB or Bucket + if self.influxdb_version.startswith('0') or self.influxdb_version.startswith('1'): + # influxdb 0.x, 1.x + client.drop_database(db_name) + print('Info: Database {0} dropped successfully'.format(db_name)) + else: + # influxdb 2.x + print('Warning: The influx is {0}, skip database dropping'.format(self.influxdb_version)) return client - def drop_measurement(self, db_name, measurement): - """Function: drop_measurement + def drop_measurement(self, db_name, measurement, bucket='', org='', client=None): + """Function: drop_measurement_or_bucket - :param db_name: the influx db name - :param measurement: the measurement + :param db_name: for influx 0.x, 1.x, he influx db name + :param measurement: for influx 0.x, 1.x, the measurement + :param client: the influxdb client + :param bucket: for influx2.x, the bucket name or id + :param org: for influx2.x, the org name or id :return client """ # Connect DB - client = self.connect_influx_db(db_name) + if type(client) is not self.influxdb_client_type: + client = self.connect_influx_db(db_name=db_name) # Drop Measurement - client.drop_measurement(measurement) + if self.influxdb_version.startswith('0') or self.influxdb_version.startswith('1'): + # influxdb 0.x, 1.x + client.drop_measurement(measurement) + else: + # influxdb 2.x + start = "1970-01-01T00:00:00Z" + stop = "{0}Z".format(datetime.datetime.now().replace(microsecond=0).isoformat()) + client.delete_api().delete(start, + stop, + '_measurement="{0}"'.format(measurement), + bucket=bucket, + org=org) + print('Info: Measurement {0} dropped successfully'.format(measurement)) return client diff --git "a/\344\270\255\346\226\207\350\257\264\346\230\216.md" "b/\344\270\255\346\226\207\350\257\264\346\230\216.md" deleted file mode 100644 index c0f452b..0000000 --- "a/\344\270\255\346\226\207\350\257\264\346\230\216.md" +++ /dev/null @@ -1,230 +0,0 @@ -Export CSV To Influx -==================== - -**Export CSV To Influx**: 处理CSV中的数据,并且将数据导入到Influx中 - -## 安装 -通过pip命令安装即可。 - -安装完成后,便可以在命令行中使用**export_csv_to_influx**了。 - -``` -pip install ExportCsvToInflux -``` - -## 功能 - -1. **[亮点 :star2::tada::heart_eyes:]** 允许在命令行中使用 **export_csv_to_influx** -2. **[亮点 :star2::tada::heart_eyes:]** 允许处理文件夹中所有的csv -3. **[亮点 :star2::tada::heart_eyes::confetti_ball::four_leaf_clover::balloon:]** 自动将csv中的数据处理为int/float/string类型 -4. **[亮点 :star2::tada::heart_eyes:]** 允许通过字符串,正则表达式匹配/过滤csv中的数据 -5. **[亮点 :star2::tada::heart_eyes:]** 允许在influx中生成数据统计表 -6. 允许设置influx中数据的长度 -7. 允许判断csv是否存在更新 -8. 允许使用文件最新的修改日期作为influx中的timestamp -9. 自动判断是否创建influx数据库 -10. 允许在influx中插入数据前,删除数据库 -11. 允许在influx中插入数据前,删除数据表 - -## 命令细节 - -通过使用 `export_csv_to_influx -h` 查看命令细节 - -`-c, --csv`: 输入csv的文件名/文件夹地址. **强制** - -`-db, --dbname`: influx数据库名 **强制** - -`-m, --measurement`: influx数据表名 **强制** - -`-fc, --field_columns`: field列, 如果多列, 使用英文逗号 ',' 作为分隔符. **强制** - -`-d, --delimiter`: csv分隔符. 默认英文的逗号: ',' - -`-lt, --lineterminator`: csv换行符. 默认: '\n' - -`-s, --server`: influx数据库地址. 默认: localhost:8086 - -`-u, --user`: influx用户名, 默认: admin - -`-p, --password`: influx密码, 默认: admin - -`-t, --time_column`: 时间戳列. 默认列名: timestamp. - -如果不存在时间戳列, 自动使用文件最新的修改日期作为时间戳 - -> 注意: 同时也支持纯的timestamp, 比如: 1517587275. 系统自动判断. - -`-tf, --time_format`: 时间戳格式. 默认: '%Y-%m-%d %H:%M:%S' 比如: 1970-01-01 00:00:00. - -`-tz, --time_zone`: 时区. 默认: UTC - -`-tc, --tag_columns`: tag列, 如果多列, 使用英文逗号 ',' 作为分隔符, 默认: None - -`-b, --batch_size`: 批量插入数据库大小. 默认: 500 - -`-lslc, --limit_string_length_columns`: 设置某列或多列中数据的长度, 如果多列, 使用英文逗号 ',' 作为分隔符. 默认: None - -`-ls, --limit_length`: 长度大小. 默认: 20 - -`-dd, --drop_database`: 在influx中插入数据前, 删除数据库. 默认: False - -`-dm, --drop_measurement`: 在influx中插入数据前, 删除数据表. 默认: False - -`-mc, --match_columns`: 用于匹配的列, 使用英文逗号 ',' 作为分隔符. 匹配规则: 所有匹配成功, 才保留该行. 默认: None - -`-mbs, --match_by_string`: 用于匹配的字符串, 使用英文逗号 ',' 作为分隔符. 默认: None - -`-mbr, --match_by_regex`: 用于匹配的正则表达式, 使用英文逗号 ',' 作为分隔符. 默认: None - -`-fic, --filter_columns`: 用于过滤的列, 使用英文逗号 ',' 作为分隔符. 匹配规则: 任意一个过滤成功, 便过滤该行. 默认: None - -`-fibs, --filter_by_string`: 用于过滤的字符串, 使用英文逗号 ',' 作为分隔符. 默认: None - -`-fibr, --filter_by_regex`: 用于匹配的正则表达式, 使用英文逗号 ',' 作为分隔符. 默认: None - -`-ecm, --enable_count_measurement`: 生成统计表. 默认: False - -`-fi, --force_insert_even_csv_no_update`: 强制往influx中插入数据, 即使csv不存在更新. 默认: False - -`-fsc, --force_string_columns`: 强制将列的类型转换成string类型, 使用英文逗号 ',' 作为分隔符. 默认: None - -`-fintc, --force_int_columns`: 强制将列的类型转换成int类型, 使用英文逗号 ',' 作为分隔符. 默认: None - -`-ffc, --force_float_columns`: 强制将列的类型转换成float类型, 使用英文逗号 ',' 作为分隔符. 默认: None - -> **注意:** -> 1. --field_columns 可以使用通配符 `*`,来匹配所有的列: `--field_columns=*`, `--field_columns '*'` -> 2. 如果检查到csv文件没有更新, 数据不会重复插入到数据库. 可以使用强制插入: `--force_insert_even_csv_no_update=True`, `--force_insert_even_csv_no_update True` -> 3. 如果csv中某些单元格的数据为空, 则自动根据列的数据类型,添加下面数据到Influx: `int: -999`, `float: -999.0`, `string: -` - -## 使用代码调用 - -可以使用代码直接调用 - -``` -from ExportCsvToInflux import ExporterObject - -exporter = ExporterObject() -exporter.export_csv_to_influx(...) - -# 更多关于export_csv_to_influx的参数细节: -print(exporter.export_csv_to_influx.__doc__) -``` - -## 示例 - -下面是操作样本 **demo.csv**. - -``` -timestamp,url,response_time -2019-07-11 02:04:05,https://jmeter.apache.org/,1.434 -2019-07-11 02:04:06,https://jmeter.apache.org/,2.434 -2019-07-11 02:04:07,https://jmeter.apache.org/,1.200 -2019-07-11 02:04:08,https://jmeter.apache.org/,1.675 -2019-07-11 02:04:09,https://jmeter.apache.org/,2.265 -2019-07-11 02:04:10,https://sample-demo.org/,1.430 -2019-07-12 08:54:13,https://sample-show.org/,1.300 -2019-07-12 14:06:00,https://sample-7.org/,1.289 -2019-07-12 18:45:34,https://sample-8.org/,2.876 -``` - -1. 将样本中所有数据插入到influx - - ``` - export_csv_to_influx \ - --csv demo.csv \ - --dbname demo \ - --measurement demo \ - --tag_columns url \ - --field_columns response_time \ - --user admin \ - --password admin \ - --server 127.0.0.1:8086 \ - --force_insert_even_csv_no_update True - ``` - -2. 将样本中所有数据插入到influx, **但是: 清除之前的老数据** - - ``` - export_csv_to_influx \ - --csv demo.csv \ - --dbname demo \ - --measurement demo \ - --tag_columns url \ - --field_columns response_time \ - --user admin \ - --password admin \ - --server 127.0.0.1:8086 \ - --force_insert_even_csv_no_update True \ - --drop_database True - ``` - -3. 匹配csv中部分数据,并将数据导入到influx: **timestamp 匹配 2019-07-12 and url 匹配 sample-\d+** - - ``` - export_csv_to_influx \ - --csv demo.csv \ - --dbname demo \ - --measurement demo \ - --tag_columns url \ - --field_columns response_time \ - --user admin \ - --password admin \ - --server 127.0.0.1:8086 \ - --drop_database True \ - --force_insert_even_csv_no_update True \ - --match_columns timestamp,url \ - --match_by_reg '2019-07-12,sample-\d+' - ``` - -4. 过滤csv中部分数据,并将数据导入到influx: **url 过滤 sample** - - ``` - export_csv_to_influx \ - --csv demo.csv \ - --dbname demo \ - --measurement demo \ - --tag_columns url \ - --field_columns response_time \ - --user admin \ - --password admin \ - --server 127.0.0.1:8086 \ - --drop_database True \ - --force_insert_even_csv_no_update True \ - --filter_columns timestamp,url \ - --filter_by_reg 'sample' - ``` - -5. 激活数据表: **demo.count**, 并且匹配: **timestamp 匹配 2019-07-12 and url 匹配 sample-\d+** - - ``` - export_csv_to_influx \ - --csv demo.csv \ - --dbname demo \ - --measurement demo \ - --tag_columns url \ - --field_columns response_time \ - --user admin \ - --password admin \ - --server 127.0.0.1:8086 \ - --drop_database True \ - --force_insert_even_csv_no_update True \ - --match_columns timestamp,url \ - --match_by_reg '2019-07-12,sample-\d+' \ - --enable_count_measurement True - ``` - - 生成的数据表为: - - ```text - select * from "demo.count" - - name: demo.count - time match_timestamp match_url total - ---- --------------- --------- ----- - 1562957134000000000 3 2 9 - ``` - -## 特殊感谢 - -灵感来自: [https://github.com/fabio-miranda/csv-to-influxdb](https://github.com/fabio-miranda/csv-to-influxdb)