Skip to content

Commit

Permalink
[Fix] Timestamp logic enhancment
Browse files Browse the repository at this point in the history
  • Loading branch information
Bugazelle committed Mar 12, 2022
1 parent 6d83e65 commit dcc362f
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 34 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ You could use `export_csv_to_influx -h` to see the help guide.
| 28 | `-fibr, --filter_by_regex` | No | None | Filter by regex, separated by comma |
| 29 | `-ecm, --enable_count_measurement` | No | False | Enable count measurement |
| 30 | `-fi, --force_insert_even_csv_no_update` | No | True | Force insert data to influx, even csv no update |
| 31 | `-fsc, --force_string_columns` | No | None | Force columns as string type, seperated as comma |
| 32 | `-fintc, --force_int_columns` | No | None | Force columns as int type, seperated as comma |
| 33 | `-ffc, --force_float_columns` | No | None | Force columns as float type, seperated as comma |
| 31 | `-fsc, --force_string_columns` | No | None | Force columns as string type, separated as comma |
| 32 | `-fintc, --force_int_columns` | No | None | Force columns as int type, separated as comma |
| 33 | `-ffc, --force_float_columns` | No | None | Force columns as float type, separated as comma |
| 34 | `-uniq, --unique` | No | False | Write duplicated points |
| 35 | `--csv_charset, --csv_charset` | No | None | he csv charset. Default: None, which will auto detect |
| 35 | `--csv_charset, --csv_charset` | No | None | The csv charset. Default: None, which will auto detect |

## Programmatically

Expand Down
8 changes: 7 additions & 1 deletion src/ExportCsvToInflux/csv_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def get_csv_lines_count(self, file_name):

return count

def convert_csv_data_to_int_float(self, file_name=None, csv_reader=None):
def convert_csv_data_to_int_float(self, file_name=None, csv_reader=None, ignore_filed=None):
"""Function: convert_csv_data_to_int_float
:param file_name: the file name (default None)
Expand All @@ -252,6 +252,7 @@ def convert_csv_data_to_int_float(self, file_name=None, csv_reader=None):
{'csv_header_1': 'value', 'csv_header_2': 'value', 'csv_header_3': 'value', ...},
...
]
:param ignore_filed: ignore the certain column, case sensitive
"""

# init
Expand Down Expand Up @@ -292,6 +293,11 @@ def convert_csv_data_to_int_float(self, file_name=None, csv_reader=None):
int_type[key].append(False)
float_type[key].append(False)
continue
# Continue if ignore_filed is provided
if ignore_filed is not None and ignore_filed == key:
int_type[key].append(False)
float_type[key].append(False)
continue
# Valid Int Type
try:
if float(value).is_integer():
Expand Down
68 changes: 39 additions & 29 deletions src/ExportCsvToInflux/exporter_object.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from pytz.exceptions import UnknownTimeZoneError
from .config_object import Configuration
from .influx_object import InfluxObject
from decimal import InvalidOperation
from collections import defaultdict
from .csv_object import CSVObject
from decimal import Decimal
from pytz import timezone
import datetime
import uuid
Expand Down Expand Up @@ -178,17 +180,16 @@ def __process_timestamp(self, row, conf):

try:
# raise if not posix-time-like
timestamp_str = str(float(row[conf.time_column]))
timestamp_magnitude = len(timestamp_str.split('.')[0])
timestamp_decimal = Decimal(row[conf.time_column])
timestamp_str = str(timestamp_decimal)
timestamp_remove_decimal = int(
str(timestamp_str).replace('.', '')
)
# add zeros to convert to nanoseconds
timestamp_influx = (
'{:<0' + str(9 + timestamp_magnitude) + 'd}'
).format(timestamp_remove_decimal)
# add zeros to convert to nanoseconds: influxdb time is 19 digital length
timestamp_influx = '{:<019d}'.format(timestamp_remove_decimal)
timestamp_influx = timestamp_influx[:19] # deal with length > 19 timestamp
timestamp = int(timestamp_influx)
except ValueError:
except (ValueError, InvalidOperation):
try:
datetime_naive = datetime.datetime.strptime(row[conf.time_column], conf.time_format)
datetime_local = timezone(conf.time_zone).localize(datetime_naive)
Expand Down Expand Up @@ -253,6 +254,34 @@ def __write_count_measurement(self, conf, csv_file_length, influx_version, clien
self.filter_count = defaultdict(int)
print('Info: Wrote count measurement {0} points'.format(count_point))

@staticmethod
def __no_new_data_check(csv_file_item, csv_object, conf, csv_file_md5):
"""Private function: __no_new_data_check"""

new_csv_file = '{0}_influx.csv'.format(csv_file_item.replace('.csv', ''))
new_csv_file_exists = os.path.exists(new_csv_file)
no_new_data_status = False
if new_csv_file_exists:
with csv_object.compatible_open(new_csv_file, encoding=csv_object.csv_charset) as f:
csv_reader = csv_object.compatible_dict_reader(f,
encoding=csv_object.csv_charset,
delimiter=conf.delimiter,
lineterminator=conf.lineterminator)
for row in csv_reader:
try:
new_csv_file_md5 = row['md5']
except KeyError:
break
if new_csv_file_md5 == csv_file_md5 and conf.force_insert_even_csv_no_update is False:
warning_message = 'Warning: No new data found, ' \
'writer stop/jump for {0}...'.format(csv_file_item)
print(warning_message)
no_new_data_status = True
# sys.exit(warning_message)
break

return no_new_data_status, new_csv_file

def export_csv_to_influx(self, **kwargs):
"""Function: export_csv_to_influx
Expand Down Expand Up @@ -371,27 +400,7 @@ def export_csv_to_influx(self, **kwargs):
break

# Check the timestamp, and generate the csv with checksum
new_csv_file = '{0}_influx.csv'.format(csv_file_item.replace('.csv', ''))
new_csv_file_exists = os.path.exists(new_csv_file)
no_new_data_status = False
if new_csv_file_exists:
with csv_object.compatible_open(new_csv_file, encoding=csv_object.csv_charset) as f:
csv_reader = csv_object.compatible_dict_reader(f,
encoding=csv_object.csv_charset,
delimiter=conf.delimiter,
lineterminator=conf.lineterminator)
for row in csv_reader:
try:
new_csv_file_md5 = row['md5']
except KeyError:
break
if new_csv_file_md5 == csv_file_md5 and conf.force_insert_even_csv_no_update is False:
warning_message = 'Warning: No new data found, ' \
'writer stop/jump for {0}...'.format(csv_file_item)
print(warning_message)
no_new_data_status = True
# sys.exit(warning_message)
break
no_new_data_status, new_csv_file = self.__no_new_data_check(csv_file_item, csv_object, conf, csv_file_md5)
if no_new_data_status:
continue
data = [{'md5': [csv_file_md5] * csv_file_length}]
Expand All @@ -409,7 +418,8 @@ def export_csv_to_influx(self, **kwargs):
data_points = list()
count = 0
timestamp = 0
convert_csv_data_to_int_float = csv_object.convert_csv_data_to_int_float(csv_reader=csv_reader_data)
convert_csv_data_to_int_float = csv_object.convert_csv_data_to_int_float(csv_reader=csv_reader_data,
ignore_filed=conf.time_column)
for row, int_type, float_type in convert_csv_data_to_int_float:
# Process Match & Filter: If match_columns exists and filter_columns not exists
match_status = self.__check_match_and_filter(row,
Expand Down

0 comments on commit dcc362f

Please sign in to comment.