Skip to content

Commit

Permalink
Merge pull request #15 from Bugazelle/dev
Browse files Browse the repository at this point in the history
[Fix] Logic enhancements & Bug fix
  • Loading branch information
Bugazelle authored Aug 29, 2019
2 parents 7fb26a7 + 37c6a50 commit c825085
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 43 deletions.
12 changes: 0 additions & 12 deletions src/ExportCsvToInflux/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,3 @@
from .exporter_object import ExporterObject
from .base_object import BaseObject
from .__version__ import __version__

_version_ = __version__


class ExportCsvToInflux(InfluxObject,
CSVObject,
ExporterObject,
BaseObject,):
"""ExportCsvToInflux is library to export csv data into influx db"""

def __init__(self):
super(ExportCsvToInflux, self).__init__()
2 changes: 1 addition & 1 deletion src/ExportCsvToInflux/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.1.17'
__version__ = '0.1.18'
34 changes: 12 additions & 22 deletions src/ExportCsvToInflux/csv_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,15 @@ def get_csv_header(self, file_name):

with open(file_name) as f:
sniffer = csv.Sniffer()
has_header = sniffer.has_header(f.read(40960))
try:
has_header = sniffer.has_header(f.read(40960))
except csv.Error:
has_header = False
f.seek(0)
csv_reader = csv.DictReader(f, delimiter=self.delimiter, lineterminator=self.lineterminator)
headers = csv_reader.fieldnames if has_header else []
headers = csv_reader.fieldnames
is_header = not any(field.isdigit() for field in headers)
headers = headers if has_header or is_header else []

return headers

Expand Down Expand Up @@ -124,12 +129,9 @@ def get_csv_lines_count(self, file_name):
"""

self.valid_file_exit(file_name)
has_header = self.get_csv_header(file_name)

with open(file_name) as f:
sniffer = csv.Sniffer()
has_header = sniffer.has_header(f.read(40960))
f.seek(0)
csv_reader = csv.DictReader(f, delimiter=self.delimiter, lineterminator=self.lineterminator)
count = 0 if has_header is True else 1
for row in csv_reader:
Expand All @@ -143,12 +145,9 @@ def convert_csv_data_to_int_float(self, file_name):
:param file_name: the file name
"""

self.valid_file_exit(file_name)
has_header = self.get_csv_header(file_name)

