From acaa68ad53f4800bb437a044a50d50c7d2a2e597 Mon Sep 17 00:00:00 2001 From: Paul Aurich Date: Tue, 18 Feb 2020 21:06:41 -0800 Subject: [PATCH] avoid race when entering/leaving idle mode Use an event to wait/signal that the IDLE command has been acknowledged by receipt of a continuation line from the server, avoiding the idle_queue. When starting idle mode (idle_start), it isn't safe to assume that the results of reading from the idle queue is a continuation line, as the queue may have other content, such as: - an unsolicited server message that just happens to arrive while the IDLE command is starting - an EXISTS from a previous IDLE loop (see #30) - a 'stop_wait_server_push' from a previous IDLE loop, if the idle timeout coincided with receipt of a message from the server. Calling code should probably be draining the queue before initiating a new loop, but that wouldn't prevent an unsolicited server message from racing. Fixes #30 --- aioimaplib/aioimaplib.py | 27 ++++++++-- aioimaplib/tests/test_aioimaplib.py | 81 ++++++++++++++++++++++++++++- 2 files changed, 102 insertions(+), 6 deletions(-) diff --git a/aioimaplib/aioimaplib.py b/aioimaplib/aioimaplib.py index 3263372..bcc738a 100644 --- a/aioimaplib/aioimaplib.py +++ b/aioimaplib/aioimaplib.py @@ -318,6 +318,7 @@ def __init__(self, loop, conn_lost_cb=None): self.pending_async_commands = dict() self.pending_sync_command = None self.idle_queue = asyncio.Queue() + self._idle_event = asyncio.Event(loop=loop) self.imap_version = None self.literal_data = None self.incomplete_line = b'' @@ -426,6 +427,9 @@ def execute(self, command): else: self.pending_async_commands.pop(command.untagged_resp_name, None) raise + finally: + if command.name == 'IDLE': + self._idle_event.clear() return command.response @@ -483,6 +487,7 @@ def close(self): def idle(self): if 'IDLE' not in self.capabilities: raise Abort('server has not IDLE capability') + self._idle_event.clear() return (yield from self.execute(IdleCommand(self.new_tag(), self.idle_queue, loop=self.loop))) def has_pending_idle_command(self): @@ -605,6 +610,10 @@ def wait(self, state_regexp): with (yield from self.state_condition): yield from self.state_condition.wait_for(lambda: state_re.match(self.state)) + @asyncio.coroutine + def wait_for_idle_response(self): + yield from self._idle_event.wait() + def _untagged_response(self, line): line = line.replace('* ', '') if self.pending_sync_command is not None: @@ -651,18 +660,21 @@ def _response_done(self, line): command.close(response_text, result=response_result) def _continuation(self, line): - if self.pending_sync_command is not None and self.pending_sync_command.name == 'APPEND': + if self.pending_sync_command is None: + log.info('server says %s (ignored)' % line) + elif self.pending_sync_command.name == 'APPEND': if self.literal_data is None: Abort('asked for literal data but have no literal data to send') self.transport.write(self.literal_data) self.transport.write(CRLF) self.literal_data = None - elif self.pending_sync_command is not None: + elif self.pending_sync_command.name == 'IDLE': + log.debug('continuation line -- assuming IDLE is active : %s', line) + self._idle_event.set() + else: log.debug('continuation line appended to pending sync command %s : %s' % (self.pending_sync_command, line)) self.pending_sync_command.append_to_resp(line) self.pending_sync_command.flush() - else: - log.info('server says %s (ignored)' % line) def new_tag(self): tag = self.tagpre + str(self.tagnum) @@ -761,8 +773,13 @@ def idle_start(self, timeout=TWENTY_NINE_MINUTES): if self._idle_waiter is not None: self._idle_waiter.cancel() idle = asyncio.ensure_future(self.idle()) + wait_for_ack = asyncio.ensure_future(self.protocol.wait_for_idle_response()) + yield from asyncio.wait({idle, wait_for_ack}, return_when=asyncio.FIRST_COMPLETED) + if not self.has_pending_idle(): + wait_for_ack.cancel() + raise Abort('server returned error to IDLE command') + self._idle_waiter = self.protocol.loop.call_later(timeout, lambda: asyncio.ensure_future(self.stop_wait_server_push())) - yield from self.wait_server_push(self.timeout) # idling continuation return idle def has_pending_idle(self): diff --git a/aioimaplib/tests/test_aioimaplib.py b/aioimaplib/tests/test_aioimaplib.py index 35bf145..ab4cbeb 100644 --- a/aioimaplib/tests/test_aioimaplib.py +++ b/aioimaplib/tests/test_aioimaplib.py @@ -204,7 +204,7 @@ def test_when_idle_continuation_line_in_same_dataframe_as_status_update(self): self.imap_protocol.pending_sync_command = cmd self.imap_protocol.data_received(b'+ idling\r\n* 1 EXISTS\r\n* 1 RECENT\r\n') - self.assertEqual(['+ idling'], queue.get_nowait()) + self.assertTrue(self.imap_protocol._idle_event.is_set()) self.assertEqual(['1 EXISTS', '1 RECENT'], queue.get_nowait()) @@ -547,6 +547,18 @@ def test_idle_stop_does_nothing_if_no_pending_idle(self): self.assertFalse((yield from imap_client.stop_wait_server_push())) + @asyncio.coroutine + def test_idle_error_response(self): + imap_client = yield from self.login_user('user', 'pass', select=True) + + conn = self.imapserver.get_connection('user') + def idle_error(tag, *args): + conn.error(tag, "Error initiating IDLE") + conn.idle = idle_error + + with self.assertRaises(Abort): + yield from imap_client.idle_start() + @asyncio.coroutine def test_store_and_search_by_keyword(self): self.imapserver.receive(Mail.create(['user'])) @@ -790,6 +802,23 @@ def test_rfc2971_id(self): response = yield from imap_client.id() self.assertEqual(('OK', ['ID command completed']), response) + @asyncio.coroutine + def test_race_idle_done_and_server_push(self): + imap_client = yield from self.login_user('user', 'pass', select=True) + + idle = yield from imap_client.idle_start(2) + imap_client.idle_done() + self.imapserver.receive(Mail.create(['user'])) + yield from asyncio.wait_for(idle, 1) + + idle = yield from imap_client.idle_start(2) + imap_client.idle_done() + yield from asyncio.wait_for(idle, 1) + + r = yield from imap_client.wait_server_push() + self.assertEqual(['1 EXISTS', '1 RECENT'], r) + self.assertTrue(imap_client.protocol.idle_queue.empty()) + class TestImapServerCapabilities(AioWithImapServer, asynctest.TestCase): def setUp(self): @@ -892,6 +921,56 @@ def test_idle_start__exits_queueget_without_timeout_error(self): r = yield from asyncio.wait_for(push_task, 0) self.assertEqual(STOP_WAIT_SERVER_PUSH, r) + @asyncio.coroutine + def test_idle_start__exits_queueget_with_keepalive_without_timeout_error(self): + imap_client = yield from self.login_user('user', 'pass', select=True) + + # Idle long enough for the server to issue a keep-alive + server_idle_timeout = imapserver.ImapProtocol.IDLE_STILL_HERE_PERIOD_SECONDS + idle_timeout = server_idle_timeout + 1 + idle = yield from imap_client.idle_start(idle_timeout) + + push_task = asyncio.ensure_future(imap_client.wait_server_push(server_idle_timeout - 1)) + + # Advance time until we've received a keep-alive from server + yield from self.advance(server_idle_timeout) + + # The original push task timed out + with self.assertRaises(asyncio.TimeoutError): + yield from asyncio.wait_for(push_task, 0.1) + + # Read the keepalive from the server + r = yield from imap_client.wait_server_push(0.1) + self.assertEqual(["OK Still here"], r) + + # Advance the clock to the client timeout (idle waiter triggers) + yield from self.advance(1) + imap_client.idle_done() + + r = yield from asyncio.wait_for(idle, 1) + self.assertEqual("OK", r.result) + + self.assertFalse(imap_client.protocol._idle_event.is_set()) + + # Start another idle period + idle = yield from imap_client.idle_start(idle_timeout) + yield from self.advance(1) + + # Read 'stop_wait_server_push' + push_task = asyncio.ensure_future(imap_client.wait_server_push(0.1)) + yield from self.advance(1) + r = yield from asyncio.wait_for(push_task, None) + self.assertEqual(STOP_WAIT_SERVER_PUSH, r) + + # There shouldn't be anything left in the queue (no '+ idling') + with self.assertRaises(asyncio.TimeoutError): + push_task = asyncio.ensure_future(imap_client.wait_server_push(0.1)) + yield from self.advance(1) + yield from asyncio.wait_for(push_task, 0.1) + + imap_client.idle_done() + yield from asyncio.wait_for(idle, 1) + class TestAioimaplibCallback(AioWithImapServer, asynctest.TestCase): def setUp(self):