Skip to content

Commit

Permalink
iobeam: fix tracking of batches so more than one send() call works
Browse files Browse the repository at this point in the history
  • Loading branch information
RobAtticus committed Dec 18, 2015
1 parent 1802b87 commit d5cb181
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 7 deletions.
16 changes: 12 additions & 4 deletions iobeam/iobeam.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ def addDataStore(self, store):

def _convertDataSetToBatches(self):
dataset = self._dataset
batches = []
for name in dataset:
batch = data.DataStore([name])
for point in dataset[name]:
Expand All @@ -330,8 +331,9 @@ def _convertDataSetToBatches(self):
row = {}
row[name] = asDict["value"]
batch.add(ts, row)
self._batches.append(batch)
batches.append(batch)
self._dataset = {}
return batches

def send(self):
"""Sends stored data to the iobeam backend.
Expand All @@ -342,15 +344,21 @@ def send(self):
self._checkToken()
pid = self.projectId
did = self._activeDevice.deviceId
self._convertDataSetToBatches()
tempBatches = self._convertDataSetToBatches()

i = 0
for b in list(self._batches):
success, extra = self._importService.importBatch(pid, did, b)
if not success:
raise Exception("send failed. server sent: {}".format(extra))
else:
self._batches.remove(b)
b.clear()

# temp batches are not saved between calls; re-made each time
for b in tempBatches:
success, extra = self._importService.importBatch(pid, did, b)
if not success:
raise Exception("send failed. server sent: {}".format(extra))


@staticmethod
def query(token, qry, backend=None):
Expand Down
4 changes: 4 additions & 0 deletions iobeam/resources/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ def __init__(self, columns):
self._columns = list(columns) # defensive copy
self._rows = []

def clear(self):
"""Remove all data rows."""
del self._rows[:]

def add(self, timestamp, dataDict):
"""Add row of data at a given timestamp.
Expand Down
30 changes: 27 additions & 3 deletions tests/test_iobeam.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,10 @@ def test_convertDataSetToBatches(self):
self.assertEqual(1, len(client._dataset))
self.assertEqual(0, len(client._batches))

client._convertDataSetToBatches()
batches = client._convertDataSetToBatches()
self.assertEqual(0, len(client._dataset))
self.assertEqual(1, len(client._batches))
for b in client._batches:
self.assertEqual(1, len(batches))
for b in batches:
self.assertEqual(1, len(b.columns()))
self.assertEqual(series, b.columns()[0])
self.assertEqual(2, len(b))
Expand All @@ -351,6 +351,30 @@ def test_send(self):
client.addDataPoint(series, dp)

client.send()
self.assertEqual(0, len(client._batches))
self.assertEqual(0, len(client._dataset))
self.assertEqual(1, dummy.calls)
self.assertTrue(dummy.lastUrl.endswith("/imports"))
self.assertEqual(1, dummy.lastJson["project_id"])
self.assertEqual("fake", dummy.lastJson["device_id"])
# 2: 'fields' and 'data', batch format
self.assertEqual(2, len(dummy.lastJson["sources"]))

def test_sendWithBatch(self):
dummy = DummyBackend()
backend = request.DummyRequester(dummy)
client = self._makeTempClient(backend=backend, deviceId="fake")
client._checkToken = checkTokenNone

temp = client.createDataStore(["test"])
temp.add(iobeam.Timestamp(0), {"test": 0})
temp.add(iobeam.Timestamp(1), {"test": 11})
temp.add(iobeam.Timestamp(2), {"test": 28})
self.assertEqual(3, len(temp))

client.send()
self.assertEqual(0, len(temp))
self.assertEqual(1, len(client._batches))
self.assertEqual(0, len(client._dataset))
self.assertEqual(1, dummy.calls)
self.assertTrue(dummy.lastUrl.endswith("/imports"))
Expand Down

0 comments on commit d5cb181

Please sign in to comment.