Skip to content

Commit

Permalink
Python API: improve request ID usage in ChunkReader
Browse files Browse the repository at this point in the history
Relates to #1346
  • Loading branch information
fvennetier committed Mar 6, 2018
1 parent ebc33be commit 4e46dc0
Showing 1 changed file with 23 additions and 9 deletions.
32 changes: 23 additions & 9 deletions oio/api/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,13 @@ def __init__(self, chunk_iter, buf_size, headers,
self.read_timeout = read_timeout or CHUNK_TIMEOUT
self._resp_by_chunk = dict()

@property
def reqid(self):
""":returns: the request ID or None"""
if not self.request_headers:
return None
return self.request_headers.get('X-oio-req-id')

def recover(self, nb_bytes):
"""
Recover the request.
Expand Down Expand Up @@ -254,7 +261,8 @@ def _get_request(self, chunk):
source = conn.getresponse()
source.conn = conn
except (Exception, Timeout) as error:
logger.exception('Connection failed to %s', chunk)
logger.exception('Connection failed to %s (reqid=%s)',
chunk, self.reqid)
self._resp_by_chunk[chunk["url"]] = (0, str(error))
return False

Expand All @@ -264,8 +272,8 @@ def _get_request(self, chunk):
self.sources.append((source, chunk))
return True
else:
logger.warn("Invalid response from %s: %d %s",
chunk, source.status, source.reason)
logger.warn("Invalid response from %s (reqid=%s): %d %s",
chunk, self.reqid, source.status, source.reason)
self._resp_by_chunk[chunk["url"]] = (source.status,
str(source.reason))
return False
Expand Down Expand Up @@ -298,8 +306,11 @@ def get_iter(self):
self._resp_by_chunk)

def stream(self):
# Calling that right now will make `headers` field available
# before the caller starts reading the stream
"""
Get a generator over chunk data.
After calling this method, the `headers` field will be available
(even if no data is read from the generator).
"""
parts_iter = self.get_iter()

def _iter():
Expand Down Expand Up @@ -377,8 +388,9 @@ def iter_from_resp(self, source, parts_iter, part, chunk):
new_source, new_chunk = self._get_source()
if new_source:
logger.warn(
"Failed to read from %s (%s), retrying from %s",
chunk, crto, new_chunk)
"Failed to read from %s (%s), "
"retrying from %s (reqid=%s)",
chunk, crto, new_chunk, self.reqid)
close_source(source[0])
# switch source
source[0] = new_source
Expand All @@ -393,7 +405,8 @@ def iter_from_resp(self, source, parts_iter, part, chunk):
return

else:
logger.warn("Failed to read from %s (%s)", chunk, crto)
logger.warn("Failed to read from %s (%s, reqid=%s)",
chunk, crto, self.reqid)
# no valid source found to recover
raise
else:
Expand Down Expand Up @@ -456,7 +469,8 @@ def _get_iter(self, chunk, source):
body_iter.close()

except green.ChunkReadTimeout:
logger.exception("Failure during chunk read")
logger.exception("Failure during chunk read (reqid=%s)",
self.reqid)
raise
except GeneratorExit:
pass
Expand Down

0 comments on commit 4e46dc0

Please sign in to comment.