Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reports query optimization using index hints #103

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions integration_tests/ci_reports/test_report_worker_CI.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,21 +171,25 @@ def test_get_matching_documents_different_cases(self):
docs.append(doc_test_machine)
mongodb_h.add_clean_documents(docs)

# Query faulty documents
faulty_doc_set = db_manager.get_faulty_documents(
test_machine[2], test_machine[3], test_machine[1],
test_machine[0], start_timestamp, end_timestamp)
faulty_docs_found = set()
duplicate_docs_found = set()

# member_code, subsystem_code, member_class, x_road_instance, start_time_timestamp, end_time_timestamp
matching_docs = []
for doc in db_manager.get_matching_documents(test_machine[2], test_machine[3], test_machine[1],
test_machine[0], start_timestamp, end_timestamp):

if doc['_id'] in faulty_docs_found:
if doc['_id'] in duplicate_docs_found:
continue
if doc['_id'] in faulty_doc_set:
faulty_docs_found.add(doc['_id'])
doc = ReportManager.reduce_to_plain_json(doc)
# If client and service is the same subsystem,
# then matching_docs contain two identical documents
if (
doc['serviceXRoadInstance'] == doc['clientXRoadInstance']
and doc['serviceMemberClass'] == doc['clientMemberClass']
and doc['serviceMemberCode'] == doc['clientMemberCode']
and doc['serviceSubsystemCode'] == doc['clientSubsystemCode']
):
duplicate_docs_found.add(doc['_id'])
matching_docs.append(doc)

self.assertEqual(len(matching_docs), total_ref_count)
Expand Down
116 changes: 51 additions & 65 deletions reports_module/opmon_reports/database_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,40 +66,68 @@ def get_matching_documents(self, target, start_time, end_time):
# ------------------------------------
# Query matching documents as producer
query_a = {
"producer.serviceXRoadInstance": target.xroad_instance,
"producer.serviceMemberCode": target.member_code,
"producer.serviceSubsystemCode": target.subsystem_code,
"producer.serviceMemberClass": target.member_class,
"producer.requestInTs": {"$gte": start_time, "$lte": end_time}
"query": {
"producer.serviceXRoadInstance": target.xroad_instance,
"producer.serviceMemberCode": target.member_code,
"producer.serviceSubsystemCode": target.subsystem_code,
"producer.serviceMemberClass": target.member_class,
"producer.requestInTs": {"$gte": start_time, "$lte": end_time}
},
"hint": [
("producer.serviceMemberCode", 1),
("producer.serviceSubsystemCode", 1),
("producer.requestInTs", 1)
]
}
# Query matching documents as producer from client field
query_c = {
"client.serviceXRoadInstance": target.xroad_instance,
"client.serviceMemberCode": target.member_code,
"client.serviceSubsystemCode": target.subsystem_code,
"client.serviceMemberClass": target.member_class,
"producer": None,
"client.requestInTs": {"$gte": start_time, "$lte": end_time}
"query": {
"client.serviceXRoadInstance": target.xroad_instance,
"client.serviceMemberCode": target.member_code,
"client.serviceSubsystemCode": target.subsystem_code,
"client.serviceMemberClass": target.member_class,
"producer": None,
"client.requestInTs": {"$gte": start_time, "$lte": end_time}
},
"hint": [
("client.serviceMemberCode", 1),
("client.serviceSubsystemCode", 1),
("client.requestInTs", 1)
]
}
# ------------------------------------
# Client group
# ------------------------------------
# Query matching documents as client
query_d = {
"client.clientXRoadInstance": target.xroad_instance,
"client.clientMemberCode": target.member_code,
"client.clientSubsystemCode": target.subsystem_code,
"client.clientMemberClass": target.member_class,
"client.requestInTs": {"$gte": start_time, "$lte": end_time}
"query": {
"client.clientXRoadInstance": target.xroad_instance,
"client.clientMemberCode": target.member_code,
"client.clientSubsystemCode": target.subsystem_code,
"client.clientMemberClass": target.member_class,
"client.requestInTs": {"$gte": start_time, "$lte": end_time}
},
"hint": [
("client.clientMemberCode", 1),
("client.clientSubsystemCode", 1),
("client.requestInTs", 1)
]
}
# Query matching documents as client from producer
query_b = {
"producer.clientXRoadInstance": target.xroad_instance,
"producer.clientMemberCode": target.member_code,
"producer.clientSubsystemCode": target.subsystem_code,
"producer.clientMemberClass": target.member_class,
"client": None,
"producer.requestInTs": {"$gte": start_time, "$lte": end_time}
"query": {
"producer.clientXRoadInstance": target.xroad_instance,
"producer.clientMemberCode": target.member_code,
"producer.clientSubsystemCode": target.subsystem_code,
"producer.clientMemberClass": target.member_class,
"client": None,
"producer.requestInTs": {"$gte": start_time, "$lte": end_time}
},
"hint": [
("producer.clientMemberCode", 1),
("producer.clientSubsystemCode", 1),
("producer.requestInTs", 1)
]
}

