Skip to content

Commit

Permalink
Merge pull request #20 from Bugazelle/dev
Browse files Browse the repository at this point in the history
[Feat] Support to force columns as string type
  • Loading branch information
Bugazelle authored Nov 23, 2019
2 parents b5c800c + 894080c commit 3ad55b7
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 64 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ You could use `export_csv_to_influx -h` to see the help guide.
-p, --password, InfluxDB Password. Default: admin
-db, --dbname, InfluxDB Database name. **Mandatory**
-m, --measurement, Measurement name. **Mandatory**
-t, --time_column, Timestamp column name. Default: timestamp. If no timestamp column, the timestamp is set to the last file modify time for whole csv rows.
-t, --time_column, Timestamp column name. Default column name: timestamp. If no timestamp column, the timestamp is set to the last file modify time for whole csv rows.
> Note: Also support the pure timestamp, like: 1517587275. Auto detected.
-tf, --time_format, Timestamp format. Default: '%Y-%m-%d %H:%M:%S' e.g.: 1970-01-01 00:00:00.
-tz, --time_zone, Timezone of supplied data. Default: UTC.
-fc, --field_columns, List of csv columns to use as fields, separated by comma. **Mandatory**
Expand All @@ -56,6 +57,7 @@ You could use `export_csv_to_influx -h` to see the help guide.
-fibr, --filter_by_regex, Filter by regex, separated by comma. Default: None.
-ecm, --enable_count_measurement, Enable count measurement. Default: False.
-fi, --force_insert_even_csv_no_update, Force insert data to influx, even csv no update. Default: False.
-fsc, --force_string_columns, Force columns as string type, seperated as comma. Default: None
```

> **Note 1:** You could use the library programmablly.
Expand Down
2 changes: 1 addition & 1 deletion src/ExportCsvToInflux/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.1.19'
__version__ = '0.1.20'
15 changes: 7 additions & 8 deletions src/ExportCsvToInflux/base_object.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys


class BaseObject(object):
"""BaseObject"""

Expand All @@ -10,8 +11,8 @@ def __init__(self):
def convert_boole(target):
target = str(target).lower()
if target != 'true' and target != 'false':
print('Error: The expected input for {0} should be: True or False'.format(target))
exit(1)
error_message = 'Error: The expected input for {0} should be: True or False'.format(target)
sys.exit(error_message)
if target == 'true':
target = True
else:
Expand All @@ -33,8 +34,8 @@ def validate_str(self, target, ignore_exception=False):
except NameError:
string_type = get_type is str
if not string_type and ignore_exception is False:
print('Error: The {0} is not string type. Please check.'.format(target))
sys.exit(1)
error_message = 'Error: The {0} is not string type. Please check.'.format(target)
sys.exit(error_message)

return string_type

Expand All @@ -55,8 +56,7 @@ def str_to_list(self, string, delimiter=',', lower=False):
try:
bool(string)
except ValueError:
print(error_message)
sys.exit(1)
sys.exit(error_message)

# Process the type
list_tuple_type = get_type is list or get_type is tuple
Expand All @@ -75,6 +75,5 @@ def str_to_list(self, string, delimiter=',', lower=False):
elif not string:
li = list()
else:
print(error_message)
sys.exit(1)
sys.exit(error_message)
return li
26 changes: 12 additions & 14 deletions src/ExportCsvToInflux/csv_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def get_csv_header(self, file_name):
"""

self.valid_file_exit(file_name)
self.valid_file_exist(file_name)

with open(file_name) as f:
sniffer = csv.Sniffer()
Expand Down Expand Up @@ -77,16 +77,16 @@ def search_files_in_dir(directory, match_suffix='.csv', filter_pattern='_influx.
yield y

@staticmethod
def valid_file_exit(file_name):
"""Function: valid_file_exit
def valid_file_exist(file_name):
"""Function: valid_file_exist
:param file_name: the file name
"""

file_exists = os.path.exists(file_name)
if file_exists is False:
print('Error: The file does not exist: {0}'.format(file_name))
sys.exit(1)
error_message = 'Error: The file does not exist: {0}'.format(file_name)
sys.exit(error_message)

def get_file_md5(self, file_name):
"""Function: get_file_md5
Expand All @@ -95,7 +95,7 @@ def get_file_md5(self, file_name):
:return return the file md5
"""

self.valid_file_exit(file_name)
self.valid_file_exist(file_name)

hash_md5 = hashlib.md5()
with open(file_name, "rb") as f:
Expand All @@ -112,7 +112,7 @@ def get_file_modify_time(self, file_name, enable_ms=False):
:return return the human readable time
"""

self.valid_file_exit(file_name)
self.valid_file_exist(file_name)

modified = os.path.getmtime(file_name)
modified_s, modified_ms = divmod(modified * 1000, 1000)
Expand Down Expand Up @@ -237,16 +237,15 @@ def add_columns_to_csv(self,

# Process data
data_type = type(data)
message = 'Error: The data should be list type, the item should be dict. Or the json type as following' \
'for example: [{"new_header_1": ["new_value_1", "new_value_2", "new_value_3"]}, ' \
'{"new_header_2": ["new_value_1", "new_value_2", "new_value_3"]}]'
error_message = 'Error: The data should be list type, the item should be dict. Or the json type as following ' \
'for example: [{"new_header_1": ["new_value_1", "new_value_2", "new_value_3"]}, ' \
'{"new_header_2": ["new_value_1", "new_value_2", "new_value_3"]}]'
try:
check_data_type = data_type is not list and data_type is not str and data_type is not unicode
except NameError:
check_data_type = data_type is not list and data_type is not str
if check_data_type:
print(message)
sys.exit(1)
sys.exit(error_message)

try:
check_data_type = data_type is str or data_type is unicode
Expand All @@ -256,8 +255,7 @@ def add_columns_to_csv(self,
try:
data = json.loads(data)
except ValueError:
print(message)
sys.exit(1)
sys.exit(error_message)

# Add columns
with open(file_name) as f:
Expand Down
98 changes: 59 additions & 39 deletions src/ExportCsvToInflux/exporter_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ def __validate_bool_string(target, alias=''):
target = str(target).lower()
expected = ['true', 'false']
if target not in expected:
print('Error: The input {0} should be True or False, current is {1}'.format(alias, target))
sys.exit(1)
error_message = 'Error: The input {0} should be True or False, current is {1}'.format(alias, target)
sys.exit(error_message)

return True if target == 'true' else False

Expand Down Expand Up @@ -139,7 +139,8 @@ def export_csv_to_influx(self,
filter_by_string=None,
filter_by_regex=None,
enable_count_measurement=False,
force_insert_even_csv_no_update=False):
force_insert_even_csv_no_update=False,
force_string_columns=None):
"""Function: export_csv_to_influx
:param csv_file: the csv file path/folder
Expand Down Expand Up @@ -168,6 +169,7 @@ def export_csv_to_influx(self,
:param filter_by_regex: filter columns by regex (default None)
:param enable_count_measurement: create the measurement with only count info (default False)
:param force_insert_even_csv_no_update: force insert data to influx even csv data no update (default False)
:param force_string_columns: force the columns as string (default None)
"""

# Init: object
Expand Down Expand Up @@ -207,6 +209,8 @@ def export_csv_to_influx(self,
drop_measurement = self.__validate_bool_string(drop_measurement)
enable_count_measurement = self.__validate_bool_string(enable_count_measurement)
force_insert_even_csv_no_update = self.__validate_bool_string(force_insert_even_csv_no_update)
force_string_columns = [] if str(force_string_columns).lower() == 'none' else force_string_columns
force_string_columns = base_object.str_to_list(force_string_columns)

# Init: database behavior
drop_database = base_object.convert_boole(drop_database)
Expand All @@ -226,23 +230,23 @@ def export_csv_to_influx(self,
try:
batch_size = int(batch_size)
except ValueError:
print('Error: The batch_size should be int, current is: {0}'.format(batch_size))
sys.exit(1)
error_message = 'Error: The batch_size should be int, current is: {0}'.format(batch_size)
sys.exit(error_message)

# Init: limit_length
try:
limit_length = int(limit_length)
except ValueError:
print('Error: The limit_length should be int, current is: {0}'.format(limit_length))
sys.exit(1)
error_message = 'Error: The limit_length should be int, current is: {0}'.format(limit_length)
sys.exit(error_message)

# Process csv_file
current_dir = os.path.curdir
csv_file = os.path.join(current_dir, csv_file)
csv_file_exists = os.path.exists(csv_file)
if csv_file_exists is False:
print('Error: CSV file not found, exiting...')
sys.exit(1)
error_message = 'Error: CSV file not found, exiting...'
sys.exit(error_message)
csv_file_generator = csv_object.search_files_in_dir(csv_file)
for csv_file_item in csv_file_generator:
csv_file_length = csv_object.get_csv_lines_count(csv_file_item)
Expand Down Expand Up @@ -294,9 +298,11 @@ def export_csv_to_influx(self,
except KeyError:
break
if new_csv_file_md5 == csv_file_md5 and force_insert_even_csv_no_update is False:
print('Info: No new data found, existing...')
warning_message = 'Warning: No new data found, ' \
'exporter stop/jump for {0}...'.format(csv_file_item)
print(warning_message)
no_new_data_status = True
# sys.exit(1)
# sys.exit(warning_message)
break
if no_new_data_status:
continue
Expand Down Expand Up @@ -341,22 +347,31 @@ def export_csv_to_influx(self,
continue

# Process Time
if re.match('^\\d+$', str(row[time_column])):
timestamp = int(row[time_column]) * 1000000
else:
datetime_naive = datetime.datetime.strptime(row[time_column], time_format)
datetime_local = timezone(time_zone).localize(datetime_naive)
timestamp = self.__unix_time_millis(datetime_local) * 1000000
try:
timestamp_float = float(row[time_column])
timestamp_remove_decimal = int(str(timestamp_float).replace('.', ''))
timestamp_influx = '{:<019d}'.format(timestamp_remove_decimal)
timestamp = int(timestamp_influx)
except ValueError:
try:
datetime_naive = datetime.datetime.strptime(row[time_column], time_format)
datetime_local = timezone(time_zone).localize(datetime_naive)
timestamp = self.__unix_time_millis(datetime_local) * 1000000
except (TypeError, ValueError):
error_message = 'Error: Unexpected time with format: {0}, {1}'.format(row[time_column],
time_format)
sys.exit(error_message)

# Process tags
tags = dict()
for tag_column in tag_columns:
v = 0
if tag_column in row:
v = row[tag_column]
if limit_string_length_columns:
if tag_column in limit_string_length_columns:
v = str(v)[:limit_length + 1]
if limit_string_length_columns and tag_column in limit_string_length_columns:
v = str(v)[:limit_length + 1]
if force_string_columns and tag_column in force_string_columns:
v = str(v)
# If field is empty
if len(str(v)) == 0:
if int_type[tag_column] is True:
Expand All @@ -373,9 +388,11 @@ def export_csv_to_influx(self,
v = 0
if field_column in row:
v = row[field_column]
if limit_string_length_columns:
if field_column in limit_string_length_columns:
v = str(v)[:limit_length + 1]
if limit_string_length_columns and field_column in limit_string_length_columns:
v = str(v)[:limit_length + 1]
if force_string_columns and field_column in force_string_columns:
v = str(v)

# If field is empty
if len(str(v)) == 0:
if int_type[field_column] is True:
Expand All @@ -399,13 +416,13 @@ def export_csv_to_influx(self,
try:
response = client.write_points(data_points)
except InfluxDBClientError as e:
print('Error: System exited. Please double check the csv data. \n'
' It is not the same type as the current date type in influx. \n'
' {0}'.format(e))
sys.exit(1)
error_message = 'Error: System exited. Please double check the csv data. \n' \
' It is not the same type as the current date type in influx. \n' \
' {0}'.format(e)
sys.exit(error_message)
if response is False:
print('Info: Problem inserting points, exiting...')
exit(1)
error_message = 'Info: Problem inserting points, exiting...'
sys.exit(error_message)
print('Info: Wrote {0} lines, response: {1}'.format(data_points_len, response))

data_points = list()
Expand All @@ -418,13 +435,13 @@ def export_csv_to_influx(self,
try:
response = client.write_points(data_points)
except InfluxDBClientError as e:
print('Error: System exited. Please double check the csv data. \n'
' It is not the same type as the current date type in influx. \n'
' {0}'.format(e))
sys.exit(1)
error_message = 'Error: System exited. Please double check the csv data. \n' \
' It is not the same type as the current date type in influx. \n' \
' {0}'.format(e)
sys.exit(error_message)
if response is False:
print('Error: Problem inserting points, exiting...')
exit(1)
error_message = 'Error: Problem inserting points, exiting...'
sys.exit(error_message)
print('Info: Wrote {0}, response: {1}'.format(data_points_len, response))

# Write count measurement
Expand All @@ -440,8 +457,8 @@ def export_csv_to_influx(self,
count_point = [{'measurement': count_measurement, 'time': timestamp, 'fields': fields, 'tags': None}]
response = client.write_points(count_point)
if response is False:
print('Error: Problem inserting points, exiting...')
exit(1)
error_message = 'Error: Problem inserting points, exiting...'
sys.exit(error_message)
print('Info: Wrote count measurement {0}, response: {1}'.format(count_point, response))

self.match_count = defaultdict(int)
Expand Down Expand Up @@ -483,7 +500,7 @@ def export_csv_to_influx():
help='List of csv columns to use as tags, separated by comma')
parser.add_argument('-b', '--batch_size', nargs='?', default=500, const=500,
help='Batch size when inserting data to influx. Default: 500.')
parser.add_argument('-lslc', '--limit_string_length_columns',nargs='?', default=None, const=None,
parser.add_argument('-lslc', '--limit_string_length_columns', nargs='?', default=None, const=None,
help='Limit string length columns, separated by comma. Default: None.')
parser.add_argument('-ls', '--limit_length', nargs='?', default=20, const=20,
help='Limit length. Default: 20.')
Expand All @@ -509,6 +526,8 @@ def export_csv_to_influx():
help='Enable count measurement. Default: False')
parser.add_argument('-fi', '--force_insert_even_csv_no_update', nargs='?', default=False, const=False,
help='Force insert data to influx, even csv no update. Default: False')
parser.add_argument('-fsc', '--force_string_columns', nargs='?', default=None, const=None,
help='Force columns as string type, separated by comma. Default: None.')
parser.add_argument('-v', '--version', action="version", version=__version__)

args = parser.parse_args()
Expand Down Expand Up @@ -538,4 +557,5 @@ def export_csv_to_influx():
filter_by_string=args.filter_by_string,
filter_by_regex=args.filter_by_regex,
enable_count_measurement=args.enable_count_measurement,
force_insert_even_csv_no_update=args.force_insert_even_csv_no_update)
force_insert_even_csv_no_update=args.force_insert_even_csv_no_update,
force_string_columns=args.force_string_columns)
4 changes: 3 additions & 1 deletion 中文说明.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ pip install ExportCsvToInflux
-p, --password, influx密码, 默认: admin
-db, --dbname, influx数据库名 **强制**
-m, --measurement, influx数据表名 **强制**
-t, --time_column, 时间戳列. 默认: timestamp. 如果不存在时间戳列, 自动使用文件最新的修改日期作为时间戳
-t, --time_column, 时间戳列. 默认列名: timestamp. 如果不存在时间戳列, 自动使用文件最新的修改日期作为时间戳
> 注意: 同时也支持纯的timestamp, 比如: 1517587275. 系统自动判断.
-tf, --time_format, 时间戳格式. 默认: '%Y-%m-%d %H:%M:%S' 比如: 1970-01-01 00:00:00.
-tz, --time_zone, 时区. 默认: UTC
-fc, --field_columns, field列, 如果多列, 使用英文逗号 ',' 作为分隔符. **强制**
Expand All @@ -57,6 +58,7 @@ pip install ExportCsvToInflux
-fibr, --filter_by_regex, 用于匹配的正则表达式, 使用英文逗号 ',' 作为分隔符. 默认: None
-ecm, --enable_count_measurement, 生成统计表. 默认: False
-fi, --force_insert_even_csv_no_update, 强制往influx中插入数据, 即使csv不存在更新. 默认: False
-fsc, --force_string_columns, 强制将列的类型转换成string类型, 使用英文逗号 ',' 作为分隔符. 默认: None
```

> **注意点 1:** 可以使用代码调用
Expand Down

0 comments on commit 3ad55b7

Please sign in to comment.