diff --git a/events/oio_events_queue.c b/events/oio_events_queue.c index 231210ad78..d5a3b66d6b 100644 --- a/events/oio_events_queue.c +++ b/events/oio_events_queue.c @@ -204,10 +204,21 @@ oio_event__init (GString *gs, const char *type, struct oio_url_s *url) } GString* -oio_event__create (const char *type, struct oio_url_s *url) +oio_event__create(const char *type, struct oio_url_s *url) +{ + return oio_event__create_with_id(type, url, NULL); +} + +GString* +oio_event__create_with_id(const char *type, struct oio_url_s *url, + const char *request_id) { GString *gs = g_string_sized_new(512); g_string_append_c (gs, '{'); oio_event__init (gs, type, url); + if (request_id && *request_id) { + g_string_append_c(gs, ','); + oio_str_gstring_append_json_pair(gs, "request_id", request_id); + } return gs; } diff --git a/events/oio_events_queue.h b/events/oio_events_queue.h index 4c8163f7ba..bab52d6cdc 100644 --- a/events/oio_events_queue.h +++ b/events/oio_events_queue.h @@ -52,6 +52,10 @@ void oio_event__init (GString *out, const char *type, struct oio_url_s *url); GString* oio_event__create (const char *type, struct oio_url_s *url); +/* Create the base of a JSON formatted event, with the specified request ID. */ +GString* oio_event__create_with_id(const char *type, struct oio_url_s *url, + const char *request_id); + /* -------------------------------------------------------------------------- */ /* find the appropriate implementation of event queue for the configuration diff --git a/oio/blob/indexer.py b/oio/blob/indexer.py index 356a661bb5..3e5588eb07 100644 --- a/oio/blob/indexer.py +++ b/oio/blob/indexer.py @@ -23,7 +23,7 @@ from oio.rdir.client import RdirClient from oio.common.daemon import Daemon from oio.common import exceptions as exc -from oio.common.utils import get_logger, int_value, paths_gen +from oio.common.utils import get_logger, int_value, paths_gen, request_id from oio.common.green import ratelimit from oio.common.exceptions import OioNetworkException from oio.common.constants import STRLEN_CHUNKID @@ -123,10 +123,12 @@ def update_index(self, path): raise exc.FaultyChunk( 'Missing extended attribute %s' % e) data = {'mtime': int(time.time())} + headers = {'X-oio-req-id': 'blob-indexer-' + request_id()[:-13]} self.index_client.chunk_push(self.volume_id, meta['container_id'], meta['content_id'], meta['chunk_id'], + headers=headers, **data) def run(self, *args, **kwargs): diff --git a/oio/event/evob.py b/oio/event/evob.py index d9eb135052..cd087bac54 100644 --- a/oio/event/evob.py +++ b/oio/event/evob.py @@ -39,13 +39,14 @@ class Event(object): job_id = _event_env_property('job_id') event_type = _event_env_property('event') data = _event_env_property('data') + reqid = _event_env_property('request_id') when = _event_env_property('when') def __init__(self, env): self.env = env def __repr__(self): - return "Event [%s](%s)" % (self.job_id, self.event_type) + return "Event [%s,%s](%s)" % (self.job_id, self.reqid, self.event_type) class Response(object): diff --git a/oio/event/filters/volume_index.py b/oio/event/filters/volume_index.py index 0a864ee13e..02d5dd2c29 100644 --- a/oio/event/filters/volume_index.py +++ b/oio/event/filters/volume_index.py @@ -28,20 +28,24 @@ class VolumeIndexFilter(Filter): _attempts_push = 3 _attempts_delete = 3 - def _chunk_delete(self, + def _chunk_delete(self, reqid, volume_id, container_id, content_id, chunk_id): + headers = {'X-oio-req-id': reqid} try: return self.app.rdir.chunk_delete( - volume_id, container_id, content_id, chunk_id) + volume_id, container_id, content_id, chunk_id, + headers=headers) except Exception as ex: self.logger.warn("chunk delete failed: %s", ex) - def _chunk_push(self, + def _chunk_push(self, reqid, volume_id, container_id, content_id, chunk_id, args): + headers = {'X-oio-req-id': reqid} try: return self.app.rdir.chunk_push( - volume_id, container_id, content_id, chunk_id, **args) + volume_id, container_id, content_id, chunk_id, + headers=headers, **args) except Exception as ex: self.logger.warn("chunk push failed: %s", ex) @@ -56,12 +60,14 @@ def process(self, env, cb): try: if event.event_type == EventTypes.CHUNK_DELETED: self._chunk_delete( + event.reqid, volume_id, container_id, content_id, chunk_id) else: args = { 'mtime': event.when / 1000000, # seconds } self._chunk_push( + event.reqid, volume_id, container_id, content_id, chunk_id, args) except OioException as exc: resp = EventError(event=event, diff --git a/oio/rdir/client.py b/oio/rdir/client.py index 7bbfaf813f..048dcd21c1 100644 --- a/oio/rdir/client.py +++ b/oio/rdir/client.py @@ -17,7 +17,8 @@ from oio.common.exceptions import ClientException, NotFound, VolumeException from oio.common.exceptions import ServiceUnavailable, ServerException from oio.common.exceptions import OioNetworkException, OioException -from oio.common.utils import get_logger, group_chunk_errors, oio_reraise +from oio.common.utils import get_logger, group_chunk_errors, oio_reraise, \ + ensure_headers, ensure_request_id from oio.conscience.client import ConscienceClient from oio.directory.client import DirectoryClient from time import sleep @@ -263,6 +264,8 @@ def _make_uri(self, action, volume_id): rdir_host = self._get_rdir_addr(volume_id) return 'http://%s/v1/rdir/%s' % (rdir_host, action) + @ensure_headers + @ensure_request_id def _rdir_request(self, volume, method, action, create=False, **kwargs): params = {'vol': volume} if create: @@ -282,7 +285,7 @@ def create(self, volume_id, **kwargs): self._rdir_request(volume_id, 'POST', 'create', **kwargs) def chunk_push(self, volume_id, container_id, content_id, chunk_id, - **data): + headers=None, **data): """Reference a chunk in the reverse directory""" body = {'container_id': container_id, 'content_id': content_id, @@ -292,18 +295,20 @@ def chunk_push(self, volume_id, container_id, content_id, chunk_id, body[key] = value self._rdir_request(volume_id, 'POST', 'push', create=True, - json=body) + json=body, headers=headers) - def chunk_delete(self, volume_id, container_id, content_id, chunk_id): + def chunk_delete(self, volume_id, container_id, content_id, chunk_id, + **kwargs): """Unreference a chunk from the reverse directory""" body = {'container_id': container_id, 'content_id': content_id, 'chunk_id': chunk_id} - self._rdir_request(volume_id, 'DELETE', 'delete', json=body) + self._rdir_request(volume_id, 'DELETE', 'delete', + json=body, **kwargs) def chunk_fetch(self, volume, limit=100, rebuild=False, - container_id=None, max_attempts=3): + container_id=None, max_attempts=3, **kwargs): """ Fetch the list of chunks belonging to the specified volume. @@ -326,8 +331,8 @@ def chunk_fetch(self, volume, limit=100, rebuild=False, while True: for i in range(max_attempts): try: - resp, resp_body = self._rdir_request( - volume, 'POST', 'fetch', json=req_body) + _resp, resp_body = self._rdir_request( + volume, 'POST', 'fetch', json=req_body, **kwargs) break except OioNetworkException: # Monotonic backoff @@ -338,38 +343,42 @@ def chunk_fetch(self, volume, limit=100, rebuild=False, raise if len(resp_body) == 0: break + key = None for (key, value) in resp_body: container, content, chunk = key.split('|') yield container, content, chunk, value - req_body['start_after'] = key + if key is not None: + req_body['start_after'] = key - def admin_incident_set(self, volume, date): + def admin_incident_set(self, volume, date, **kwargs): body = {'date': int(float(date))} - self._rdir_request(volume, 'POST', 'admin/incident', json=body) + self._rdir_request(volume, 'POST', 'admin/incident', + json=body, **kwargs) - def admin_incident_get(self, volume): - resp, resp_body = self._rdir_request(volume, 'GET', - 'admin/incident') - return resp_body.get('date') + def admin_incident_get(self, volume, **kwargs): + _resp, body = self._rdir_request(volume, 'GET', + 'admin/incident', **kwargs) + return body.get('date') - def admin_lock(self, volume, who): + def admin_lock(self, volume, who, **kwargs): body = {'who': who} - self._rdir_request(volume, 'POST', 'admin/lock', json=body) + self._rdir_request(volume, 'POST', 'admin/lock', json=body, **kwargs) - def admin_unlock(self, volume): - self._rdir_request(volume, 'POST', 'admin/unlock') + def admin_unlock(self, volume, **kwargs): + self._rdir_request(volume, 'POST', 'admin/unlock', **kwargs) - def admin_show(self, volume): - resp, resp_body = self._rdir_request(volume, 'GET', 'admin/show') - return resp_body + def admin_show(self, volume, **kwargs): + _resp, body = self._rdir_request(volume, 'GET', 'admin/show', + **kwargs) + return body - def admin_clear(self, volume, clear_all=False): + def admin_clear(self, volume, clear_all=False, **kwargs): body = {'all': clear_all} - resp, resp_body = self._rdir_request( - volume, 'POST', 'admin/clear', json=body) + _resp, resp_body = self._rdir_request( + volume, 'POST', 'admin/clear', json=body, **kwargs) return resp_body - def status(self, volume): - resp, resp_body = self._rdir_request(volume, 'GET', 'status') - return resp_body + def status(self, volume, **kwargs): + _resp, body = self._rdir_request(volume, 'GET', 'status', **kwargs) + return body diff --git a/proxy/transport_http.c b/proxy/transport_http.c index 3f634d90a3..9861ce45fd 100644 --- a/proxy/transport_http.c +++ b/proxy/transport_http.c @@ -344,7 +344,7 @@ sender(gpointer k, gpointer v, gpointer u) static void _access_log(struct req_ctx_s *r, gint status, gsize out_len, const gchar *tail) { - if (r->access_disabled && 2 == (status / 100)) + if (r->access_disabled && 2 == (status / 100) && !GRID_DEBUG_ENABLED()) return; const char *reqid = g_tree_lookup(r->request->tree_headers, PROXYD_HEADER_REQID); diff --git a/rawx-apache2/src/rawx_event.c b/rawx-apache2/src/rawx_event.c index fd185ea013..1ed786a2ba 100644 --- a/rawx-apache2/src/rawx_event.c +++ b/rawx-apache2/src/rawx_event.c @@ -92,10 +92,12 @@ rawx_event_destroy (void) } GError * -rawx_event_send (const char *event_type, GString *data_json) +rawx_event_send(const char *event_type, const char *request_id, + GString *data_json) { if (q != NULL && th_queue != NULL) { - GString *json = oio_event__create (event_type, NULL); + GString *json = oio_event__create_with_id( + event_type, NULL, request_id); g_string_append_printf(json, ",\"data\":%.*s}", (int) data_json->len, data_json->str); oio_events_queue__send (q, g_string_free (json, FALSE)); diff --git a/rawx-apache2/src/rawx_event.h b/rawx-apache2/src/rawx_event.h index d513e6ca3d..34a11306aa 100644 --- a/rawx-apache2/src/rawx_event.h +++ b/rawx-apache2/src/rawx_event.h @@ -45,6 +45,7 @@ void rawx_event_destroy(void); * * @return NULL if OK, or a GError describing the problem */ -GError* rawx_event_send(const char *event_type, GString *data_json); +GError* rawx_event_send(const char *event_type, const char *request_id, + GString *data_json); #endif /*OIO_SDS__rawx_apache2__src__rawx_event_h*/ diff --git a/rawx-apache2/src/rawx_internals.c b/rawx-apache2/src/rawx_internals.c index de46edd753..63c1c36f38 100644 --- a/rawx-apache2/src/rawx_internals.c +++ b/rawx-apache2/src/rawx_internals.c @@ -153,8 +153,10 @@ send_chunk_event(const char *type, const dav_resource *resource) g_string_append_c(json, '}'); + const char *reqid = apr_table_get( + resource->info->request->headers_in, PROXYD_HEADER_REQID); const gint64 pre = oio_ext_monotonic_time (); - GError *err = rawx_event_send(type, json); + GError *err = rawx_event_send(type, reqid, json); const gint64 post = oio_ext_monotonic_time (); if (!err) { gint64 limit = 5 * G_TIME_SPAN_SECOND;