-
Notifications
You must be signed in to change notification settings - Fork 140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a Fair lock recipe? #115
base: master
Are you sure you want to change the base?
Conversation
The tricky part is what to do when one of the cache key expires but not both of them. Maybe there needs to be a single cache key to track tickets and serving. It still seems the cache key could expire while someone is holding the lock. I'm not sure how they later discover that the lock was lost. I'm sure there's research on this problem. Need to look there rather than re-implementing. |
Brainstorming, came up with: class FairRLock:
"""Recipe for cross-process and cross-thread re-entrant lock.
Assumes the key will not be evicted. Set the eviction policy to 'none' on
the cache to guarantee the key is not evicted.
>>> import diskcache
>>> cache = diskcache.Cache()
>>> rlock = FairRLock(cache, 'user-123')
>>> rlock.acquire()
>>> rlock.acquire()
>>> rlock.release()
>>> with rlock:
... pass
>>> rlock.release()
"""
def __init__(self, cache, key, expire=None, tag=None):
self._cache = cache
self._key = key
self._expire = expire
self._tag = tag
value = {
'tickets': 0,
'current': 1,
'pid_tid': '',
'time': 0,
'count': 0,
}
self._cache.add(self._key, value, tag=self._tag)
def _acquire_ticket(self):
with self._cache.transact():
state = self._cache.get(self._key)
state['tickets'] += 1
self._cache.set(self._key, state, tag=self._tag)
return state['tickets']
def _release_ticket(self, state):
state['current'] += 1
state['pid_tid'] = ''
state['time'] = 0
state['count'] = 0
self._cache.set(self._key, state, tag=self._tag)
def _get_pid_tid(self):
pid_tid = f'{os.getpid()}-{threading.get_ident()}'
return pid_tid
def acquire(self):
pid_tid = self._get_pid_tid()
ticket = self._acquire_ticket()
while True:
with self._cache.transact():
state = self._cache[self._key]
if state['current'] == ticket:
state['pid_tid'] = pid_tid
state['time'] = time.monotonic()
state['count'] += 1
self._cache.set(self._key, state, tag=self._tag)
break
expired = (
self._expire is not None
and time.monotonic() - state['time'] > self._expire
)
if expired:
self._release_ticket(state)
time.sleep(0.01)
return ticket
def release(self, ticket):
pid_tid = self._get_pid_tid()
with self._cache.transact():
state = self._cache.get(self._key)
if pid_tid != state['pid_tid']:
return
if state['count'] > 1:
state['time'] = time.monotonic()
state['count'] -= 1
self._cache.set(self._key, state, tag=self._tag)
assert state['count'] == 1
self._release_ticket(state)
def __enter__(self):
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release() But I think this'll break because the current ticket number is only incremented by release(). If one of the release() calls doesn't happen, then everything will be stuck. |
Change the code such that if the lock expires then it is immediately reacquired by another thread or process in the acquire() method rather than by first releasing it. Update the release ticket code to set time correctly. This will allow the acquire method to skip tickets if they are not acquired after a long time. |
When first acquiring a ticket, if the pid_tid already holds the lock, then don't increment the tickets count. |
Some prototyping for a fair lock recipe.