Skip to content

Commit

Permalink
[Feat] _influx.csv file could be ignored
Browse files Browse the repository at this point in the history
  • Loading branch information
Bugazelle committed Feb 20, 2020
1 parent 72d6a46 commit 0d1710c
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 95 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ You could use `export_csv_to_influx -h` to see the help guide.
> **Note:**
> 1. You could pass `*` to --field_columns to match all the fields: `--field_columns=*`, `--field_columns '*'`
> 2. CSV data won't insert into influx again if no update. Use to force insert: `--force_insert_even_csv_no_update=True`, `--force_insert_even_csv_no_update True`
> 3. If some csv cells have no value, auto fill the influx db based on column data type: `int: -999`, `float: -999.0`, `string: -`
## Programmatically

Expand Down
2 changes: 1 addition & 1 deletion src/ExportCsvToInflux/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.1.22'
__version__ = '0.1.23'
236 changes: 149 additions & 87 deletions src/ExportCsvToInflux/csv_object.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from collections import defaultdict
from .base_object import BaseObject
from itertools import tee
from glob import glob
import hashlib
import types
import time
import json
import csv
Expand Down Expand Up @@ -141,88 +143,135 @@ def get_csv_lines_count(self, file_name):

return count

def convert_csv_data_to_int_float(self, file_name):
def convert_csv_data_to_int_float(self, file_name=None, csv_reader=None):
"""Function: convert_csv_data_to_int_float
:param file_name: the file name
:param file_name: the file name (default None)
:param csv_reader: the csv dict reader (default None)
The csv_reader could come from 2 ways:
1. use csv.DictReader to get the csv_reader object
2. use dict to make up the csv_reader, the dict format is as following
[
{'csv_header_1': 'value', 'csv_header_2': 'value', 'csv_header_3': 'value', ...},
{'csv_header_1': 'value', 'csv_header_2': 'value', 'csv_header_3': 'value', ...},
{'csv_header_1': 'value', 'csv_header_2': 'value', 'csv_header_3': 'value', ...},
...
]
"""

has_header = self.get_csv_header(file_name)

with open(file_name) as f:
# init
int_type = defaultdict(list)
float_type = defaultdict(list)
keys = list()
csv_reader = list() if csv_reader is None else csv_reader
csv_reader_bk = csv_reader
has_header = True

# Verify the csv_reader
csv_reader_type = type(csv_reader)
is_generator_type = isinstance(csv_reader, types.GeneratorType)
if csv_reader_type != list and csv_reader_type != csv.DictReader and not is_generator_type:
error_message = 'Error: The csv_reader type is not expected: {0}, ' \
'should list type or csv.DictReader'.format(csv_reader_type)
sys.exit(error_message)
if is_generator_type:
csv_reader, csv_reader_bk = tee(csv_reader)

# Get csv_reader from csv file
f = None
if file_name:
has_header = self.get_csv_header(file_name)
f = open(file_name)
csv_reader = csv.DictReader(f, delimiter=self.delimiter, lineterminator=self.lineterminator)
int_type = defaultdict(list)
float_type = defaultdict(list)
keys = list()
for row in csv_reader:
keys = row.keys()
for key in keys:
value = row[key]
# Valid Int Type
try:
if float(value).is_integer():
int_type[key].append(True)
else:
int_type[key].append(False)
except ValueError:
csv_reader, csv_reader_bk = tee(csv_reader)

# Process
for row in csv_reader:
keys = row.keys()
for key in keys:
value = row[key]
len_value = len(value)
# Continue If Value Empty
if len_value == 0:
continue
# Valid Int Type
try:
if float(value).is_integer():
int_type[key].append(True)
else:
int_type[key].append(False)
# Valid Float Type
try:
float(value)
float_type[key].append(True)
except ValueError:
float_type[key].append(False)

# Valid the key if no header
if keys and not has_header:
for key in keys:
# Valid Int Type
try:
if float(key).is_integer():
int_type[key].append(True)
else:
int_type[key].append(False)
except ValueError:
except ValueError:
int_type[key].append(False)
# Valid Float Type
try:
float(value)
float_type[key].append(True)
except ValueError:
float_type[key].append(False)

# Valid the key if no header
if keys and not has_header:
for key in keys:
len_key = len(key)
# Continue If Key Empty
if len_key == 0:
continue
# Valid Int Type
try:
if float(key).is_integer():
int_type[key].append(True)
else:
int_type[key].append(False)
# Valid Float Type
try:
float(key)
float_type[key].append(True)
except ValueError:
float_type[key].append(False)

# Finalize Type
int_type = {k: all(int_type[k]) for k in int_type}
float_type = {k: all(float_type[k]) for k in float_type}

# Yield Data
f.seek(0)
csv_reader = csv.DictReader(f, delimiter=self.delimiter, lineterminator=self.lineterminator)
i = 1
for row in csv_reader:
keys = row.keys()
except ValueError:
int_type[key].append(False)
# Valid Float Type
try:
float(key)
float_type[key].append(True)
except ValueError:
float_type[key].append(False)

# Finalize Type
int_type = {k: all(int_type[k]) for k in int_type}
float_type = {k: all(float_type[k]) for k in float_type}