# Define projection
Expand Down Expand Up @@ -142,54 +170,12 @@ def get_matching_documents(self, target, start_time, end_time):

queries = [query_a, query_b, query_c, query_d]
for q in queries:
for doc in collection.find(q, projection):
for doc in collection.find(q["query"], projection).hint(q["hint"]):
yield doc
except Exception as e:
self.logger_m.log_error('DatabaseManager.get_matching_documents', '{0}'.format(repr(e)))
raise e

def get_faulty_documents(self, target, start_time, end_time):
try:
db = self.mongodb_handler.get_query_db()
collection = db[CLEAN_DATA_COLLECTION]

query_a = {
"producer.serviceXRoadInstance": target.xroad_instance,
"producer.serviceMemberCode": target.member_code,
"producer.serviceSubsystemCode": target.subsystem_code,
"producer.serviceMemberClass": target.member_class,
"producer.requestInTs": {"$gte": start_time, "$lte": end_time},
"client.clientXRoadInstance": target.xroad_instance,
"client.clientMemberCode": target.member_code,
"client.clientSubsystemCode": target.subsystem_code,
"client.clientMemberClass": target.member_class,
"client.requestInTs": {"$gte": start_time, "$lte": end_time}
}

query_b = {
"client.serviceXRoadInstance": target.xroad_instance,
"client.serviceMemberCode": target.member_code,
"client.serviceSubsystemCode": target.subsystem_code,
"client.serviceMemberClass": target.member_class,
"client.requestInTs": {"$gte": start_time, "$lte": end_time},
"producer.clientXRoadInstance": target.xroad_instance,
"producer.clientMemberCode": target.member_code,
"producer.clientSubsystemCode": target.subsystem_code,
"producer.clientMemberClass": target.member_class,
"producer.requestInTs": {"$gte": start_time, "$lte": end_time}
}

faulty_set = set()

for q in [query_a, query_b]:
for doc in collection.find(q, {"_id": 1}):
faulty_set.add(doc['_id'])

except Exception as e:
self.logger_m.log_error('DatabaseManager.get_matching_documents', '{0}'.format(repr(e)))
raise e
return faulty_set

def get_documents_within_time_frame(self, start_time, end_time):
"""
Get all the documents for specified time period.
Expand Down
20 changes: 11 additions & 9 deletions reports_module/opmon_reports/report_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,7 @@ def merge_document_fields(document, merged_fields, new_field_name, separator):
def get_documents(self):
report_map = dict()

faulty_doc_set = self.database_manager.get_faulty_documents(
self.target,
self.reports_arguments.start_time_milliseconds,
self.reports_arguments.end_time_milliseconds
)
faulty_docs_found = set()
duplicate_docs_found = set()

matching_docs = self.database_manager.get_matching_documents(
self.target,
Expand All @@ -154,12 +149,19 @@ def get_documents(self):

# Iterate over all the docs and append to report map
for doc in matching_docs:
if doc['_id'] in faulty_docs_found:
if doc['_id'] in duplicate_docs_found:
continue
if doc['_id'] in faulty_doc_set:
faulty_docs_found.add(doc['_id'])

doc = self.reduce_to_plain_json(doc)
# If client and service is the same subsystem,
# then matching_docs contain two identical documents
if (
doc['serviceXRoadInstance'] == doc['clientXRoadInstance']
and doc['serviceMemberClass'] == doc['clientMemberClass']
and doc['serviceMemberCode'] == doc['clientMemberCode']
and doc['serviceSubsystemCode'] == doc['clientSubsystemCode']
):
duplicate_docs_found.add(doc['_id'])

# "ps" / "pms" / "cs" / "cms"
sorted_service_type = self.get_service_type(doc)
Expand Down
Loading