Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make handler spans more accurate, re-add connection span #276

Merged
merged 6 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions logging/log.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ValueError } from '@sinclair/typebox/value';
import { OpaqueTransportMessage, ProtocolVersion } from '../transport/message';
import { context, trace } from '@opentelemetry/api';

const LoggingLevels = {
debug: -1,
Expand Down Expand Up @@ -27,6 +28,17 @@ export type Tags =

const cleanedLogFn = (log: LogFn) => {
return (msg: string, metadata?: MessageMetadata) => {
// try to infer telemetry
if (metadata && !metadata.telemetry) {
const span = trace.getSpan(context.active());
if (span) {
metadata.telemetry = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not expect an innocuous logging function to modify the metadata here. How are these metadata created? Can we just be more strict instead of trying to heal post-hoc? Removing optionality increases intelligibility

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this metadata are manually injected at logging callsites, theres a lot and we kept finding places where we didnt inject the right telemetry :(

i thought it would be easier to 'do the right thing' and follow the open telemetry suggestion of just always logging the active telemetry

this doesnt log anything if its not within an active span

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it also means that any logging inside a user-written handler also gets the right span associations

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still wouldn't expect it, is the concern that they're too expensive to preallocate or what?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no more so its hard to remember to inject the telemetry info at every place we care about it so i thought it made sense to do it implicitly because the implicit approach will always have the right telemetry

is the worry here that we are modifying the param or that its just doing patching? we do some cleaning here already below this (stripping the payload)

Copy link

@blast-hardcheese blast-hardcheese Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the underlying concern is the implicit complexity. In a logging method, we take an optional parameter with an optional member, then we might write that field before returning back.

A logging function definitely doesn't seem to be the right place to be doing any of that work.

Ultimately it's not the end of the world if we go this way, but I'd much rather solve this by making it easier to create and manage the MessageMetadata instead of offering important functionality for free invisibly just by calling log. If that's easy, then the span fields could even be marked required and defensive complexity can be delegated to where it belongs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of note, this is exactly how most third-party logging libraries operate: they add a global hook where they perform this same operation of massaging the metadata, only to avoid callers having to reason about adding metadata explicitly so that all logs get the tracing metadata.

traceId: span.spanContext().traceId,
spanId: span.spanContext().spanId,
};
}
}

// skip cloning object if metadata has no transportMessage
if (!metadata?.transportMessage) {
log(msg, metadata);
Expand All @@ -37,6 +49,7 @@ const cleanedLogFn = (log: LogFn) => {
// clone metadata and clean transportMessage
const { payload, ...rest } = metadata.transportMessage;
metadata.transportMessage = rest;

log(msg, metadata);
};
};
Expand Down
49 changes: 20 additions & 29 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@
"@sinclair/typebox": "~0.32.8"
},
"devDependencies": {
"@opentelemetry/context-async-hooks": "^1.26.0",
"@opentelemetry/core": "^1.7.0",
"@opentelemetry/sdk-trace-base": "^1.24.1",
"@opentelemetry/sdk-trace-web": "^1.24.1",
"@stylistic/eslint-plugin": "^2.6.4",
"@types/ws": "^8.5.5",
"@typescript-eslint/eslint-plugin": "^7.8.0",
Expand Down
2 changes: 1 addition & 1 deletion router/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ function handleProc(
const procClosesWithInit = procType === 'rpc' || procType === 'subscription';
const streamId = generateId();
const { span, ctx } = createProcTelemetryInfo(
transport,
session,
procType,
serviceName,
procedureName,
Expand Down
182 changes: 81 additions & 101 deletions router/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ export interface Server<Services extends AnyServiceSchemaMap> {
close: () => Promise<void>;
}

type ProcHandlerReturn = Promise<(() => void) | void>;

interface StreamInitProps {
// msg derived
streamId: StreamId;
Expand Down Expand Up @@ -199,10 +197,17 @@ class RiverServer<Services extends AnyServiceSchemaMap>
}

// if its not a cancelled stream, validate and create a new stream
this.createNewProcStream({
...newStreamProps,
...message,
});
createHandlerSpan(
newStreamProps.initialSession,
newStreamProps.procedure.type,
newStreamProps.serviceName,
newStreamProps.procedureName,
newStreamProps.streamId,
newStreamProps.tracingCtx,
(span) => {
this.createNewProcStream(span, newStreamProps);
},
);
};

const handleSessionStatus = (evt: EventMap['sessionStatus']) => {
Expand Down Expand Up @@ -241,7 +246,7 @@ class RiverServer<Services extends AnyServiceSchemaMap>
this.transport.addEventListener('transportStatus', handleTransportStatus);
}

private createNewProcStream(props: StreamInitProps) {
private createNewProcStream(span: Span, props: StreamInitProps) {
const {
streamId,
initialSession,
Expand All @@ -251,7 +256,6 @@ class RiverServer<Services extends AnyServiceSchemaMap>
sessionMetadata,
serviceContext,
initPayload,
tracingCtx,
procClosesWithInit,
passInitAsDataForBackwardsCompat,
} = props;
Expand All @@ -263,6 +267,12 @@ class RiverServer<Services extends AnyServiceSchemaMap>
id: sessionId,
} = initialSession;

// dont use the session span here, we want to create a new span for the procedure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there still a way to link them (maybe starting an async span or using links)? or is it not desirable to begin with? (if it's the latter, maybe it's worth expanding the code comment explaining the rationale / consequences)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes! i can add a comment but we add the link already in span itself, see createProcTelemetryInfo

loggingMetadata.telemetry = {
traceId: span.spanContext().traceId,
spanId: span.spanContext().spanId,
};

let cleanClose = true;
const onMessage = (msg: OpaqueTransportMessage) => {
if (msg.from !== from) {
Expand Down Expand Up @@ -558,108 +568,78 @@ class RiverServer<Services extends AnyServiceSchemaMap>

switch (procedure.type) {
case 'rpc':
void createHandlerSpan(
procedure.type,
serviceName,
procedureName,
streamId,
tracingCtx,
async (span): ProcHandlerReturn => {
try {
const responsePayload = await procedure.handler({
ctx: handlerContextWithSpan(span),
reqInit: initPayload,
});

if (resWritable.isClosed()) {
// A disconnect happened
return;
}

resWritable.write(responsePayload);
} catch (err) {
onHandlerError(err, span);
} finally {
span.end();
void (async () => {
try {
const responsePayload = await procedure.handler({
ctx: handlerContextWithSpan(span),
reqInit: initPayload,
});

if (resWritable.isClosed()) {
// A disconnect happened
return;
}
},
);

resWritable.write(responsePayload);
} catch (err) {
onHandlerError(err, span);
} finally {
span.end();
}
})();
break;
case 'stream':
void createHandlerSpan(
procedure.type,
serviceName,
procedureName,
streamId,
tracingCtx,
async (span): ProcHandlerReturn => {
try {
await procedure.handler({
ctx: handlerContextWithSpan(span),
reqInit: initPayload,
reqReadable,
resWritable,
});
} catch (err) {
onHandlerError(err, span);
} finally {
span.end();
}
},
);

void (async () => {
try {
await procedure.handler({
ctx: handlerContextWithSpan(span),
reqInit: initPayload,
reqReadable,
resWritable,
});
} catch (err) {
onHandlerError(err, span);
} finally {
span.end();
}
})();
break;
case 'subscription':
void createHandlerSpan(
procedure.type,
serviceName,
procedureName,
streamId,
tracingCtx,
async (span): ProcHandlerReturn => {
try {
await procedure.handler({
ctx: handlerContextWithSpan(span),
reqInit: initPayload,
resWritable: resWritable,
});
} catch (err) {
onHandlerError(err, span);
} finally {
span.end();
}
},
);
void (async () => {
try {
await procedure.handler({
ctx: handlerContextWithSpan(span),
reqInit: initPayload,
resWritable: resWritable,
});
} catch (err) {
onHandlerError(err, span);
} finally {
span.end();
}
})();
break;
case 'upload':
void createHandlerSpan(
procedure.type,
serviceName,
procedureName,
streamId,
tracingCtx,
async (span): ProcHandlerReturn => {
try {
const responsePayload = await procedure.handler({
ctx: handlerContextWithSpan(span),
reqInit: initPayload,
reqReadable: reqReadable,
});

if (resWritable.isClosed()) {
// A disconnect happened
return;
}

resWritable.write(responsePayload);
} catch (err) {
onHandlerError(err, span);
} finally {
span.end();
void (async () => {
try {
const responsePayload = await procedure.handler({
ctx: handlerContextWithSpan(span),
reqInit: initPayload,
reqReadable: reqReadable,
});

if (resWritable.isClosed()) {
// A disconnect happened
return;
}
},
);

resWritable.write(responsePayload);
} catch (err) {
onHandlerError(err, span);
} finally {
span.end();
}
})();
break;
}

Expand Down
Loading
Loading