From c1bb353ea66114c54724721baad7224291aeca88 Mon Sep 17 00:00:00 2001 From: Bugazelle Date: Wed, 18 Mar 2020 16:52:52 +0800 Subject: [PATCH] [Feat] Support --force_int_columns and --force_float_columns --- README.md | 4 + setup.py | 10 -- src/ExportCsvToInflux/__version__.py | 2 +- src/ExportCsvToInflux/exporter_object.py | 154 +++++++++++++----- ...55\346\226\207\350\257\264\346\230\216.md" | 4 + 5 files changed, 120 insertions(+), 54 deletions(-) diff --git a/README.md b/README.md index c44bd68..dcacdc2 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,10 @@ If no timestamp column, the timestamp is set to the last file modify time for wh `-fsc, --force_string_columns`: Force columns as string type, seperated as comma. Default: None +`-fintc, --force_int_columns`: Force columns as int type, seperated as comma. Default: None + +`-ffc, --force_float_columns`: Force columns as float type, seperated as comma. Default: None + > **Note:** > 1. You could pass `*` to --field_columns to match all the fields: `--field_columns=*`, `--field_columns '*'` diff --git a/setup.py b/setup.py index 7543662..bdca255 100644 --- a/setup.py +++ b/setup.py @@ -13,16 +13,6 @@ def readme(): with open('README.md') as f: long_description = f.read() - index = long_description.find('```\n\n> **Note:**') - long_description = long_description[:index] - long_description = long_description.replace('## Install', '**Install**') - long_description = long_description.replace('## Features', '**Features**') - long_description = long_description.replace('## Command Arguments', '**Command Arguments**') - long_description = long_description.replace('```bash', '') - long_description = long_description.replace('\n-', '\n\n-') - long_description = long_description.replace('\n-c', '-c') - long_description += '\n\nFor more info, please refer to the {0}'.format(url) - return long_description diff --git a/src/ExportCsvToInflux/__version__.py b/src/ExportCsvToInflux/__version__.py index b5fb6c4..cdf16bc 100644 --- a/src/ExportCsvToInflux/__version__.py +++ b/src/ExportCsvToInflux/__version__.py @@ -1 +1 @@ -__version__ = '0.1.23' +__version__ = '0.1.24' diff --git a/src/ExportCsvToInflux/exporter_object.py b/src/ExportCsvToInflux/exporter_object.py index 4551509..906a135 100755 --- a/src/ExportCsvToInflux/exporter_object.py +++ b/src/ExportCsvToInflux/exporter_object.py @@ -6,6 +6,7 @@ from .__version__ import __version__ from .csv_object import CSVObject from pytz import timezone +import collections import argparse import datetime import csv @@ -21,6 +22,59 @@ def __init__(self): self.match_count = defaultdict(int) self.filter_count = defaultdict(int) + @staticmethod + def __process_tags_fields(columns, + row, + limit_length, + int_type, + float_type, + limit_string_length_columns, + force_string_columns, + force_int_columns, + force_float_columns): + """Private function: __process_tags_fields""" + + results = dict() + for column in columns: + v = 0 + if column in row: + v = row[column] + if limit_string_length_columns and column in limit_string_length_columns: + v = str(v)[:limit_length + 1] + if force_string_columns and column in force_string_columns: + v = str(v) + if force_int_columns and column in force_int_columns: + try: + v = int(v) + except ValueError: + print('Warning: Failed to force "{0}" to float, skip...'.format(v)) + if force_float_columns and column in force_float_columns: + try: + v = float(v) + except ValueError: + print('Warning: Failed to force "{0}" to int, skip...'.format(v)) + + # If field is empty + if len(str(v)) == 0: + # Process the empty + if int_type[column] is True: + v = -999 + elif float_type[column] is True: + v = -999.0 + else: + v = '-' + + # Process the force + if force_string_columns and column in force_string_columns: + v = '-' + if force_int_columns and column in force_int_columns: + v = -999 + if force_float_columns and column in force_float_columns: + v = -999.0 + + results[column] = v + return results + def __check_match_and_filter(self, row, check_columns, @@ -142,7 +196,9 @@ def export_csv_to_influx(self, filter_by_regex=None, enable_count_measurement=False, force_insert_even_csv_no_update=False, - force_string_columns=None): + force_string_columns=None, + force_int_columns=None, + force_float_columns=None): """Function: export_csv_to_influx :param csv_file: the csv file path/folder @@ -172,6 +228,8 @@ def export_csv_to_influx(self, :param enable_count_measurement: create the measurement with only count info (default False) :param force_insert_even_csv_no_update: force insert data to influx even csv data no update (default False) :param force_string_columns: force the columns as string (default None) + :param force_int_columns: force the columns as int (default None) + :param force_float_columns: force the columns as float (default None) """ # Init: object @@ -213,6 +271,23 @@ def export_csv_to_influx(self, force_insert_even_csv_no_update = self.__validate_bool_string(force_insert_even_csv_no_update) force_string_columns = [] if str(force_string_columns).lower() == 'none' else force_string_columns force_string_columns = base_object.str_to_list(force_string_columns) + force_int_columns = [] if str(force_int_columns).lower() == 'none' else force_int_columns + force_int_columns = base_object.str_to_list(force_int_columns) + force_float_columns = [] if str(force_float_columns).lower() == 'none' else force_float_columns + force_float_columns = base_object.str_to_list(force_float_columns) + + # Fields should not duplicate in force_string_columns, force_int_columns, force_float_columns + all_force_columns = force_string_columns + force_int_columns + force_float_columns + duplicates = [item for item, count in collections.Counter(all_force_columns).items() if count > 1] + if duplicates: + error_message = 'Error: Find duplicate items: {0} in: \n' \ + ' force_string_columns: {1} \n' \ + ' force_int_columns: {2} \n' \ + ' force_float_columns: {3}'.format(duplicates, + force_string_columns, + force_int_columns, + force_float_columns) + sys.exit(error_message) # Init: database behavior drop_database = base_object.convert_boole(drop_database) @@ -368,45 +443,26 @@ def export_csv_to_influx(self, sys.exit(error_message) # Process tags - tags = dict() - for tag_column in tag_columns: - v = 0 - if tag_column in row: - v = row[tag_column] - if limit_string_length_columns and tag_column in limit_string_length_columns: - v = str(v)[:limit_length + 1] - if force_string_columns and tag_column in force_string_columns: - v = str(v) - # If field is empty - if len(str(v)) == 0: - if int_type[tag_column] is True: - v = -999 - elif float_type[tag_column] is True: - v = -999.0 - else: - v = '-' - tags[tag_column] = v + tags = self.__process_tags_fields(columns=tag_columns, + row=row, + limit_length=limit_length, + int_type=int_type, + float_type=float_type, + limit_string_length_columns=limit_string_length_columns, + force_string_columns=force_string_columns, + force_int_columns=force_int_columns, + force_float_columns=force_float_columns) # Process fields - fields = dict() - for field_column in field_columns: - v = 0 - if field_column in row: - v = row[field_column] - if limit_string_length_columns and field_column in limit_string_length_columns: - v = str(v)[:limit_length + 1] - if force_string_columns and field_column in force_string_columns: - v = str(v) - - # If field is empty - if len(str(v)) == 0: - if int_type[field_column] is True: - v = -999 - elif float_type[field_column] is True: - v = -999.9 - else: - v = '-' - fields[field_column] = v + fields = self.__process_tags_fields(columns=field_columns, + row=row, + limit_length=limit_length, + int_type=int_type, + float_type=float_type, + limit_string_length_columns=limit_string_length_columns, + force_string_columns=force_string_columns, + force_int_columns=force_int_columns, + force_float_columns=force_float_columns) point = {'measurement': db_measurement, 'time': timestamp, 'fields': fields, 'tags': tags} data_points.append(point) @@ -423,8 +479,11 @@ def export_csv_to_influx(self, except InfluxDBClientError as e: error_message = 'Error: System exited. Encounter data type conflict issue in influx. \n' \ ' Please double check the csv data. \n' \ - ' If would like to force data to str, use: --force_string_columns \n' \ - ' {0}'.format(e) + ' If would like to force data type to target data type, use: \n' \ + ' --force_string_columns \n' \ + ' --force_int_columns \n' \ + ' --force_float_columns \n' \ + ' Error Details: {0}'.format(e) sys.exit(error_message) if response is False: error_message = 'Info: Problem inserting points, exiting...' @@ -443,8 +502,11 @@ def export_csv_to_influx(self, except InfluxDBClientError as e: error_message = 'Error: System exited. Encounter data type conflict issue in influx. \n' \ ' Please double check the csv data. \n' \ - ' If would like to force data to str, use: --force_string_columns \n' \ - ' {0}'.format(e) + ' If would like to force data type to target data type, use: \n' \ + ' --force_string_columns \n' \ + ' --force_int_columns \n' \ + ' --force_float_columns \n' \ + ' Error Details: {0}'.format(e) sys.exit(error_message) if response is False: error_message = 'Error: Problem inserting points, exiting...' @@ -535,6 +597,10 @@ def export_csv_to_influx(): help='Force insert data to influx, even csv no update. Default: False') parser.add_argument('-fsc', '--force_string_columns', nargs='?', default=None, const=None, help='Force columns as string type, separated by comma. Default: None.') + parser.add_argument('-fintc', '--force_int_columns', nargs='?', default=None, const=None, + help='Force columns as int type, separated by comma. Default: None.') + parser.add_argument('-ffc', '--force_float_columns', nargs='?', default=None, const=None, + help='Force columns as float type, separated by comma. Default: None.') parser.add_argument('-v', '--version', action="version", version=__version__) args = parser.parse_args() @@ -565,4 +631,6 @@ def export_csv_to_influx(): filter_by_regex=args.filter_by_regex, enable_count_measurement=args.enable_count_measurement, force_insert_even_csv_no_update=args.force_insert_even_csv_no_update, - force_string_columns=args.force_string_columns) + force_string_columns=args.force_string_columns, + force_int_columns=args.force_int_columns, + force_float_columns=args.force_float_columns) diff --git "a/\344\270\255\346\226\207\350\257\264\346\230\216.md" "b/\344\270\255\346\226\207\350\257\264\346\230\216.md" index 4c95a0a..c0f452b 100644 --- "a/\344\270\255\346\226\207\350\257\264\346\230\216.md" +++ "b/\344\270\255\346\226\207\350\257\264\346\230\216.md" @@ -88,6 +88,10 @@ pip install ExportCsvToInflux `-fsc, --force_string_columns`: 强制将列的类型转换成string类型, 使用英文逗号 ',' 作为分隔符. 默认: None +`-fintc, --force_int_columns`: 强制将列的类型转换成int类型, 使用英文逗号 ',' 作为分隔符. 默认: None + +`-ffc, --force_float_columns`: 强制将列的类型转换成float类型, 使用英文逗号 ',' 作为分隔符. 默认: None + > **注意:** > 1. --field_columns 可以使用通配符 `*`,来匹配所有的列: `--field_columns=*`, `--field_columns '*'` > 2. 如果检查到csv文件没有更新, 数据不会重复插入到数据库. 可以使用强制插入: `--force_insert_even_csv_no_update=True`, `--force_insert_even_csv_no_update True`