# Yield Data
i = 1
for row in csv_reader_bk:
keys = row.keys()
for key in keys:
value = row[key]
int_status = int_type[key]
len_value = len(value)
if len_value == 0:
continue
if int_status is True:
row[key] = int(float(value)) if int_type[key] is True else value
else:
row[key] = float(value) if float_type[key] is True else value
yield row, int_type, float_type
if not has_header and i == 1:
for key in keys:
value = row[key]
int_status = int_type[key]
len_key = len(key)
if len_key == 0:
continue
if int_status is True:
row[key] = int(float(value)) if int_type[key] is True else value
row[key] = int(float(key)) if int_type[key] is True else key
else:
row[key] = float(value) if float_type[key] is True else value
row[key] = float(key) if float_type[key] is True else key
yield row, int_type, float_type
if not has_header and i == 1:
for key in keys:
int_status = int_type[key]
if int_status is True:
row[key] = int(float(key)) if int_type[key] is True else key
else:
row[key] = float(key) if float_type[key] is True else key
yield row, int_type, float_type
i += 1
i += 1

# Close file
if file_name:
f.close()

def add_columns_to_csv(self,
file_name,
target,
data):
data,
save_csv_file=True):
"""Function: add_columns_to_csv
:param file_name: the file name
Expand All @@ -231,6 +280,8 @@ def add_columns_to_csv(self,
for example: [{"new_header_1": ["new_value_1", "new_value_2", "new_value_3"]},
{"new_header_2": ["new_value_1", "new_value_2", "new_value_3"]}
]
:param save_csv_file: save csv file to local (default True)
:return return the new csv data by dict
"""

has_header = self.get_csv_header(file_name)
Expand Down Expand Up @@ -258,29 +309,40 @@ def add_columns_to_csv(self,
sys.exit(error_message)

# Add columns
target_writer = None
target_file = None
if save_csv_file:
target_file = open(target, 'w+')
target_writer = csv.writer(target_file, delimiter=self.delimiter, lineterminator=self.lineterminator)

with open(file_name) as f:
source_reader = csv.DictReader(f, delimiter=self.delimiter, lineterminator=self.lineterminator)
new_headers = [list(x.keys())[0] for x in data]
with open(target, 'w+') as target_file:
target_writer = csv.writer(target_file, delimiter=self.delimiter, lineterminator=self.lineterminator)
row_id = 0
for row in source_reader:
values = list(row.values())
if row_id == 0:
headers = list(row.keys())
if not has_header:
continue
headers += new_headers
row_id = 0
for row in source_reader:
values = list(row.values())
if row_id == 0:
headers = list(row.keys())
if not has_header:
continue
headers += new_headers
if save_csv_file:
target_writer.writerow(headers)
new_values = list()
for x in data:
try:
value = list(x.values())[0][row_id]
except IndexError:
print('Warning: The provided column length is less than with the source csv length. '
'Use "null" to fill the empty data')
value = 'null'
new_values.append(value)
values += new_values
row_id += 1
new_values = list()
for x in data:
try:
value = list(x.values())[0][row_id]
except IndexError:
print('Warning: The provided column length is less than with the source csv length. '
'Use "null" to fill the empty data')
value = 'null'
new_values.append(value)
values += new_values
row_id += 1
if save_csv_file:
target_writer.writerow(values)

yield dict(zip(headers, values))

if save_csv_file:
target_file.close()
19 changes: 12 additions & 7 deletions src/ExportCsvToInflux/exporter_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,16 @@ def export_csv_to_influx(self,
field_columns.append('timestamp')
tag_columns.append('timestamp')
data.append({time_column: [modified_time] * csv_file_length})
csv_object.add_columns_to_csv(file_name=csv_file_item, target=new_csv_file, data=data)
csv_reader_data = csv_object.add_columns_to_csv(file_name=csv_file_item,
target=new_csv_file,
data=data,
save_csv_file=not force_insert_even_csv_no_update)

# Open influx csv
# Process influx csv
data_points = list()
count = 0
timestamp = 0
convert_csv_data_to_int_float = csv_object.convert_csv_data_to_int_float(file_name=new_csv_file)
convert_csv_data_to_int_float = csv_object.convert_csv_data_to_int_float(csv_reader=csv_reader_data)
for row, int_type, float_type in convert_csv_data_to_int_float:
# Process Match & Filter: If match_columns exists and filter_columns not exists
match_status = self.__check_match_and_filter(row,
Expand Down Expand Up @@ -418,8 +421,9 @@ def export_csv_to_influx(self,
try:
response = client.write_points(data_points)
except InfluxDBClientError as e:
error_message = 'Error: System exited. Please double check the csv data. \n' \
' It is not the same type as the current date type in influx. \n' \
error_message = 'Error: System exited. Encounter data type conflict issue in influx. \n' \
' Please double check the csv data. \n' \
' If would like to force data to str, use: --force_string_columns \n' \
' {0}'.format(e)
sys.exit(error_message)
if response is False:
Expand All @@ -437,8 +441,9 @@ def export_csv_to_influx(self,
try:
response = client.write_points(data_points)
except InfluxDBClientError as e:
error_message = 'Error: System exited. Please double check the csv data. \n' \
' It is not the same type as the current date type in influx. \n' \
error_message = 'Error: System exited. Encounter data type conflict issue in influx. \n' \
' Please double check the csv data. \n' \
' If would like to force data to str, use: --force_string_columns \n' \
' {0}'.format(e)
sys.exit(error_message)
if response is False:
Expand Down
1 change: 1 addition & 0 deletions 中文说明.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pip install ExportCsvToInflux
> **注意:**
> 1. --field_columns 可以使用通配符 `*`,来匹配所有的列: `--field_columns=*`, `--field_columns '*'`
> 2. 如果检查到csv文件没有更新, 数据不会重复插入到数据库. 可以使用强制插入: `--force_insert_even_csv_no_update=True`, `--force_insert_even_csv_no_update True`
> 3. 如果csv中某些单元格的数据为空, 则自动根据列的数据类型,添加下面数据到Influx: `int: -999`, `float: -999.0`, `string: -`
## 使用代码调用

Expand Down

0 comments on commit 0d1710c

Please sign in to comment.