forked from softwaredoug/subredis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
subredis.py
304 lines (261 loc) · 10.3 KB
/
subredis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
class NotSupportedError(NotImplementedError):
""" We intentionally do not support this feature
we may support it in the future, but it requires
pretty big thinking """
def __init__(self, operation, message):
super(NotSupportedError, self).__init__()
self.operation = operation
self.message = message
def __str__(self):
return "Unsupported Redis Operation: " + self.operation + \
"performed (" + self.message + ")"
def keyProxy(cls, fn_name):
'''Create a method called fn_name in the SubRedis instance.
This method appends a prefix to the key passed in,
and then calls down into the inner redis object'''
def fn_proxied(self, key, *args, **kwds):
fn = getattr(self.redis, fn_name)
prefixedKey = self.appendKeys(key)
return fn(prefixedKey, *args, **kwds)
setattr(cls, fn_name, fn_proxied)
return cls
def unsupportedOperation(cls, fn_name, message=''):
'''Create a method called fn_name in the SubRedis instance.
This method throws a NotSupportedError when called'''
def fn_unsupported(*args, **kwds):
raise NotSupportedError(fn_name, message)
setattr(cls, fn_name, fn_unsupported)
return cls
def directProxy(cls, fn_name):
'''Create a method called fn_name in the SubRedis instance.
This method is just a redirect into the inner redis object'''
def fn_proxied(self, key, *args, **kwds):
fn = getattr(self.redis, fn_name)
return fn(*args, **kwds)
setattr(cls, fn_name, fn_proxied)
return cls
def subredis_wrapper():
def decorator(cls):
mapping = [
('hget', keyProxy),
('append', keyProxy),
# bgrewriteaof not supported for subredis
# bgsave not supported for subredis
('bitcount', keyProxy),
# bitopt custom impl below
('blpop', keyProxy),
('brpop', keyProxy),
# brpoplpush custom impl below
('client_kill', unsupportedOperation),
('client_list', unsupportedOperation),
('client_get', unsupportedOperation),
# dbsize TODO
('debug_object', keyProxy),
('decr', keyProxy),
('delete', keyProxy), # TODO only supports deleting single key
('echo', unsupportedOperation),
('eval', unsupportedOperation),
('evalsha', unsupportedOperation),
('exists', keyProxy),
('expire', keyProxy),
('expireat', keyProxy),
# flushall TODO is possible?
# flushdb implemented below
# from_url not supported for subredi
('from_url', unsupportedOperation),
('get', keyProxy),
('getbit', keyProxy),
('getrange', keyProxy),
('getset', keyProxy),
('hdel', keyProxy),
('hexists', keyProxy),
('hget', keyProxy),
('hgetall', keyProxy),
('hincrby', keyProxy),
('hincrbyfloat', keyProxy),
('hkeys', keyProxy),
('hlen', keyProxy),
('hmget', keyProxy),
('hmset', keyProxy),
('hset', keyProxy),
('hsetnx', keyProxy),
('hvals', keyProxy),
('incr', keyProxy),
('incrbyfloat', keyProxy),
# info TODO -- maybe info that this is a subredis?
# keys -- custom implementation below
('lastsave', unsupportedOperation),
('lindex', keyProxy),
('linsert', keyProxy),
('llen', keyProxy),
('lock', keyProxy),
('lpop', keyProxy),
('lpush', keyProxy),
('lpushx', keyProxy),
('lrange', keyProxy),
('lrem', keyProxy),
('lset', keyProxy),
('ltrim', keyProxy),
('mget', keyProxy),
('move', unsupportedOperation),
# mset -- custom impl below
# msetnx -- custom impl below
# object -- custom impl below
('parse_response', unsupportedOperation),
('persist', keyProxy),
('pexpire', keyProxy),
('pexpireat', keyProxy),
('ping', directProxy),
('pttl', keyProxy),
# publish TODO pubsub possible here?
# pubsub TODO pubsub possible here?
('randomkey', unsupportedOperation),
# register_script UNSUPPORTED
# rename custom impl below
# renamenx custom impl below
('rpop', keyProxy),
# rpoplpush custom impl below
('rpush', keyProxy),
('rpushx', keyProxy),
('sadd', keyProxy),
('scard', keyProxy),
# scripting not supported
# sdiff -- custom impl below
# sdiffstore -- custom impl below
('set', keyProxy),
# set_response_callback not supported
('setbit', keyProxy),
('setex', keyProxy),
('setnx', keyProxy),
('setrange', keyProxy),
# shutdown not supported
('sinter', keyProxy),
# sinterstore custom impl below
('sismember', keyProxy),
# slaveof not supported
('smembers', keyProxy),
# smove custom impl below
# TODO sort -- last key needs to be implemented
('spop', keyProxy),
('srandmember', keyProxy),
('srem', keyProxy),
('substr', keyProxy),
('sunion', keyProxy),
# sunionstore -- custom impl below
('time', directProxy),
# TODO transaction
('ttl', keyProxy),
('type', keyProxy),
# TODO unwatch
# TODO watch
('zadd', keyProxy),
('zcard', keyProxy),
('zincrby', keyProxy),
('zinterstore', keyProxy),
('zrange', keyProxy),
('zrangebyscore', keyProxy),
('zrank', keyProxy),
('zrem', keyProxy),
('zremrangebyrank', keyProxy),
('zremrangebyscore', keyProxy),
('zrevrange', keyProxy),
('zrevrangebyscore', keyProxy),
('zrevrank', keyProxy),
('zscore', keyProxy),
]
for fn_name, wrapper in mapping:
wrapper(cls, fn_name)
return cls
return decorator
@subredis_wrapper()
class SubRedis(object):
def __init__(self, prefix, redis):
self.redis = redis
self.prefix = prefix
def appendKeys(self, key):
prefixedKey = key
if self.prefix:
prefixedKey = self.prefix + "_" + key
return prefixedKey
def flushdb(self):
""" Should only flush stuff beginning with prefix?"""
allKeys = self.redis.keys(self.appendKeys("*"))
# for some reason deleteing with a list of keys isn't working
p = self.redis.pipeline()
for key in allKeys:
p.delete(key)
p.execute()
def keys(self, pattern="*"):
""" Only run pattern matching on my values """
lenOfPrefix = len(self.appendKeys(""))
return [key[lenOfPrefix:] for key in
self.redis.keys(self.appendKeys(pattern))]
def bitop(self, operation, dest, keys):
keys = [self.appendKeys(key) for key in keys]
return self.redis.bitop(operation, dest, keys)
def brpoplpush(self, src, dest, timeout=0):
src = self.appendKeys(src)
dest = self.appendKeys(dest)
return self.redis.brpoplpush(src, dest, timeout)
def dbsize(self):
raise NotImplementedError()
def mset(self, mapping):
mapping = {self.appendKeys(key): value for key, value in mapping}
return self.redis.mset(mapping)
def msetnx(self, mapping):
mapping = {self.appendKeys(key): value for key, value in mapping}
return self.redis.msetnx(mapping)
def object(self, infotype, key):
return self.redis.object(infotype, self.appendKeys(key))
def pipeline(self, transaction=True):
return SubPipeline(self.prefix, self.redis.pipeline())
def rename(self, srcKey, destKey):
srcKey = self.appendKeys(srcKey)
destKey = self.appendKeys(destKey)
return self.redis.rename(srcKey, destKey)
def renamenx(self, srcKey, destKey):
srcKey = self.appendKeys(srcKey)
destKey = self.appendKeys(destKey)
return self.redis.renamenx(srcKey, destKey)
def rpoplpush(self, srcKey, destKey):
srcKey = self.appendKeys(srcKey)
destKey = self.appendKeys(destKey)
return self.redis.rpoplpush(srcKey, destKey)
def sdiff(self, keys, *args):
keys = [self.appendKeys(key) for key in keys]
return self.redis.sdiff(keys, *args)
def sdiffstore(self, dest, keys, *args):
dest = self.appendKeys(dest)
keys = [self.appendKeys(key) for key in keys]
return self.redis.sdiffstore(dest, keys, *args)
def sinterstore(self, dest, keys, *args):
dest = self.appendKeys(dest)
keys = [self.appendKeys(key) for key in keys]
return self.redis.sinterstore(dest, keys, *args)
def smove(self, srcKey, destKey, value):
srcKey = self.appendKeys(srcKey)
destKey = self.appendKeys(destKey)
return self.redis.smove(srcKey, destKey, value)
def sort(name, start=None, num=None, by=None, get=None,
desc=False, alpha=False, store=None):
raise NotImplementedError()
def sunionstore(self, dest, keys, *args):
dest = self.appendKeys(dest)
keys = [self.appendKeys(key) for key in keys]
return self.redis.sunionstore(dest, keys, *args)
def transaction(func, watches, **kwargs):
raise NotImplementedError()
def watch(self, names):
raise NotImplementedError()
def unwatch(self):
raise NotImplementedError()
def zunionstore(self, dest, keys, aggregate=None):
dest = self.appendKeys(dest)
keys = [self.appendKeys(key) for key in keys]
return self.redis.zunionstore(dest, keys, aggregate)
class SubPipeline(SubRedis):
def __init__(self, prefix, pipeline):
super(SubPipeline, self).__init__(prefix, pipeline)
self.pipeline = pipeline
def execute(self):
return self.pipeline.execute()