Skip to content

Commit

Permalink
tsdbwriter should work with full v3io paths (#181)
Browse files Browse the repository at this point in the history
* tsdbwriter should work with full v3io paths

* support /container/path

* align unit tests

* change import datetime

* more import fix

* move tests back to flow_integ

Co-authored-by: Dina Nimrodi <[email protected]>
  • Loading branch information
dinal and Dina Nimrodi authored Mar 11, 2021
1 parent fb0ea00 commit 58c24c8
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 20 deletions.
14 changes: 8 additions & 6 deletions integration/test_flow_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import base64
import json
import time
from _datetime import datetime, timedelta
from datetime import datetime, timedelta

import aiohttp
import pandas as pd
Expand Down Expand Up @@ -180,7 +180,8 @@ def test_write_to_v3io_stream_unbalanced(setup_stream_teardown_test):


def test_write_to_tsdb():
tsdb_path = f'tsdb_path-{int(time.time_ns() / 1000)}'
table_name = f'tsdb_path-{int(time.time_ns() / 1000)}'
tsdb_path = f'v3io://bigdata/{table_name}'
controller = build_flow([
Source(),
WriteToTSDB(path=tsdb_path, time_col='time', index_cols='node', columns=['cpu', 'disk'], rate='1/h', max_events=2)
Expand All @@ -197,15 +198,16 @@ def test_write_to_tsdb():
controller.await_termination()

client = frames.Client()
res = client.read('tsdb', tsdb_path, start='0', end='now', multi_index=True)
res = client.read('tsdb', table_name, start='0', end='now', multi_index=True)
res = res.sort_values(['time'])
df = pd.DataFrame(expected, columns=['time', 'node', 'cpu', 'disk'])
df.set_index(keys=['time', 'node'], inplace=True)
assert res.equals(df), f"result{res}\n!=\nexpected{df}"


def test_write_to_tsdb_with_metadata_label():
tsdb_path = f'tsdb_path-{int(time.time_ns() / 1000)}'
table_name = f'tsdb_path-{int(time.time_ns() / 1000)}'
tsdb_path = f'projects/{table_name}'
controller = build_flow([
Source(),
WriteToTSDB(path=tsdb_path, index_cols='node', columns=['cpu', 'disk'], rate='1/h',
Expand All @@ -222,8 +224,8 @@ def test_write_to_tsdb_with_metadata_label():
controller.terminate()
controller.await_termination()

client = frames.Client()
res = client.read('tsdb', tsdb_path, start='0', end='now', multi_index=True)
client = frames.Client(container='projects')
res = client.read('tsdb', table_name, start='0', end='now', multi_index=True)
res = res.sort_values(['time'])
df = pd.DataFrame(expected, columns=['time', 'node', 'cpu', 'disk'])
df.set_index(keys=['time', 'node'], inplace=True)
Expand Down
12 changes: 7 additions & 5 deletions storey/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import random
import uuid
from typing import Optional, Union, List, Callable
from urllib.parse import urlparse

import pandas as pd
import v3io_frames as frames
Expand Down Expand Up @@ -408,7 +409,7 @@ class WriteToTSDB(_Batching, _Writer):

def __init__(self, path: str, time_col: str = '$time', columns: Union[str, List[str], None] = None,
infer_columns_from_data: Optional[bool] = None, index_cols: Union[str, List[str], None] = None,
v3io_frames: Optional[str] = None, access_key: Optional[str] = None, container: str = "", rate: str = "", aggr: str = "",
v3io_frames: Optional[str] = None, access_key: Optional[str] = None, rate: str = "", aggr: str = "",
aggr_granularity: str = "", frames_client=None, **kwargs):
kwargs['path'] = path
kwargs['time_col'] = time_col
Expand All @@ -420,8 +421,6 @@ def __init__(self, path: str, time_col: str = '$time', columns: Union[str, List[
kwargs['index_cols'] = index_cols
if v3io_frames is not None:
kwargs['v3io_frames'] = v3io_frames
if container:
kwargs['container'] = container
if rate:
kwargs['rate'] = rate
if aggr:
Expand All @@ -435,8 +434,11 @@ def __init__(self, path: str, time_col: str = '$time', columns: Union[str, List[
index_cols = [index_cols]
new_index_cols.extend(index_cols)
_Writer.__init__(self, columns, infer_columns_from_data, index_cols=new_index_cols)

self._path = path
parts = urlparse(path)
self._path = parts.path
container = parts.netloc
if not parts.scheme:
container, self._path = _split_path(self._path)
self._rate = rate
self._aggr = aggr
self.aggr_granularity = aggr_granularity
Expand Down
18 changes: 9 additions & 9 deletions tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1746,7 +1746,7 @@ def test_write_to_tsdb():

controller = build_flow([
Source(),
WriteToTSDB(path='some/path', time_col='time', index_cols='node', columns=['cpu', 'disk'], rate='1/h',
WriteToTSDB(path='container/some/path', time_col='time', index_cols='node', columns=['cpu', 'disk'], rate='1/h',
max_events=1,
frames_client=mock_frames_client)
]).run()
Expand All @@ -1763,7 +1763,7 @@ def test_write_to_tsdb():

expected_create = (
'create', {'if_exists': 1, 'rate': '1/h', 'aggregates': '', 'aggregation_granularity': '', 'backend': 'tsdb',
'table': 'some/path'})
'table': '/some/path'})
assert mock_frames_client.call_log[0] == expected_create
i = 0
for write_call in mock_frames_client.call_log[1:]:
Expand All @@ -1773,7 +1773,7 @@ def test_write_to_tsdb():
res = write_call[1]['dfs']
assert expected.equals(res), f"result{res}\n!=\nexpected{expected}"
del write_call[1]['dfs']
assert write_call[1] == {'backend': 'tsdb', 'table': 'some/path'}
assert write_call[1] == {'backend': 'tsdb', 'table': '/some/path'}
i += 1


Expand All @@ -1782,7 +1782,7 @@ def test_write_to_tsdb_with_key_index():

controller = build_flow([
Source(),
WriteToTSDB(path='some/path', time_col='time', index_cols='node=$key', columns=['cpu', 'disk'], rate='1/h',
WriteToTSDB(path='container/some/path', time_col='time', index_cols='node=$key', columns=['cpu', 'disk'], rate='1/h',
max_events=1, frames_client=mock_frames_client)
]).run()

Expand All @@ -1798,7 +1798,7 @@ def test_write_to_tsdb_with_key_index():

expected_create = (
'create', {'if_exists': 1, 'rate': '1/h', 'aggregates': '', 'aggregation_granularity': '', 'backend': 'tsdb',
'table': 'some/path'})
'table': '/some/path'})
assert mock_frames_client.call_log[0] == expected_create
i = 0
for write_call in mock_frames_client.call_log[1:]:
Expand All @@ -1808,7 +1808,7 @@ def test_write_to_tsdb_with_key_index():
res = write_call[1]['dfs']
assert expected.equals(res), f"result{res}\n!=\nexpected{expected}"
del write_call[1]['dfs']
assert write_call[1] == {'backend': 'tsdb', 'table': 'some/path'}
assert write_call[1] == {'backend': 'tsdb', 'table': '/some/path'}
i += 1


Expand All @@ -1817,7 +1817,7 @@ def test_write_to_tsdb_with_key_index_and_default_time():

controller = build_flow([
Source(),
WriteToTSDB(path='some/path', index_cols='node=$key', columns=['cpu', 'disk'], rate='1/h',
WriteToTSDB(path='container/some/path', index_cols='node=$key', columns=['cpu', 'disk'], rate='1/h',
max_events=1, frames_client=mock_frames_client)
]).run()

Expand All @@ -1833,7 +1833,7 @@ def test_write_to_tsdb_with_key_index_and_default_time():

expected_create = (
'create', {'if_exists': 1, 'rate': '1/h', 'aggregates': '', 'aggregation_granularity': '', 'backend': 'tsdb',
'table': 'some/path'})
'table': '/some/path'})
assert mock_frames_client.call_log[0] == expected_create
i = 0
for write_call in mock_frames_client.call_log[1:]:
Expand All @@ -1843,7 +1843,7 @@ def test_write_to_tsdb_with_key_index_and_default_time():
res = write_call[1]['dfs']
assert expected.equals(res), f"result{res}\n!=\nexpected{expected}"
del write_call[1]['dfs']
assert write_call[1] == {'backend': 'tsdb', 'table': 'some/path'}
assert write_call[1] == {'backend': 'tsdb', 'table': '/some/path'}
i += 1


Expand Down

0 comments on commit 58c24c8

Please sign in to comment.