From e340ca91ebdfe1ba8f9df60bbd106ccfc22671db Mon Sep 17 00:00:00 2001 From: Michael Sloan Date: Fri, 24 Jan 2025 02:36:22 -0700 Subject: [PATCH] Fix terminal memory leak by deduping alacritty events on background thread --- crates/repl/src/outputs/plain.rs | 16 +-- crates/terminal/src/terminal.rs | 184 ++++++++++++++++++++++--------- 2 files changed, 134 insertions(+), 66 deletions(-) diff --git a/crates/repl/src/outputs/plain.rs b/crates/repl/src/outputs/plain.rs index 93eeb20794f384..c885c59aa2ebd7 100644 --- a/crates/repl/src/outputs/plain.rs +++ b/crates/repl/src/outputs/plain.rs @@ -16,6 +16,7 @@ //! use alacritty_terminal::{ + event::VoidListener, grid::Dimensions as _, index::{Column, Line, Point}, term::Config, @@ -24,8 +25,6 @@ use alacritty_terminal::{ use gpui::{canvas, size, ClipboardItem, FontStyle, Model, TextStyle, WhiteSpace}; use language::Buffer; use settings::Settings as _; -use std::mem; -use terminal::ZedListener; use terminal_view::terminal_element::TerminalElement; use theme::ThemeSettings; use ui::{prelude::*, IntoElement}; @@ -50,7 +49,7 @@ pub struct TerminalOutput { /// ANSI escape sequence processor for parsing input text. parser: Processor, /// Alacritty terminal instance that manages the terminal state and content. - handler: alacritty_terminal::Term, + handler: alacritty_terminal::Term, } const DEFAULT_NUM_LINES: usize = 32; @@ -124,14 +123,9 @@ impl TerminalOutput { /// and sets up the necessary components for handling terminal events and rendering. /// pub fn new(cx: &mut WindowContext) -> Self { - let (events_tx, events_rx) = futures::channel::mpsc::unbounded(); - let term = alacritty_terminal::Term::new( - Config::default(), - &terminal_size(cx), - terminal::ZedListener(events_tx.clone()), - ); - - mem::forget(events_rx); + let term = + alacritty_terminal::Term::new(Config::default(), &terminal_size(cx), VoidListener); + Self { parser: Processor::new(), handler: term, diff --git a/crates/terminal/src/terminal.rs b/crates/terminal/src/terminal.rs index 955578d5e9c928..24e4df2529224b 100644 --- a/crates/terminal/src/terminal.rs +++ b/crates/terminal/src/terminal.rs @@ -28,7 +28,7 @@ use anyhow::{bail, Result}; use futures::{ channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}, - FutureExt, + FutureExt, SinkExt, }; use mappings::mouse::{ @@ -53,7 +53,7 @@ use std::{ ops::{Deref, Index, RangeInclusive}, path::PathBuf, sync::Arc, - time::Duration, + time::{Duration, Instant}, }; use thiserror::Error; @@ -482,61 +482,59 @@ impl TerminalBuilder { }) } - pub fn subscribe(mut self, cx: &ModelContext) -> Terminal { - //Event loop - cx.spawn(|terminal, mut cx| async move { - while let Some(event) = self.events_rx.next().await { - terminal.update(&mut cx, |terminal, cx| { - //Process the first event immediately for lowered latency - terminal.process_event(&event, cx); - })?; - - 'outer: loop { - let mut events = Vec::new(); - let mut timer = cx - .background_executor() - .timer(Duration::from_millis(4)) - .fuse(); - let mut wakeup = false; - loop { - futures::select_biased! { - _ = timer => break, - event = self.events_rx.next() => { - if let Some(event) = event { - if matches!(event, AlacTermEvent::Wakeup) { - wakeup = true; - } else { - events.push(event); + pub fn subscribe(self, cx: &ModelContext) -> Terminal { + // Accumulate the effects of events on a background thread in order to keep up with the + // events from alacritty even when it is emitting events rapidly. + let (mut accumulated_events_tx, mut accumulated_events_rx) = unbounded(); + let mut events_rx = self.events_rx; + let background_executor = cx.background_executor().clone(); + cx.background_executor() + .spawn(async move { + while let Some(event) = events_rx.next().await { + // Process the first event immediately to reduce latency + accumulated_events_tx + .feed(EventOrAccumulator::Event(event)) + .await?; + 'outer: loop { + let start_time = Instant::now(); + let mut timer = background_executor.timer(Duration::from_millis(4)).fuse(); + let mut event_accumulator = EventAccumulator::new(); + loop { + futures::select_biased! { + // Events are no longer coming in at a high rate, so go back to just + // awaiting the next event. + _ = timer => break 'outer, + event = events_rx.next() => { + let Some(event) = event else { + break; + }; + event_accumulator.add(event); + if event_accumulator.events.len() > 100 { + break; } - - if events.len() > 100 { + let elapsed = Instant::now().duration_since(start_time); + if elapsed > Duration::from_millis(20) { break; } - } else { - break; - } - }, + }, + } } + accumulated_events_tx + .feed(EventOrAccumulator::Accumulator(event_accumulator)) + .await?; } - - if events.is_empty() && !wakeup { - smol::future::yield_now().await; - break 'outer; - } - - terminal.update(&mut cx, |this, cx| { - if wakeup { - this.process_event(&AlacTermEvent::Wakeup, cx); - } - - for event in events { - this.process_event(&event, cx); - } - })?; - smol::future::yield_now().await; } - } + anyhow::Ok(()) + }) + .detach(); + // On the foreground thread, apply the accumulated effects of events. + cx.spawn(|terminal, mut cx| async move { + while let Some(event_or_accumulator) = accumulated_events_rx.next().await { + terminal.update(&mut cx, |terminal, cx| { + event_or_accumulator.apply(terminal, cx) + })?; + } anyhow::Ok(()) }) .detach(); @@ -545,6 +543,83 @@ impl TerminalBuilder { } } +enum EventOrAccumulator { + Event(AlacTermEvent), + Accumulator(EventAccumulator), +} + +impl EventOrAccumulator { + fn apply(self, terminal: &mut Terminal, cx: &mut ModelContext) { + match self { + EventOrAccumulator::Event(event) => terminal.process_event(event, cx), + EventOrAccumulator::Accumulator(accumulator) => { + accumulator.process_events(terminal, cx) + } + } + } +} + +struct EventAccumulator { + wakeup: bool, + cursor_blinking_changed: bool, + bell: bool, + title: Option, + /// Events that can't be deduplicated. + events: Vec, +} + +impl EventAccumulator { + fn new() -> Self { + EventAccumulator { + wakeup: false, + cursor_blinking_changed: false, + bell: false, + title: None, + events: Vec::new(), + } + } + + fn add(&mut self, event: AlacTermEvent) { + match event { + // Events that can have their effects deduplicated. + AlacTermEvent::Title(title) => self.title = Some(title), + AlacTermEvent::ResetTitle => self.title = Some(String::new()), + AlacTermEvent::CursorBlinkingChange => self.cursor_blinking_changed = true, + AlacTermEvent::Wakeup => self.wakeup = true, + AlacTermEvent::Bell => self.bell = true, + // Events that have handlers involving writing text to the terminal or interacting with + // clipboard, and so must be kept in order. + AlacTermEvent::ClipboardStore(_, _) => self.events.push(event), + AlacTermEvent::ClipboardLoad(_, _) => self.events.push(event), + AlacTermEvent::PtyWrite(_) => self.events.push(event), + AlacTermEvent::TextAreaSizeRequest(_) => self.events.push(event), + AlacTermEvent::ColorRequest(_, _) => self.events.push(event), + AlacTermEvent::Exit => self.events.push(event), + AlacTermEvent::ChildExit(_) => self.events.push(event), + // Handled in render so no need to handle here. + AlacTermEvent::MouseCursorDirty => {} + } + } + + fn process_events(self, terminal: &mut Terminal, cx: &mut ModelContext) { + if self.wakeup { + terminal.process_event(AlacTermEvent::Wakeup, cx); + } + if self.cursor_blinking_changed { + terminal.process_event(AlacTermEvent::CursorBlinkingChange, cx); + } + if self.bell { + terminal.process_event(AlacTermEvent::Bell, cx); + } + if let Some(title) = self.title { + terminal.process_event(AlacTermEvent::Title(title), cx); + } + for event in self.events { + terminal.process_event(event, cx); + } + } +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct IndexedCell { pub point: AlacPoint, @@ -674,7 +749,7 @@ impl TaskStatus { } impl Terminal { - fn process_event(&mut self, event: &AlacTermEvent, cx: &mut ModelContext) { + fn process_event(&mut self, event: AlacTermEvent, cx: &mut ModelContext) { match event { AlacTermEvent::Title(title) => { self.breadcrumb_text = title.to_string(); @@ -728,13 +803,12 @@ impl Terminal { // Instead of locking, we could store the colors in `self.last_content`. But then // we might respond with out of date value if a "set color" sequence is immediately // followed by a color request sequence. - let color = self.term.lock().colors()[*index].unwrap_or_else(|| { - to_alac_rgb(get_color_at_index(*index, cx.theme().as_ref())) - }); + let color = self.term.lock().colors()[index] + .unwrap_or_else(|| to_alac_rgb(get_color_at_index(index, cx.theme().as_ref()))); self.write_to_pty(format(color)); } AlacTermEvent::ChildExit(error_code) => { - self.register_task_finished(Some(*error_code), cx); + self.register_task_finished(Some(error_code), cx); } } }