Skip to content

Commit

Permalink
ft: adding dynamodb instrumentation and project refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Cagri Yonca <[email protected]>
  • Loading branch information
CagriYonca committed Feb 26, 2025
1 parent 6e678a6 commit 7e7b53c
Show file tree
Hide file tree
Showing 18 changed files with 780 additions and 563 deletions.
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ services:
ports:
- 9042:9042


couchbase:
image: public.ecr.aws/docker/library/couchbase:community
ports:
Expand Down
23 changes: 14 additions & 9 deletions src/instana/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,10 @@ def boot_agent() -> None:
from instana.instrumentation import (
aioamqp, # noqa: F401
asyncio, # noqa: F401
boto3_inst, # noqa: F401
cassandra_inst, # noqa: F401
cassandra, # noqa: F401
celery, # noqa: F401
couchbase_inst, # noqa: F401
fastapi_inst, # noqa: F401
couchbase, # noqa: F401
fastapi, # noqa: F401
flask, # noqa: F401
# gevent_inst, # noqa: F401
grpcio, # noqa: F401
Expand All @@ -185,9 +184,9 @@ def boot_agent() -> None:
pymysql, # noqa: F401
pyramid, # noqa: F401
redis, # noqa: F401
sanic_inst, # noqa: F401
sanic, # noqa: F401
sqlalchemy, # noqa: F401
starlette_inst, # noqa: F401
starlette, # noqa: F401
urllib3, # noqa: F401
)
from instana.instrumentation.aiohttp import (
Expand All @@ -196,7 +195,11 @@ def boot_agent() -> None:
from instana.instrumentation.aiohttp import (
server as aiohttp_server, # noqa: F401
)
from instana.instrumentation.aws import lambda_inst # noqa: F401
from instana.instrumentation.aws import (
boto3, # noqa: F401
lambda_inst, # noqa: F401
s3, # noqa: F401
)
from instana.instrumentation.django import middleware # noqa: F401
from instana.instrumentation.google.cloud import (
pubsub, # noqa: F401
Expand All @@ -209,12 +212,14 @@ def boot_agent() -> None:
client as tornado_client, # noqa: F401
)
from instana.instrumentation.tornado import (
client as tornado_client, # noqa: F401
server as tornado_server, # noqa: F401
)

# Hooks
from instana.hooks import hook_gunicorn, hook_uwsgi # noqa: F401
from instana.hooks import (
hook_gunicorn, # noqa: F401
hook_uwsgi, # noqa: F401
)


if "INSTANA_DISABLE" not in os.environ:
Expand Down
89 changes: 89 additions & 0 deletions src/instana/instrumentation/aws/boto3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# (c) Copyright IBM Corp. 2025

from typing import TYPE_CHECKING, Any, Callable, Dict, Sequence, Tuple, Type

from instana.instrumentation.aws.dynamodb import collect_dynamodb_attributes
from instana.instrumentation.aws.s3 import collect_s3_attributes

if TYPE_CHECKING:
from botocore.auth import SigV4Auth
from botocore.client import BaseClient

from instana.span.span import InstanaSpan

try:
import json

import wrapt

from instana.log import logger
from instana.propagators.format import Format
from instana.singletons import tracer
from instana.span.span import get_current_span
from instana.util.traceutils import (
extract_custom_headers,
get_tracer_tuple,
tracing_is_off,
)

def lambda_inject_context(payload: Dict[str, Any], span: "InstanaSpan") -> None:
"""
When boto3 lambda client 'Invoke' is called, we want to inject the tracing context.
boto3/botocore has specific requirements:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda.html#Lambda.Client.invoke
"""
try:
invoke_payload = payload.get("Payload", {})

if not isinstance(invoke_payload, dict):
invoke_payload = json.loads(invoke_payload)

tracer.inject(span.context, Format.HTTP_HEADERS, invoke_payload)
payload["Payload"] = json.dumps(invoke_payload)
except Exception:
logger.debug("non-fatal lambda_inject_context: ", exc_info=True)

@wrapt.patch_function_wrapper("botocore.auth", "SigV4Auth.add_auth")
def emit_add_auth_with_instana(
wrapped: Callable[..., None],
instance: "SigV4Auth",
args: Tuple[object],
kwargs: Dict[str, Any],
) -> Callable[..., None]:
current_span = get_current_span()
if not tracing_is_off() and current_span and current_span.is_recording():
extract_custom_headers(current_span, args[0].headers)
return wrapped(*args, **kwargs)

@wrapt.patch_function_wrapper("botocore.client", "BaseClient._make_api_call")
def make_api_call_with_instana(
wrapped: Callable[..., Dict[str, Any]],
instance: Type["BaseClient"],
args: Sequence[Dict[str, Any]],
kwargs: Dict[str, Any],
) -> Dict[str, Any]:
# If we're not tracing, just return
if tracing_is_off():
return wrapped(*args, **kwargs)

tracer, parent_span, _ = get_tracer_tuple()

parent_context = parent_span.get_span_context() if parent_span else None

try:
if instance:
if instance.meta.service_model.service_name == "dynamodb":
collect_dynamodb_attributes(
wrapped, instance, args, kwargs, parent_context
)
elif instance.meta.service_model.service_name == "s3":
collect_s3_attributes(
wrapped, instance, args, kwargs, parent_context
)
except Exception:
logger.debug("make_api_call_with_instana: collect error", exc_info=True)
else:
return wrapped(*args, **kwargs)

except ImportError:
pass
40 changes: 40 additions & 0 deletions src/instana/instrumentation/aws/dynamodb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# (c) Copyright IBM Corp. 2025

from typing import TYPE_CHECKING, Any, Callable, Dict, Sequence, Type

if TYPE_CHECKING:
from botocore.client import BaseClient

from instana.log import logger
from instana.singletons import tracer
from instana.span_context import SpanContext

try:

def collect_dynamodb_attributes(
wrapped: Callable[..., Dict[str, Any]],
instance: Type["BaseClient"],
args: Sequence[Dict[str, Any]],
kwargs: Dict[str, Any],
parent_context: SpanContext,
) -> None:
with tracer.start_as_current_span(
"dynamodb", span_context=parent_context
) as span:
try:
span.set_attribute("dynamodb.op", args[0])
span.set_attribute(
"dynamodb.region", instance._client_config.region_name
)
if "TableName" in args[1].keys():
span.set_attribute("dynamodb.table", args[1]["TableName"])
except Exception as exc:
span.record_exception(exc)
logger.debug(
"collect_dynamodb_attributes: collect error", exc_info=True
)

logger.debug("Instrumenting DynamoDB")

except ImportError:
pass
146 changes: 75 additions & 71 deletions src/instana/instrumentation/aws/lambda_inst.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,89 +5,93 @@
Instrumentation for AWS Lambda functions
"""

import sys
import traceback
from typing import TYPE_CHECKING, Any, Callable, Dict, Tuple

import wrapt
from opentelemetry.semconv.trace import SpanAttributes

from instana import get_aws_lambda_handler
from instana.instrumentation.aws.triggers import enrich_lambda_span, get_context
from instana.log import logger
from instana.singletons import env_is_aws_lambda, get_agent, get_tracer
from instana.util.ids import define_server_timing

if TYPE_CHECKING:
from instana.agent.aws_lambda import AWSLambdaAgent

try:
import sys
import traceback

def lambda_handler_with_instana(
wrapped: Callable[..., object],
instance: object,
args: Tuple[object, ...],
kwargs: Dict[str, Any],
) -> object:
event = args[0]
agent: "AWSLambdaAgent" = get_agent()
tracer = get_tracer()
import wrapt
from opentelemetry.semconv.trace import SpanAttributes

agent.collector.collect_snapshot(*args)
incoming_ctx = get_context(tracer, event)
from instana import get_aws_lambda_handler
from instana.instrumentation.aws.triggers import enrich_lambda_span, get_context
from instana.log import logger
from instana.singletons import env_is_aws_lambda, get_agent, get_tracer
from instana.util.ids import define_server_timing

result = None
with tracer.start_as_current_span(
"aws.lambda.entry", span_context=incoming_ctx
) as span:
enrich_lambda_span(agent, span, *args)
try:
result = wrapped(*args, **kwargs)
def lambda_handler_with_instana(
wrapped: Callable[..., object],
instance: object,
args: Tuple[object, ...],
kwargs: Dict[str, Any],
) -> object:
event = args[0]
agent: "AWSLambdaAgent" = get_agent()
tracer = get_tracer()

if isinstance(result, dict):
server_timing_value = define_server_timing(span.context.trace_id)
if "headers" in result:
result["headers"]["Server-Timing"] = server_timing_value
elif "multiValueHeaders" in result:
result["multiValueHeaders"]["Server-Timing"] = [server_timing_value]
if "statusCode" in result and result.get("statusCode"):
status_code = int(result["statusCode"])
span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, status_code)
if 500 <= status_code:
span.record_exception(f"HTTP status {status_code}")
except Exception as exc:
logger.debug(f"AWS Lambda lambda_handler_with_instana error: {exc}")
if span:
exc = traceback.format_exc()
span.record_exception(exc)
raise
finally:
agent.collector.shutdown()
agent.collector.collect_snapshot(*args)
incoming_ctx = get_context(tracer, event)

result = None
with tracer.start_as_current_span(
"aws.lambda.entry", span_context=incoming_ctx
) as span:
enrich_lambda_span(agent, span, *args)
try:
result = wrapped(*args, **kwargs)

if agent.collector.started:
agent.collector.shutdown()

return result
if isinstance(result, dict):
server_timing_value = define_server_timing(span.context.trace_id)
if "headers" in result:
result["headers"]["Server-Timing"] = server_timing_value
elif "multiValueHeaders" in result:
result["multiValueHeaders"]["Server-Timing"] = [
server_timing_value
]
if "statusCode" in result and result.get("statusCode"):
status_code = int(result["statusCode"])
span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, status_code)
if 500 <= status_code:
span.record_exception(f"HTTP status {status_code}")
except Exception as exc:
logger.debug(f"AWS Lambda lambda_handler_with_instana error: {exc}")
if span:
exc = traceback.format_exc()
span.record_exception(exc)
raise
finally:
agent.collector.shutdown()

if agent.collector.started:
agent.collector.shutdown()

if env_is_aws_lambda:
handler_module, handler_function = get_aws_lambda_handler()
return result

if handler_module and handler_function:
try:
logger.debug(
f"Instrumenting AWS Lambda handler ({handler_module}.{handler_function})"
)
sys.path.insert(0, "/var/runtime")
sys.path.insert(0, "/var/task")
wrapt.wrap_function_wrapper(
handler_module, handler_function, lambda_handler_with_instana
)
except (ModuleNotFoundError, ImportError) as exc:
logger.debug(f"AWS Lambda error: {exc}")
if env_is_aws_lambda:
handler_module, handler_function = get_aws_lambda_handler()

if handler_module and handler_function:
try:
logger.debug(
f"Instrumenting AWS Lambda handler ({handler_module}.{handler_function})"
)
sys.path.insert(0, "/var/runtime")
sys.path.insert(0, "/var/task")
wrapt.wrap_function_wrapper(
handler_module, handler_function, lambda_handler_with_instana
)
except (ModuleNotFoundError, ImportError) as exc:
logger.debug(f"AWS Lambda error: {exc}")
logger.warning(
"Instana: Couldn't instrument AWS Lambda handler. Not monitoring."
)
else:
logger.warning(
"Instana: Couldn't instrument AWS Lambda handler. Not monitoring."
"Instana: Couldn't determine AWS Lambda Handler. Not monitoring."
)
else:
logger.warning(
"Instana: Couldn't determine AWS Lambda Handler. Not monitoring."
)
except ImportError:
pass
Loading

0 comments on commit 7e7b53c

Please sign in to comment.