Skip to content

Commit

Permalink
Merge pull request #30 from Bugazelle/dev
Browse files Browse the repository at this point in the history
[Feat] Support --force_int_columns and --force_float_columns
  • Loading branch information
Bugazelle authored Mar 18, 2020
2 parents 36b404b + c1bb353 commit eb15d1c
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 54 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ If no timestamp column, the timestamp is set to the last file modify time for wh

`-fsc, --force_string_columns`: Force columns as string type, seperated as comma. Default: None

`-fintc, --force_int_columns`: Force columns as int type, seperated as comma. Default: None

`-ffc, --force_float_columns`: Force columns as float type, seperated as comma. Default: None


> **Note:**
> 1. You could pass `*` to --field_columns to match all the fields: `--field_columns=*`, `--field_columns '*'`
Expand Down
10 changes: 0 additions & 10 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,6 @@
def readme():
with open('README.md') as f:
long_description = f.read()
index = long_description.find('```\n\n> **Note:**')
long_description = long_description[:index]
long_description = long_description.replace('## Install', '**Install**')
long_description = long_description.replace('## Features', '**Features**')
long_description = long_description.replace('## Command Arguments', '**Command Arguments**')
long_description = long_description.replace('```bash', '')
long_description = long_description.replace('\n-', '\n\n-')
long_description = long_description.replace('\n-c', '-c')
long_description += '\n\nFor more info, please refer to the {0}'.format(url)

return long_description


Expand Down
2 changes: 1 addition & 1 deletion src/ExportCsvToInflux/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.1.23'
__version__ = '0.1.24'
154 changes: 111 additions & 43 deletions src/ExportCsvToInflux/exporter_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .__version__ import __version__
from .csv_object import CSVObject
from pytz import timezone
import collections
import argparse
import datetime
import csv
Expand All @@ -21,6 +22,59 @@ def __init__(self):
self.match_count = defaultdict(int)
self.filter_count = defaultdict(int)

@staticmethod
def __process_tags_fields(columns,
row,
limit_length,
int_type,
float_type,
limit_string_length_columns,
force_string_columns,
force_int_columns,
force_float_columns):
"""Private function: __process_tags_fields"""

results = dict()
for column in columns:
v = 0
if column in row:
v = row[column]
if limit_string_length_columns and column in limit_string_length_columns:
v = str(v)[:limit_length + 1]
if force_string_columns and column in force_string_columns:
v = str(v)
if force_int_columns and column in force_int_columns:
try:
v = int(v)
except ValueError:
print('Warning: Failed to force "{0}" to float, skip...'.format(v))
if force_float_columns and column in force_float_columns:
try:
v = float(v)
except ValueError:
print('Warning: Failed to force "{0}" to int, skip...'.format(v))

# If field is empty
if len(str(v)) == 0:
# Process the empty
if int_type[column] is True:
v = -999
elif float_type[column] is True:
v = -999.0
else:
v = '-'

# Process the force
if force_string_columns and column in force_string_columns:
v = '-'
if force_int_columns and column in force_int_columns:
v = -999
if force_float_columns and column in force_float_columns:
v = -999.0

results[column] = v
return results

def __check_match_and_filter(self,
row,
check_columns,
Expand Down Expand Up @@ -142,7 +196,9 @@ def export_csv_to_influx(self,
filter_by_regex=None,
enable_count_measurement=False,
force_insert_even_csv_no_update=False,
force_string_columns=None):
force_string_columns=None,
force_int_columns=None,
force_float_columns=None):
"""Function: export_csv_to_influx
:param csv_file: the csv file path/folder
Expand Down Expand Up @@ -172,6 +228,8 @@ def export_csv_to_influx(self,
:param enable_count_measurement: create the measurement with only count info (default False)
:param force_insert_even_csv_no_update: force insert data to influx even csv data no update (default False)
:param force_string_columns: force the columns as string (default None)
:param force_int_columns: force the columns as int (default None)
:param force_float_columns: force the columns as float (default None)
"""

