Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Commit

Permalink
Slow query parsing and fixes for bp recs (#67)
Browse files Browse the repository at this point in the history
* fixing sperf default recs
* several queries were getting missed by slowquery
* more tests to verify the behavior of new time logs in the slowquery log

Co-authored-by: Ryan SVIHLA <[email protected]>
  • Loading branch information
Ryan SVIHLA and foundev authored Jun 29, 2021
1 parent fd7e04e commit 4b78e0e
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 71 deletions.
2 changes: 1 addition & 1 deletion pysper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""top level module for sperf python port"""
VERSION = "0.6.11"
VERSION = "0.6.13"
75 changes: 47 additions & 28 deletions pysper/core/slowquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,51 +28,57 @@ class SlowQueryParser:

BEGIN = "begin"
begin_match = re.compile(
r" *(?P<level>[A-Z]*) *\[(?P<thread_name>[^\]]*?)[:_-]?(?P<thread_id>[0-9]*)\] (?P<date>.{10} .{12}) *(?P<source_file>[^:]*):(?P<source_line>[0-9]*) - (?P<numslow>\d+) operations were slow in the last (?P<timeslow>\d+) msecs:"
r" *(?P<level>[A-Z]*) *\[(?P<thread_name>[^\]]*?)[:_-]?(?P<thread_id>[0-9]*)\] (?P<date>.{10} .{12}) *(?P<source_file>[^:]*):(?P<source_line>[0-9]*) - (?P<numslow>\d+) operations were slow in the last (?P<threshold>\d+) msecs:"
)
slow_match_multiple = re.compile(
r"\<((?P<query>.*))\>, was slow (?P<count>\d+) times: avg\/min\/max (?P<avg>\d+)/(?P<min>\d+)\/(?P<time>\d+) msec - slow timeout (?P<threshold>\d+) msec(?P<cross>\/cross-node)?"
)
slow_match = re.compile(
r"(?P<query>.*), was slow (?P<count>\d+) times: avg/min/max (?P<avg>\d+)/(?P<min>\d+)/(?P<max>\d+) msec - slow timeout (?P<threshold>\d+) msec(?P<cross>/cross-node)?"
r"\<((?P<query>.*))\>, time (?P<time>\d+) msec - slow timeout (?P<threshold>\d+) msec(?P<cross>\/cross-node)?"
)
slow_match_single = re.compile(
r"(?P<query>.*), time (?P<time>\d+) msec - slow timeout (?P<threshold>\d+) msec(?P<cross>/cross-node)?"
fail_match_multiple = re.compile(
r"\<((?P<query>.*))\>, timed out (?P<numslow>\d+) times: avg\/min\/max (?P<avg>\d+)/(?P<min>\d+)\/(?P<time>\d+) msec -timeout (?P<threshold>\d+) msec(?P<cross>\/cross-node)?"
)
fail_match = re.compile(
r"(?P<query>.*), timed out (?P<count>\d+) times: avg/min/max (?P<avg>\d+)/(?P<min>\d+)/(?P<max>\d+) msec -timeout (?P<threshold>\d+) msec(?P<cross>/cross-node)?"
)
fail_match_single = re.compile(
r"(?P<query>.*), total time (?P<time>\d+) msec - timeout (?P<threshold>\d+) msec(?P<cross>/cross-node)?"
r"\<((?P<query>.*))\>, total time (?P<time>\d+) msec - timeout (?P<threshold>\d+) msec(?P<cross>\/cross-node)?"
)

def __init__(self):
self.state = None
begin_timed_out = re.compile(
r" *(?P<level>[A-Z]*) *\[(?P<thread_name>[^\]]*?)[:_-]?(?P<thread_id>[0-9]*)\] (?P<date>.{10} .{12}) *(?P<source_file>[^:]*):(?P<source_line>[0-9]*) - (?P<numslow>\d+) operations timed out in the last (?P<threshold>\d+) msecs:"
)
timed_out_match = re.compile(
r"\<(?P<query>.*)\>, total time (?P<time>\d+) msec, timeout (?P<threshold>\d+) msec"
)

def parse(self, logfile):
"""parses a debug log for slow queries"""
ret = OrderedDict()
for line in logfile:
if self.state is None:
m = self.begin_match.match(line)
if m:
self.state = self.BEGIN
ret["numslow"] = int(m.group("numslow"))
ret["timeslow"] = int(m.group("timeslow"))
ret["date"] = date()(m.group("date"))
continue
if self.state == self.BEGIN:
m = self.begin_match.match(line)
time_match = self.begin_timed_out.match(line)
if m:
ret["numslow"] = int(m.group("numslow"))
ret["date"] = date()(m.group("date"))
elif time_match:
ret["numslow"] = int(time_match.group("numslow"))
ret["date"] = date()(time_match.group("date"))
else:
for match in [
self.slow_match,
self.fail_match,
self.slow_match_single,
self.fail_match_single,
self.slow_match_multiple,
self.fail_match_multiple,
self.timed_out_match,
]:
m = match.match(line)
if m:
ret.update(m.groupdict())
if match in [self.fail_match, self.fail_match_single]:
if match in [self.fail_match, self.fail_match_multiple]:
ret["type"] = "fail"
elif match == self.timed_out_match:
ret["type"] = "timed_out"
else:
ret["type"] = "slow"
self.state = None
yield ret
break

Expand All @@ -90,6 +96,7 @@ def __init__(self, diag_dir, files=None, start=None, end=None):
self.start = None
self.end = None
self.cross = 0
self.timedout = 0
self.start_time = None
self.end_time = None
if start:
Expand Down Expand Up @@ -117,12 +124,14 @@ def analyze(self):
self.end = query["date"]
if query["date"] < self.start:
self.start = query["date"]
if "numslow" in query:
if "avg" in query:
for x in range(query["numslow"]):
self.querytimes[query["date"]].append(query["timeslow"])
self.querytimes[query["date"]].append(int(query["time"]))
else:
self.querytimes[query["date"]].append(query["timeslow"])
self.queries.append((query["query"], int(query["timeslow"])))
self.querytimes[query["date"]].append(int(query["time"]))
self.queries.append((query["query"], int(query["time"])))
if "type" in query and query["type"] == "timed_out":
self.timedout += 1 * int(query["numslow"])
if query["cross"] is not None:
self.cross += 1
self.analyzed = True
Expand All @@ -133,6 +142,11 @@ def print_report(self, command_name, interval=3600, top=3):
self.analyze()
print("%s version: %s" % (command_name, VERSION))
print("")
print(
"this is not a very accurate report, use it to discover basics, but I suggest analyzing the logs by hand for any outliers"
)
print("")

if not self.queries:
if self.files:
print("no queries found the files provided")
Expand All @@ -149,7 +163,12 @@ def print_report(self, command_name, interval=3600, top=3):
key=lambda t: t[0],
)
)
print(len(self.queries), "slow queries, %s cross-node" % self.cross)
print("slow query breakdown")
print("--------------------")
print(
len(self.queries),
"total, %s cross-node, %s timeouts" % (self.cross, self.timedout),
)
print()
print("Top %s slow queries:" % top)
print("-" * 30)
Expand Down
1 change: 0 additions & 1 deletion pysper/core/statuslogger.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ def analyze(self):
if not node.version:
node.version = "6.x"
if "delayed" in event and event["delayed"]:
print(event)
val = event["delayed"]
node.stages["local backpressure"][
event["pool_name"]
Expand Down
48 changes: 24 additions & 24 deletions pysper/sperf_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
"""sperf default command run when you type `sperf`"""
from collections import OrderedDict
from dataclasses import dataclass

from pysper import humanize
from pysper.core.diag import parse_diag
Expand All @@ -31,6 +32,13 @@
)


@dataclass
class StatusLoggerCounter:
delayed: int = 0
pending: int = 0
blocked: int = 0


def parse(args):
"""read diag tarball"""
res = parse_diag(args, lambda n: [calculate(n)])
Expand Down Expand Up @@ -75,11 +83,9 @@ def calculate(node_config):


def _recs_on_stages(
recommendations,
gc_over_500,
delayed_counter,
signficant_pending_counter,
signficant_blocked_counter,
recommendations: list,
gc_over_500: int,
counter: StatusLoggerCounter,
):
if gc_over_500 > 0:
recommendations.append(
Expand All @@ -88,27 +94,27 @@ def _recs_on_stages(
"rec": "Run `sperf core gc` for more analysis",
}
)
if delayed_counter > 0:
if counter.delayed > 0:
recommendations.append(
{
"issue": "There were %i incidents of local backpressure"
% delayed_counter,
% counter.delayed,
"rec": "Run `sperf core statuslogger` for more analysis",
}
)
if signficant_blocked_counter > 0:
if counter.blocked > 0:
recommendations.append(
{
"issue": "There were %i incidents of signficantly blocked stages"
% signficant_blocked_counter,
% counter.blocked,
"rec": "Run `sperf core statuslogger` for more analysis",
}
)
if signficant_pending_counter > 0:
if counter.pending > 0:
recommendations.append(
{
"issue": "There were %i incidents of signficantly pending stages"
% signficant_pending_counter,
% counter.pending,
"rec": "Run `sperf core statuslogger` for more analysis",
}
)
Expand Down Expand Up @@ -177,27 +183,25 @@ def _recs_on_configs(recommendations, configs):
recommendations.append(rec)


def _status_logger_counter(event, delayed, pending, blocked):
def _status_logger_counter(event, counter):
if "delayed" in event and event["delayed"]:
val = event["delayed"]
if val > 0:
delayed += 1
counter.delayed += 1
if "pending" in event and event["pending"]:
val = event["pending"]
if val > 100:
pending += 1
counter.pending += 1
if "blocked" in event and event["blocked"]:
val = event["blocked"]
if val > 10:
blocked += 1
counter.blocked += 1


def generate_recommendations(parsed):
"""generate recommendations off the parsed data"""
gc_over_500 = 0
delayed_counter = 0
pending_counter = 0
blocked_counter = 0
counter = StatusLoggerCounter()
event_filter = diag.UniqEventPerNodeFilter()
for rec_log in parsed["rec_logs"]:
node = util.extract_node_name(rec_log)
Expand All @@ -220,13 +224,9 @@ def generate_recommendations(parsed):
):
if event.get("duration") > 500:
gc_over_500 += 1
_status_logger_counter(
event, delayed_counter, pending_counter, blocked_counter
)
_status_logger_counter(event, counter)
recommendations = []
_recs_on_stages(
recommendations, gc_over_500, delayed_counter, pending_counter, blocked_counter
)
_recs_on_stages(recommendations, gc_over_500, counter)
_recs_on_configs(recommendations, parsed["configs"])
return recommendations

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

setup(
name="sperf",
version="0.6.11",
version="0.6.13",
description="Diagnostic utility for DSE and Cassandra",
url="https://www.github.com/DataStax-Toolkit/sperf",
scripts=["scripts/sperf"],
Expand Down
Loading

0 comments on commit 4b78e0e

Please sign in to comment.