From 0af64b2c4690d8844557eddb3d36d73fa165c199 Mon Sep 17 00:00:00 2001 From: Vaibhav Gupta Date: Fri, 31 May 2024 15:05:27 -0700 Subject: [PATCH] Fixes issues with BamlSpan for async vs sync thread. Now .finish_span is always sync on everything but WASM as it only pushes items into a queue. On WASM builds, we still wait for the logs to be sent to complete the function --- engine/baml-runtime/src/lib.rs | 59 +++++++++++++++-- engine/baml-runtime/src/runtime_interface.rs | 22 ++++++- engine/baml-runtime/src/tracing/mod.rs | 64 +++++++++++++++++++ .../src/tracing/threaded_tracer.rs | 2 +- engine/baml-runtime/src/types/stream.rs | 6 ++ .../python_src/baml_py/async_context_vars.py | 15 ++--- .../python_src/baml_py/baml_py.pyi | 19 +++--- .../language_client_python/src/types/image.rs | 37 ++++------- .../language_client_python/src/types/span.rs | 56 +++------------- .../async_context_vars.d.ts | 3 +- .../async_context_vars.d.ts.map | 2 +- .../async_context_vars.js | 20 ++---- engine/language_client_typescript/native.d.ts | 3 +- .../src/types/span.rs | 62 ++++-------------- .../typescript_src/async_context_vars.ts | 21 ++---- integ-tests/python/app/test_functions.py | 18 ++++-- 16 files changed, 219 insertions(+), 190 deletions(-) diff --git a/engine/baml-runtime/src/lib.rs b/engine/baml-runtime/src/lib.rs index 35f5f1ccf..7548de77c 100644 --- a/engine/baml-runtime/src/lib.rs +++ b/engine/baml-runtime/src/lib.rs @@ -22,7 +22,6 @@ pub mod type_builder; mod types; use std::collections::HashMap; -use std::env; use std::path::PathBuf; use std::sync::Arc; @@ -158,6 +157,12 @@ impl BamlRuntime { let mut target_id = None; if let Some(span) = span { + #[cfg(not(target_arch = "wasm32"))] + match self.tracer.finish_span(span, ctx, None) { + Ok(id) => target_id = id, + Err(e) => log::debug!("Error during logging: {}", e), + } + #[cfg(target_arch = "wasm32")] match self.tracer.finish_span(span, ctx, None).await { Ok(id) => target_id = id, Err(e) => log::debug!("Error during logging: {}", e), @@ -184,6 +189,12 @@ impl BamlRuntime { let mut target_id = None; if let Some(span) = span { + #[cfg(not(target_arch = "wasm32"))] + match self.tracer.finish_baml_span(span, ctx, &response) { + Ok(id) => target_id = id, + Err(e) => log::debug!("Error during logging: {}", e), + } + #[cfg(target_arch = "wasm32")] match self.tracer.finish_baml_span(span, ctx, &response).await { Ok(id) => target_id = id, Err(e) => log::debug!("Error during logging: {}", e), @@ -259,22 +270,60 @@ impl ExperimentalTracingInterface for BamlRuntime { self.tracer.start_span(function_name, ctx, None, params) } + #[cfg(not(target_arch = "wasm32"))] + fn finish_function_span( + &self, + span: Option, + result: &Result, + ctx: &RuntimeContextManager, + ) -> Result> { + if let Some(span) = span { + self.tracer.finish_baml_span(span, ctx, result) + } else { + Ok(None) + } + } + + #[cfg(target_arch = "wasm32")] async fn finish_function_span( &self, - span: TracingSpan, + span: Option, result: &Result, ctx: &RuntimeContextManager, ) -> Result> { - self.tracer.finish_baml_span(span, ctx, result).await + if let Some(span) = span { + self.tracer.finish_baml_span(span, ctx, result).await + } else { + Ok(None) + } + } + + #[cfg(not(target_arch = "wasm32"))] + fn finish_span( + &self, + span: Option, + result: Option, + ctx: &RuntimeContextManager, + ) -> Result> { + if let Some(span) = span { + self.tracer.finish_span(span, ctx, result) + } else { + Ok(None) + } } + #[cfg(target_arch = "wasm32")] async fn finish_span( &self, - span: TracingSpan, + span: Option, result: Option, ctx: &RuntimeContextManager, ) -> Result> { - self.tracer.finish_span(span, ctx, result).await + if let Some(span) = span { + self.tracer.finish_span(span, ctx, result).await + } else { + Ok(None) + } } fn flush(&self) -> Result<()> { diff --git a/engine/baml-runtime/src/runtime_interface.rs b/engine/baml-runtime/src/runtime_interface.rs index 59ec7895f..d74b664f1 100644 --- a/engine/baml-runtime/src/runtime_interface.rs +++ b/engine/baml-runtime/src/runtime_interface.rs @@ -59,18 +59,36 @@ pub trait ExperimentalTracingInterface { ctx: &RuntimeContextManager, ) -> (Option, RuntimeContext); + #[cfg(target_arch = "wasm32")] #[allow(async_fn_in_trait)] async fn finish_function_span( &self, - span: TracingSpan, + span: Option, result: &Result, ctx: &RuntimeContextManager, ) -> Result>; + #[cfg(not(target_arch = "wasm32"))] + fn finish_function_span( + &self, + span: Option, + result: &Result, + ctx: &RuntimeContextManager, + ) -> Result>; + + #[cfg(target_arch = "wasm32")] #[allow(async_fn_in_trait)] async fn finish_span( &self, - span: TracingSpan, + span: Option, + result: Option, + ctx: &RuntimeContextManager, + ) -> Result>; + + #[cfg(not(target_arch = "wasm32"))] + fn finish_span( + &self, + span: Option, result: Option, ctx: &RuntimeContextManager, ) -> Result>; diff --git a/engine/baml-runtime/src/tracing/mod.rs b/engine/baml-runtime/src/tracing/mod.rs index 39144bfcd..687b88b4f 100644 --- a/engine/baml-runtime/src/tracing/mod.rs +++ b/engine/baml-runtime/src/tracing/mod.rs @@ -100,6 +100,7 @@ impl BamlTracer { (Some(span), ctx.create_ctx(tb)) } + #[cfg(target_arch = "wasm32")] pub(crate) async fn finish_span( &self, span: TracingSpan, @@ -128,6 +129,34 @@ impl BamlTracer { } } + #[cfg(not(target_arch = "wasm32"))] + pub(crate) fn finish_span( + &self, + span: TracingSpan, + ctx: &RuntimeContextManager, + response: Option, + ) -> Result> { + let Some((span_id, event_chain, tags)) = ctx.exit() else { + anyhow::bail!( + "Attempting to finish a span {:#?} without first starting one. Current context {:#?}", + span, + ctx + ); + }; + + if span.span_id != span_id { + anyhow::bail!("Span ID mismatch: {} != {}", span.span_id, span_id); + } + + if let Some(tracer) = &self.tracer { + tracer.submit(response.to_log_schema(&self.options, event_chain, tags, span))?; + Ok(Some(span_id)) + } else { + Ok(None) + } + } + + #[cfg(target_arch = "wasm32")] pub(crate) async fn finish_baml_span( &self, span: TracingSpan, @@ -163,6 +192,41 @@ impl BamlTracer { Ok(None) } } + + #[cfg(not(target_arch = "wasm32"))] + pub(crate) fn finish_baml_span( + &self, + span: TracingSpan, + ctx: &RuntimeContextManager, + response: &Result, + ) -> Result> { + let Some((span_id, event_chain, tags)) = ctx.exit() else { + anyhow::bail!("Attempting to finish a span without first starting one"); + }; + + if span.span_id != span_id { + anyhow::bail!("Span ID mismatch: {} != {}", span.span_id, span_id); + } + + if let Ok(response) = &response { + let name = event_chain.last().map(|s| s.name.as_str()); + let is_ok = response.parsed().as_ref().is_some_and(|r| r.is_ok()); + log::log!( + target: "baml_events", + if is_ok { log::Level::Info } else { log::Level::Warn }, + "{}{}", + name.map(|s| format!("Function {}:\n", s)).unwrap_or_default().purple(), + response + ); + } + + if let Some(tracer) = &self.tracer { + tracer.submit(response.to_log_schema(&self.options, event_chain, tags, span))?; + Ok(Some(span_id)) + } else { + Ok(None) + } + } } // Function to convert web_time::SystemTime to ISO 8601 string diff --git a/engine/baml-runtime/src/tracing/threaded_tracer.rs b/engine/baml-runtime/src/tracing/threaded_tracer.rs index dbaecc69f..7871cd677 100644 --- a/engine/baml-runtime/src/tracing/threaded_tracer.rs +++ b/engine/baml-runtime/src/tracing/threaded_tracer.rs @@ -143,7 +143,7 @@ impl ThreadedTracer { } } - pub async fn submit(&self, event: LogSchema) -> Result<()> { + pub fn submit(&self, event: LogSchema) -> Result<()> { log::info!("Submitting work {}", event.event_id); let tx = self .tx diff --git a/engine/baml-runtime/src/types/stream.rs b/engine/baml-runtime/src/types/stream.rs index dee66c64a..135b1b6e4 100644 --- a/engine/baml-runtime/src/types/stream.rs +++ b/engine/baml-runtime/src/types/stream.rs @@ -98,6 +98,12 @@ impl FunctionResultStream { let mut target_id = None; if let Some(span) = span { + #[cfg(not(target_arch = "wasm32"))] + match self.tracer.finish_baml_span(span, ctx, &res) { + Ok(id) => target_id = id, + Err(e) => log::debug!("Error during logging: {}", e), + } + #[cfg(target_arch = "wasm32")] match self.tracer.finish_baml_span(span, ctx, &res).await { Ok(id) => target_id = id, Err(e) => log::debug!("Error during logging: {}", e), diff --git a/engine/language_client_python/python_src/baml_py/async_context_vars.py b/engine/language_client_python/python_src/baml_py/async_context_vars.py index 03af719a4..34b1c9253 100644 --- a/engine/language_client_python/python_src/baml_py/async_context_vars.py +++ b/engine/language_client_python/python_src/baml_py/async_context_vars.py @@ -41,11 +41,8 @@ def start_trace_async( self.ctx.set(cln) return BamlSpan.new(self.rt, name, args, cln) - async def end_trace(self, span: BamlSpan, response: typing.Any) -> None: - await span.finish(response, self.ctx.get()) - - def end_trace_sync(self, span: BamlSpan, response: typing.Any) -> None: - span.finish_sync(response, self.ctx.get()) + def end_trace(self, span: BamlSpan, response: typing.Any) -> None: + span.finish(response, self.ctx.get()) def flush(self) -> None: self.rt.flush() @@ -69,10 +66,10 @@ async def async_wrapper( span = self.start_trace_async(func_name, params) try: response = await func(*args, **kwargs) - await self.end_trace(span, response) + self.end_trace(span, response) return response except Exception as e: - await self.end_trace(span, e) + self.end_trace(span, e) raise e return typing.cast(F, async_wrapper) @@ -89,10 +86,10 @@ def wrapper(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: span = self.start_trace_sync(func_name, params) try: response = func(*args, **kwargs) - self.end_trace_sync(span, response) + self.end_trace(span, response) return response except Exception as e: - self.end_trace_sync(span, e) + self.end_trace(span, e) raise e return typing.cast(F, wrapper) diff --git a/engine/language_client_python/python_src/baml_py/baml_py.pyi b/engine/language_client_python/python_src/baml_py/baml_py.pyi index 3d4cac7f5..da736291a 100644 --- a/engine/language_client_python/python_src/baml_py/baml_py.pyi +++ b/engine/language_client_python/python_src/baml_py/baml_py.pyi @@ -1,4 +1,4 @@ -from typing import Any, Callable, Dict, Optional +from typing import Any, Callable, Dict, Optional, Tuple class FunctionResult: """The result of a BAML function call. @@ -32,16 +32,14 @@ class FunctionResultStream: async def done(self, ctx: RuntimeContextManager) -> FunctionResult: ... class BamlImagePy: + @staticmethod def from_url(url: str) -> BamlImagePy: ... + @staticmethod def from_base64(base64: str, media_type: str) -> BamlImagePy: ... - @property - def url(self) -> Optional[str]: ... - @url.setter - def url(self, value: Optional[str]) -> None: ... - @property - def base64(self) -> Optional[str]: ... - @base64.setter - def base64(self, value: Optional[str]) -> None: ... + def is_url(self) -> bool: ... + def is_base64(self) -> bool: ... + def as_url(self) -> str: ... + def as_base64(self) -> Tuple[str, str]: ... class RuntimeContextManager: def upsert_tags(self, tags: Dict[str, Any]) -> None: ... @@ -80,8 +78,7 @@ class BamlSpan: args: Dict[str, Any], ctx: RuntimeContextManager, ) -> BamlSpan: ... - async def finish(self, result: Any, ctx: RuntimeContextManager) -> str | None: ... - def finish_sync(self, result: Any, ctx: RuntimeContextManager) -> str | None: ... + def finish(self, result: Any, ctx: RuntimeContextManager) -> str | None: ... class TypeBuilder: def __init__(self) -> None: ... diff --git a/engine/language_client_python/src/types/image.rs b/engine/language_client_python/src/types/image.rs index 56286a58a..207fbcf1e 100644 --- a/engine/language_client_python/src/types/image.rs +++ b/engine/language_client_python/src/types/image.rs @@ -20,33 +20,24 @@ impl BamlImagePy { } } - #[getter] - pub fn get_url(&self) -> PyResult> { - Ok(match &self.inner { - baml_types::BamlImage::Url(url) => Some(url.url.clone()), - _ => None, - }) - } - - #[getter] - pub fn get_base64(&self) -> PyResult> { - Ok(match &self.inner { - baml_types::BamlImage::Base64(base64) => { - Some((base64.base64.clone(), base64.media_type.clone())) - } - _ => None, - }) + pub fn is_url(&self) -> bool { + matches!(&self.inner, baml_types::BamlImage::Url(_)) } - #[setter] - pub fn set_url(&mut self, url: String) { - self.inner = baml_types::BamlImage::Url(baml_types::ImageUrl::new(url)); + pub fn as_url(&self) -> PyResult { + match &self.inner { + baml_types::BamlImage::Url(url) => Ok(url.url.clone()), + _ => Err(crate::BamlError::new_err("Image is not a URL")), + } } - #[setter] - pub fn set_base64(&mut self, base64: (String, String)) { - self.inner = - baml_types::BamlImage::Base64(baml_types::ImageBase64::new(base64.0, base64.1)); + pub fn as_base64(&self) -> PyResult> { + match &self.inner { + baml_types::BamlImage::Base64(base64) => { + Ok(vec![base64.base64.clone(), base64.media_type.clone()]) + } + _ => Err(crate::BamlError::new_err("Image is not base64")), + } } pub fn __repr__(&self) -> String { diff --git a/engine/language_client_python/src/types/span.rs b/engine/language_client_python/src/types/span.rs index fd84a4798..191631d84 100644 --- a/engine/language_client_python/src/types/span.rs +++ b/engine/language_client_python/src/types/span.rs @@ -10,7 +10,7 @@ use super::runtime::BamlRuntime; use super::runtime_ctx_manager::RuntimeContextManager; crate::lang_wrapper!(BamlSpan, - Option, + Option>, no_from, rt: std::sync::Arc ); @@ -34,8 +34,10 @@ impl BamlSpan { let (span, _) = runtime .inner .start_span(function_name, &args_map, &ctx.inner); + + log::trace!("Starting span: {:#?} for {:?}\n", span, function_name); Ok(Self { - inner: span, + inner: Some(span), rt: runtime.inner.clone(), }) } @@ -46,7 +48,8 @@ impl BamlSpan { py: Python<'_>, result: PyObject, ctx: &RuntimeContextManager, - ) -> PyResult { + ) -> PyResult> { + log::info!("Finishing span: {:?}", self.inner); let result = parse_py_type(result.into_bound(py).to_object(py), true)?; let span = self @@ -54,50 +57,9 @@ impl BamlSpan { .take() .ok_or_else(|| BamlError::new_err("Span already finished"))?; - let runtime = self.rt.clone(); - let ctx = ctx.inner.clone(); - pyo3_asyncio::tokio::future_into_py(py, async move { - let result = runtime - .finish_span(span, result, &ctx) - .await - .map_err(BamlError::from_anyhow) - .map(|u| u.map(|id| id.to_string()))?; - Ok(result) - }) - .map(|f| f.into()) - } - - fn finish_sync( - &mut self, - py: Python<'_>, - result: PyObject, - ctx: &RuntimeContextManager, - ) -> PyResult { - let result = parse_py_type(result, true)?; - - // Acquire the span from the internal storage - let span = self - .inner - .take() - .ok_or_else(|| BamlError::new_err("Span already finished"))?; - - // Using block_on to run the asynchronous function in a synchronous way - // You need an instance of Runtime to call block_on - let tokio_runtime = tokio::runtime::Runtime::new().unwrap(); - let runtime = self.rt.clone(); - let ctx = ctx.inner.clone(); - - let finish_span_future = runtime.finish_span(span, result, &ctx); - - // Block the current thread until the asynchronous code completes - let result = tokio_runtime - .block_on(finish_span_future) + self.rt + .finish_span(span, result, &ctx.inner) .map_err(BamlError::from_anyhow) - .and_then(|u| { - u.map(|id| id.to_string()) - .ok_or_else(|| BamlError::new_err("No ID returned from finish_span")) - })?; - - Ok(result.to_object(py)) + .map(|u| u.map(|id| id.to_string())) } } diff --git a/engine/language_client_typescript/async_context_vars.d.ts b/engine/language_client_typescript/async_context_vars.d.ts index f86424310..36f3f1ebb 100644 --- a/engine/language_client_typescript/async_context_vars.d.ts +++ b/engine/language_client_typescript/async_context_vars.d.ts @@ -7,8 +7,7 @@ export declare class CtxManager { get(): RuntimeContextManager; startTraceSync(name: string, args: Record): BamlSpan; startTraceAsync(name: string, args: Record): BamlSpan; - endTrace(span: BamlSpan, response: any): Promise; - endTraceSync(span: BamlSpan, response: any): void; + endTrace(span: BamlSpan, response: any): void; flush(): void; traceFnSync ReturnType>(name: string, func: F): F; traceFnAync Promise>(name: string, func: F): F; diff --git a/engine/language_client_typescript/async_context_vars.d.ts.map b/engine/language_client_typescript/async_context_vars.d.ts.map index 28d43e5cc..7642518da 100644 --- a/engine/language_client_typescript/async_context_vars.d.ts.map +++ b/engine/language_client_typescript/async_context_vars.d.ts.map @@ -1 +1 @@ -{"version":3,"file":"async_context_vars.d.ts","sourceRoot":"","sources":["typescript_src/async_context_vars.ts"],"names":[],"mappings":"AAAA,OAAO,EAAE,QAAQ,EAAE,qBAAqB,EAAE,WAAW,EAAE,MAAM,UAAU,CAAA;AAGvE,qBAAa,UAAU;IACrB,OAAO,CAAC,EAAE,CAAa;IACvB,OAAO,CAAC,GAAG,CAA0C;gBAEzC,EAAE,EAAE,WAAW;IAS3B,UAAU,CAAC,IAAI,EAAE,MAAM,CAAC,MAAM,EAAE,MAAM,CAAC,GAAG,IAAI;IAK9C,GAAG,IAAI,qBAAqB;IAS5B,cAAc,CAAC,IAAI,EAAE,MAAM,EAAE,IAAI,EAAE,MAAM,CAAC,MAAM,EAAE,GAAG,CAAC,GAAG,QAAQ;IAOjE,eAAe,CAAC,IAAI,EAAE,MAAM,EAAE,IAAI,EAAE,MAAM,CAAC,MAAM,EAAE,GAAG,CAAC,GAAG,QAAQ;IAO5D,QAAQ,CAAC,IAAI,EAAE,QAAQ,EAAE,QAAQ,EAAE,GAAG,GAAG,OAAO,CAAC,IAAI,CAAC;IAS5D,YAAY,CAAC,IAAI,EAAE,QAAQ,EAAE,QAAQ,EAAE,GAAG,GAAG,IAAI;IASjD,KAAK,IAAI,IAAI;IAIb,WAAW,CAAC,UAAU,EAAE,CAAC,SAAS,CAAC,GAAG,IAAI,EAAE,GAAG,EAAE,KAAK,UAAU,EAAE,IAAI,EAAE,MAAM,EAAE,IAAI,EAAE,CAAC,GAAG,CAAC;IAsB3F,WAAW,CAAC,UAAU,EAAE,CAAC,SAAS,CAAC,GAAG,IAAI,EAAE,GAAG,EAAE,KAAK,OAAO,CAAC,UAAU,CAAC,EAAE,IAAI,EAAE,MAAM,EAAE,IAAI,EAAE,CAAC,GAAG,CAAC;CAqBrG"} \ No newline at end of file +{"version":3,"file":"async_context_vars.d.ts","sourceRoot":"","sources":["typescript_src/async_context_vars.ts"],"names":[],"mappings":"AAAA,OAAO,EAAE,QAAQ,EAAE,qBAAqB,EAAE,WAAW,EAAE,MAAM,UAAU,CAAA;AAGvE,qBAAa,UAAU;IACrB,OAAO,CAAC,EAAE,CAAa;IACvB,OAAO,CAAC,GAAG,CAA0C;gBAEzC,EAAE,EAAE,WAAW;IAS3B,UAAU,CAAC,IAAI,EAAE,MAAM,CAAC,MAAM,EAAE,MAAM,CAAC,GAAG,IAAI;IAK9C,GAAG,IAAI,qBAAqB;IAS5B,cAAc,CAAC,IAAI,EAAE,MAAM,EAAE,IAAI,EAAE,MAAM,CAAC,MAAM,EAAE,GAAG,CAAC,GAAG,QAAQ;IAOjE,eAAe,CAAC,IAAI,EAAE,MAAM,EAAE,IAAI,EAAE,MAAM,CAAC,MAAM,EAAE,GAAG,CAAC,GAAG,QAAQ;IAOlE,QAAQ,CAAC,IAAI,EAAE,QAAQ,EAAE,QAAQ,EAAE,GAAG,GAAG,IAAI;IAS7C,KAAK,IAAI,IAAI;IAIb,WAAW,CAAC,UAAU,EAAE,CAAC,SAAS,CAAC,GAAG,IAAI,EAAE,GAAG,EAAE,KAAK,UAAU,EAAE,IAAI,EAAE,MAAM,EAAE,IAAI,EAAE,CAAC,GAAG,CAAC;IAsB3F,WAAW,CAAC,UAAU,EAAE,CAAC,SAAS,CAAC,GAAG,IAAI,EAAE,GAAG,EAAE,KAAK,OAAO,CAAC,UAAU,CAAC,EAAE,IAAI,EAAE,MAAM,EAAE,IAAI,EAAE,CAAC,GAAG,CAAC;CAqBrG"} \ No newline at end of file diff --git a/engine/language_client_typescript/async_context_vars.js b/engine/language_client_typescript/async_context_vars.js index 0ea15c59f..62cc8a2f1 100644 --- a/engine/language_client_typescript/async_context_vars.js +++ b/engine/language_client_typescript/async_context_vars.js @@ -38,21 +38,13 @@ class CtxManager { this.ctx.enterWith(clone); return native_1.BamlSpan.new(this.rt, name, args, clone); } - async endTrace(span, response) { + endTrace(span, response) { const manager = this.ctx.getStore(); if (!manager) { console.error('Context lost before span could be finished\n'); return; } - await span.finish(response, manager); - } - endTraceSync(span, response) { - const manager = this.ctx.getStore(); - if (!manager) { - console.error('Context lost before span could be finished\n'); - return; - } - span.finishSync(response, manager); + span.finish(response, manager); } flush() { this.rt.flush(); @@ -66,11 +58,11 @@ class CtxManager { const span = this.startTraceSync(name, params); try { const response = func(...args); - this.endTraceSync(span, response); + this.endTrace(span, response); return response; } catch (e) { - this.endTraceSync(span, e); + this.endTrace(span, e); throw e; } }); @@ -85,11 +77,11 @@ class CtxManager { const span = this.startTraceAsync(funcName, params); try { const response = await func(...args); - await this.endTrace(span, response); + this.endTrace(span, response); return response; } catch (e) { - await this.endTrace(span, e); + this.endTrace(span, e); throw e; } }); diff --git a/engine/language_client_typescript/native.d.ts b/engine/language_client_typescript/native.d.ts index befff595a..8a52d05a6 100644 --- a/engine/language_client_typescript/native.d.ts +++ b/engine/language_client_typescript/native.d.ts @@ -20,8 +20,7 @@ export class BamlRuntime { export class BamlSpan { static new(runtime: BamlRuntime, functionName: string, args: any, ctx: RuntimeContextManager): BamlSpan - finish(result: any, ctx: RuntimeContextManager): Promise - finishSync(result: any, ctx: RuntimeContextManager): any + finish(result: any, ctx: RuntimeContextManager): any } export class ClassBuilder { diff --git a/engine/language_client_typescript/src/types/span.rs b/engine/language_client_typescript/src/types/span.rs index 11a20207a..8ddc48bce 100644 --- a/engine/language_client_typescript/src/types/span.rs +++ b/engine/language_client_typescript/src/types/span.rs @@ -7,9 +7,8 @@ use super::runtime::BamlRuntime; use super::runtime_ctx_manager::RuntimeContextManager; crate::lang_wrapper!(BamlSpan, - Option, + Option>, no_from, - thread_safe, rt: std::sync::Arc ); @@ -36,71 +35,32 @@ impl BamlSpan { .start_span(&function_name, &args_map, &ctx.inner); log::trace!("Starting span: {:#?} for {:?}\n", span, function_name); Ok(Self { - inner: std::sync::Arc::new(tokio::sync::Mutex::new(span)), + inner: span.into(), rt: runtime.inner.clone(), }) } // mthod to finish #[napi] - pub async fn finish( - &self, + pub fn finish( + &mut self, result: serde_json::Value, ctx: &RuntimeContextManager, ) -> napi::Result { + log::info!("Finishing span: {:?}", self.inner); let result: BamlValue = serde_json::from_value(result) .map_err(|e| napi::Error::new(napi::Status::GenericFailure, e.to_string()))?; // log::info!("Finishing span: {:#?}\n", self.inner.lock().await); - let span = { - self.inner.lock().await.take().ok_or_else(|| { - napi::Error::new(napi::Status::GenericFailure, "Already used span") - })? - }; - - let result = self - .rt - .finish_span(span, Some(result), &ctx.inner) - .await - .map_err(|e| napi::Error::new(napi::Status::GenericFailure, e.to_string())) - .map(|u| u.map(|id| id.to_string()))?; - - Ok(serde_json::json!(result)) - } - - #[napi] - pub fn finish_sync( - &self, - result: serde_json::Value, - ctx: &RuntimeContextManager, - ) -> napi::Result { - let result: BamlValue = serde_json::from_value(result) - .map_err(|e| napi::Error::new(napi::Status::GenericFailure, e.to_string()))?; - // log::info!("Finishing span: {:#?}\n", self.inner.lock()); - - // Acquire the lock synchronously - let mut guard = block_on(self.inner.lock()); - - // log::trace!("Finishing span: {:#?}", guard); - - // Take the span safely from the guard - let span = guard + let span = self + .inner .take() .ok_or_else(|| napi::Error::new(napi::Status::GenericFailure, "Already used span"))?; - // Finish the span synchronously - let future = self.rt.finish_span(span, Some(result), &ctx.inner); - let result = block_on(future) + self.rt + .finish_span(span, Some(result), &ctx.inner) + .map(|u| u.map(|id| id.to_string())) + .map(|u| serde_json::json!(u)) .map_err(|e| napi::Error::new(napi::Status::GenericFailure, e.to_string())) - .and_then(|u| { - u.map(|id| id.to_string()).ok_or_else(|| { - napi::Error::new( - napi::Status::GenericFailure, - "No ID returned from finish_span", - ) - }) - })?; - - Ok(serde_json::json!(result)) } } diff --git a/engine/language_client_typescript/typescript_src/async_context_vars.ts b/engine/language_client_typescript/typescript_src/async_context_vars.ts index 895bae126..c2a31d129 100644 --- a/engine/language_client_typescript/typescript_src/async_context_vars.ts +++ b/engine/language_client_typescript/typescript_src/async_context_vars.ts @@ -42,22 +42,13 @@ export class CtxManager { return BamlSpan.new(this.rt, name, args, clone) } - async endTrace(span: BamlSpan, response: any): Promise { + endTrace(span: BamlSpan, response: any): void { const manager = this.ctx.getStore() if (!manager) { console.error('Context lost before span could be finished\n') return } - await span.finish(response, manager) - } - - endTraceSync(span: BamlSpan, response: any): void { - const manager = this.ctx.getStore() - if (!manager) { - console.error('Context lost before span could be finished\n') - return - } - span.finishSync(response, manager) + span.finish(response, manager) } flush(): void { @@ -77,10 +68,10 @@ export class CtxManager { try { const response = func(...args) - this.endTraceSync(span, response) + this.endTrace(span, response) return response } catch (e) { - this.endTraceSync(span, e) + this.endTrace(span, e) throw e } }) @@ -99,10 +90,10 @@ export class CtxManager { const span = this.startTraceAsync(funcName, params) try { const response = await func(...args) - await this.endTrace(span, response) + this.endTrace(span, response) return response } catch (e) { - await this.endTrace(span, e) + this.endTrace(span, e) throw e } }) diff --git a/integ-tests/python/app/test_functions.py b/integ-tests/python/app/test_functions.py index 8610ed394..8f87f0c17 100644 --- a/integ-tests/python/app/test_functions.py +++ b/integ-tests/python/app/test_functions.py @@ -7,6 +7,7 @@ from baml_client.type_builder import TypeBuilder import datetime + class MyCustomClass(NamedArgsSingleClass): date: datetime.datetime @@ -53,15 +54,13 @@ async def test_should_work_for_all_inputs(): res = await b.TestFnNamedArgsSingleInt(3566) assert "3566" in res + @pytest.mark.asyncio async def test_custom_types(): print("calling with class") res = await b.TestFnNamedArgsSingleClass( myArg=MyCustomClass( - key="key", - key_two=True, - key_three=52, - date=datetime.datetime.now() + key="key", key_two=True, key_three=52, date=datetime.datetime.now() ) ) @@ -94,10 +93,13 @@ async def test_should_work_for_all_outputs(): @pytest.mark.asyncio async def test_should_work_with_image(): res = await b.TestImageInput( - img=baml_py.Image.from_url('https://upload.wikimedia.org/wikipedia/en/4/4d/Shrek_%28character%29.png') + img=baml_py.Image.from_url( + "https://upload.wikimedia.org/wikipedia/en/4/4d/Shrek_%28character%29.png" + ) ) assert "green" in res.lower() + @pytest.mark.asyncio async def test_works_with_retries2(): try: @@ -173,13 +175,15 @@ async def test_streaming_claude(): @pytest.mark.asyncio async def test_tracing_async(): + # sync_dummy_func("second-dummycall-arg") res = await parent_async("first-arg-value") - + # sync_dummy_func("second-dummycall-arg") res2 = await parent_async2("second-arg-value") + # sync_dummy_func("second-dummycall-arg") def test_tracing_sync(): - res = parent_sync("first-arg-value") + # res = parent_sync("first-arg-value") res2 = sync_dummy_func("second-dummycall-arg")