with open(file_name) as f:
sniffer = csv.Sniffer()
has_header = sniffer.has_header(f.read(40960))
f.seek(0)
csv_reader = csv.DictReader(f, delimiter=self.delimiter, lineterminator=self.lineterminator)
int_type = defaultdict(list)
float_type = defaultdict(list)
Expand All @@ -157,20 +156,17 @@ def convert_csv_data_to_int_float(self, file_name):
keys = row.keys()
for key in keys:
value = row[key]
# Continue if field no value
if len(str(value)) == 0:
continue
# Valid Int Type
try:
if float(row[key]).is_integer():
if float(value).is_integer():
int_type[key].append(True)
else:
int_type[key].append(False)
except ValueError:
int_type[key].append(False)
# Valid Float Type
try:
float(row[key])
float(value)
float_type[key].append(True)
except ValueError:
float_type[key].append(False)
Expand Down Expand Up @@ -205,9 +201,6 @@ def convert_csv_data_to_int_float(self, file_name):
keys = row.keys()
for key in keys:
value = row[key]
if len(str(value)) == 0:
row[key] = ''
continue
int_status = int_type[key]
if int_status is True:
row[key] = int(float(value)) if int_type[key] is True else value
Expand Down Expand Up @@ -238,7 +231,7 @@ def add_columns_to_csv(self,
]
"""

self.valid_file_exit(file_name)
has_header = self.get_csv_header(file_name)

# Process data
data_type = type(data)
Expand All @@ -264,9 +257,6 @@ def add_columns_to_csv(self,

# Add columns
with open(file_name) as f:
sniffer = csv.Sniffer()
has_header = sniffer.has_header(f.read(40960))
f.seek(0)
source_reader = csv.DictReader(f, delimiter=self.delimiter, lineterminator=self.lineterminator)
new_headers = [list(x.keys())[0] for x in data]
with open(target, 'w+') as target_file:
Expand Down
37 changes: 29 additions & 8 deletions src/ExportCsvToInflux/exporter_object.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from pytz.exceptions import UnknownTimeZoneError
from .influx_object import InfluxObject
from collections import defaultdict
from .base_object import BaseObject
Expand Down Expand Up @@ -100,6 +101,20 @@ def __validate_columns(csv_headers, check_columns):

return check_columns

@staticmethod
def __validate_bool_string(target, alias=''):
"""Private Function: Validate bool string
:param target: the target
"""

target = str(target).lower()
expected = ['true', 'false']
if target not in expected:
raise Exception('Error: The input {0} should be True or False, current is {1}'.format(alias, target))

return True if target == 'true' else False

@staticmethod
def __unix_time_millis(dt):
"""Private Function: unix_time_millis"""
Expand Down Expand Up @@ -139,20 +154,20 @@ def export_csv_to_influx(self,
:param csv_file: the csv file path/folder
:param db_server_name: the influx server (default localhost:8086)
:param db_user: the influx db user
:param db_password: the influx db password
:param db_user: the influx db user (default admin)
:param db_password: the influx db password (default admin)
:param db_name: the influx db name
:param db_measurement: the measurement in a db
:param time_column: the time columns (default timestamp)
:param tag_columns: the tag columns ()
:param tag_columns: the tag columns
:param time_format: the time format (default %Y-%m-%d %H:%M:%S)
:param field_columns: the filed columns
:param delimiter: the csv delimiter (default comma)
:param lineterminator: the csv line terminator (default comma)
:param batch_size: how many rows insert every time (default 500)
:param time_zone: the data time zone (default UTC)
:param limit_string_length_columns: limit the string length
:param limit_length: default 20
:param limit_string_length_columns: limit the string length columns (default None)
:param limit_length: limited length (default 20)
:param drop_database: drop database (default False)
:param drop_measurement: drop measurement (default False)
:param match_columns: the columns need to be matched (default None)
Expand All @@ -161,7 +176,7 @@ def export_csv_to_influx(self,
:param filter_columns: the columns need to be filter (default None)
:param filter_by_string: filter columns by string (default None)
:param filter_by_regex: filter columns by regex (default None)
:param enable_count_measurement: create the measurement with only count info
:param enable_count_measurement: create the measurement with only count info (default False)
:param force_insert_even_csv_no_update: force insert data to influx even csv data no update (default False)
"""

Expand All @@ -188,6 +203,10 @@ def export_csv_to_influx(self,
filter_by_string = base_object.str_to_list(filter_by_string)
filter_by_regex = [] if str(filter_by_regex).lower() == 'none' else filter_by_regex
filter_by_regex = base_object.str_to_list(filter_by_regex)
drop_database = self.__validate_bool_string(drop_database)
drop_measurement = self.__validate_bool_string(drop_measurement)
enable_count_measurement = self.__validate_bool_string(enable_count_measurement)
force_insert_even_csv_no_update = self.__validate_bool_string(force_insert_even_csv_no_update)

# Init: database behavior
drop_database = self.convert_boole(drop_database)
Expand Down Expand Up @@ -255,15 +274,17 @@ def export_csv_to_influx(self,
break

# Check the timestamp, and generate the csv with checksum
# csv_base_name = os.path.basename(csv_file_item)
new_csv_file = '{0}_influx.csv'.format(csv_file_item.replace('.csv', ''))
new_csv_file_exists = os.path.exists(new_csv_file)
no_new_data_status = False
if new_csv_file_exists:
with open(new_csv_file) as f:
csv_reader = csv.DictReader(f, delimiter=delimiter, lineterminator=lineterminator)
for row in csv_reader:
new_csv_file_md5 = row['md5']
try:
new_csv_file_md5 = row['md5']
except KeyError:
break
if new_csv_file_md5 == csv_file_md5 and force_insert_even_csv_no_update is False:
print('Info: No new data found, existing...')
no_new_data_status = True
Expand Down

0 comments on commit c825085

Please sign in to comment.