Skip to content

Commit

Permalink
Merge pull request #19 from Bugazelle/dev
Browse files Browse the repository at this point in the history
[Fix] Fix issue-17 & fix total count & logic enhancements
  • Loading branch information
Bugazelle authored Oct 15, 2019
2 parents c825085 + 3b895b9 commit b5c800c
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 42 deletions.
2 changes: 1 addition & 1 deletion src/ExportCsvToInflux/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.1.18'
__version__ = '0.1.19'
58 changes: 50 additions & 8 deletions src/ExportCsvToInflux/base_object.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,43 @@
import sys

class BaseObject(object):
"""BaseObject"""

def __init__(self):
self.strip_chars = ' \r\n\t/"\',\\'

@staticmethod
def convert_boole(target):
target = str(target).lower()
if target != 'true' and target != 'false':
print('Error: The expected input for {0} should be: True or False'.format(target))
exit(1)
if target == 'true':
target = True
else:
target = False

return target

def validate_str(self, target, ignore_exception=False):
"""Function: validate_string
:param target: the string
:param ignore_exception: the True or False
"""

get_type = type(target)
ignore_exception = self.convert_boole(ignore_exception)
try:
string_type = get_type is str or get_type is unicode
except NameError:
string_type = get_type is str
if not string_type and ignore_exception is False:
print('Error: The {0} is not string type. Please check.'.format(target))
sys.exit(1)

return string_type

def str_to_list(self, string, delimiter=',', lower=False):
"""Function: str_to_list
Expand All @@ -13,12 +47,20 @@ def str_to_list(self, string, delimiter=',', lower=False):
:return
"""

string_type = type(string)
list_tuple_type = string_type is list or string_type is tuple
get_type = type(string)
error_message = 'Error: The string should be list or string, use comma to separate. ' \
'Current is: type-{0}, {1}'.format(get_type, string)

# Process if Value Error
try:
str_unicode_type = string_type is str or string_type is unicode
except NameError:
str_unicode_type = string_type is str
bool(string)
except ValueError:
print(error_message)
sys.exit(1)

# Process the type
list_tuple_type = get_type is list or get_type is tuple
str_unicode_type = self.validate_str(string, True)
if list_tuple_type:
if lower:
li = [str(item).strip(self.strip_chars).lower() for item in string]
Expand All @@ -30,9 +72,9 @@ def str_to_list(self, string, delimiter=',', lower=False):
li = [item.strip(self.strip_chars).lower() for item in li]
else:
li = [item.strip(self.strip_chars) for item in li]
elif bool(string) is False:
elif not string:
li = list()
else:
raise Exception('Error: The string should be list or string, use comma to separate. '
'Current is: type-{0}, {1}'.format(string_type, string))
print(error_message)
sys.exit(1)
return li
18 changes: 11 additions & 7 deletions src/ExportCsvToInflux/csv_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import json
import csv
import sys
import os


Expand Down Expand Up @@ -84,7 +85,8 @@ def valid_file_exit(file_name):

file_exists = os.path.exists(file_name)
if file_exists is False:
raise Exception('Error: The file does not exist: {0}'.format(file_name))
print('Error: The file does not exist: {0}'.format(file_name))
sys.exit(1)