# Init: object
Expand Down Expand Up @@ -213,6 +271,23 @@ def export_csv_to_influx(self,
force_insert_even_csv_no_update = self.__validate_bool_string(force_insert_even_csv_no_update)
force_string_columns = [] if str(force_string_columns).lower() == 'none' else force_string_columns
force_string_columns = base_object.str_to_list(force_string_columns)
force_int_columns = [] if str(force_int_columns).lower() == 'none' else force_int_columns
force_int_columns = base_object.str_to_list(force_int_columns)
force_float_columns = [] if str(force_float_columns).lower() == 'none' else force_float_columns
force_float_columns = base_object.str_to_list(force_float_columns)

# Fields should not duplicate in force_string_columns, force_int_columns, force_float_columns
all_force_columns = force_string_columns + force_int_columns + force_float_columns
duplicates = [item for item, count in collections.Counter(all_force_columns).items() if count > 1]
if duplicates:
error_message = 'Error: Find duplicate items: {0} in: \n' \
' force_string_columns: {1} \n' \
' force_int_columns: {2} \n' \
' force_float_columns: {3}'.format(duplicates,
force_string_columns,
force_int_columns,
force_float_columns)
sys.exit(error_message)

# Init: database behavior
drop_database = base_object.convert_boole(drop_database)
Expand Down Expand Up @@ -368,45 +443,26 @@ def export_csv_to_influx(self,
sys.exit(error_message)

# Process tags
tags = dict()
for tag_column in tag_columns:
v = 0
if tag_column in row:
v = row[tag_column]
if limit_string_length_columns and tag_column in limit_string_length_columns:
v = str(v)[:limit_length + 1]
if force_string_columns and tag_column in force_string_columns:
v = str(v)
# If field is empty
if len(str(v)) == 0:
if int_type[tag_column] is True:
v = -999
elif float_type[tag_column] is True:
v = -999.0
else:
v = '-'
tags[tag_column] = v
tags = self.__process_tags_fields(columns=tag_columns,
row=row,
limit_length=limit_length,
int_type=int_type,
float_type=float_type,
limit_string_length_columns=limit_string_length_columns,
force_string_columns=force_string_columns,
force_int_columns=force_int_columns,
force_float_columns=force_float_columns)

# Process fields
fields = dict()
for field_column in field_columns:
v = 0
if field_column in row:
v = row[field_column]
if limit_string_length_columns and field_column in limit_string_length_columns:
v = str(v)[:limit_length + 1]
if force_string_columns and field_column in force_string_columns:
v = str(v)

# If field is empty
if len(str(v)) == 0:
if int_type[field_column] is True:
v = -999
elif float_type[field_column] is True:
v = -999.9
else:
v = '-'
fields[field_column] = v
fields = self.__process_tags_fields(columns=field_columns,
row=row,
limit_length=limit_length,
int_type=int_type,
float_type=float_type,
limit_string_length_columns=limit_string_length_columns,
force_string_columns=force_string_columns,
force_int_columns=force_int_columns,
force_float_columns=force_float_columns)

point = {'measurement': db_measurement, 'time': timestamp, 'fields': fields, 'tags': tags}
data_points.append(point)
Expand All @@ -423,8 +479,11 @@ def export_csv_to_influx(self,
except InfluxDBClientError as e:
error_message = 'Error: System exited. Encounter data type conflict issue in influx. \n' \
' Please double check the csv data. \n' \
' If would like to force data to str, use: --force_string_columns \n' \
' {0}'.format(e)
' If would like to force data type to target data type, use: \n' \
' --force_string_columns \n' \
' --force_int_columns \n' \
' --force_float_columns \n' \
' Error Details: {0}'.format(e)
sys.exit(error_message)
if response is False:
error_message = 'Info: Problem inserting points, exiting...'
Expand All @@ -443,8 +502,11 @@ def export_csv_to_influx(self,
except InfluxDBClientError as e:
error_message = 'Error: System exited. Encounter data type conflict issue in influx. \n' \
' Please double check the csv data. \n' \
' If would like to force data to str, use: --force_string_columns \n' \
' {0}'.format(e)
' If would like to force data type to target data type, use: \n' \
' --force_string_columns \n' \
' --force_int_columns \n' \
' --force_float_columns \n' \
' Error Details: {0}'.format(e)
sys.exit(error_message)
if response is False:
error_message = 'Error: Problem inserting points, exiting...'
Expand Down Expand Up @@ -535,6 +597,10 @@ def export_csv_to_influx():
help='Force insert data to influx, even csv no update. Default: False')
parser.add_argument('-fsc', '--force_string_columns', nargs='?', default=None, const=None,
help='Force columns as string type, separated by comma. Default: None.')
parser.add_argument('-fintc', '--force_int_columns', nargs='?', default=None, const=None,
help='Force columns as int type, separated by comma. Default: None.')
parser.add_argument('-ffc', '--force_float_columns', nargs='?', default=None, const=None,
help='Force columns as float type, separated by comma. Default: None.')
parser.add_argument('-v', '--version', action="version", version=__version__)

args = parser.parse_args()
Expand Down Expand Up @@ -565,4 +631,6 @@ def export_csv_to_influx():
filter_by_regex=args.filter_by_regex,
enable_count_measurement=args.enable_count_measurement,
force_insert_even_csv_no_update=args.force_insert_even_csv_no_update,
force_string_columns=args.force_string_columns)
force_string_columns=args.force_string_columns,
force_int_columns=args.force_int_columns,
force_float_columns=args.force_float_columns)
4 changes: 4 additions & 0 deletions 中文说明.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ pip install ExportCsvToInflux

`-fsc, --force_string_columns`: 强制将列的类型转换成string类型, 使用英文逗号 ',' 作为分隔符. 默认: None

`-fintc, --force_int_columns`: 强制将列的类型转换成int类型, 使用英文逗号 ',' 作为分隔符. 默认: None

`-ffc, --force_float_columns`: 强制将列的类型转换成float类型, 使用英文逗号 ',' 作为分隔符. 默认: None

> **注意:**
> 1. --field_columns 可以使用通配符 `*`,来匹配所有的列: `--field_columns=*`, `--field_columns '*'`
> 2. 如果检查到csv文件没有更新, 数据不会重复插入到数据库. 可以使用强制插入: `--force_insert_even_csv_no_update=True`, `--force_insert_even_csv_no_update True`
Expand Down

0 comments on commit eb15d1c

Please sign in to comment.