Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(transactions): clean up post_process transactions areas #81091

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions src/sentry/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@
)
from sentry.grouping.variants import BaseVariant
from sentry.ingest.inbound_filters import FilterStatKeys
from sentry.ingest.transaction_clusterer.datasource.redis import (
record_transaction_name as record_transaction_name_for_clustering,
)
from sentry.integrations.tasks.kick_off_status_syncs import kick_off_status_syncs
from sentry.issues.grouptype import ErrorGroupType
from sentry.issues.issue_occurrence import IssueOccurrence
Expand Down Expand Up @@ -107,6 +110,7 @@
first_insight_span_received,
first_transaction_received,
issue_unresolved,
transaction_processed,
)
from sentry.tasks.process_buffer import buffer_incr
from sentry.tasks.relay import schedule_invalidate_project_config
Expand Down Expand Up @@ -2512,6 +2516,25 @@ def _detect_performance_problems(
)


@sentry_sdk.tracing.trace
def _sample_transactions_in_save(jobs: Sequence[Job], projects: ProjectsMapping) -> None:
for job in jobs:
project = job["event"].project
record_transaction_name_for_clustering(project, job["event"].data)


@sentry_sdk.tracing.trace
def _send_transaction_processed_signals(jobs: Sequence[Job], projects: ProjectsMapping) -> None:
for job in jobs:
project = job["event"].project
with sentry_sdk.start_span(op="tasks.post_process_group.transaction_processed_signal"):
transaction_processed.send_robust(
sender=None,
project=project,
event=job["event"],
)


class PerformanceJob(TypedDict, total=False):
performance_problems: Sequence[PerformanceProblem]
event: Event
Expand Down Expand Up @@ -2626,6 +2649,11 @@ def save_transaction_events(jobs: Sequence[Job], projects: ProjectsMapping) -> S
_nodestore_save_many(jobs=jobs, app_feature="transactions")

with metrics.timer("save_transaction_events.eventstream_insert_many"):
for job in jobs:
# we don't need to send transactions to post process
# so set raw so we skip post_process
job["raw"] = True

_eventstream_insert_many(jobs)

with metrics.timer("save_transaction_events.track_outcome_accepted_many"):
Expand All @@ -2637,6 +2665,12 @@ def save_transaction_events(jobs: Sequence[Job], projects: ProjectsMapping) -> S
with metrics.timer("save_transaction_events.send_occurrence_to_platform"):
_send_occurrence_to_platform(jobs, projects)

with metrics.timer("save_transaction_events.sample_transactions"):
_sample_transactions_in_save(jobs, projects)

with metrics.timer("save_transaction_events.send_transaction_processed_signals"):
_send_transaction_processed_signals(jobs, projects)

return jobs


Expand Down
12 changes: 12 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -2915,3 +2915,15 @@
default=[],
flags=FLAG_ALLOW_EMPTY | FLAG_AUTOMATOR_MODIFIABLE,
)
register(
"save_event_transactions.sample_transactions_in_save",
type=Bool,
default=False,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)
register(
"save_event_transactions.post_process_cleanup",
type=Bool,
default=False,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)
36 changes: 6 additions & 30 deletions src/sentry/tasks/post_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from google.api_core.exceptions import ServiceUnavailable

from sentry import features, projectoptions
from sentry.eventstream.types import EventStreamEventType
from sentry.exceptions import PluginError
from sentry.issues.grouptype import GroupCategory
from sentry.issues.issue_occurrence import IssueOccurrence
Expand All @@ -23,7 +22,7 @@
from sentry.replays.lib.kafka import initialize_replays_publisher
from sentry.sentry_metrics.client import generic_metrics_backend
from sentry.sentry_metrics.use_case_id_registry import UseCaseID
from sentry.signals import event_processed, issue_unignored, transaction_processed
from sentry.signals import event_processed, issue_unignored
from sentry.silo.base import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.types.group import GroupSubStatus
Expand Down Expand Up @@ -471,10 +470,9 @@ def should_retry_fetch(attempt: int, e: Exception) -> bool:
fetch_retry_policy = ConditionalRetryPolicy(should_retry_fetch, exponential_delay(1.00))


