Skip to content

Commit

Permalink
Update #17 : refactored pipelining. Works.
Browse files Browse the repository at this point in the history
  • Loading branch information
Theo Crevon committed Oct 12, 2012
1 parent 1df19e5 commit 5fc8c5d
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 14 deletions.
3 changes: 2 additions & 1 deletion pyelevator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import zmq

from .constants import FAILURE_STATUS
from .message import Request, Response
from .error import ELEVATOR_ERROR, TimeoutError
from .utils.snippets import sec_to_ms, ms_to_sec
Expand Down Expand Up @@ -80,7 +81,7 @@ def send(self, db_uid, command, arguments, *args, **kwargs):
try:
response = Response(self.socket.recv_multipart()[0])

if response.error is not None:
if response.status == FAILURE_STATUS:
raise ELEVATOR_ERROR[response.error['code']](response.error['msg'])
except zmq.core.error.ZMQError:
# Restore original timeout and raise
Expand Down
7 changes: 5 additions & 2 deletions pyelevator/message.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import msgpack
import logging

from .constants import FAILURE_STATUS
from .constants import FAILURE_STATUS, WARNING_STATUS


class MessageFormatError(Exception):
Expand Down Expand Up @@ -44,8 +44,11 @@ def datas(self):
return self._datas

def _handle_failures(self):
if self.status == FAILURE_STATUS:
if self.status in (FAILURE_STATUS, WARNING_STATUS):
self.error = {
'code': int(self.datas[0]),
'msg': self.datas[1],
}

if self.status == WARNING_STATUS:
self._datas = self.datas[2:]
36 changes: 26 additions & 10 deletions pyelevator/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from __future__ import absolute_import

from collections import deque

from .base import Client


Expand All @@ -10,22 +8,40 @@ def __init__(self, *args, **kwargs):
super(Pipeline, self).__init__(*args, **kwargs)
self.queue = []

def _action_request(self, command, arguments):
def _action_request(self, command, *args):
return {
'COMMAND': command,
'ARGS': arguments,
'ARGS': args,
}

def add(self, command, arguments):
self.queue.append(self._action_request(command, arguments))
def Get(self, key, *args, **kwargs):
self.queue.append(self._action_request("GET", key))

def MGet(self, keys, *args, **kwargs):
self.queue.append(self.action_request("MGET", keys))

def Put(self, key, value, *args, **kwargs):
self.queue.append(self._action_request("PUT", key, value))

def Delete(self, key, *args, **kwargs):
self.queue.append(self._action_request("DELETE", key))

def Range(self, start=None, limit=None, *args, **kwargs):
self.queue.append(self._action_request("RANGE", start, limit))

def Slice(self, key_from=None, offset=None, *args, **kwargs):
self.queue.append(self._action_request("SLICE", key_from, offset))

def Batch(self, batch, *args, **kwargs):
self.queue.append(self._action_request("BATCH", batch.container))

def pop(self):
self.queue.pop()
return self.queue.pop()

def clear(self):
self.queue = []

def push(self, *args, **kwargs):
datas = self.send(self.db_uid, 'PIPELINE', list(self.queue), *args, **kwargs)
self.queue = []
def execute(self, *args, **kwargs):
datas = self.send(self.db_uid, 'PIPELINE', [self.queue, ], *args, **kwargs)
self.clear()
return datas
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
package_dir={'': '.'},

install_requires=[
'pyzmq>=2.1.11',
'msgpack-python'
],
)

0 comments on commit 5fc8c5d

Please sign in to comment.