Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transfer request ID in chunk events #1488

Merged
merged 1 commit into from
Jun 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion events/oio_events_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,21 @@ oio_event__init (GString *gs, const char *type, struct oio_url_s *url)
}

GString*
oio_event__create (const char *type, struct oio_url_s *url)
oio_event__create(const char *type, struct oio_url_s *url)
{
return oio_event__create_with_id(type, url, NULL);
}

GString*
oio_event__create_with_id(const char *type, struct oio_url_s *url,
const char *request_id)
{
GString *gs = g_string_sized_new(512);
g_string_append_c (gs, '{');
oio_event__init (gs, type, url);
if (request_id && *request_id) {
g_string_append_c(gs, ',');
oio_str_gstring_append_json_pair(gs, "request_id", request_id);
}
return gs;
}
4 changes: 4 additions & 0 deletions events/oio_events_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ void oio_event__init (GString *out, const char *type, struct oio_url_s *url);

GString* oio_event__create (const char *type, struct oio_url_s *url);

/* Create the base of a JSON formatted event, with the specified request ID. */
GString* oio_event__create_with_id(const char *type, struct oio_url_s *url,
const char *request_id);

/* -------------------------------------------------------------------------- */

/* find the appropriate implementation of event queue for the configuration
Expand Down
4 changes: 3 additions & 1 deletion oio/blob/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from oio.rdir.client import RdirClient
from oio.common.daemon import Daemon
from oio.common import exceptions as exc
from oio.common.utils import get_logger, int_value, paths_gen
from oio.common.utils import get_logger, int_value, paths_gen, request_id
from oio.common.green import ratelimit
from oio.common.exceptions import OioNetworkException
from oio.common.constants import STRLEN_CHUNKID
Expand Down Expand Up @@ -123,10 +123,12 @@ def update_index(self, path):
raise exc.FaultyChunk(
'Missing extended attribute %s' % e)
data = {'mtime': int(time.time())}
headers = {'X-oio-req-id': 'blob-indexer-' + request_id()[:-13]}
self.index_client.chunk_push(self.volume_id,
meta['container_id'],
meta['content_id'],
meta['chunk_id'],
headers=headers,
**data)

def run(self, *args, **kwargs):
Expand Down
3 changes: 2 additions & 1 deletion oio/event/evob.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ class Event(object):
job_id = _event_env_property('job_id')
event_type = _event_env_property('event')
data = _event_env_property('data')
reqid = _event_env_property('request_id')
when = _event_env_property('when')

def __init__(self, env):
self.env = env

def __repr__(self):
return "Event [%s](%s)" % (self.job_id, self.event_type)
return "Event [%s,%s](%s)" % (self.job_id, self.reqid, self.event_type)


class Response(object):
Expand Down
14 changes: 10 additions & 4 deletions oio/event/filters/volume_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,24 @@ class VolumeIndexFilter(Filter):
_attempts_push = 3
_attempts_delete = 3

def _chunk_delete(self,
def _chunk_delete(self, reqid,
volume_id, container_id, content_id, chunk_id):
headers = {'X-oio-req-id': reqid}
try:
return self.app.rdir.chunk_delete(
volume_id, container_id, content_id, chunk_id)
volume_id, container_id, content_id, chunk_id,
headers=headers)
except Exception as ex:
self.logger.warn("chunk delete failed: %s", ex)

def _chunk_push(self,
def _chunk_push(self, reqid,
volume_id, container_id, content_id, chunk_id,
args):
headers = {'X-oio-req-id': reqid}
try:
return self.app.rdir.chunk_push(
volume_id, container_id, content_id, chunk_id, **args)
volume_id, container_id, content_id, chunk_id,
headers=headers, **args)
except Exception as ex:
self.logger.warn("chunk push failed: %s", ex)

Expand All @@ -56,12 +60,14 @@ def process(self, env, cb):
try:
if event.event_type == EventTypes.CHUNK_DELETED:
self._chunk_delete(
event.reqid,
volume_id, container_id, content_id, chunk_id)
else:
args = {
'mtime': event.when / 1000000, # seconds
}
self._chunk_push(
event.reqid,
volume_id, container_id, content_id, chunk_id, args)
except OioException as exc:
resp = EventError(event=event,
Expand Down
65 changes: 37 additions & 28 deletions oio/rdir/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from oio.common.exceptions import ClientException, NotFound, VolumeException
from oio.common.exceptions import ServiceUnavailable, ServerException
from oio.common.exceptions import OioNetworkException, OioException
from oio.common.utils import get_logger, group_chunk_errors, oio_reraise
from oio.common.utils import get_logger, group_chunk_errors, oio_reraise, \
ensure_headers, ensure_request_id
from oio.conscience.client import ConscienceClient
from oio.directory.client import DirectoryClient
from time import sleep
Expand Down Expand Up @@ -263,6 +264,8 @@ def _make_uri(self, action, volume_id):
rdir_host = self._get_rdir_addr(volume_id)
return 'http://%s/v1/rdir/%s' % (rdir_host, action)

@ensure_headers
@ensure_request_id
def _rdir_request(self, volume, method, action, create=False, **kwargs):
params = {'vol': volume}
if create:
Expand All @@ -282,7 +285,7 @@ def create(self, volume_id, **kwargs):
self._rdir_request(volume_id, 'POST', 'create', **kwargs)

def chunk_push(self, volume_id, container_id, content_id, chunk_id,
**data):
headers=None, **data):
"""Reference a chunk in the reverse directory"""
body = {'container_id': container_id,
'content_id': content_id,
Expand All @@ -292,18 +295,20 @@ def chunk_push(self, volume_id, container_id, content_id, chunk_id,
body[key] = value

self._rdir_request(volume_id, 'POST', 'push', create=True,
json=body)
json=body, headers=headers)

def chunk_delete(self, volume_id, container_id, content_id, chunk_id):
def chunk_delete(self, volume_id, container_id, content_id, chunk_id,
**kwargs):
"""Unreference a chunk from the reverse directory"""
body = {'container_id': container_id,
'content_id': content_id,
'chunk_id': chunk_id}

self._rdir_request(volume_id, 'DELETE', 'delete', json=body)
self._rdir_request(volume_id, 'DELETE', 'delete',
json=body, **kwargs)

def chunk_fetch(self, volume, limit=100, rebuild=False,
container_id=None, max_attempts=3):
container_id=None, max_attempts=3, **kwargs):
"""
Fetch the list of chunks belonging to the specified volume.

Expand All @@ -326,8 +331,8 @@ def chunk_fetch(self, volume, limit=100, rebuild=False,
while True:
for i in range(max_attempts):
try:
resp, resp_body = self._rdir_request(
volume, 'POST', 'fetch', json=req_body)
_resp, resp_body = self._rdir_request(
volume, 'POST', 'fetch', json=req_body, **kwargs)
break
except OioNetworkException:
# Monotonic backoff
Expand All @@ -338,38 +343,42 @@ def chunk_fetch(self, volume, limit=100, rebuild=False,
raise
if len(resp_body) == 0:
break
key = None
for (key, value) in resp_body:
container, content, chunk = key.split('|')
yield container, content, chunk, value
req_body['start_after'] = key
if key is not None:
req_body['start_after'] = key

def admin_incident_set(self, volume, date):
def admin_incident_set(self, volume, date, **kwargs):
body = {'date': int(float(date))}
self._rdir_request(volume, 'POST', 'admin/incident', json=body)
self._rdir_request(volume, 'POST', 'admin/incident',
json=body, **kwargs)

def admin_incident_get(self, volume):
resp, resp_body = self._rdir_request(volume, 'GET',
'admin/incident')
return resp_body.get('date')
def admin_incident_get(self, volume, **kwargs):
_resp, body = self._rdir_request(volume, 'GET',
'admin/incident', **kwargs)
return body.get('date')

def admin_lock(self, volume, who):
def admin_lock(self, volume, who, **kwargs):
body = {'who': who}

self._rdir_request(volume, 'POST', 'admin/lock', json=body)
self._rdir_request(volume, 'POST', 'admin/lock', json=body, **kwargs)

def admin_unlock(self, volume):
self._rdir_request(volume, 'POST', 'admin/unlock')
def admin_unlock(self, volume, **kwargs):
self._rdir_request(volume, 'POST', 'admin/unlock', **kwargs)

def admin_show(self, volume):
resp, resp_body = self._rdir_request(volume, 'GET', 'admin/show')
return resp_body
def admin_show(self, volume, **kwargs):
_resp, body = self._rdir_request(volume, 'GET', 'admin/show',
**kwargs)
return body

def admin_clear(self, volume, clear_all=False):
def admin_clear(self, volume, clear_all=False, **kwargs):
body = {'all': clear_all}
resp, resp_body = self._rdir_request(
volume, 'POST', 'admin/clear', json=body)
_resp, resp_body = self._rdir_request(
volume, 'POST', 'admin/clear', json=body, **kwargs)
return resp_body

def status(self, volume):
resp, resp_body = self._rdir_request(volume, 'GET', 'status')
return resp_body
def status(self, volume, **kwargs):
_resp, body = self._rdir_request(volume, 'GET', 'status', **kwargs)
return body
2 changes: 1 addition & 1 deletion proxy/transport_http.c
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ sender(gpointer k, gpointer v, gpointer u)
static void
_access_log(struct req_ctx_s *r, gint status, gsize out_len, const gchar *tail)
{
if (r->access_disabled && 2 == (status / 100))
if (r->access_disabled && 2 == (status / 100) && !GRID_DEBUG_ENABLED())
return;

const char *reqid = g_tree_lookup(r->request->tree_headers, PROXYD_HEADER_REQID);
Expand Down
6 changes: 4 additions & 2 deletions rawx-apache2/src/rawx_event.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,12 @@ rawx_event_destroy (void)
}

GError *
rawx_event_send (const char *event_type, GString *data_json)
rawx_event_send(const char *event_type, const char *request_id,
GString *data_json)
{
if (q != NULL && th_queue != NULL) {
GString *json = oio_event__create (event_type, NULL);
GString *json = oio_event__create_with_id(
event_type, NULL, request_id);
g_string_append_printf(json, ",\"data\":%.*s}",
(int) data_json->len, data_json->str);
oio_events_queue__send (q, g_string_free (json, FALSE));
Expand Down
3 changes: 2 additions & 1 deletion rawx-apache2/src/rawx_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ void rawx_event_destroy(void);
*
* @return NULL if OK, or a GError describing the problem
*/
GError* rawx_event_send(const char *event_type, GString *data_json);
GError* rawx_event_send(const char *event_type, const char *request_id,
GString *data_json);

#endif /*OIO_SDS__rawx_apache2__src__rawx_event_h*/
4 changes: 3 additions & 1 deletion rawx-apache2/src/rawx_internals.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,10 @@ send_chunk_event(const char *type, const dav_resource *resource)

g_string_append_c(json, '}');

const char *reqid = apr_table_get(
resource->info->request->headers_in, PROXYD_HEADER_REQID);
const gint64 pre = oio_ext_monotonic_time ();
GError *err = rawx_event_send(type, json);
GError *err = rawx_event_send(type, reqid, json);
const gint64 post = oio_ext_monotonic_time ();
if (!err) {
gint64 limit = 5 * G_TIME_SPAN_SECOND;
Expand Down