Skip to content

Commit

Permalink
avoid race when entering/leaving idle mode
Browse files Browse the repository at this point in the history
Use an event to wait/signal that the IDLE command has been acknowledged
by receipt of a continuation line from the server, avoiding the
idle_queue.

When starting idle mode (idle_start), it isn't safe to assume that the
results of reading from the idle queue is a continuation line, as the
queue may have other content, such as:

- an unsolicited server message that just happens to arrive while the
  IDLE command is starting
- an EXISTS from a previous IDLE loop (see bamthomas#30)
- a 'stop_wait_server_push' from a previous IDLE loop, if the idle
  timeout coincided with receipt of a message from the server.

Calling code should probably be draining the queue before initiating
a new loop, but that wouldn't prevent an unsolicited server message
from racing.

Fixes bamthomas#30
  • Loading branch information
darkrain42 committed Feb 26, 2020
1 parent 497d6e8 commit acaa68a
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 6 deletions.
27 changes: 22 additions & 5 deletions aioimaplib/aioimaplib.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ def __init__(self, loop, conn_lost_cb=None):
self.pending_async_commands = dict()
self.pending_sync_command = None
self.idle_queue = asyncio.Queue()
self._idle_event = asyncio.Event(loop=loop)
self.imap_version = None
self.literal_data = None
self.incomplete_line = b''
Expand Down Expand Up @@ -426,6 +427,9 @@ def execute(self, command):
else:
self.pending_async_commands.pop(command.untagged_resp_name, None)
raise
finally:
if command.name == 'IDLE':
self._idle_event.clear()

return command.response

Expand Down Expand Up @@ -483,6 +487,7 @@ def close(self):
def idle(self):
if 'IDLE' not in self.capabilities:
raise Abort('server has not IDLE capability')
self._idle_event.clear()
return (yield from self.execute(IdleCommand(self.new_tag(), self.idle_queue, loop=self.loop)))

def has_pending_idle_command(self):
Expand Down Expand Up @@ -605,6 +610,10 @@ def wait(self, state_regexp):
with (yield from self.state_condition):
yield from self.state_condition.wait_for(lambda: state_re.match(self.state))

@asyncio.coroutine
def wait_for_idle_response(self):
yield from self._idle_event.wait()

def _untagged_response(self, line):
line = line.replace('* ', '')
if self.pending_sync_command is not None:
Expand Down Expand Up @@ -651,18 +660,21 @@ def _response_done(self, line):
command.close(response_text, result=response_result)

def _continuation(self, line):
if self.pending_sync_command is not None and self.pending_sync_command.name == 'APPEND':
if self.pending_sync_command is None:
log.info('server says %s (ignored)' % line)
elif self.pending_sync_command.name == 'APPEND':
if self.literal_data is None:
Abort('asked for literal data but have no literal data to send')
self.transport.write(self.literal_data)
self.transport.write(CRLF)
self.literal_data = None
elif self.pending_sync_command is not None:
elif self.pending_sync_command.name == 'IDLE':
log.debug('continuation line -- assuming IDLE is active : %s', line)
self._idle_event.set()
else:
log.debug('continuation line appended to pending sync command %s : %s' % (self.pending_sync_command, line))
self.pending_sync_command.append_to_resp(line)
self.pending_sync_command.flush()
else:
log.info('server says %s (ignored)' % line)

def new_tag(self):
tag = self.tagpre + str(self.tagnum)
Expand Down Expand Up @@ -761,8 +773,13 @@ def idle_start(self, timeout=TWENTY_NINE_MINUTES):
if self._idle_waiter is not None:
self._idle_waiter.cancel()
idle = asyncio.ensure_future(self.idle())
wait_for_ack = asyncio.ensure_future(self.protocol.wait_for_idle_response())
yield from asyncio.wait({idle, wait_for_ack}, return_when=asyncio.FIRST_COMPLETED)
if not self.has_pending_idle():
wait_for_ack.cancel()
raise Abort('server returned error to IDLE command')

self._idle_waiter = self.protocol.loop.call_later(timeout, lambda: asyncio.ensure_future(self.stop_wait_server_push()))
yield from self.wait_server_push(self.timeout) # idling continuation
return idle

def has_pending_idle(self):
Expand Down
81 changes: 80 additions & 1 deletion aioimaplib/tests/test_aioimaplib.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def test_when_idle_continuation_line_in_same_dataframe_as_status_update(self):
self.imap_protocol.pending_sync_command = cmd
self.imap_protocol.data_received(b'+ idling\r\n* 1 EXISTS\r\n* 1 RECENT\r\n')

self.assertEqual(['+ idling'], queue.get_nowait())
self.assertTrue(self.imap_protocol._idle_event.is_set())
self.assertEqual(['1 EXISTS', '1 RECENT'], queue.get_nowait())


Expand Down Expand Up @@ -547,6 +547,18 @@ def test_idle_stop_does_nothing_if_no_pending_idle(self):

self.assertFalse((yield from imap_client.stop_wait_server_push()))

@asyncio.coroutine
def test_idle_error_response(self):
imap_client = yield from self.login_user('user', 'pass', select=True)

conn = self.imapserver.get_connection('user')
def idle_error(tag, *args):
conn.error(tag, "Error initiating IDLE")
conn.idle = idle_error

with self.assertRaises(Abort):
yield from imap_client.idle_start()

@asyncio.coroutine
def test_store_and_search_by_keyword(self):
self.imapserver.receive(Mail.create(['user']))
Expand Down Expand Up @@ -790,6 +802,23 @@ def test_rfc2971_id(self):
response = yield from imap_client.id()
self.assertEqual(('OK', ['ID command completed']), response)

@asyncio.coroutine
def test_race_idle_done_and_server_push(self):
imap_client = yield from self.login_user('user', 'pass', select=True)

idle = yield from imap_client.idle_start(2)
imap_client.idle_done()
self.imapserver.receive(Mail.create(['user']))
yield from asyncio.wait_for(idle, 1)

idle = yield from imap_client.idle_start(2)
imap_client.idle_done()
yield from asyncio.wait_for(idle, 1)

r = yield from imap_client.wait_server_push()
self.assertEqual(['1 EXISTS', '1 RECENT'], r)
self.assertTrue(imap_client.protocol.idle_queue.empty())


class TestImapServerCapabilities(AioWithImapServer, asynctest.TestCase):
def setUp(self):
Expand Down Expand Up @@ -892,6 +921,56 @@ def test_idle_start__exits_queueget_without_timeout_error(self):
r = yield from asyncio.wait_for(push_task, 0)
self.assertEqual(STOP_WAIT_SERVER_PUSH, r)

@asyncio.coroutine
def test_idle_start__exits_queueget_with_keepalive_without_timeout_error(self):
imap_client = yield from self.login_user('user', 'pass', select=True)

# Idle long enough for the server to issue a keep-alive
server_idle_timeout = imapserver.ImapProtocol.IDLE_STILL_HERE_PERIOD_SECONDS
idle_timeout = server_idle_timeout + 1
idle = yield from imap_client.idle_start(idle_timeout)

push_task = asyncio.ensure_future(imap_client.wait_server_push(server_idle_timeout - 1))

# Advance time until we've received a keep-alive from server
yield from self.advance(server_idle_timeout)

# The original push task timed out
with self.assertRaises(asyncio.TimeoutError):
yield from asyncio.wait_for(push_task, 0.1)

# Read the keepalive from the server
r = yield from imap_client.wait_server_push(0.1)
self.assertEqual(["OK Still here"], r)

# Advance the clock to the client timeout (idle waiter triggers)
yield from self.advance(1)
imap_client.idle_done()

r = yield from asyncio.wait_for(idle, 1)
self.assertEqual("OK", r.result)

self.assertFalse(imap_client.protocol._idle_event.is_set())

# Start another idle period
idle = yield from imap_client.idle_start(idle_timeout)
yield from self.advance(1)

# Read 'stop_wait_server_push'
push_task = asyncio.ensure_future(imap_client.wait_server_push(0.1))
yield from self.advance(1)
r = yield from asyncio.wait_for(push_task, None)
self.assertEqual(STOP_WAIT_SERVER_PUSH, r)

# There shouldn't be anything left in the queue (no '+ idling')
with self.assertRaises(asyncio.TimeoutError):
push_task = asyncio.ensure_future(imap_client.wait_server_push(0.1))
yield from self.advance(1)
yield from asyncio.wait_for(push_task, 0.1)

imap_client.idle_done()
yield from asyncio.wait_for(idle, 1)


class TestAioimaplibCallback(AioWithImapServer, asynctest.TestCase):
def setUp(self):
Expand Down

0 comments on commit acaa68a

Please sign in to comment.