From fa3a13fee657bbe7b6d566a4f43a184ee9fcbddd Mon Sep 17 00:00:00 2001 From: Bugazelle Date: Wed, 16 Oct 2019 19:50:43 +0800 Subject: [PATCH 1/4] [Fix] Timestamp enhancements --- src/ExportCsvToInflux/base_object.py | 15 +++-- src/ExportCsvToInflux/csv_object.py | 26 ++++----- src/ExportCsvToInflux/exporter_object.py | 70 ++++++++++++++---------- 3 files changed, 59 insertions(+), 52 deletions(-) diff --git a/src/ExportCsvToInflux/base_object.py b/src/ExportCsvToInflux/base_object.py index 6f0006a..fa6dd2d 100644 --- a/src/ExportCsvToInflux/base_object.py +++ b/src/ExportCsvToInflux/base_object.py @@ -1,5 +1,6 @@ import sys + class BaseObject(object): """BaseObject""" @@ -10,8 +11,8 @@ def __init__(self): def convert_boole(target): target = str(target).lower() if target != 'true' and target != 'false': - print('Error: The expected input for {0} should be: True or False'.format(target)) - exit(1) + error_message = 'Error: The expected input for {0} should be: True or False'.format(target) + sys.exit(error_message) if target == 'true': target = True else: @@ -33,8 +34,8 @@ def validate_str(self, target, ignore_exception=False): except NameError: string_type = get_type is str if not string_type and ignore_exception is False: - print('Error: The {0} is not string type. Please check.'.format(target)) - sys.exit(1) + error_message = 'Error: The {0} is not string type. Please check.'.format(target) + sys.exit(error_message) return string_type @@ -55,8 +56,7 @@ def str_to_list(self, string, delimiter=',', lower=False): try: bool(string) except ValueError: - print(error_message) - sys.exit(1) + sys.exit(error_message) # Process the type list_tuple_type = get_type is list or get_type is tuple @@ -75,6 +75,5 @@ def str_to_list(self, string, delimiter=',', lower=False): elif not string: li = list() else: - print(error_message) - sys.exit(1) + sys.exit(error_message) return li diff --git a/src/ExportCsvToInflux/csv_object.py b/src/ExportCsvToInflux/csv_object.py index 878d636..152992b 100755 --- a/src/ExportCsvToInflux/csv_object.py +++ b/src/ExportCsvToInflux/csv_object.py @@ -24,7 +24,7 @@ def get_csv_header(self, file_name): """ - self.valid_file_exit(file_name) + self.valid_file_exist(file_name) with open(file_name) as f: sniffer = csv.Sniffer() @@ -77,16 +77,16 @@ def search_files_in_dir(directory, match_suffix='.csv', filter_pattern='_influx. yield y @staticmethod - def valid_file_exit(file_name): - """Function: valid_file_exit + def valid_file_exist(file_name): + """Function: valid_file_exist :param file_name: the file name """ file_exists = os.path.exists(file_name) if file_exists is False: - print('Error: The file does not exist: {0}'.format(file_name)) - sys.exit(1) + error_message = 'Error: The file does not exist: {0}'.format(file_name) + sys.exit(error_message) def get_file_md5(self, file_name): """Function: get_file_md5 @@ -95,7 +95,7 @@ def get_file_md5(self, file_name): :return return the file md5 """ - self.valid_file_exit(file_name) + self.valid_file_exist(file_name) hash_md5 = hashlib.md5() with open(file_name, "rb") as f: @@ -112,7 +112,7 @@ def get_file_modify_time(self, file_name, enable_ms=False): :return return the human readable time """ - self.valid_file_exit(file_name) + self.valid_file_exist(file_name) modified = os.path.getmtime(file_name) modified_s, modified_ms = divmod(modified * 1000, 1000) @@ -237,16 +237,15 @@ def add_columns_to_csv(self, # Process data data_type = type(data) - message = 'Error: The data should be list type, the item should be dict. Or the json type as following' \ - 'for example: [{"new_header_1": ["new_value_1", "new_value_2", "new_value_3"]}, ' \ - '{"new_header_2": ["new_value_1", "new_value_2", "new_value_3"]}]' + error_message = 'Error: The data should be list type, the item should be dict. Or the json type as following ' \ + 'for example: [{"new_header_1": ["new_value_1", "new_value_2", "new_value_3"]}, ' \ + '{"new_header_2": ["new_value_1", "new_value_2", "new_value_3"]}]' try: check_data_type = data_type is not list and data_type is not str and data_type is not unicode except NameError: check_data_type = data_type is not list and data_type is not str if check_data_type: - print(message) - sys.exit(1) + sys.exit(error_message) try: check_data_type = data_type is str or data_type is unicode @@ -256,8 +255,7 @@ def add_columns_to_csv(self, try: data = json.loads(data) except ValueError: - print(message) - sys.exit(1) + sys.exit(error_message) # Add columns with open(file_name) as f: diff --git a/src/ExportCsvToInflux/exporter_object.py b/src/ExportCsvToInflux/exporter_object.py index 608cac6..f8d5042 100755 --- a/src/ExportCsvToInflux/exporter_object.py +++ b/src/ExportCsvToInflux/exporter_object.py @@ -100,8 +100,8 @@ def __validate_bool_string(target, alias=''): target = str(target).lower() expected = ['true', 'false'] if target not in expected: - print('Error: The input {0} should be True or False, current is {1}'.format(alias, target)) - sys.exit(1) + error_message = 'Error: The input {0} should be True or False, current is {1}'.format(alias, target) + sys.exit(error_message) return True if target == 'true' else False @@ -226,23 +226,23 @@ def export_csv_to_influx(self, try: batch_size = int(batch_size) except ValueError: - print('Error: The batch_size should be int, current is: {0}'.format(batch_size)) - sys.exit(1) + error_message = 'Error: The batch_size should be int, current is: {0}'.format(batch_size) + sys.exit(error_message) # Init: limit_length try: limit_length = int(limit_length) except ValueError: - print('Error: The limit_length should be int, current is: {0}'.format(limit_length)) - sys.exit(1) + error_message = 'Error: The limit_length should be int, current is: {0}'.format(limit_length) + sys.exit(error_message) # Process csv_file current_dir = os.path.curdir csv_file = os.path.join(current_dir, csv_file) csv_file_exists = os.path.exists(csv_file) if csv_file_exists is False: - print('Error: CSV file not found, exiting...') - sys.exit(1) + error_message = 'Error: CSV file not found, exiting...' + sys.exit(error_message) csv_file_generator = csv_object.search_files_in_dir(csv_file) for csv_file_item in csv_file_generator: csv_file_length = csv_object.get_csv_lines_count(csv_file_item) @@ -294,9 +294,11 @@ def export_csv_to_influx(self, except KeyError: break if new_csv_file_md5 == csv_file_md5 and force_insert_even_csv_no_update is False: - print('Info: No new data found, existing...') + warning_message = 'Warning: No new data found, ' \ + 'exporter stop/jump for {0}...'.format(csv_file_item) + print(warning_message) no_new_data_status = True - # sys.exit(1) + # sys.exit(warning_message) break if no_new_data_status: continue @@ -341,12 +343,20 @@ def export_csv_to_influx(self, continue # Process Time - if re.match('^\\d+$', str(row[time_column])): - timestamp = int(row[time_column]) * 1000000 - else: - datetime_naive = datetime.datetime.strptime(row[time_column], time_format) - datetime_local = timezone(time_zone).localize(datetime_naive) - timestamp = self.__unix_time_millis(datetime_local) * 1000000 + try: + timestamp_float = float(row[time_column]) + timestamp_remove_decimal = int(str(timestamp_float).replace('.', '')) + timestamp_influx = '{:<019d}'.format(timestamp_remove_decimal) + timestamp = int(timestamp_influx) + except ValueError: + try: + datetime_naive = datetime.datetime.strptime(row[time_column], time_format) + datetime_local = timezone(time_zone).localize(datetime_naive) + timestamp = self.__unix_time_millis(datetime_local) * 1000000 + except (TypeError, ValueError): + error_message = 'Error: Unexpected time with format: {0}, {1}'.format(row[time_column], + time_format) + sys.exit(error_message) # Process tags tags = dict() @@ -399,13 +409,13 @@ def export_csv_to_influx(self, try: response = client.write_points(data_points) except InfluxDBClientError as e: - print('Error: System exited. Please double check the csv data. \n' - ' It is not the same type as the current date type in influx. \n' - ' {0}'.format(e)) - sys.exit(1) + error_message = 'Error: System exited. Please double check the csv data. \n' \ + ' It is not the same type as the current date type in influx. \n' \ + ' {0}'.format(e) + sys.exit(error_message) if response is False: - print('Info: Problem inserting points, exiting...') - exit(1) + error_message = 'Info: Problem inserting points, exiting...' + sys.exit(error_message) print('Info: Wrote {0} lines, response: {1}'.format(data_points_len, response)) data_points = list() @@ -418,13 +428,13 @@ def export_csv_to_influx(self, try: response = client.write_points(data_points) except InfluxDBClientError as e: - print('Error: System exited. Please double check the csv data. \n' - ' It is not the same type as the current date type in influx. \n' - ' {0}'.format(e)) - sys.exit(1) + error_message = 'Error: System exited. Please double check the csv data. \n' \ + ' It is not the same type as the current date type in influx. \n' \ + ' {0}'.format(e) + sys.exit(error_message) if response is False: - print('Error: Problem inserting points, exiting...') - exit(1) + error_message = 'Error: Problem inserting points, exiting...' + sys.exit(error_message) print('Info: Wrote {0}, response: {1}'.format(data_points_len, response)) # Write count measurement @@ -440,8 +450,8 @@ def export_csv_to_influx(self, count_point = [{'measurement': count_measurement, 'time': timestamp, 'fields': fields, 'tags': None}] response = client.write_points(count_point) if response is False: - print('Error: Problem inserting points, exiting...') - exit(1) + error_message = 'Error: Problem inserting points, exiting...' + sys.exit(error_message) print('Info: Wrote count measurement {0}, response: {1}'.format(count_point, response)) self.match_count = defaultdict(int) From d1e4dca523b9df21e074ea68c6c854c9f82d4db8 Mon Sep 17 00:00:00 2001 From: Bugazelle Date: Sun, 24 Nov 2019 00:30:59 +0800 Subject: [PATCH 2/4] [Feat] Support to force columns as string type --- README.md | 1 + src/ExportCsvToInflux/__version__.py | 2 +- src/ExportCsvToInflux/exporter_object.py | 26 +++++++++++++------ ...55\346\226\207\350\257\264\346\230\216.md" | 1 + 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index fd39bde..9162e68 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,7 @@ You could use `export_csv_to_influx -h` to see the help guide. -fibr, --filter_by_regex, Filter by regex, separated by comma. Default: None. -ecm, --enable_count_measurement, Enable count measurement. Default: False. -fi, --force_insert_even_csv_no_update, Force insert data to influx, even csv no update. Default: False. +-fsc, --force_string_columns, Force columns as string type, seperated as comma. Default: None ``` > **Note 1:** You could use the library programmablly. diff --git a/src/ExportCsvToInflux/__version__.py b/src/ExportCsvToInflux/__version__.py index d83c2a9..e160d93 100644 --- a/src/ExportCsvToInflux/__version__.py +++ b/src/ExportCsvToInflux/__version__.py @@ -1 +1 @@ -__version__ = '0.1.19' +__version__ = '0.1.20' diff --git a/src/ExportCsvToInflux/exporter_object.py b/src/ExportCsvToInflux/exporter_object.py index f8d5042..cd776f5 100755 --- a/src/ExportCsvToInflux/exporter_object.py +++ b/src/ExportCsvToInflux/exporter_object.py @@ -139,7 +139,8 @@ def export_csv_to_influx(self, filter_by_string=None, filter_by_regex=None, enable_count_measurement=False, - force_insert_even_csv_no_update=False): + force_insert_even_csv_no_update=False, + force_string_columns=None): """Function: export_csv_to_influx :param csv_file: the csv file path/folder @@ -168,6 +169,7 @@ def export_csv_to_influx(self, :param filter_by_regex: filter columns by regex (default None) :param enable_count_measurement: create the measurement with only count info (default False) :param force_insert_even_csv_no_update: force insert data to influx even csv data no update (default False) + :param force_string_columns: force the columns as string (default None) """ # Init: object @@ -207,6 +209,8 @@ def export_csv_to_influx(self, drop_measurement = self.__validate_bool_string(drop_measurement) enable_count_measurement = self.__validate_bool_string(enable_count_measurement) force_insert_even_csv_no_update = self.__validate_bool_string(force_insert_even_csv_no_update) + force_string_columns = [] if str(force_string_columns).lower() == 'none' else force_string_columns + force_string_columns = base_object.str_to_list(force_string_columns) # Init: database behavior drop_database = base_object.convert_boole(drop_database) @@ -364,9 +368,10 @@ def export_csv_to_influx(self, v = 0 if tag_column in row: v = row[tag_column] - if limit_string_length_columns: - if tag_column in limit_string_length_columns: - v = str(v)[:limit_length + 1] + if limit_string_length_columns and tag_column in limit_string_length_columns: + v = str(v)[:limit_length + 1] + if force_string_columns and tag_column in force_string_columns: + v = str(v) # If field is empty if len(str(v)) == 0: if int_type[tag_column] is True: @@ -383,9 +388,11 @@ def export_csv_to_influx(self, v = 0 if field_column in row: v = row[field_column] - if limit_string_length_columns: - if field_column in limit_string_length_columns: - v = str(v)[:limit_length + 1] + if limit_string_length_columns and field_column in limit_string_length_columns: + v = str(v)[:limit_length + 1] + if force_string_columns and field_column in force_string_columns: + v = str(v) + # If field is empty if len(str(v)) == 0: if int_type[field_column] is True: @@ -519,6 +526,8 @@ def export_csv_to_influx(): help='Enable count measurement. Default: False') parser.add_argument('-fi', '--force_insert_even_csv_no_update', nargs='?', default=False, const=False, help='Force insert data to influx, even csv no update. Default: False') + parser.add_argument('-fsc', '--force_string_columns', nargs='?', default=None, const=None, + help='Force columns as string type, separated by comma. Default: None.') parser.add_argument('-v', '--version', action="version", version=__version__) args = parser.parse_args() @@ -548,4 +557,5 @@ def export_csv_to_influx(): filter_by_string=args.filter_by_string, filter_by_regex=args.filter_by_regex, enable_count_measurement=args.enable_count_measurement, - force_insert_even_csv_no_update=args.force_insert_even_csv_no_update) + force_insert_even_csv_no_update=args.force_insert_even_csv_no_update, + force_string_columns=args.force_string_columns) diff --git "a/\344\270\255\346\226\207\350\257\264\346\230\216.md" "b/\344\270\255\346\226\207\350\257\264\346\230\216.md" index ed87385..47400fd 100644 --- "a/\344\270\255\346\226\207\350\257\264\346\230\216.md" +++ "b/\344\270\255\346\226\207\350\257\264\346\230\216.md" @@ -57,6 +57,7 @@ pip install ExportCsvToInflux -fibr, --filter_by_regex, 用于匹配的正则表达式, 使用英文逗号 ',' 作为分隔符. 默认: None -ecm, --enable_count_measurement, 生成统计表. 默认: False -fi, --force_insert_even_csv_no_update, 强制往influx中插入数据, 即使csv不存在更新. 默认: False +-fsc, --force_string_columns, 强制将列的类型转换成string类型, 使用英文逗号 ',' 作为分隔符. 默认: None ``` > **注意点 1:** 可以使用代码调用 From 8f7307003e8f73c24a35cb3d8b4680850c34f32e Mon Sep 17 00:00:00 2001 From: Bugazelle Date: Sun, 24 Nov 2019 00:36:36 +0800 Subject: [PATCH 3/4] [Fix] Code style --- src/ExportCsvToInflux/exporter_object.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ExportCsvToInflux/exporter_object.py b/src/ExportCsvToInflux/exporter_object.py index cd776f5..179795a 100755 --- a/src/ExportCsvToInflux/exporter_object.py +++ b/src/ExportCsvToInflux/exporter_object.py @@ -500,7 +500,7 @@ def export_csv_to_influx(): help='List of csv columns to use as tags, separated by comma') parser.add_argument('-b', '--batch_size', nargs='?', default=500, const=500, help='Batch size when inserting data to influx. Default: 500.') - parser.add_argument('-lslc', '--limit_string_length_columns',nargs='?', default=None, const=None, + parser.add_argument('-lslc', '--limit_string_length_columns', nargs='?', default=None, const=None, help='Limit string length columns, separated by comma. Default: None.') parser.add_argument('-ls', '--limit_length', nargs='?', default=20, const=20, help='Limit length. Default: 20.') From 894080c110ee232d397f113c6bcaf770967726c8 Mon Sep 17 00:00:00 2001 From: Bugazelle Date: Sun, 24 Nov 2019 00:49:04 +0800 Subject: [PATCH 4/4] [Doc] Update README.md --- README.md | 3 ++- "\344\270\255\346\226\207\350\257\264\346\230\216.md" | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 9162e68..696cde9 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,8 @@ You could use `export_csv_to_influx -h` to see the help guide. -p, --password, InfluxDB Password. Default: admin -db, --dbname, InfluxDB Database name. **Mandatory** -m, --measurement, Measurement name. **Mandatory** --t, --time_column, Timestamp column name. Default: timestamp. If no timestamp column, the timestamp is set to the last file modify time for whole csv rows. +-t, --time_column, Timestamp column name. Default column name: timestamp. If no timestamp column, the timestamp is set to the last file modify time for whole csv rows. + > Note: Also support the pure timestamp, like: 1517587275. Auto detected. -tf, --time_format, Timestamp format. Default: '%Y-%m-%d %H:%M:%S' e.g.: 1970-01-01 00:00:00. -tz, --time_zone, Timezone of supplied data. Default: UTC. -fc, --field_columns, List of csv columns to use as fields, separated by comma. **Mandatory** diff --git "a/\344\270\255\346\226\207\350\257\264\346\230\216.md" "b/\344\270\255\346\226\207\350\257\264\346\230\216.md" index 47400fd..e7ebb13 100644 --- "a/\344\270\255\346\226\207\350\257\264\346\230\216.md" +++ "b/\344\270\255\346\226\207\350\257\264\346\230\216.md" @@ -39,7 +39,8 @@ pip install ExportCsvToInflux -p, --password, influx密码, 默认: admin -db, --dbname, influx数据库名 **强制** -m, --measurement, influx数据表名 **强制** --t, --time_column, 时间戳列. 默认: timestamp. 如果不存在时间戳列, 自动使用文件最新的修改日期作为时间戳 +-t, --time_column, 时间戳列. 默认列名: timestamp. 如果不存在时间戳列, 自动使用文件最新的修改日期作为时间戳 + > 注意: 同时也支持纯的timestamp, 比如: 1517587275. 系统自动判断. -tf, --time_format, 时间戳格式. 默认: '%Y-%m-%d %H:%M:%S' 比如: 1970-01-01 00:00:00. -tz, --time_zone, 时区. 默认: UTC -fc, --field_columns, field列, 如果多列, 使用英文逗号 ',' 作为分隔符. **强制**