def get_file_md5(self, file_name):
"""Function: get_file_md5
Expand Down Expand Up @@ -133,7 +135,7 @@ def get_csv_lines_count(self, file_name):

with open(file_name) as f:
csv_reader = csv.DictReader(f, delimiter=self.delimiter, lineterminator=self.lineterminator)
count = 0 if has_header is True else 1
count = 0 if has_header else 1
for row in csv_reader:
count += 1

Expand Down Expand Up @@ -172,7 +174,7 @@ def convert_csv_data_to_int_float(self, file_name):
float_type[key].append(False)

# Valid the key if no header
if keys and has_header is False:
if keys and not has_header:
for key in keys:
# Valid Int Type
try:
Expand Down Expand Up @@ -207,7 +209,7 @@ def convert_csv_data_to_int_float(self, file_name):
else:
row[key] = float(value) if float_type[key] is True else value
yield row, int_type, float_type
if has_header is False and i == 1:
if not has_header and i == 1:
for key in keys:
int_status = int_type[key]
if int_status is True:
Expand Down Expand Up @@ -243,7 +245,8 @@ def add_columns_to_csv(self,
except NameError:
check_data_type = data_type is not list and data_type is not str
if check_data_type:
raise Exception(message)
print(message)
sys.exit(1)

try:
check_data_type = data_type is str or data_type is unicode
Expand All @@ -253,7 +256,8 @@ def add_columns_to_csv(self,
try:
data = json.loads(data)
except ValueError:
raise Exception(message)
print(message)
sys.exit(1)

# Add columns
with open(file_name) as f:
Expand All @@ -266,7 +270,7 @@ def add_columns_to_csv(self,
values = list(row.values())
if row_id == 0:
headers = list(row.keys())
if has_header is False:
if not has_header:
continue
headers += new_headers
target_writer.writerow(headers)
Expand Down
75 changes: 49 additions & 26 deletions src/ExportCsvToInflux/exporter_object.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pytz.exceptions import UnknownTimeZoneError
from influxdb.exceptions import InfluxDBClientError
from .influx_object import InfluxObject
from collections import defaultdict
from .base_object import BaseObject
Expand All @@ -20,18 +21,6 @@ def __init__(self):
self.match_count = defaultdict(int)
self.filter_count = defaultdict(int)

@staticmethod
def convert_boole(target):
target = str(target).lower()
if target != 'true' and target != 'false':
raise Exception('Error: The expected input for {0} should be: True or False'.format(target))
if target == 'true':
target = True
else:
target = False

return target

def __check_match_and_filter(self,
row,
check_columns,
Expand Down Expand Up @@ -111,7 +100,8 @@ def __validate_bool_string(target, alias=''):
target = str(target).lower()
expected = ['true', 'false']
if target not in expected:
raise Exception('Error: The input {0} should be True or False, current is {1}'.format(alias, target))
print('Error: The input {0} should be True or False, current is {1}'.format(alias, target))
sys.exit(1)

return True if target == 'true' else False

Expand Down Expand Up @@ -186,6 +176,16 @@ def export_csv_to_influx(self,
base_object = BaseObject()

# Init: Arguments
base_object.validate_str(csv_file)
base_object.validate_str(db_name)
base_object.validate_str(db_measurement)
base_object.validate_str(db_server_name)
base_object.validate_str(db_user)
base_object.validate_str(db_password)
base_object.validate_str(time_format)
base_object.validate_str(delimiter)
base_object.validate_str(lineterminator)
base_object.validate_str(time_zone)
tag_columns = base_object.str_to_list(tag_columns)
field_columns = base_object.str_to_list(field_columns)
limit_string_length_columns = [] if str(limit_string_length_columns).lower() == 'none' \
Expand All @@ -209,10 +209,10 @@ def export_csv_to_influx(self,
force_insert_even_csv_no_update = self.__validate_bool_string(force_insert_even_csv_no_update)

# Init: database behavior
drop_database = self.convert_boole(drop_database)
drop_measurement = self.convert_boole(drop_measurement)
enable_count_measurement = self.convert_boole(enable_count_measurement)
force_insert_even_csv_no_update = self.convert_boole(force_insert_even_csv_no_update)
drop_database = base_object.convert_boole(drop_database)
drop_measurement = base_object.convert_boole(drop_measurement)
enable_count_measurement = base_object.convert_boole(enable_count_measurement)
force_insert_even_csv_no_update = base_object.convert_boole(force_insert_even_csv_no_update)
count_measurement = '{0}.count'.format(db_measurement)
if drop_measurement:
influx_object.drop_measurement(db_name, db_measurement)
Expand All @@ -226,15 +226,23 @@ def export_csv_to_influx(self,
try:
batch_size = int(batch_size)
except ValueError:
raise Exception('Error: The batch_size should be int, current is: {0}'.format(batch_size))
print('Error: The batch_size should be int, current is: {0}'.format(batch_size))
sys.exit(1)

# Init: limit_length
try:
limit_length = int(limit_length)
except ValueError:
print('Error: The limit_length should be int, current is: {0}'.format(limit_length))
sys.exit(1)

# Process csv_file
current_dir = os.path.curdir
csv_file = os.path.join(current_dir, csv_file)
csv_file_exists = os.path.exists(csv_file)
if csv_file_exists is False:
print('Info: CSV file not found, exiting...')
sys.exit(0)
print('Error: CSV file not found, exiting...')
sys.exit(1)
csv_file_generator = csv_object.search_files_in_dir(csv_file)
for csv_file_item in csv_file_generator:
csv_file_length = csv_object.get_csv_lines_count(csv_file_item)
Expand Down Expand Up @@ -288,7 +296,7 @@ def export_csv_to_influx(self,
if new_csv_file_md5 == csv_file_md5 and force_insert_even_csv_no_update is False:
print('Info: No new data found, existing...')
no_new_data_status = True
# sys.exit(0)
# sys.exit(1)
break
if no_new_data_status:
continue
Expand Down Expand Up @@ -333,9 +341,12 @@ def export_csv_to_influx(self,
continue

# Process Time
datetime_naive = datetime.datetime.strptime(row[time_column], time_format)
datetime_local = timezone(time_zone).localize(datetime_naive)
timestamp = self.__unix_time_millis(datetime_local) * 1000000
if re.match('^\\d+$', str(row[time_column])):
timestamp = int(row[time_column]) * 1000000
else:
datetime_naive = datetime.datetime.strptime(row[time_column], time_format)
datetime_local = timezone(time_zone).localize(datetime_naive)
timestamp = self.__unix_time_millis(datetime_local) * 1000000

# Process tags
tags = dict()
Expand Down Expand Up @@ -385,7 +396,13 @@ def export_csv_to_influx(self,
if data_points_len % batch_size == 0:
print('Info: Read {0} lines from {1}'.format(count, csv_file_item))
print('Info: Inserting {0} data_points...'.format(data_points_len))
response = client.write_points(data_points)
try:
response = client.write_points(data_points)
except InfluxDBClientError as e:
print('Error: System exited. Please double check the csv data. \n'
' It is not the same type as the current date type in influx. \n'
' {0}'.format(e))
sys.exit(1)
if response is False:
print('Info: Problem inserting points, exiting...')
exit(1)
Expand All @@ -398,7 +415,13 @@ def export_csv_to_influx(self,
if data_points_len > 0:
print('Info: Read {0} lines from {1}'.format(count, csv_file_item))
print('Info: Inserting {0} data_points...'.format(data_points_len))
response = client.write_points(data_points)
try:
response = client.write_points(data_points)
except InfluxDBClientError as e:
print('Error: System exited. Please double check the csv data. \n'
' It is not the same type as the current date type in influx. \n'
' {0}'.format(e))
sys.exit(1)
if response is False:
print('Error: Problem inserting points, exiting...')
exit(1)
Expand Down

0 comments on commit b5c800c

Please sign in to comment.