From a247617d6f8b5a0b527db0636a413fdf9a06dfee Mon Sep 17 00:00:00 2001 From: Conrad Irwin Date: Fri, 17 Jan 2025 15:06:10 -0700 Subject: [PATCH] Revert "lsp: Parse LSP messages on background thread - again (#23122)" (#23301) This reverts commit 1b3b825c7f646f2fb34e2bceb6be33f310dfeb59. When debugging git diffs we found that this introduced a re-ordering of messages sent to the LSP: * User hits "format" * Zed adjusts spacing, and sends "spaces changed" to the LSP * Zed sends "format" to LSP With the async approach here, the format request can now arrive before the space changed request. You can reproduce this with `test_strip_whitespace_and_format_via_lsp` under some conditions. Release Notes: - N/A --- crates/collab/src/tests/integration_tests.rs | 1 - .../src/test/editor_lsp_test_context.rs | 4 +- crates/lsp/src/lsp.rs | 140 ++++++++---------- 3 files changed, 62 insertions(+), 83 deletions(-) diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index 5abb17799b3ac9..988ac3bcd9eb00 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -4197,7 +4197,6 @@ async fn test_collaborating_with_lsp_progress_updates_and_diagnostics_ordering( }], }, ); - executor.run_until_parked(); } fake_language_server.notify::(&lsp::ProgressParams { token: lsp::NumberOrString::String("the-disk-based-token".to_string()), diff --git a/crates/editor/src/test/editor_lsp_test_context.rs b/crates/editor/src/test/editor_lsp_test_context.rs index 87be96afc7f1cc..23e37a1267bdbf 100644 --- a/crates/editor/src/test/editor_lsp_test_context.rs +++ b/crates/editor/src/test/editor_lsp_test_context.rs @@ -315,12 +315,12 @@ impl EditorLspTestContext { pub fn handle_request( &self, - handler: F, + mut handler: F, ) -> futures::channel::mpsc::UnboundedReceiver<()> where T: 'static + request::Request, T::Params: 'static + Send, - F: 'static + Send + Sync + Fn(lsp::Url, T::Params, gpui::AsyncAppContext) -> Fut, + F: 'static + Send + FnMut(lsp::Url, T::Params, gpui::AsyncAppContext) -> Fut, Fut: 'static + Send + Future>, { let url = self.buffer_lsp_url.clone(); diff --git a/crates/lsp/src/lsp.rs b/crates/lsp/src/lsp.rs index b445c3bb95960a..e9fa1caac23989 100644 --- a/crates/lsp/src/lsp.rs +++ b/crates/lsp/src/lsp.rs @@ -45,7 +45,7 @@ const CONTENT_LEN_HEADER: &str = "Content-Length: "; const LSP_REQUEST_TIMEOUT: Duration = Duration::from_secs(60 * 2); const SERVER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5); -type NotificationHandler = Arc, Value, AsyncAppContext)>; +type NotificationHandler = Box, Value, AsyncAppContext)>; type ResponseHandler = Box)>; type IoHandler = Box; @@ -890,7 +890,7 @@ impl LanguageServer { pub fn on_notification(&self, f: F) -> Subscription where T: notification::Notification, - F: 'static + Send + Sync + Fn(T::Params, AsyncAppContext), + F: 'static + Send + FnMut(T::Params, AsyncAppContext), { self.on_custom_notification(T::METHOD, f) } @@ -903,7 +903,7 @@ impl LanguageServer { where T: request::Request, T::Params: 'static + Send, - F: 'static + Fn(T::Params, AsyncAppContext) -> Fut + Send + Sync, + F: 'static + FnMut(T::Params, AsyncAppContext) -> Fut + Send, Fut: 'static + Future>, { self.on_custom_request(T::METHOD, f) @@ -939,27 +939,17 @@ impl LanguageServer { } #[must_use] - fn on_custom_notification(&self, method: &'static str, f: F) -> Subscription + fn on_custom_notification(&self, method: &'static str, mut f: F) -> Subscription where - F: 'static + Fn(Params, AsyncAppContext) + Send + Sync, - Params: DeserializeOwned + Send + 'static, + F: 'static + FnMut(Params, AsyncAppContext) + Send, + Params: DeserializeOwned, { - let callback = Arc::new(f); let prev_handler = self.notification_handlers.lock().insert( method, - Arc::new(move |_, params, cx| { - let callback = callback.clone(); - - cx.spawn(move |cx| async move { - if let Some(params) = cx - .background_executor() - .spawn(async move { serde_json::from_value(params).log_err() }) - .await - { - callback(params, cx); - } - }) - .detach(); + Box::new(move |_, params, cx| { + if let Some(params) = serde_json::from_value(params).log_err() { + f(params, cx); + } }), ); assert!( @@ -973,74 +963,64 @@ impl LanguageServer { } #[must_use] - fn on_custom_request(&self, method: &'static str, f: F) -> Subscription + fn on_custom_request(&self, method: &'static str, mut f: F) -> Subscription where - F: 'static + Fn(Params, AsyncAppContext) -> Fut + Send + Sync, + F: 'static + FnMut(Params, AsyncAppContext) -> Fut + Send, Fut: 'static + Future>, Params: DeserializeOwned + Send + 'static, Res: Serialize, { let outbound_tx = self.outbound_tx.clone(); - let f = Arc::new(f); let prev_handler = self.notification_handlers.lock().insert( method, - Arc::new(move |id, params, cx| { + Box::new(move |id, params, cx| { if let Some(id) = id { - let f = f.clone(); - let deserialized_params = cx - .background_executor() - .spawn(async move { serde_json::from_value(params) }); - - cx.spawn({ - let outbound_tx = outbound_tx.clone(); - move |cx| async move { - match deserialized_params.await { - Ok(params) => { - let response = f(params, cx.clone()); - let response = match response.await { - Ok(result) => Response { - jsonrpc: JSON_RPC_VERSION, - id, - value: LspResult::Ok(Some(result)), - }, - Err(error) => Response { - jsonrpc: JSON_RPC_VERSION, - id, - value: LspResult::Error(Some(Error { - message: error.to_string(), - })), - }, - }; - if let Some(response) = - serde_json::to_string(&response).log_err() - { - outbound_tx.try_send(response).ok(); - } - } - Err(error) => { - log::error!( - "error deserializing {} request: {:?}", - method, - error - ); - let response = AnyResponse { - jsonrpc: JSON_RPC_VERSION, - id, - result: None, - error: Some(Error { - message: error.to_string(), - }), - }; - if let Some(response) = - serde_json::to_string(&response).log_err() - { - outbound_tx.try_send(response).ok(); + match serde_json::from_value(params) { + Ok(params) => { + let response = f(params, cx.clone()); + cx.foreground_executor() + .spawn({ + let outbound_tx = outbound_tx.clone(); + async move { + let response = match response.await { + Ok(result) => Response { + jsonrpc: JSON_RPC_VERSION, + id, + value: LspResult::Ok(Some(result)), + }, + Err(error) => Response { + jsonrpc: JSON_RPC_VERSION, + id, + value: LspResult::Error(Some(Error { + message: error.to_string(), + })), + }, + }; + if let Some(response) = + serde_json::to_string(&response).log_err() + { + outbound_tx.try_send(response).ok(); + } } - } + }) + .detach(); + } + + Err(error) => { + log::error!("error deserializing {} request: {:?}", method, error); + let response = AnyResponse { + jsonrpc: JSON_RPC_VERSION, + id, + result: None, + error: Some(Error { + message: error.to_string(), + }), + }; + if let Some(response) = serde_json::to_string(&response).log_err() { + outbound_tx.try_send(response).ok(); } } - }) - .detach(); + } } }), ); @@ -1445,12 +1425,12 @@ impl FakeLanguageServer { /// Registers a handler for a specific kind of request. Removes any existing handler for specified request type. pub fn handle_request( &self, - handler: F, + mut handler: F, ) -> futures::channel::mpsc::UnboundedReceiver<()> where T: 'static + request::Request, T::Params: 'static + Send, - F: 'static + Send + Sync + Fn(T::Params, gpui::AsyncAppContext) -> Fut, + F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> Fut, Fut: 'static + Send + Future>, { let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded(); @@ -1474,12 +1454,12 @@ impl FakeLanguageServer { /// Registers a handler for a specific kind of notification. Removes any existing handler for specified notification type. pub fn handle_notification( &self, - handler: F, + mut handler: F, ) -> futures::channel::mpsc::UnboundedReceiver<()> where T: 'static + notification::Notification, T::Params: 'static + Send, - F: 'static + Send + Sync + Fn(T::Params, gpui::AsyncAppContext), + F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext), { let (handled_tx, handled_rx) = futures::channel::mpsc::unbounded(); self.server.remove_notification_handler::();