def should_update_escalating_metrics(event: Event, is_transaction_event: bool) -> bool:
def should_update_escalating_metrics(event: Event) -> bool:
return (
features.has("organizations:escalating-metrics-backend", event.project.organization)
and not is_transaction_event
and event.group is not None
and event.group.issue_type.should_detect_escalation()
)
Expand Down Expand Up @@ -505,22 +503,14 @@ def post_process_group(

with snuba.options_override({"consistent": True}):
from sentry import eventstore
from sentry.eventstore.processing import (
event_processing_store,
transaction_processing_store,
)
from sentry.ingest.transaction_clusterer.datasource.redis import (
record_transaction_name as record_transaction_name_for_clustering,
)
from sentry.eventstore.processing import event_processing_store
from sentry.issues.occurrence_consumer import EventLookupError
from sentry.models.organization import Organization
from sentry.models.project import Project
from sentry.reprocessing2 import is_reprocessed_event

if eventstream_type == EventStreamEventType.Transaction.value:
processing_store = transaction_processing_store
else:
processing_store = event_processing_store
processing_store = event_processing_store

if occurrence_id is None:
# We use the data being present/missing in the processing store
# to ensure that we don't duplicate work should the forwarding consumers
Expand Down Expand Up @@ -606,20 +596,6 @@ def get_event_raise_exception() -> Event:
is_reprocessed = is_reprocessed_event(event.data)
sentry_sdk.set_tag("is_reprocessed", is_reprocessed)

is_transaction_event = event.get_event_type() == "transaction"

# Simplified post processing for transaction events.
# This should eventually be completely removed and transactions
# will not go through any post processing.
if is_transaction_event:
record_transaction_name_for_clustering(event.project, event.data)
with sentry_sdk.start_span(op="tasks.post_process_group.transaction_processed_signal"):
transaction_processed.send_robust(
sender=post_process_group,
project=event.project,
event=event,
)

metric_tags = {}
if group_id:
group_state: GroupState = {
Expand All @@ -632,7 +608,7 @@ def get_event_raise_exception() -> Event:
group_event = update_event_group(event, group_state)
bind_organization_context(event.project.organization)
_capture_event_stats(event)
if should_update_escalating_metrics(event, is_transaction_event):
if should_update_escalating_metrics(event):
_update_escalating_metrics(event)

group_event.occurrence = occurrence
Expand Down
6 changes: 6 additions & 0 deletions src/sentry/tasks/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,12 @@ def _do_save_event(
raise

finally:
if consumer_type == ConsumerType.Transactions:
# we won't use the transaction data in post_process
# so we can delete it from the cache now.
if cache_key:
processing_store.delete_by_key(cache_key)

reprocessing2.mark_event_reprocessed(data)
if cache_key and has_attachments:
attachment_cache.delete(cache_key)
Expand Down
189 changes: 189 additions & 0 deletions tests/sentry/event_manager/test_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from sentry.grouping.api import load_grouping_config
from sentry.grouping.utils import hash_from_values
from sentry.ingest.inbound_filters import FilterStatKeys
from sentry.ingest.transaction_clusterer import ClustererNamespace
from sentry.integrations.models.external_issue import ExternalIssue
from sentry.integrations.models.integration import Integration
from sentry.issues.grouptype import (
Expand Down Expand Up @@ -1540,6 +1541,194 @@ def test_transaction_event_span_grouping(self) -> None:
# the basic strategy is to simply use the description
assert spans == [{"hash": hash_values([span["description"]])} for span in data["spans"]]

def test_transaction_sampler_and_recieve(self) -> None:
# make sure with the option on we don't get any errors
manager = EventManager(
make_event(
**{
"transaction": "wait",
"contexts": {
"trace": {
"parent_span_id": "bce14471e0e9654d",
"op": "foobar",
"trace_id": "a0fa8803753e40fd8124b21eeb2986b5",
"span_id": "bf5be759039ede9a",
}
},
"spans": [
{
"trace_id": "a0fa8803753e40fd8124b21eeb2986b5",
"parent_span_id": "bf5be759039ede9a",
"span_id": "a" * 16,
"start_timestamp": 0,
"timestamp": 1,
"same_process_as_parent": True,
"op": "default",
"description": "span a",
},
{
"trace_id": "a0fa8803753e40fd8124b21eeb2986b5",
"parent_span_id": "bf5be759039ede9a",
"span_id": "b" * 16,
"start_timestamp": 0,
"timestamp": 1,
"same_process_as_parent": True,
"op": "default",
"description": "span a",
},
{
"trace_id": "a0fa8803753e40fd8124b21eeb2986b5",
"parent_span_id": "bf5be759039ede9a",
"span_id": "c" * 16,
"start_timestamp": 0,
"timestamp": 1,
"same_process_as_parent": True,
"op": "default",
"description": "span b",
},
],
"timestamp": "2019-06-14T14:01:40Z",
"start_timestamp": "2019-06-14T14:01:40Z",
"type": "transaction",
"transaction_info": {
"source": "url",
},
}
)
)
manager.normalize()
manager.save(self.project.id)

@patch("sentry.signals.transaction_processed.send_robust")
@patch("sentry.ingest.transaction_clusterer.datasource.redis._record_sample")
def test_transaction_sampler_and_recieve_mock_called(
self,
transaction_processed_signal_mock: mock.MagicMock,
mock_record_sample: mock.MagicMock,
) -> None:
manager = EventManager(
make_event(
**{
"transaction": "wait",
"contexts": {
"trace": {
"parent_span_id": "bce14471e0e9654d",
"op": "foobar",
"trace_id": "a0fa8803753e40fd8124b21eeb2986b5",
"span_id": "bf5be759039ede9a",
}
},
"spans": [
{
"trace_id": "a0fa8803753e40fd8124b21eeb2986b5",
"parent_span_id": "bf5be759039ede9a",
"span_id": "a" * 16,
"start_timestamp": 0,
"timestamp": 1,
"same_process_as_parent": True,
"op": "default",
"description": "span a",
},
{
"trace_id": "a0fa8803753e40fd8124b21eeb2986b5",
"parent_span_id": "bf5be759039ede9a",
"span_id": "b" * 16,
"start_timestamp": 0,
"timestamp": 1,
"same_process_as_parent": True,
"op": "default",
"description": "span a",
},
{
"trace_id": "a0fa8803753e40fd8124b21eeb2986b5",
"parent_span_id": "bf5be759039ede9a",
"span_id": "c" * 16,
"start_timestamp": 0,
"timestamp": 1,
"same_process_as_parent": True,
"op": "default",
"description": "span b",
},
],
"timestamp": "2019-06-14T14:01:40Z",
"start_timestamp": "2019-06-14T14:01:40Z",
"type": "transaction",
"transaction_info": {
"source": "url",
},
}
)
)
manager.normalize()
manager.save(self.project.id)
assert transaction_processed_signal_mock.call_count == 1
assert mock_record_sample.mock_calls == [
mock.call(ClustererNamespace.TRANSACTIONS, self.project, "wait")
]

@mock.patch("sentry.event_manager.eventstream.backend.insert")
def test_transaction_event_stream_insert_with_raw(
self, eventstream_insert: mock.MagicMock
) -> None:
# make sure with the option on we don't get any errors
manager = EventManager(
make_event(
**{
"transaction": "wait",
"contexts": {
"trace": {
"parent_span_id": "bce14471e0e9654d",
"op": "foobar",
"trace_id": "a0fa8803753e40fd8124b21eeb2986b5",
"span_id": "bf5be759039ede9a",
}
},
"spans": [
{
"trace_id": "a0fa8803753e40fd8124b21eeb2986b5",
"parent_span_id": "bf5be759039ede9a",
"span_id": "a" * 16,
"start_timestamp": 0,
"timestamp": 1,
"same_process_as_parent": True,
"op": "default",
"description": "span a",
},
{
"trace_id": "a0fa8803753e40fd8124b21eeb2986b5",
"parent_span_id": "bf5be759039ede9a",
"span_id": "b" * 16,
"start_timestamp": 0,
"timestamp": 1,
"same_process_as_parent": True,
"op": "default",
"description": "span a",
},
{
"trace_id": "a0fa8803753e40fd8124b21eeb2986b5",
"parent_span_id": "bf5be759039ede9a",
"span_id": "c" * 16,
"start_timestamp": 0,
"timestamp": 1,
"same_process_as_parent": True,
"op": "default",
"description": "span b",
},
],
"timestamp": "2019-06-14T14:01:40Z",
"start_timestamp": "2019-06-14T14:01:40Z",
"type": "transaction",
"transaction_info": {
"source": "url",
},
}
)
)
manager.normalize()
manager.save(self.project.id)
eventstream_insert.assert_called_once()
assert eventstream_insert.call_args.kwargs["skip_consume"] is True

def test_sdk(self) -> None:
manager = EventManager(make_event(**{"sdk": {"name": "sentry-unity", "version": "1.0"}}))
manager.normalize()
Expand Down
Loading
Loading