diff --git a/src/ExportCsvToInflux/__init__.py b/src/ExportCsvToInflux/__init__.py index a8816b0..29b13bc 100644 --- a/src/ExportCsvToInflux/__init__.py +++ b/src/ExportCsvToInflux/__init__.py @@ -3,15 +3,3 @@ from .exporter_object import ExporterObject from .base_object import BaseObject from .__version__ import __version__ - -_version_ = __version__ - - -class ExportCsvToInflux(InfluxObject, - CSVObject, - ExporterObject, - BaseObject,): - """ExportCsvToInflux is library to export csv data into influx db""" - - def __init__(self): - super(ExportCsvToInflux, self).__init__() diff --git a/src/ExportCsvToInflux/__version__.py b/src/ExportCsvToInflux/__version__.py index b04cffb..60eb1af 100644 --- a/src/ExportCsvToInflux/__version__.py +++ b/src/ExportCsvToInflux/__version__.py @@ -1 +1 @@ -__version__ = '0.1.17' +__version__ = '0.1.18' diff --git a/src/ExportCsvToInflux/csv_object.py b/src/ExportCsvToInflux/csv_object.py index 50bd180..a53dde9 100755 --- a/src/ExportCsvToInflux/csv_object.py +++ b/src/ExportCsvToInflux/csv_object.py @@ -27,10 +27,15 @@ def get_csv_header(self, file_name): with open(file_name) as f: sniffer = csv.Sniffer() - has_header = sniffer.has_header(f.read(40960)) + try: + has_header = sniffer.has_header(f.read(40960)) + except csv.Error: + has_header = False f.seek(0) csv_reader = csv.DictReader(f, delimiter=self.delimiter, lineterminator=self.lineterminator) - headers = csv_reader.fieldnames if has_header else [] + headers = csv_reader.fieldnames + is_header = not any(field.isdigit() for field in headers) + headers = headers if has_header or is_header else [] return headers @@ -124,12 +129,9 @@ def get_csv_lines_count(self, file_name): """ - self.valid_file_exit(file_name) + has_header = self.get_csv_header(file_name) with open(file_name) as f: - sniffer = csv.Sniffer() - has_header = sniffer.has_header(f.read(40960)) - f.seek(0) csv_reader = csv.DictReader(f, delimiter=self.delimiter, lineterminator=self.lineterminator) count = 0 if has_header is True else 1 for row in csv_reader: @@ -143,12 +145,9 @@ def convert_csv_data_to_int_float(self, file_name): :param file_name: the file name """ - self.valid_file_exit(file_name) + has_header = self.get_csv_header(file_name) with open(file_name) as f: - sniffer = csv.Sniffer() - has_header = sniffer.has_header(f.read(40960)) - f.seek(0) csv_reader = csv.DictReader(f, delimiter=self.delimiter, lineterminator=self.lineterminator) int_type = defaultdict(list) float_type = defaultdict(list) @@ -157,12 +156,9 @@ def convert_csv_data_to_int_float(self, file_name): keys = row.keys() for key in keys: value = row[key] - # Continue if field no value - if len(str(value)) == 0: - continue # Valid Int Type try: - if float(row[key]).is_integer(): + if float(value).is_integer(): int_type[key].append(True) else: int_type[key].append(False) @@ -170,7 +166,7 @@ def convert_csv_data_to_int_float(self, file_name): int_type[key].append(False) # Valid Float Type try: - float(row[key]) + float(value) float_type[key].append(True) except ValueError: float_type[key].append(False) @@ -205,9 +201,6 @@ def convert_csv_data_to_int_float(self, file_name): keys = row.keys() for key in keys: value = row[key] - if len(str(value)) == 0: - row[key] = '' - continue int_status = int_type[key] if int_status is True: row[key] = int(float(value)) if int_type[key] is True else value @@ -238,7 +231,7 @@ def add_columns_to_csv(self, ] """ - self.valid_file_exit(file_name) + has_header = self.get_csv_header(file_name) # Process data data_type = type(data) @@ -264,9 +257,6 @@ def add_columns_to_csv(self, # Add columns with open(file_name) as f: - sniffer = csv.Sniffer() - has_header = sniffer.has_header(f.read(40960)) - f.seek(0) source_reader = csv.DictReader(f, delimiter=self.delimiter, lineterminator=self.lineterminator) new_headers = [list(x.keys())[0] for x in data] with open(target, 'w+') as target_file: diff --git a/src/ExportCsvToInflux/exporter_object.py b/src/ExportCsvToInflux/exporter_object.py index 49e1086..3fced52 100755 --- a/src/ExportCsvToInflux/exporter_object.py +++ b/src/ExportCsvToInflux/exporter_object.py @@ -1,3 +1,4 @@ +from pytz.exceptions import UnknownTimeZoneError from .influx_object import InfluxObject from collections import defaultdict from .base_object import BaseObject @@ -100,6 +101,20 @@ def __validate_columns(csv_headers, check_columns): return check_columns + @staticmethod + def __validate_bool_string(target, alias=''): + """Private Function: Validate bool string + + :param target: the target + """ + + target = str(target).lower() + expected = ['true', 'false'] + if target not in expected: + raise Exception('Error: The input {0} should be True or False, current is {1}'.format(alias, target)) + + return True if target == 'true' else False + @staticmethod def __unix_time_millis(dt): """Private Function: unix_time_millis""" @@ -139,20 +154,20 @@ def export_csv_to_influx(self, :param csv_file: the csv file path/folder :param db_server_name: the influx server (default localhost:8086) - :param db_user: the influx db user - :param db_password: the influx db password + :param db_user: the influx db user (default admin) + :param db_password: the influx db password (default admin) :param db_name: the influx db name :param db_measurement: the measurement in a db :param time_column: the time columns (default timestamp) - :param tag_columns: the tag columns () + :param tag_columns: the tag columns :param time_format: the time format (default %Y-%m-%d %H:%M:%S) :param field_columns: the filed columns :param delimiter: the csv delimiter (default comma) :param lineterminator: the csv line terminator (default comma) :param batch_size: how many rows insert every time (default 500) :param time_zone: the data time zone (default UTC) - :param limit_string_length_columns: limit the string length - :param limit_length: default 20 + :param limit_string_length_columns: limit the string length columns (default None) + :param limit_length: limited length (default 20) :param drop_database: drop database (default False) :param drop_measurement: drop measurement (default False) :param match_columns: the columns need to be matched (default None) @@ -161,7 +176,7 @@ def export_csv_to_influx(self, :param filter_columns: the columns need to be filter (default None) :param filter_by_string: filter columns by string (default None) :param filter_by_regex: filter columns by regex (default None) - :param enable_count_measurement: create the measurement with only count info + :param enable_count_measurement: create the measurement with only count info (default False) :param force_insert_even_csv_no_update: force insert data to influx even csv data no update (default False) """ @@ -188,6 +203,10 @@ def export_csv_to_influx(self, filter_by_string = base_object.str_to_list(filter_by_string) filter_by_regex = [] if str(filter_by_regex).lower() == 'none' else filter_by_regex filter_by_regex = base_object.str_to_list(filter_by_regex) + drop_database = self.__validate_bool_string(drop_database) + drop_measurement = self.__validate_bool_string(drop_measurement) + enable_count_measurement = self.__validate_bool_string(enable_count_measurement) + force_insert_even_csv_no_update = self.__validate_bool_string(force_insert_even_csv_no_update) # Init: database behavior drop_database = self.convert_boole(drop_database) @@ -255,7 +274,6 @@ def export_csv_to_influx(self, break # Check the timestamp, and generate the csv with checksum - # csv_base_name = os.path.basename(csv_file_item) new_csv_file = '{0}_influx.csv'.format(csv_file_item.replace('.csv', '')) new_csv_file_exists = os.path.exists(new_csv_file) no_new_data_status = False @@ -263,7 +281,10 @@ def export_csv_to_influx(self, with open(new_csv_file) as f: csv_reader = csv.DictReader(f, delimiter=delimiter, lineterminator=lineterminator) for row in csv_reader: - new_csv_file_md5 = row['md5'] + try: + new_csv_file_md5 = row['md5'] + except KeyError: + break if new_csv_file_md5 == csv_file_md5 and force_insert_even_csv_no_update is False: print('Info: No new data found, existing...') no_new_data_status = True