From 16f2fc7eb3bc6745f1b0c0679cd5b6231c98a739 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Fri, 13 Sep 2024 09:59:03 -0400 Subject: [PATCH 01/11] Start setting up pagination --- src/app/app_execution.rs | 178 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 178 insertions(+) diff --git a/src/app/app_execution.rs b/src/app/app_execution.rs index a94951b..9f8aa14 100644 --- a/src/app/app_execution.rs +++ b/src/app/app_execution.rs @@ -21,6 +21,8 @@ use crate::app::state::tabs::sql::Query; use crate::app::AppEvent; use crate::execution::ExecutionContext; use color_eyre::eyre::Result; +use datafusion::arrow::array::RecordBatch; +use datafusion::execution::SendableRecordBatchStream; use futures::StreamExt; use log::{error, info}; use std::sync::Arc; @@ -108,3 +110,179 @@ impl AppExecution { Ok(()) } } + +/// A stream of [`RecordBatch`]es that can be paginated for display in the TUI. +pub struct PaginatingRecordBatchStream { + // currently executing stream + inner: SendableRecordBatchStream, + // any batches that have been buffered so far + batches: Vec, + // current batch being shown + current_batch: Option, +} + +impl PaginatingRecordBatchStream { + pub fn new(inner: SendableRecordBatchStream) -> Self { + Self { + inner, + batches: Vec::new(), + current_batch: None, + } + } + + /// Return the batch at the current index + pub fn current_batch(&self) -> Option<&RecordBatch> { + if let Some(idx) = self.current_batch { + self.batches.get(idx) + } else { + None + } + } + + /// Return the next batch + /// TBD on logic for handling the end + pub async fn next_batch(&mut self) -> Result> { + if let Some(b) = self.inner.next().await { + match b { + Ok(batch) => { + self.batches.push(batch); + self.current_batch = Some(self.batches.len() - 1); + Ok(self.current_batch()) + } + Err(e) => Err(e.into()), + } + } else { + Ok(None) + } + } + + /// Return the previous batch + /// TBD on logic for handling the beginning + pub fn previous_batch(&mut self) -> Option<&RecordBatch> { + if let Some(idx) = self.current_batch { + if idx > 0 { + self.current_batch = Some(idx - 1); + } + } + self.current_batch() + } +} + +#[cfg(test)] +mod tests { + use super::PaginatingRecordBatchStream; + use datafusion::{ + arrow::array::{ArrayRef, Int32Array, RecordBatch}, + common::Result, + physical_plan::stream::RecordBatchStreamAdapter, + }; + use std::sync::Arc; + + #[tokio::test] + async fn test_paginating_record_batch_stream() { + let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); + let b: ArrayRef = Arc::new(Int32Array::from(vec![1, 1])); + + let record_batch1 = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); + let record_batch2 = RecordBatch::try_from_iter(vec![("b", b)]).unwrap(); + + let schema = record_batch1.schema(); + let batches: Vec> = + vec![Ok(record_batch1.clone()), Ok(record_batch2.clone())]; + let stream = futures::stream::iter(batches); + let sendable_stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream)); + + let mut paginating_stream = PaginatingRecordBatchStream::new(sendable_stream); + + assert_eq!(paginating_stream.current_batch(), None); + assert_eq!( + paginating_stream.next_batch().await.unwrap(), + Some(&record_batch1) + ); + assert_eq!( + paginating_stream.next_batch().await.unwrap(), + Some(&record_batch2) + ); + assert_eq!(paginating_stream.next_batch().await.unwrap(), None); + } + + #[tokio::test] + async fn test_paginating_record_batch_stream_previous() { + let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); + let b: ArrayRef = Arc::new(Int32Array::from(vec![1, 1])); + + let record_batch1 = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); + let record_batch2 = RecordBatch::try_from_iter(vec![("b", b)]).unwrap(); + + let schema = record_batch1.schema(); + let batches: Vec> = + vec![Ok(record_batch1.clone()), Ok(record_batch2.clone())]; + let stream = futures::stream::iter(batches); + let sendable_stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream)); + + let mut paginating_stream = PaginatingRecordBatchStream::new(sendable_stream); + + assert_eq!(paginating_stream.current_batch(), None); + assert_eq!( + paginating_stream.next_batch().await.unwrap(), + Some(&record_batch1) + ); + assert_eq!( + paginating_stream.next_batch().await.unwrap(), + Some(&record_batch2) + ); + assert_eq!(paginating_stream.next_batch().await.unwrap(), None); + assert_eq!(paginating_stream.current_batch(), Some(&record_batch2)); + assert_eq!(paginating_stream.previous_batch(), Some(&record_batch1)); + assert_eq!(paginating_stream.previous_batch(), Some(&record_batch1)); + } + + #[tokio::test] + async fn test_paginating_record_batch_stream_one_error() { + let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); + let record_batch1 = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); + + let schema = record_batch1.schema(); + let batches: Vec> = vec![Err( + datafusion::error::DataFusionError::Execution("Error creating dataframe".to_string()), + )]; + let stream = futures::stream::iter(batches); + let sendable_stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream)); + + let mut paginating_stream = PaginatingRecordBatchStream::new(sendable_stream); + + assert_eq!(paginating_stream.current_batch(), None); + + let res = paginating_stream.next_batch().await; + assert!(res.is_err()); + } + + #[tokio::test] + async fn test_paginating_record_batch_stream_successful_then_error() { + let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); + + let record_batch1 = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); + + let schema = record_batch1.schema(); + let batches: Vec> = vec![ + Ok(record_batch1.clone()), + Err(datafusion::error::DataFusionError::Execution( + "Error creating dataframe".to_string(), + )), + ]; + let stream = futures::stream::iter(batches); + let sendable_stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream)); + + let mut paginating_stream = PaginatingRecordBatchStream::new(sendable_stream); + + assert_eq!(paginating_stream.current_batch(), None); + assert_eq!( + paginating_stream.next_batch().await.unwrap(), + Some(&record_batch1) + ); + let res = paginating_stream.next_batch().await; + assert!(res.is_err()); + assert_eq!(paginating_stream.next_batch().await.unwrap(), None); + assert_eq!(paginating_stream.current_batch(), Some(&record_batch1)); + } +} From 93e6ba366058141b7271d92a487d5b9378560feb Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Fri, 13 Sep 2024 17:39:14 -0400 Subject: [PATCH 02/11] Playing around with execution structure to paginate --- src/app/app_execution.rs | 74 +++++++++++++++++++++++++++------------- src/app/handlers/sql.rs | 3 +- src/app/mod.rs | 6 ++-- src/app/ui/tabs/sql.rs | 7 ++-- 4 files changed, 61 insertions(+), 29 deletions(-) diff --git a/src/app/app_execution.rs b/src/app/app_execution.rs index 9f8aa14..68ab18a 100644 --- a/src/app/app_execution.rs +++ b/src/app/app_execution.rs @@ -22,23 +22,39 @@ use crate::app::AppEvent; use crate::execution::ExecutionContext; use color_eyre::eyre::Result; use datafusion::arrow::array::RecordBatch; -use datafusion::execution::SendableRecordBatchStream; +use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream}; use futures::StreamExt; use log::{error, info}; +use std::fmt::Debug; +use std::pin::Pin; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::Mutex; /// Handles executing queries for the TUI application, formatting results /// and sending them to the UI. pub(crate) struct AppExecution { inner: Arc, + results: Arc>>, } impl AppExecution { /// Create a new instance of [`AppExecution`]. pub fn new(inner: Arc) -> Self { - Self { inner } + Self { + inner, + results: Arc::new(Mutex::new(None)), + } + } + + fn results(&self) -> Arc>> { + Arc::clone(&self.results) + } + + async fn set_results(&self, results: PaginatingRecordBatchStream) { + let mut r = self.results.lock().await; + *r = Some(results); } /// Run the sequence of SQL queries, sending the results as [`AppEvent::QueryResult`] via the sender. @@ -63,26 +79,29 @@ impl AppExecution { if i == statement_count - 1 { info!("Executing last query and display results"); match self.inner.execute_sql(sql).await { - Ok(mut stream) => { - let mut batches = Vec::new(); - while let Some(maybe_batch) = stream.next().await { - match maybe_batch { - Ok(batch) => { - batches.push(batch); - } - Err(e) => { - let elapsed = start.elapsed(); - query.set_error(Some(e.to_string())); - query.set_execution_time(elapsed); - break; - } - } - } - let elapsed = start.elapsed(); - let rows: usize = batches.iter().map(|r| r.num_rows()).sum(); - query.set_results(Some(batches)); - query.set_num_rows(Some(rows)); - query.set_execution_time(elapsed); + Ok(stream) => { + let paginating_stream = PaginatingRecordBatchStream::new(stream); + self.set_results(paginating_stream).await; + + // let mut batches = Vec::new(); + // while let Some(maybe_batch) = stream.next().await { + // match maybe_batch { + // Ok(batch) => { + // batches.push(batch); + // } + // Err(e) => { + // let elapsed = start.elapsed(); + // query.set_error(Some(e.to_string())); + // query.set_execution_time(elapsed); + // break; + // } + // } + // } + // let elapsed = start.elapsed(); + // let rows: usize = batches.iter().map(|r| r.num_rows()).sum(); + // query.set_results(Some(batches)); + // query.set_num_rows(Some(rows)); + // query.set_execution_time(elapsed); } Err(e) => { error!("Error creating dataframe: {:?}", e); @@ -122,7 +141,7 @@ pub struct PaginatingRecordBatchStream { } impl PaginatingRecordBatchStream { - pub fn new(inner: SendableRecordBatchStream) -> Self { + pub fn new(inner: Pin>) -> Self { Self { inner, batches: Vec::new(), @@ -168,6 +187,15 @@ impl PaginatingRecordBatchStream { } } +impl Debug for PaginatingRecordBatchStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PaginatingRecordBatchStream") + .field("batches", &self.batches) + .field("current_batch", &self.current_batch) + .finish() + } +} + #[cfg(test)] mod tests { use super::PaginatingRecordBatchStream; diff --git a/src/app/handlers/sql.rs b/src/app/handlers/sql.rs index 98a2730..251b9ea 100644 --- a/src/app/handlers/sql.rs +++ b/src/app/handlers/sql.rs @@ -62,9 +62,8 @@ pub fn normal_mode_handler(app: &mut App, key: KeyEvent) { } KeyCode::Enter => { - info!("Run query"); let sql = app.state.sql_tab.editor().lines().join(""); - info!("SQL: {}", sql); + info!("Running query: {}", sql); let app_execution = AppExecution::new(Arc::clone(&app.execution)); let _event_tx = app.event_tx().clone(); // TODO: Maybe this should be on a separate runtime to prevent blocking main thread / diff --git a/src/app/mod.rs b/src/app/mod.rs index e35515f..28aafba 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -38,6 +38,7 @@ use tokio::task::JoinHandle; use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; +use self::app_execution::PaginatingRecordBatchStream; use self::handlers::{app_event_handler, crossterm_event_handler}; use self::state::tabs::sql::Query; use crate::execution::ExecutionContext; @@ -45,7 +46,7 @@ use crate::execution::ExecutionContext; #[cfg(feature = "flightsql")] use self::state::tabs::flightsql::FlightSQLQuery; -#[derive(Clone, Debug)] +#[derive(Debug)] pub enum AppEvent { Key(event::KeyEvent), Error, @@ -61,6 +62,7 @@ pub enum AppEvent { Resize(u16, u16), ExecuteDDL(String), QueryResult(Query), + // PaginatedQueryResult(PaginatingRecordBatchStream), #[cfg(feature = "flightsql")] EstablishFlightSQLConnection, #[cfg(feature = "flightsql")] @@ -325,7 +327,7 @@ pub async fn run_app(state: state::AppState<'_>) -> Result<()> { loop { let event = app.next().await?; - if let AppEvent::Render = event.clone() { + if let AppEvent::Render = &event { terminal.draw(|f| f.render_widget(&app, f.area()))?; }; diff --git a/src/app/ui/tabs/sql.rs b/src/app/ui/tabs/sql.rs index 16ee0cb..0e25afd 100644 --- a/src/app/ui/tabs/sql.rs +++ b/src/app/ui/tabs/sql.rs @@ -20,7 +20,7 @@ use ratatui::{ layout::{Alignment, Constraint, Direction, Layout, Rect}, style::{palette::tailwind, Style, Stylize}, text::Span, - widgets::{Block, Borders, Paragraph, Row, StatefulWidget, Table, Widget}, + widgets::{block::Title, Block, Borders, Paragraph, Row, StatefulWidget, Table, Widget}, }; use crate::app::ui::convert::record_batches_to_table; @@ -44,7 +44,10 @@ pub fn render_sql_editor(area: Rect, buf: &mut Buffer, app: &App) { } pub fn render_sql_results(area: Rect, buf: &mut Buffer, app: &App) { - let block = Block::default().title(" Results ").borders(Borders::ALL); + let block = Block::default() + .title(" Results ") + .borders(Borders::ALL) + .title(Title::from(" Page ").alignment(Alignment::Right)); if let Some(q) = app.state.sql_tab.query() { if let Some(r) = q.results() { if let Some(s) = app.state.sql_tab.query_results_state() { From 52802d65ea078f3d150495abfacc622106e11458 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Sat, 14 Sep 2024 10:06:01 -0400 Subject: [PATCH 03/11] SQL results displaying --- src/app/app_execution.rs | 11 ++- src/app/handlers/flightsql.rs | 136 +++++++++++++++++----------------- src/app/handlers/mod.rs | 54 ++++++++------ src/app/handlers/sql.rs | 5 +- src/app/mod.rs | 10 ++- src/app/ui/convert.rs | 2 +- src/app/ui/tabs/sql.rs | 75 ++++++++++++++----- src/execution/mod.rs | 6 ++ 8 files changed, 181 insertions(+), 118 deletions(-) diff --git a/src/app/app_execution.rs b/src/app/app_execution.rs index 68ab18a..a1d112a 100644 --- a/src/app/app_execution.rs +++ b/src/app/app_execution.rs @@ -22,6 +22,7 @@ use crate::app::AppEvent; use crate::execution::ExecutionContext; use color_eyre::eyre::Result; use datafusion::arrow::array::RecordBatch; +use datafusion::execution::context::SessionContext; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream}; use futures::StreamExt; use log::{error, info}; @@ -34,6 +35,7 @@ use tokio::sync::Mutex; /// Handles executing queries for the TUI application, formatting results /// and sending them to the UI. +#[derive(Debug)] pub(crate) struct AppExecution { inner: Arc, results: Arc>>, @@ -48,7 +50,11 @@ impl AppExecution { } } - fn results(&self) -> Arc>> { + pub fn session_ctx(&self) -> &SessionContext { + self.inner.session_ctx() + } + + pub fn results(&self) -> Arc>> { Arc::clone(&self.results) } @@ -80,7 +86,8 @@ impl AppExecution { info!("Executing last query and display results"); match self.inner.execute_sql(sql).await { Ok(stream) => { - let paginating_stream = PaginatingRecordBatchStream::new(stream); + let mut paginating_stream = PaginatingRecordBatchStream::new(stream); + paginating_stream.next_batch().await?; self.set_results(paginating_stream).await; // let mut batches = Vec::new(); diff --git a/src/app/handlers/flightsql.rs b/src/app/handlers/flightsql.rs index 52ac9ed..f20f470 100644 --- a/src/app/handlers/flightsql.rs +++ b/src/app/handlers/flightsql.rs @@ -66,74 +66,74 @@ pub fn normal_mode_handler(app: &mut App, key: KeyEvent) { } } - KeyCode::Enter => { - info!("Run FS query"); - let sql = app.state.flightsql_tab.editor().lines().join(""); - info!("SQL: {}", sql); - let execution = Arc::clone(&app.execution); - let _event_tx = app.event_tx(); - tokio::spawn(async move { - let client = execution.flightsql_client(); - let mut query = - FlightSQLQuery::new(sql.clone(), None, None, None, Duration::default(), None); - let start = Instant::now(); - if let Some(ref mut c) = *client.lock().await { - info!("Sending query"); - match c.execute(sql, None).await { - Ok(flight_info) => { - for endpoint in flight_info.endpoint { - if let Some(ticket) = endpoint.ticket { - match c.do_get(ticket.into_request()).await { - Ok(mut stream) => { - let mut batches: Vec = Vec::new(); - // temporarily only show the first batch to avoid - // buffering massive result sets. Eventually there should - // be some sort of paging logic - // see https://github.com/datafusion-contrib/datafusion-tui/pull/133#discussion_r1756680874 - // while let Some(maybe_batch) = stream.next().await { - if let Some(maybe_batch) = stream.next().await { - match maybe_batch { - Ok(batch) => { - info!("Batch rows: {}", batch.num_rows()); - batches.push(batch); - } - Err(e) => { - error!("Error getting batch: {:?}", e); - let elapsed = start.elapsed(); - query.set_error(Some(e.to_string())); - query.set_execution_time(elapsed); - } - } - } - let elapsed = start.elapsed(); - let rows: usize = - batches.iter().map(|r| r.num_rows()).sum(); - query.set_results(Some(batches)); - query.set_num_rows(Some(rows)); - query.set_execution_time(elapsed); - } - Err(e) => { - error!("Error getting response: {:?}", e); - let elapsed = start.elapsed(); - query.set_error(Some(e.to_string())); - query.set_execution_time(elapsed); - } - } - } - } - } - Err(e) => { - error!("Error getting response: {:?}", e); - let elapsed = start.elapsed(); - query.set_error(Some(e.to_string())); - query.set_execution_time(elapsed); - } - } - } - - let _ = _event_tx.send(AppEvent::FlightSQLQueryResult(query)); - }); - } + // KeyCode::Enter => { + // info!("Run FS query"); + // let sql = app.state.flightsql_tab.editor().lines().join(""); + // info!("SQL: {}", sql); + // let execution = Arc::clone(&app.execution); + // let _event_tx = app.event_tx(); + // tokio::spawn(async move { + // let client = execution.flightsql_client(); + // let mut query = + // FlightSQLQuery::new(sql.clone(), None, None, None, Duration::default(), None); + // let start = Instant::now(); + // if let Some(ref mut c) = *client.lock().await { + // info!("Sending query"); + // match c.execute(sql, None).await { + // Ok(flight_info) => { + // for endpoint in flight_info.endpoint { + // if let Some(ticket) = endpoint.ticket { + // match c.do_get(ticket.into_request()).await { + // Ok(mut stream) => { + // let mut batches: Vec = Vec::new(); + // // temporarily only show the first batch to avoid + // // buffering massive result sets. Eventually there should + // // be some sort of paging logic + // // see https://github.com/datafusion-contrib/datafusion-tui/pull/133#discussion_r1756680874 + // // while let Some(maybe_batch) = stream.next().await { + // if let Some(maybe_batch) = stream.next().await { + // match maybe_batch { + // Ok(batch) => { + // info!("Batch rows: {}", batch.num_rows()); + // batches.push(batch); + // } + // Err(e) => { + // error!("Error getting batch: {:?}", e); + // let elapsed = start.elapsed(); + // query.set_error(Some(e.to_string())); + // query.set_execution_time(elapsed); + // } + // } + // } + // let elapsed = start.elapsed(); + // let rows: usize = + // batches.iter().map(|r| r.num_rows()).sum(); + // query.set_results(Some(batches)); + // query.set_num_rows(Some(rows)); + // query.set_execution_time(elapsed); + // } + // Err(e) => { + // error!("Error getting response: {:?}", e); + // let elapsed = start.elapsed(); + // query.set_error(Some(e.to_string())); + // query.set_execution_time(elapsed); + // } + // } + // } + // } + // } + // Err(e) => { + // error!("Error getting response: {:?}", e); + // let elapsed = start.elapsed(); + // query.set_error(Some(e.to_string())); + // query.set_execution_time(elapsed); + // } + // } + // } + // + // let _ = _event_tx.send(AppEvent::FlightSQLQueryResult(query)); + // }); + // } _ => {} } } diff --git a/src/app/handlers/mod.rs b/src/app/handlers/mod.rs index 668dd92..32ec426 100644 --- a/src/app/handlers/mod.rs +++ b/src/app/handlers/mod.rs @@ -194,6 +194,16 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> { app.state.history_tab.add_to_history(history_query); app.state.history_tab.refresh_history_table_state() } + AppEvent::QueryResultsNextPage => { + let results = app.execution().results(); + + tokio::spawn(async move { + let mut locked = results.lock().await; + if let Some(stream) = locked.as_mut() { + stream.next_batch().await; + } + }); + } #[cfg(feature = "flightsql")] AppEvent::FlightSQLQueryResult(r) => { app.state.flightsql_tab.set_query(r.clone()); @@ -207,28 +217,28 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> { app.state.history_tab.add_to_history(history_query); app.state.history_tab.refresh_history_table_state() } - #[cfg(feature = "flightsql")] - AppEvent::EstablishFlightSQLConnection => { - let url = app.state.config.flightsql.connection_url.clone(); - info!("Connection to FlightSQL host: {}", url); - let url: &'static str = Box::leak(url.into_boxed_str()); - let execution = Arc::clone(&app.execution); - tokio::spawn(async move { - let client = execution.flightsql_client(); - let maybe_channel = Channel::from_static(url).connect().await; - info!("Created channel"); - match maybe_channel { - Ok(channel) => { - let flightsql_client = FlightSqlServiceClient::new(channel); - let mut locked_client = client.lock().await; - *locked_client = Some(flightsql_client); - } - Err(e) => { - info!("Error creating channel for FlightSQL: {:?}", e); - } - } - }); - } + // #[cfg(feature = "flightsql")] + // AppEvent::EstablishFlightSQLConnection => { + // let url = app.state.config.flightsql.connection_url.clone(); + // info!("Connection to FlightSQL host: {}", url); + // let url: &'static str = Box::leak(url.into_boxed_str()); + // let execution = Arc::clone(&app.execution); + // tokio::spawn(async move { + // let client = execution.flightsql_client(); + // let maybe_channel = Channel::from_static(url).connect().await; + // info!("Created channel"); + // match maybe_channel { + // Ok(channel) => { + // let flightsql_client = FlightSqlServiceClient::new(channel); + // let mut locked_client = client.lock().await; + // *locked_client = Some(flightsql_client); + // } + // Err(e) => { + // info!("Error creating channel for FlightSQL: {:?}", e); + // } + // } + // }); + // } _ => { match app.state.tabs.selected { SelectedTab::SQL => sql::app_event_handler(app, event), diff --git a/src/app/handlers/sql.rs b/src/app/handlers/sql.rs index 251b9ea..b2e8de4 100644 --- a/src/app/handlers/sql.rs +++ b/src/app/handlers/sql.rs @@ -64,14 +64,15 @@ pub fn normal_mode_handler(app: &mut App, key: KeyEvent) { KeyCode::Enter => { let sql = app.state.sql_tab.editor().lines().join(""); info!("Running query: {}", sql); - let app_execution = AppExecution::new(Arc::clone(&app.execution)); + // let app_execution = AppExecution::new(Arc::clone(&app.execution)); let _event_tx = app.event_tx().clone(); + let execution = Arc::clone(&app.execution); // TODO: Maybe this should be on a separate runtime to prevent blocking main thread / // runtime // TODO: Extract this into function to be used in both normal and editable handler tokio::spawn(async move { let sqls: Vec<&str> = sql.split(';').collect(); - let _ = app_execution.run_sqls(sqls, _event_tx).await; + let _ = execution.run_sqls(sqls, _event_tx).await; }); } _ => {} diff --git a/src/app/mod.rs b/src/app/mod.rs index de68c02..8474ec4 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -38,7 +38,7 @@ use tokio::task::JoinHandle; use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; -use self::app_execution::PaginatingRecordBatchStream; +use self::app_execution::{AppExecution, PaginatingRecordBatchStream}; use self::handlers::{app_event_handler, crossterm_event_handler}; use self::state::tabs::sql::Query; use crate::execution::ExecutionContext; @@ -63,6 +63,7 @@ pub enum AppEvent { ExecuteDDL(String), QueryResult(Query), // PaginatedQueryResult(PaginatingRecordBatchStream), + QueryResultsNextPage, #[cfg(feature = "flightsql")] EstablishFlightSQLConnection, #[cfg(feature = "flightsql")] @@ -71,7 +72,7 @@ pub enum AppEvent { pub struct App<'app> { state: state::AppState<'app>, - execution: Arc, + execution: Arc, event_tx: UnboundedSender, event_rx: UnboundedReceiver, cancellation_token: CancellationToken, @@ -83,6 +84,7 @@ impl<'app> App<'app> { let (event_tx, event_rx) = mpsc::unbounded_channel(); let cancellation_token = CancellationToken::new(); let task = tokio::spawn(async {}); + let app_execution = Arc::new(AppExecution::new(Arc::new(execution))); Self { state, @@ -90,7 +92,7 @@ impl<'app> App<'app> { event_rx, event_tx, cancellation_token, - execution: Arc::new(execution), + execution: app_execution, } } @@ -102,7 +104,7 @@ impl<'app> App<'app> { &mut self.event_rx } - pub fn execution(&self) -> Arc { + pub fn execution(&self) -> Arc { Arc::clone(&self.execution) } diff --git a/src/app/ui/convert.rs b/src/app/ui/convert.rs index 1d129df..d3a01a2 100644 --- a/src/app/ui/convert.rs +++ b/src/app/ui/convert.rs @@ -166,7 +166,7 @@ pub fn empty_results_table<'frame>() -> Table<'frame> { } pub fn record_batches_to_table<'frame, 'results>( - record_batches: &'results [RecordBatch], + record_batches: &'results [&RecordBatch], ) -> Result> where // The results come from sql_tab state which persists until the next query is run which is diff --git a/src/app/ui/tabs/sql.rs b/src/app/ui/tabs/sql.rs index 0e25afd..2a4773b 100644 --- a/src/app/ui/tabs/sql.rs +++ b/src/app/ui/tabs/sql.rs @@ -48,17 +48,17 @@ pub fn render_sql_results(area: Rect, buf: &mut Buffer, app: &App) { .title(" Results ") .borders(Borders::ALL) .title(Title::from(" Page ").alignment(Alignment::Right)); - if let Some(q) = app.state.sql_tab.query() { - if let Some(r) = q.results() { - if let Some(s) = app.state.sql_tab.query_results_state() { - let stats = Span::from(format!( - " {} rows in {}ms ", - q.num_rows().unwrap_or(0), - q.execution_time().as_millis() - )) - .fg(tailwind::WHITE); - let block = block.title_bottom(stats).fg(tailwind::ORANGE.c500); - let maybe_table = record_batches_to_table(r); + + let results = app.execution().results(); + let locked = tokio::task::block_in_place(|| results.blocking_lock()); + let maybe_stream = locked.as_ref(); + if let Some(s) = app.state.sql_tab.query_results_state() { + if let Some(stream) = maybe_stream { + if let Some(batch) = stream.current_batch() { + let batches = vec![batch]; + let maybe_table = record_batches_to_table(&batches); + + let block = block.title_bottom("Stats").fg(tailwind::ORANGE.c500); match maybe_table { Ok(table) => { let table = table @@ -78,17 +78,54 @@ pub fn render_sql_results(area: Rect, buf: &mut Buffer, app: &App) { } } } + } + } + + // let block = Block::default() + // .title(" Results ") + // .borders(Borders::ALL) + // .title(Title::from(" Page ").alignment(Alignment::Right)); + if let Some(q) = app.state.sql_tab.query() { + if let Some(r) = q.results() { + if let Some(s) = app.state.sql_tab.query_results_state() { + // let stats = Span::from(format!( + // " {} rows in {}ms ", + // q.num_rows().unwrap_or(0), + // q.execution_time().as_millis() + // )) + // .fg(tailwind::WHITE); + // let block = block.title_bottom(stats).fg(tailwind::ORANGE.c500); + // let maybe_table = record_batches_to_table(r); + // match maybe_table { + // Ok(table) => { + // let table = table + // .highlight_style( + // Style::default().bg(tailwind::WHITE).fg(tailwind::BLACK), + // ) + // .block(block); + // + // let mut s = s.borrow_mut(); + // StatefulWidget::render(table, area, buf, &mut s); + // } + // Err(e) => { + // let row = Row::new(vec![e.to_string()]); + // let widths = vec![Constraint::Percentage(100)]; + // let table = Table::new(vec![row], widths).block(block); + // Widget::render(table, area, buf); + // } + // } + } } else if let Some(e) = q.error() { - let row = Row::new(vec![e.to_string()]); - let widths = vec![Constraint::Percentage(100)]; - let table = Table::new(vec![row], widths).block(block); - Widget::render(table, area, buf); + // let row = Row::new(vec![e.to_string()]); + // let widths = vec![Constraint::Percentage(100)]; + // let table = Table::new(vec![row], widths).block(block); + // Widget::render(table, area, buf); } } else { - let row = Row::new(vec!["Run a query to generate results"]); - let widths = vec![Constraint::Percentage(100)]; - let table = Table::new(vec![row], widths).block(block); - Widget::render(table, area, buf); + // let row = Row::new(vec!["Run a query to generate results"]); + // let widths = vec![Constraint::Percentage(100)]; + // let table = Table::new(vec![row], widths).block(block); + // Widget::render(table, area, buf); } } diff --git a/src/execution/mod.rs b/src/execution/mod.rs index 6eb212a..d1f6bc3 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -54,6 +54,12 @@ pub struct ExecutionContext { flightsql_client: Mutex>>, } +impl std::fmt::Debug for ExecutionContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ExecutionContext").finish() + } +} + impl ExecutionContext { /// Construct a new `ExecutionContext` with the specified configuration pub fn try_new(config: &ExecutionConfig) -> Result { From a9becce98834f41dd190f339ef2fb1e4db22840f Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Sun, 15 Sep 2024 10:17:54 -0400 Subject: [PATCH 04/11] New paginating model --- src/app/app_execution.rs | 44 ++++++++++++++++++++++++----------- src/app/handlers/mod.rs | 19 +++++++-------- src/app/mod.rs | 4 ++-- src/app/state/tabs/sql.rs | 16 +++++++++++++ src/app/ui/tabs/flightsql.rs | 38 +++++++++++++++--------------- src/app/ui/tabs/sql.rs | 45 +++++++++++++++++------------------- 6 files changed, 99 insertions(+), 67 deletions(-) diff --git a/src/app/app_execution.rs b/src/app/app_execution.rs index a1d112a..afbd9b3 100644 --- a/src/app/app_execution.rs +++ b/src/app/app_execution.rs @@ -35,10 +35,9 @@ use tokio::sync::Mutex; /// Handles executing queries for the TUI application, formatting results /// and sending them to the UI. -#[derive(Debug)] pub(crate) struct AppExecution { inner: Arc, - results: Arc>>, + result_stream: Arc>>, // results: Arc>>, } impl AppExecution { @@ -46,7 +45,7 @@ impl AppExecution { pub fn new(inner: Arc) -> Self { Self { inner, - results: Arc::new(Mutex::new(None)), + result_stream: Arc::new(Mutex::new(None)), // results: Arc::new(Mutex::new(None)), } } @@ -54,15 +53,20 @@ impl AppExecution { self.inner.session_ctx() } - pub fn results(&self) -> Arc>> { - Arc::clone(&self.results) - } + // pub fn results(&self) -> Arc>> { + // Arc::clone(&self.results) + // } - async fn set_results(&self, results: PaginatingRecordBatchStream) { - let mut r = self.results.lock().await; - *r = Some(results); + pub async fn set_result_stream(&self, stream: SendableRecordBatchStream) { + let mut s = self.result_stream.lock().await; + *s = Some(stream) } + // async fn set_results(&self, results: PaginatingRecordBatchStream) { + // let mut r = self.results.lock().await; + // *r = Some(results); + // } + /// Run the sequence of SQL queries, sending the results as [`AppEvent::QueryResult`] via the sender. /// /// All queries except the last one will have their results discarded. @@ -86,10 +90,22 @@ impl AppExecution { info!("Executing last query and display results"); match self.inner.execute_sql(sql).await { Ok(stream) => { - let mut paginating_stream = PaginatingRecordBatchStream::new(stream); - paginating_stream.next_batch().await?; - self.set_results(paginating_stream).await; - + // let mut paginating_stream = PaginatingRecordBatchStream::new(stream); + // paginating_stream.next_batch().await?; + self.set_result_stream(stream).await; + let mut stream = self.result_stream.lock().await; + if let Some(s) = stream.as_mut() { + if let Some(b) = s.next().await { + match b { + Ok(b) => { + sender.send(AppEvent::QueryResultsNextPage(b)); + } + Err(e) => { + error!("Error getting RecordBatch: {:?}", e); + } + } + } + } // let mut batches = Vec::new(); // while let Some(maybe_batch) = stream.next().await { // match maybe_batch { @@ -138,6 +154,8 @@ impl AppExecution { } /// A stream of [`RecordBatch`]es that can be paginated for display in the TUI. +/// +/// Since `SendabkeRecordBatchStream` is not `Sync` we can't send it as an [`AppEvent`] pub struct PaginatingRecordBatchStream { // currently executing stream inner: SendableRecordBatchStream, diff --git a/src/app/handlers/mod.rs b/src/app/handlers/mod.rs index 32ec426..dfc2b30 100644 --- a/src/app/handlers/mod.rs +++ b/src/app/handlers/mod.rs @@ -194,15 +194,16 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> { app.state.history_tab.add_to_history(history_query); app.state.history_tab.refresh_history_table_state() } - AppEvent::QueryResultsNextPage => { - let results = app.execution().results(); - - tokio::spawn(async move { - let mut locked = results.lock().await; - if let Some(stream) = locked.as_mut() { - stream.next_batch().await; - } - }); + AppEvent::QueryResultsNextPage(b) => { + app.state.sql_tab.add_batch(b); + // let results = app.execution().results(); + // + // tokio::spawn(async move { + // let mut locked = results.lock().await; + // if let Some(stream) = locked.as_mut() { + // stream.next_batch().await; + // } + // }); } #[cfg(feature = "flightsql")] AppEvent::FlightSQLQueryResult(r) => { diff --git a/src/app/mod.rs b/src/app/mod.rs index 8474ec4..ed063c9 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -23,6 +23,7 @@ pub mod ui; use color_eyre::eyre::eyre; use color_eyre::Result; use crossterm::event as ct; +use datafusion::arrow::array::RecordBatch; use futures::FutureExt; use log::{debug, error, info, trace}; use ratatui::backend::CrosstermBackend; @@ -62,8 +63,7 @@ pub enum AppEvent { Resize(u16, u16), ExecuteDDL(String), QueryResult(Query), - // PaginatedQueryResult(PaginatingRecordBatchStream), - QueryResultsNextPage, + QueryResultsNextPage(RecordBatch), #[cfg(feature = "flightsql")] EstablishFlightSQLConnection, #[cfg(feature = "flightsql")] diff --git a/src/app/state/tabs/sql.rs b/src/app/state/tabs/sql.rs index 7bc633d..7f0283e 100644 --- a/src/app/state/tabs/sql.rs +++ b/src/app/state/tabs/sql.rs @@ -107,6 +107,8 @@ pub struct SQLTabState<'app> { editor_editable: bool, query: Option, query_results_state: Option>, + result_batches: Option>, + results_page: Option, } impl<'app> SQLTabState<'app> { @@ -120,6 +122,8 @@ impl<'app> SQLTabState<'app> { editor_editable: false, query: None, query_results_state: None, + result_batches: None, + results_page: None, } } @@ -192,4 +196,16 @@ impl<'app> SQLTabState<'app> { pub fn delete_word(&mut self) { self.editor.delete_word(); } + + pub fn add_batch(&mut self, batch: RecordBatch) { + if let Some(batches) = self.result_batches.as_mut() { + batches.push(batch); + } else { + self.result_batches = Some(vec![batch]); + } + } + + pub fn current_batch(&self) -> Option<&RecordBatch> { + self.result_batches.as_ref().and_then(|b| b.get(0)) + } } diff --git a/src/app/ui/tabs/flightsql.rs b/src/app/ui/tabs/flightsql.rs index daf9ba5..b8bdc9a 100644 --- a/src/app/ui/tabs/flightsql.rs +++ b/src/app/ui/tabs/flightsql.rs @@ -54,25 +54,25 @@ pub fn render_sql_results(area: Rect, buf: &mut Buffer, app: &App) { )) .fg(tailwind::WHITE); let block = block.title_bottom(stats).fg(tailwind::ORANGE.c500); - let maybe_table = record_batches_to_table(r); - match maybe_table { - Ok(table) => { - let table = table - .highlight_style( - Style::default().bg(tailwind::WHITE).fg(tailwind::BLACK), - ) - .block(block); - - let mut s = s.borrow_mut(); - StatefulWidget::render(table, area, buf, &mut s); - } - Err(e) => { - let row = Row::new(vec![e.to_string()]); - let widths = vec![Constraint::Percentage(100)]; - let table = Table::new(vec![row], widths).block(block); - Widget::render(table, area, buf); - } - } + // let maybe_table = record_batches_to_table(r); + // match maybe_table { + // Ok(table) => { + // let table = table + // .highlight_style( + // Style::default().bg(tailwind::WHITE).fg(tailwind::BLACK), + // ) + // .block(block); + // + // let mut s = s.borrow_mut(); + // StatefulWidget::render(table, area, buf, &mut s); + // } + // Err(e) => { + // let row = Row::new(vec![e.to_string()]); + // let widths = vec![Constraint::Percentage(100)]; + // let table = Table::new(vec![row], widths).block(block); + // Widget::render(table, area, buf); + // } + // } } } else if let Some(e) = q.error() { let row = Row::new(vec![e.to_string()]); diff --git a/src/app/ui/tabs/sql.rs b/src/app/ui/tabs/sql.rs index 2a4773b..a2bf51e 100644 --- a/src/app/ui/tabs/sql.rs +++ b/src/app/ui/tabs/sql.rs @@ -49,33 +49,30 @@ pub fn render_sql_results(area: Rect, buf: &mut Buffer, app: &App) { .borders(Borders::ALL) .title(Title::from(" Page ").alignment(Alignment::Right)); - let results = app.execution().results(); - let locked = tokio::task::block_in_place(|| results.blocking_lock()); - let maybe_stream = locked.as_ref(); + // let results = app.execution().results(); + // let locked = tokio::task::block_in_place(|| results.blocking_lock()); + // let maybe_stream = locked.as_ref(); + // TODO: Change this to a match on state and batch if let Some(s) = app.state.sql_tab.query_results_state() { - if let Some(stream) = maybe_stream { - if let Some(batch) = stream.current_batch() { - let batches = vec![batch]; - let maybe_table = record_batches_to_table(&batches); + if let Some(batch) = app.state.sql_tab.current_batch() { + let batches = vec![batch]; + let maybe_table = record_batches_to_table(&batches); - let block = block.title_bottom("Stats").fg(tailwind::ORANGE.c500); - match maybe_table { - Ok(table) => { - let table = table - .highlight_style( - Style::default().bg(tailwind::WHITE).fg(tailwind::BLACK), - ) - .block(block); + let block = block.title_bottom("Stats").fg(tailwind::ORANGE.c500); + match maybe_table { + Ok(table) => { + let table = table + .highlight_style(Style::default().bg(tailwind::WHITE).fg(tailwind::BLACK)) + .block(block); - let mut s = s.borrow_mut(); - StatefulWidget::render(table, area, buf, &mut s); - } - Err(e) => { - let row = Row::new(vec![e.to_string()]); - let widths = vec![Constraint::Percentage(100)]; - let table = Table::new(vec![row], widths).block(block); - Widget::render(table, area, buf); - } + let mut s = s.borrow_mut(); + StatefulWidget::render(table, area, buf, &mut s); + } + Err(e) => { + let row = Row::new(vec![e.to_string()]); + let widths = vec![Constraint::Percentage(100)]; + let table = Table::new(vec![row], widths).block(block); + Widget::render(table, area, buf); } } } From f35351901b10dd91a5d0294f67ef644132603607 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Tue, 17 Sep 2024 00:06:54 -0400 Subject: [PATCH 05/11] More refactoring for pagination --- src/app/app_execution.rs | 322 +++++++++------------------------- src/app/handlers/mod.rs | 33 ++-- src/app/handlers/sql.rs | 6 +- src/app/mod.rs | 29 ++- src/app/state/tabs/history.rs | 3 + src/app/state/tabs/sql.rs | 18 ++ src/app/ui/tabs/sql.rs | 48 ++--- src/execution/mod.rs | 10 ++ 8 files changed, 194 insertions(+), 275 deletions(-) diff --git a/src/app/app_execution.rs b/src/app/app_execution.rs index afbd9b3..b8d1446 100644 --- a/src/app/app_execution.rs +++ b/src/app/app_execution.rs @@ -18,12 +18,13 @@ //! [`AppExecution`]: Handles executing queries for the TUI application. use crate::app::state::tabs::sql::Query; -use crate::app::AppEvent; +use crate::app::{AppEvent, ExecutionError}; use crate::execution::ExecutionContext; use color_eyre::eyre::Result; use datafusion::arrow::array::RecordBatch; use datafusion::execution::context::SessionContext; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream}; +use datafusion::physical_plan::{execute_stream, ExecutionPlan}; use futures::StreamExt; use log::{error, info}; use std::fmt::Debug; @@ -37,7 +38,7 @@ use tokio::sync::Mutex; /// and sending them to the UI. pub(crate) struct AppExecution { inner: Arc, - result_stream: Arc>>, // results: Arc>>, + result_stream: Arc>>, } impl AppExecution { @@ -45,7 +46,7 @@ impl AppExecution { pub fn new(inner: Arc) -> Self { Self { inner, - result_stream: Arc::new(Mutex::new(None)), // results: Arc::new(Mutex::new(None)), + result_stream: Arc::new(Mutex::new(None)), } } @@ -53,20 +54,11 @@ impl AppExecution { self.inner.session_ctx() } - // pub fn results(&self) -> Arc>> { - // Arc::clone(&self.results) - // } - pub async fn set_result_stream(&self, stream: SendableRecordBatchStream) { let mut s = self.result_stream.lock().await; *s = Some(stream) } - // async fn set_results(&self, results: PaginatingRecordBatchStream) { - // let mut r = self.results.lock().await; - // *r = Some(results); - // } - /// Run the sequence of SQL queries, sending the results as [`AppEvent::QueryResult`] via the sender. /// /// All queries except the last one will have their results discarded. @@ -88,51 +80,94 @@ impl AppExecution { let start = std::time::Instant::now(); if i == statement_count - 1 { info!("Executing last query and display results"); - match self.inner.execute_sql(sql).await { - Ok(stream) => { - // let mut paginating_stream = PaginatingRecordBatchStream::new(stream); - // paginating_stream.next_batch().await?; - self.set_result_stream(stream).await; - let mut stream = self.result_stream.lock().await; - if let Some(s) = stream.as_mut() { - if let Some(b) = s.next().await { - match b { - Ok(b) => { - sender.send(AppEvent::QueryResultsNextPage(b)); - } - Err(e) => { - error!("Error getting RecordBatch: {:?}", e); + sender.send(AppEvent::NewExecution)?; + match self.inner.create_physical_plan(sql).await { + Ok(plan) => match execute_stream(plan, self.inner.session_ctx().task_ctx()) { + Ok(stream) => { + self.set_result_stream(stream).await; + let mut stream = self.result_stream.lock().await; + if let Some(s) = stream.as_mut() { + if let Some(b) = s.next().await { + match b { + Ok(b) => { + sender.send(AppEvent::ExecutionResultsNextPage(b))?; + } + Err(e) => { + error!("Error getting RecordBatch: {:?}", e); + } } } } } - // let mut batches = Vec::new(); - // while let Some(maybe_batch) = stream.next().await { - // match maybe_batch { - // Ok(batch) => { - // batches.push(batch); - // } - // Err(e) => { - // let elapsed = start.elapsed(); - // query.set_error(Some(e.to_string())); - // query.set_execution_time(elapsed); - // break; - // } - // } - // } - // let elapsed = start.elapsed(); - // let rows: usize = batches.iter().map(|r| r.num_rows()).sum(); - // query.set_results(Some(batches)); - // query.set_num_rows(Some(rows)); - // query.set_execution_time(elapsed); - } - Err(e) => { - error!("Error creating dataframe: {:?}", e); + Err(stream_err) => { + error!("Error creating physical plan: {:?}", stream_err); + let elapsed = start.elapsed(); + let e = ExecutionError { + query: sql.to_string(), + error: stream_err.to_string(), + duration: elapsed, + }; + sender.send(AppEvent::ExecutionResultsError(e))?; + } + }, + Err(plan_err) => { + error!("Error creating physical plan: {:?}", plan_err); let elapsed = start.elapsed(); - query.set_error(Some(e.to_string())); - query.set_execution_time(elapsed); + let e = ExecutionError { + query: sql.to_string(), + error: plan_err.to_string(), + duration: elapsed, + }; + sender.send(AppEvent::ExecutionResultsError(e))?; } } + // match self.inner.execute_sql(sql).await { + // Ok(stream) => { + // // self.set_result_stream(stream).await; + // // let mut stream = self.result_stream.lock().await; + // // if let Some(s) = stream.as_mut() { + // // if let Some(b) = s.next().await { + // // match b { + // // Ok(b) => { + // // sender.send(AppEvent::ExecutionResultsNextPage(b))?; + // // } + // // Err(e) => { + // // error!("Error getting RecordBatch: {:?}", e); + // // } + // // } + // // } + // // } + // // let mut batches = Vec::new(); + // // while let Some(maybe_batch) = stream.next().await { + // // match maybe_batch { + // // Ok(batch) => { + // // batches.push(batch); + // // } + // // Err(e) => { + // // let elapsed = start.elapsed(); + // // query.set_error(Some(e.to_string())); + // // query.set_execution_time(elapsed); + // // break; + // // } + // // } + // // } + // // let elapsed = start.elapsed(); + // // let rows: usize = batches.iter().map(|r| r.num_rows()).sum(); + // // query.set_results(Some(batches)); + // // query.set_num_rows(Some(rows)); + // // query.set_execution_time(elapsed); + // } + // Err(e) => { + // error!("Error creating dataframe: {:?}", e); + // let elapsed = start.elapsed(); + // let e = ExecutionError { + // query: sql.to_string(), + // error: e.to_string(), + // duration: elapsed, + // }; + // sender.send(AppEvent::ExecutionResultsError(e))?; + // } + // } } else { match self.inner.execute_sql_and_discard_results(sql).await { Ok(_) => { @@ -152,190 +187,3 @@ impl AppExecution { Ok(()) } } - -/// A stream of [`RecordBatch`]es that can be paginated for display in the TUI. -/// -/// Since `SendabkeRecordBatchStream` is not `Sync` we can't send it as an [`AppEvent`] -pub struct PaginatingRecordBatchStream { - // currently executing stream - inner: SendableRecordBatchStream, - // any batches that have been buffered so far - batches: Vec, - // current batch being shown - current_batch: Option, -} - -impl PaginatingRecordBatchStream { - pub fn new(inner: Pin>) -> Self { - Self { - inner, - batches: Vec::new(), - current_batch: None, - } - } - - /// Return the batch at the current index - pub fn current_batch(&self) -> Option<&RecordBatch> { - if let Some(idx) = self.current_batch { - self.batches.get(idx) - } else { - None - } - } - - /// Return the next batch - /// TBD on logic for handling the end - pub async fn next_batch(&mut self) -> Result> { - if let Some(b) = self.inner.next().await { - match b { - Ok(batch) => { - self.batches.push(batch); - self.current_batch = Some(self.batches.len() - 1); - Ok(self.current_batch()) - } - Err(e) => Err(e.into()), - } - } else { - Ok(None) - } - } - - /// Return the previous batch - /// TBD on logic for handling the beginning - pub fn previous_batch(&mut self) -> Option<&RecordBatch> { - if let Some(idx) = self.current_batch { - if idx > 0 { - self.current_batch = Some(idx - 1); - } - } - self.current_batch() - } -} - -impl Debug for PaginatingRecordBatchStream { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("PaginatingRecordBatchStream") - .field("batches", &self.batches) - .field("current_batch", &self.current_batch) - .finish() - } -} - -#[cfg(test)] -mod tests { - use super::PaginatingRecordBatchStream; - use datafusion::{ - arrow::array::{ArrayRef, Int32Array, RecordBatch}, - common::Result, - physical_plan::stream::RecordBatchStreamAdapter, - }; - use std::sync::Arc; - - #[tokio::test] - async fn test_paginating_record_batch_stream() { - let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); - let b: ArrayRef = Arc::new(Int32Array::from(vec![1, 1])); - - let record_batch1 = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); - let record_batch2 = RecordBatch::try_from_iter(vec![("b", b)]).unwrap(); - - let schema = record_batch1.schema(); - let batches: Vec> = - vec![Ok(record_batch1.clone()), Ok(record_batch2.clone())]; - let stream = futures::stream::iter(batches); - let sendable_stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream)); - - let mut paginating_stream = PaginatingRecordBatchStream::new(sendable_stream); - - assert_eq!(paginating_stream.current_batch(), None); - assert_eq!( - paginating_stream.next_batch().await.unwrap(), - Some(&record_batch1) - ); - assert_eq!( - paginating_stream.next_batch().await.unwrap(), - Some(&record_batch2) - ); - assert_eq!(paginating_stream.next_batch().await.unwrap(), None); - } - - #[tokio::test] - async fn test_paginating_record_batch_stream_previous() { - let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); - let b: ArrayRef = Arc::new(Int32Array::from(vec![1, 1])); - - let record_batch1 = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); - let record_batch2 = RecordBatch::try_from_iter(vec![("b", b)]).unwrap(); - - let schema = record_batch1.schema(); - let batches: Vec> = - vec![Ok(record_batch1.clone()), Ok(record_batch2.clone())]; - let stream = futures::stream::iter(batches); - let sendable_stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream)); - - let mut paginating_stream = PaginatingRecordBatchStream::new(sendable_stream); - - assert_eq!(paginating_stream.current_batch(), None); - assert_eq!( - paginating_stream.next_batch().await.unwrap(), - Some(&record_batch1) - ); - assert_eq!( - paginating_stream.next_batch().await.unwrap(), - Some(&record_batch2) - ); - assert_eq!(paginating_stream.next_batch().await.unwrap(), None); - assert_eq!(paginating_stream.current_batch(), Some(&record_batch2)); - assert_eq!(paginating_stream.previous_batch(), Some(&record_batch1)); - assert_eq!(paginating_stream.previous_batch(), Some(&record_batch1)); - } - - #[tokio::test] - async fn test_paginating_record_batch_stream_one_error() { - let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); - let record_batch1 = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); - - let schema = record_batch1.schema(); - let batches: Vec> = vec![Err( - datafusion::error::DataFusionError::Execution("Error creating dataframe".to_string()), - )]; - let stream = futures::stream::iter(batches); - let sendable_stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream)); - - let mut paginating_stream = PaginatingRecordBatchStream::new(sendable_stream); - - assert_eq!(paginating_stream.current_batch(), None); - - let res = paginating_stream.next_batch().await; - assert!(res.is_err()); - } - - #[tokio::test] - async fn test_paginating_record_batch_stream_successful_then_error() { - let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); - - let record_batch1 = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); - - let schema = record_batch1.schema(); - let batches: Vec> = vec![ - Ok(record_batch1.clone()), - Err(datafusion::error::DataFusionError::Execution( - "Error creating dataframe".to_string(), - )), - ]; - let stream = futures::stream::iter(batches); - let sendable_stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream)); - - let mut paginating_stream = PaginatingRecordBatchStream::new(sendable_stream); - - assert_eq!(paginating_stream.current_batch(), None); - assert_eq!( - paginating_stream.next_batch().await.unwrap(), - Some(&record_batch1) - ); - let res = paginating_stream.next_batch().await; - assert!(res.is_err()); - assert_eq!(paginating_stream.next_batch().await.unwrap(), None); - assert_eq!(paginating_stream.current_batch(), Some(&record_batch1)); - } -} diff --git a/src/app/handlers/mod.rs b/src/app/handlers/mod.rs index 81210d3..2208b97 100644 --- a/src/app/handlers/mod.rs +++ b/src/app/handlers/mod.rs @@ -148,8 +148,6 @@ fn context_tab_app_event_handler(app: &mut App, event: AppEvent) { } pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> { - // TODO: AppEvent::QueryResult can probably be handled here rather than duplicating in - // each tab trace!("Tui::Event: {:?}", event); let now = std::time::Instant::now(); match event { @@ -180,19 +178,33 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> { } }); } - AppEvent::QueryResult(r) => { - app.state.sql_tab.set_query(r.clone()); - app.state.sql_tab.refresh_query_results_state(); + AppEvent::NewExecution => { + app.state.sql_tab.reset_execution_results(); + } + AppEvent::ExecutionResultsError(e) => { + app.state.sql_tab.set_execution_error(e.clone()); let history_query = HistoryQuery::new( Context::Local, - r.sql().clone(), - *r.execution_time(), - r.execution_stats().clone(), + e.query().to_string(), + *e.duration(), + None, + Some(e.error().to_string()), ); app.state.history_tab.add_to_history(history_query); - app.state.history_tab.refresh_history_table_state() } - AppEvent::QueryResultsNextPage(b) => { + // AppEvent::QueryResult(r) => { + // app.state.sql_tab.set_query(r.clone()); + // app.state.sql_tab.refresh_query_results_state(); + // let history_query = HistoryQuery::new( + // Context::Local, + // r.sql().clone(), + // *r.execution_time(), + // r.execution_stats().clone(), + // ); + // app.state.history_tab.add_to_history(history_query); + // app.state.history_tab.refresh_history_table_state() + // } + AppEvent::ExecutionResultsNextPage(b) => { app.state.sql_tab.add_batch(b); // let results = app.execution().results(); // @@ -212,6 +224,7 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> { r.sql().clone(), *r.execution_time(), r.execution_stats().clone(), + None, ); app.state.history_tab.add_to_history(history_query); app.state.history_tab.refresh_history_table_state() diff --git a/src/app/handlers/sql.rs b/src/app/handlers/sql.rs index 94ac3db..fae1ad2 100644 --- a/src/app/handlers/sql.rs +++ b/src/app/handlers/sql.rs @@ -64,12 +64,10 @@ pub fn normal_mode_handler(app: &mut App, key: KeyEvent) { KeyCode::Enter => { let sql = app.state.sql_tab.editor().lines().join(""); info!("Running query: {}", sql); - // let app_execution = AppExecution::new(Arc::clone(&app.execution)); let _event_tx = app.event_tx().clone(); let execution = Arc::clone(&app.execution); - // TODO: Maybe this should be on a separate runtime to prevent blocking main thread / - // runtime - // TODO: Extract this into function to be used in both normal and editable handler + // TODO: Extract this into function to be used in both normal and editable handler. + // Only useful if we get Ctrl / Cmd + Enter to work in editable mode though. tokio::spawn(async move { let sqls: Vec<&str> = sql.split(';').collect(); let _ = execution.run_sqls(sqls, _event_tx).await; diff --git a/src/app/mod.rs b/src/app/mod.rs index 62313d2..af37c98 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -33,13 +33,14 @@ use ratatui::crossterm::{ }; use ratatui::{prelude::*, style::palette::tailwind, widgets::*}; use std::sync::Arc; +use std::time::Duration; use strum::IntoEnumIterator; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio::task::JoinHandle; use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; -use self::app_execution::{AppExecution, PaginatingRecordBatchStream}; +use self::app_execution::AppExecution; use self::handlers::{app_event_handler, crossterm_event_handler}; use self::state::tabs::sql::Query; use crate::execution::ExecutionContext; @@ -47,6 +48,27 @@ use crate::execution::ExecutionContext; #[cfg(feature = "flightsql")] use self::state::tabs::flightsql::FlightSQLQuery; +#[derive(Clone, Debug)] +pub struct ExecutionError { + query: String, + error: String, + duration: Duration, +} + +impl ExecutionError { + pub fn query(&self) -> &str { + &self.query + } + + pub fn error(&self) -> &str { + &self.error + } + + pub fn duration(&self) -> &Duration { + &self.duration + } +} + #[derive(Debug)] pub enum AppEvent { Key(event::KeyEvent), @@ -61,8 +83,11 @@ pub enum AppEvent { Mouse(event::MouseEvent), Resize(u16, u16), ExecuteDDL(String), + NewExecution, QueryResult(Query), - QueryResultsNextPage(RecordBatch), + ExecutionResultsNextPage(RecordBatch), + ExecutionResultsPreviousPage, + ExecutionResultsError(ExecutionError), #[cfg(feature = "flightsql")] EstablishFlightSQLConnection, #[cfg(feature = "flightsql")] diff --git a/src/app/state/tabs/history.rs b/src/app/state/tabs/history.rs index 7646120..4c9c02b 100644 --- a/src/app/state/tabs/history.rs +++ b/src/app/state/tabs/history.rs @@ -43,6 +43,7 @@ pub struct HistoryQuery { sql: String, execution_time: Duration, execution_stats: Option, + error: Option, } impl HistoryQuery { @@ -51,12 +52,14 @@ impl HistoryQuery { sql: String, execution_time: Duration, execution_stats: Option, + error: Option, ) -> Self { Self { context, sql, execution_time, execution_stats, + error, } } pub fn sql(&self) -> &String { diff --git a/src/app/state/tabs/sql.rs b/src/app/state/tabs/sql.rs index 7f0283e..c0fc196 100644 --- a/src/app/state/tabs/sql.rs +++ b/src/app/state/tabs/sql.rs @@ -25,6 +25,7 @@ use ratatui::style::Style; use ratatui::widgets::TableState; use tui_textarea::TextArea; +use crate::app::ExecutionError; use crate::execution::ExecutionStats; #[derive(Clone, Debug)] @@ -109,6 +110,7 @@ pub struct SQLTabState<'app> { query_results_state: Option>, result_batches: Option>, results_page: Option, + execution_error: Option, } impl<'app> SQLTabState<'app> { @@ -124,6 +126,7 @@ impl<'app> SQLTabState<'app> { query_results_state: None, result_batches: None, results_page: None, + execution_error: None, } } @@ -135,6 +138,13 @@ impl<'app> SQLTabState<'app> { self.query_results_state = Some(RefCell::new(TableState::default())); } + pub fn reset_execution_results(&mut self) { + self.result_batches = None; + self.results_page = None; + self.execution_error = None; + self.refresh_query_results_state(); + } + pub fn editor(&self) -> TextArea { // TODO: Figure out how to do this without clone. Probably need logic in handler to make // updates to the Widget and then pass a ref @@ -208,4 +218,12 @@ impl<'app> SQLTabState<'app> { pub fn current_batch(&self) -> Option<&RecordBatch> { self.result_batches.as_ref().and_then(|b| b.get(0)) } + + pub fn execution_error(&self) -> &Option { + &self.execution_error + } + + pub fn set_execution_error(&mut self, error: ExecutionError) { + self.execution_error = Some(error); + } } diff --git a/src/app/ui/tabs/sql.rs b/src/app/ui/tabs/sql.rs index a2bf51e..01ccf2a 100644 --- a/src/app/ui/tabs/sql.rs +++ b/src/app/ui/tabs/sql.rs @@ -44,17 +44,13 @@ pub fn render_sql_editor(area: Rect, buf: &mut Buffer, app: &App) { } pub fn render_sql_results(area: Rect, buf: &mut Buffer, app: &App) { - let block = Block::default() - .title(" Results ") - .borders(Borders::ALL) - .title(Title::from(" Page ").alignment(Alignment::Right)); - - // let results = app.execution().results(); - // let locked = tokio::task::block_in_place(|| results.blocking_lock()); - // let maybe_stream = locked.as_ref(); // TODO: Change this to a match on state and batch - if let Some(s) = app.state.sql_tab.query_results_state() { - if let Some(batch) = app.state.sql_tab.current_batch() { + if let Some(batch) = app.state.sql_tab.current_batch() { + if let Some(s) = app.state.sql_tab.query_results_state() { + let block = Block::default() + .title(" Results ") + .borders(Borders::ALL) + .title(Title::from(" Page ").alignment(Alignment::Right)); let batches = vec![batch]; let maybe_table = record_batches_to_table(&batches); @@ -76,12 +72,28 @@ pub fn render_sql_results(area: Rect, buf: &mut Buffer, app: &App) { } } } + } else if let Some(e) = app.state.sql_tab.execution_error() { + let dur = e.duration().as_millis(); + let block = Block::default() + .title(" Results ") + .borders(Borders::ALL) + .title(Title::from(" Page ").alignment(Alignment::Right)) + .title_bottom(format!(" {}ms ", dur)); + let row = Row::new(vec![e.error().to_string()]); + let widths = vec![Constraint::Percentage(100)]; + let table = Table::new(vec![row], widths).block(block); + Widget::render(table, area, buf); + } else { + let block = Block::default() + .title(" Results ") + .borders(Borders::ALL) + .title(Title::from(" Page ").alignment(Alignment::Right)); + let row = Row::new(vec!["Run a query to generate results"]); + let widths = vec![Constraint::Percentage(100)]; + let table = Table::new(vec![row], widths).block(block); + Widget::render(table, area, buf); } - // let block = Block::default() - // .title(" Results ") - // .borders(Borders::ALL) - // .title(Title::from(" Page ").alignment(Alignment::Right)); if let Some(q) = app.state.sql_tab.query() { if let Some(r) = q.results() { if let Some(s) = app.state.sql_tab.query_results_state() { @@ -113,16 +125,8 @@ pub fn render_sql_results(area: Rect, buf: &mut Buffer, app: &App) { // } } } else if let Some(e) = q.error() { - // let row = Row::new(vec![e.to_string()]); - // let widths = vec![Constraint::Percentage(100)]; - // let table = Table::new(vec![row], widths).block(block); - // Widget::render(table, area, buf); } } else { - // let row = Row::new(vec!["Run a query to generate results"]); - // let widths = vec![Constraint::Percentage(100)]; - // let table = Table::new(vec![row], widths).block(block); - // Widget::render(table, area, buf); } } diff --git a/src/execution/mod.rs b/src/execution/mod.rs index e344790..58d2b34 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -113,6 +113,16 @@ impl ExecutionContext { Ok(()) } + /// Create a physical plan from the specified SQL string. This is useful if you want to store + /// the plan and collect metrics from it. + pub async fn create_physical_plan( + &self, + sql: &str, + ) -> datafusion::error::Result> { + let df = self.session_ctx.sql(sql).await?; + df.create_physical_plan().await + } + /// Executes the specified sql string, returning the resulting /// [`SendableRecordBatchStream`] of results pub async fn execute_sql( From 8af0c8adbb25a3a0bd667e08abdbd018ccf81b3c Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Tue, 17 Sep 2024 09:13:36 -0400 Subject: [PATCH 06/11] Better history and pagination handling --- src/app/app_execution.rs | 102 +++++++++++++++++++++++++++++++- src/app/handlers/mod.rs | 37 +++++------- src/app/handlers/sql.rs | 68 ++++----------------- src/app/mod.rs | 9 ++- src/app/state/tabs/flightsql.rs | 2 +- src/app/state/tabs/history.rs | 16 ++--- src/app/state/tabs/sql.rs | 20 ++++++- src/app/ui/tabs/history.rs | 16 ++++- src/app/ui/tabs/sql.rs | 88 ++++++++++----------------- src/execution/mod.rs | 66 +-------------------- 10 files changed, 207 insertions(+), 217 deletions(-) diff --git a/src/app/app_execution.rs b/src/app/app_execution.rs index b8d1446..a5ccde3 100644 --- a/src/app/app_execution.rs +++ b/src/app/app_execution.rs @@ -18,13 +18,15 @@ //! [`AppExecution`]: Handles executing queries for the TUI application. use crate::app::state::tabs::sql::Query; -use crate::app::{AppEvent, ExecutionError}; +use crate::app::{AppEvent, ExecutionError, ExecutionResultsBatch}; use crate::execution::ExecutionContext; use color_eyre::eyre::Result; use datafusion::arrow::array::RecordBatch; use datafusion::execution::context::SessionContext; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream}; -use datafusion::physical_plan::{execute_stream, ExecutionPlan}; +use datafusion::physical_plan::{ + execute_stream, visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor, +}; use futures::StreamExt; use log::{error, info}; use std::fmt::Debug; @@ -90,7 +92,15 @@ impl AppExecution { if let Some(b) = s.next().await { match b { Ok(b) => { - sender.send(AppEvent::ExecutionResultsNextPage(b))?; + let duration = start.elapsed(); + let results = ExecutionResultsBatch { + query: sql.to_string(), + batch: b, + duration, + }; + sender.send(AppEvent::ExecutionResultsNextPage( + results, + ))?; } Err(e) => { error!("Error getting RecordBatch: {:?}", e); @@ -186,4 +196,90 @@ impl AppExecution { } Ok(()) } + + pub async fn next_batch(&self, sql: String, sender: UnboundedSender) { + let mut stream = self.result_stream.lock().await; + if let Some(s) = stream.as_mut() { + let start = std::time::Instant::now(); + if let Some(b) = s.next().await { + match b { + Ok(b) => { + let duration = start.elapsed(); + let results = ExecutionResultsBatch { + query: sql, + batch: b, + duration, + }; + sender.send(AppEvent::ExecutionResultsNextPage(results)); + } + Err(e) => { + error!("Error getting RecordBatch: {:?}", e); + } + } + } + } + } +} + +// #[derive(Debug, Clone)] +// pub struct ExecMetrics { +// name: String, +// bytes_scanned: usize, +// } + +#[derive(Clone, Debug)] +pub struct ExecutionStats { + // bytes_scanned: usize, + // exec_metrics: Vec, +} + +// impl ExecutionStats { +// pub fn bytes_scanned(&self) -> usize { +// self.bytes_scanned +// } +// } + +#[derive(Default)] +struct PlanVisitor { + total_bytes_scanned: usize, + // exec_metrics: Vec, +} + +impl From for ExecutionStats { + fn from(value: PlanVisitor) -> Self { + Self { + // bytes_scanned: value.total_bytes_scanned, + } + } +} + +impl ExecutionPlanVisitor for PlanVisitor { + type Error = datafusion_common::DataFusionError; + + fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { + match plan.metrics() { + Some(metrics) => match metrics.sum_by_name("bytes_scanned") { + Some(bytes_scanned) => { + info!("Adding {} to total_bytes_scanned", bytes_scanned.as_usize()); + self.total_bytes_scanned += bytes_scanned.as_usize(); + } + None => { + info!("No bytes_scanned for {}", plan.name()) + } + }, + None => { + info!("No MetricsSet for {}", plan.name()) + } + } + Ok(true) + } +} + +pub fn collect_plan_stats(plan: Arc) -> Option { + let mut visitor = PlanVisitor::default(); + if visit_execution_plan(plan.as_ref(), &mut visitor).is_ok() { + Some(visitor.into()) + } else { + None + } } diff --git a/src/app/handlers/mod.rs b/src/app/handlers/mod.rs index 2208b97..47d4fc4 100644 --- a/src/app/handlers/mod.rs +++ b/src/app/handlers/mod.rs @@ -25,6 +25,7 @@ use ratatui::crossterm::event::{self, KeyCode, KeyEvent}; use tui_logger::TuiWidgetEvent; use crate::app::state::tabs::history::Context; +use crate::app::ExecutionResultsBatch; #[cfg(feature = "flightsql")] use arrow_flight::sql::client::FlightSqlServiceClient; @@ -190,30 +191,22 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> { None, Some(e.error().to_string()), ); + info!("Adding to history: {:?}", history_query); app.state.history_tab.add_to_history(history_query); + app.state.history_tab.refresh_history_table_state(); } - // AppEvent::QueryResult(r) => { - // app.state.sql_tab.set_query(r.clone()); - // app.state.sql_tab.refresh_query_results_state(); - // let history_query = HistoryQuery::new( - // Context::Local, - // r.sql().clone(), - // *r.execution_time(), - // r.execution_stats().clone(), - // ); - // app.state.history_tab.add_to_history(history_query); - // app.state.history_tab.refresh_history_table_state() - // } - AppEvent::ExecutionResultsNextPage(b) => { - app.state.sql_tab.add_batch(b); - // let results = app.execution().results(); - // - // tokio::spawn(async move { - // let mut locked = results.lock().await; - // if let Some(stream) = locked.as_mut() { - // stream.next_batch().await; - // } - // }); + AppEvent::ExecutionResultsNextPage(r) => { + let ExecutionResultsBatch { + query, + duration, + batch, + } = r; + app.state.sql_tab.add_batch(batch); + app.state.sql_tab.next_page(); + let history_query = + HistoryQuery::new(Context::Local, query.to_string(), duration, None, None); + app.state.history_tab.add_to_history(history_query); + app.state.history_tab.refresh_history_table_state(); } #[cfg(feature = "flightsql")] AppEvent::FlightSQLQueryResult(r) => { diff --git a/src/app/handlers/sql.rs b/src/app/handlers/sql.rs index fae1ad2..7e149d9 100644 --- a/src/app/handlers/sql.rs +++ b/src/app/handlers/sql.rs @@ -15,17 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::{sync::Arc, time::Instant}; +use std::sync::Arc; -use datafusion::{arrow::array::RecordBatch, physical_plan::execute_stream}; -use log::{error, info}; +use log::info; use ratatui::crossterm::event::{KeyCode, KeyEvent, KeyModifiers}; -use tokio_stream::StreamExt; use super::App; -use crate::app::app_execution::AppExecution; -use crate::app::{handlers::tab_navigation_handler, state::tabs::sql::Query, AppEvent}; -use crate::execution::collect_plan_stats; +use crate::app::{handlers::tab_navigation_handler, AppEvent}; pub fn normal_mode_handler(app: &mut App, key: KeyEvent) { match key.code { @@ -73,6 +69,16 @@ pub fn normal_mode_handler(app: &mut App, key: KeyEvent) { let _ = execution.run_sqls(sqls, _event_tx).await; }); } + KeyCode::Right => { + let _event_tx = app.event_tx().clone(); + if let Some(p) = app.state.history_tab.history().last() { + let execution = Arc::clone(&app.execution); + let sql = p.sql().clone(); + tokio::spawn(async move { + execution.next_batch(sql, _event_tx).await; + }); + } + } _ => {} } } @@ -83,54 +89,6 @@ pub fn editable_handler(app: &mut App, key: KeyEvent) { (KeyCode::Right, KeyModifiers::ALT) => app.state.sql_tab.next_word(), (KeyCode::Backspace, KeyModifiers::ALT) => app.state.sql_tab.delete_word(), (KeyCode::Esc, _) => app.state.sql_tab.exit_edit(), - (KeyCode::Enter, KeyModifiers::CONTROL) => { - let query = app.state.sql_tab.editor().lines().join(""); - let ctx = app.execution.session_ctx().clone(); - let _event_tx = app.event_tx(); - // TODO: Maybe this should be on a separate runtime to prevent blocking main thread / - // runtime - tokio::spawn(async move { - // TODO: Turn this into a match and return the error somehow - let start = Instant::now(); - if let Ok(df) = ctx.sql(&query).await { - let plan = df.create_physical_plan().await; - match plan { - Ok(p) => { - let task_ctx = ctx.task_ctx(); - let stream = execute_stream(Arc::clone(&p), task_ctx); - let mut batches: Vec = Vec::new(); - match stream { - Ok(mut s) => { - while let Some(b) = s.next().await { - match b { - Ok(b) => batches.push(b), - Err(e) => { - error!("Error getting RecordBatch: {:?}", e) - } - } - } - - let elapsed = start.elapsed(); - let stats = collect_plan_stats(p); - info!("Got stats: {:?}", stats); - let query = - Query::new(query, Some(batches), None, None, elapsed, None); - let _ = _event_tx.send(AppEvent::QueryResult(query)); - } - Err(e) => { - error!("Error creating RecordBatchStream: {:?}", e) - } - } - } - Err(e) => { - error!("Error creating physical plan: {:?}", e) - } - } - } else { - error!("Error creating dataframe") - } - }); - } _ => app.state.sql_tab.update_editor_content(key), } } diff --git a/src/app/mod.rs b/src/app/mod.rs index af37c98..411d56e 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -55,6 +55,13 @@ pub struct ExecutionError { duration: Duration, } +#[derive(Clone, Debug)] +pub struct ExecutionResultsBatch { + query: String, + batch: RecordBatch, + duration: Duration, +} + impl ExecutionError { pub fn query(&self) -> &str { &self.query @@ -85,7 +92,7 @@ pub enum AppEvent { ExecuteDDL(String), NewExecution, QueryResult(Query), - ExecutionResultsNextPage(RecordBatch), + ExecutionResultsNextPage(ExecutionResultsBatch), ExecutionResultsPreviousPage, ExecutionResultsError(ExecutionError), #[cfg(feature = "flightsql")] diff --git a/src/app/state/tabs/flightsql.rs b/src/app/state/tabs/flightsql.rs index a49d92c..0aa2b04 100644 --- a/src/app/state/tabs/flightsql.rs +++ b/src/app/state/tabs/flightsql.rs @@ -25,7 +25,7 @@ use ratatui::style::Style; use ratatui::widgets::TableState; use tui_textarea::TextArea; -use crate::execution::ExecutionStats; +use crate::app::app_execution::ExecutionStats; #[derive(Clone, Debug)] pub struct FlightSQLQuery { diff --git a/src/app/state/tabs/history.rs b/src/app/state/tabs/history.rs index 4c9c02b..5881870 100644 --- a/src/app/state/tabs/history.rs +++ b/src/app/state/tabs/history.rs @@ -20,7 +20,7 @@ use std::time::Duration; use ratatui::widgets::TableState; -use crate::execution::ExecutionStats; +use crate::app::app_execution::ExecutionStats; #[derive(Debug)] pub enum Context { @@ -74,13 +74,13 @@ impl HistoryQuery { &self.execution_stats } - pub fn scanned_bytes(&self) -> usize { - if let Some(stats) = &self.execution_stats { - stats.bytes_scanned() - } else { - 0 - } - } + // pub fn scanned_bytes(&self) -> usize { + // if let Some(stats) = &self.execution_stats { + // stats.bytes_scanned() + // } else { + // 0 + // } + // } pub fn context(&self) -> &Context { &self.context diff --git a/src/app/state/tabs/sql.rs b/src/app/state/tabs/sql.rs index c0fc196..cdcd046 100644 --- a/src/app/state/tabs/sql.rs +++ b/src/app/state/tabs/sql.rs @@ -19,14 +19,15 @@ use core::cell::RefCell; use std::time::Duration; use datafusion::arrow::array::RecordBatch; +use log::info; use ratatui::crossterm::event::KeyEvent; use ratatui::style::palette::tailwind; use ratatui::style::Style; use ratatui::widgets::TableState; use tui_textarea::TextArea; +use crate::app::app_execution::ExecutionStats; use crate::app::ExecutionError; -use crate::execution::ExecutionStats; #[derive(Clone, Debug)] pub struct Query { @@ -216,7 +217,10 @@ impl<'app> SQLTabState<'app> { } pub fn current_batch(&self) -> Option<&RecordBatch> { - self.result_batches.as_ref().and_then(|b| b.get(0)) + match (self.results_page, self.result_batches.as_ref()) { + (Some(page), Some(batches)) => batches.get(page), + _ => None, + } } pub fn execution_error(&self) -> &Option { @@ -226,4 +230,16 @@ impl<'app> SQLTabState<'app> { pub fn set_execution_error(&mut self, error: ExecutionError) { self.execution_error = Some(error); } + + pub fn results_page(&self) -> Option { + self.results_page + } + + pub fn next_page(&mut self) { + if let Some(page) = self.results_page { + self.results_page = Some(page + 1); + } else { + self.results_page = Some(0); + } + } } diff --git a/src/app/ui/tabs/history.rs b/src/app/ui/tabs/history.rs index 3f0988b..a9ee3aa 100644 --- a/src/app/ui/tabs/history.rs +++ b/src/app/ui/tabs/history.rs @@ -62,7 +62,9 @@ pub fn render_query_history(area: Rect, buf: &mut Buffer, app: &App) { .title(" Query History ") .borders(Borders::ALL); let history = app.state.history_tab.history(); + info!("History: {:?}", history); let history_table_state = app.state.history_tab.history_table_state(); + info!("History Table State: {:?}", history_table_state); match (history.is_empty(), history_table_state) { (true, _) | (_, None) => { let row = Row::new(vec!["Your query history will show here"]); @@ -85,7 +87,14 @@ pub fn render_query_history(area: Rect, buf: &mut Buffer, app: &App) { Cell::from(q.context().as_str()), Cell::from(q.sql().as_str()), Cell::from(q.execution_time().as_millis().to_string()), - Cell::from(q.scanned_bytes().to_string()), + // Not sure showing scanned_bytes is useful anymore in the context of + // paginated queries. Hard coding to zero for now but this will need to be + // revisted. One option I have is removing these type of stats from the + // query history table (so we only show execution time) and then + // _anything_ ExecutionPlan statistics related is shown in the lower pane + // and their is a `analyze` mode that runs the query to completion and + // collects all stats to show in a table next to the query. + Cell::from(0.to_string()), ]) }) .collect(); @@ -96,8 +105,9 @@ pub fn render_query_history(area: Rect, buf: &mut Buffer, app: &App) { Cell::from("Execution Time(ms)"), Cell::from("Scanned Bytes"), ]) - .bg(tailwind::WHITE) - .fg(tailwind::BLACK); + .bg(tailwind::ORANGE.c300) + .fg(tailwind::BLACK) + .bold(); let table = Table::new(rows, widths).header(header).block(block.clone()); let table = table diff --git a/src/app/ui/tabs/sql.rs b/src/app/ui/tabs/sql.rs index 01ccf2a..3a50f11 100644 --- a/src/app/ui/tabs/sql.rs +++ b/src/app/ui/tabs/sql.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use log::info; use ratatui::{ buffer::Buffer, layout::{Alignment, Constraint, Direction, Layout, Rect}, @@ -45,12 +46,18 @@ pub fn render_sql_editor(area: Rect, buf: &mut Buffer, app: &App) { pub fn render_sql_results(area: Rect, buf: &mut Buffer, app: &App) { // TODO: Change this to a match on state and batch - if let Some(batch) = app.state.sql_tab.current_batch() { - if let Some(s) = app.state.sql_tab.query_results_state() { + let sql_tab = &app.state.sql_tab; + match ( + sql_tab.current_batch(), + sql_tab.results_page(), + sql_tab.query_results_state(), + sql_tab.execution_error(), + ) { + (Some(batch), Some(p), Some(s), None) => { let block = Block::default() .title(" Results ") .borders(Borders::ALL) - .title(Title::from(" Page ").alignment(Alignment::Right)); + .title(Title::from(format!(" Page {p} ")).alignment(Alignment::Right)); let batches = vec![batch]; let maybe_table = record_batches_to_table(&batches); @@ -72,61 +79,28 @@ pub fn render_sql_results(area: Rect, buf: &mut Buffer, app: &App) { } } } - } else if let Some(e) = app.state.sql_tab.execution_error() { - let dur = e.duration().as_millis(); - let block = Block::default() - .title(" Results ") - .borders(Borders::ALL) - .title(Title::from(" Page ").alignment(Alignment::Right)) - .title_bottom(format!(" {}ms ", dur)); - let row = Row::new(vec![e.error().to_string()]); - let widths = vec![Constraint::Percentage(100)]; - let table = Table::new(vec![row], widths).block(block); - Widget::render(table, area, buf); - } else { - let block = Block::default() - .title(" Results ") - .borders(Borders::ALL) - .title(Title::from(" Page ").alignment(Alignment::Right)); - let row = Row::new(vec!["Run a query to generate results"]); - let widths = vec![Constraint::Percentage(100)]; - let table = Table::new(vec![row], widths).block(block); - Widget::render(table, area, buf); - } - - if let Some(q) = app.state.sql_tab.query() { - if let Some(r) = q.results() { - if let Some(s) = app.state.sql_tab.query_results_state() { - // let stats = Span::from(format!( - // " {} rows in {}ms ", - // q.num_rows().unwrap_or(0), - // q.execution_time().as_millis() - // )) - // .fg(tailwind::WHITE); - // let block = block.title_bottom(stats).fg(tailwind::ORANGE.c500); - // let maybe_table = record_batches_to_table(r); - // match maybe_table { - // Ok(table) => { - // let table = table - // .highlight_style( - // Style::default().bg(tailwind::WHITE).fg(tailwind::BLACK), - // ) - // .block(block); - // - // let mut s = s.borrow_mut(); - // StatefulWidget::render(table, area, buf, &mut s); - // } - // Err(e) => { - // let row = Row::new(vec![e.to_string()]); - // let widths = vec![Constraint::Percentage(100)]; - // let table = Table::new(vec![row], widths).block(block); - // Widget::render(table, area, buf); - // } - // } - } - } else if let Some(e) = q.error() { + (_, _, _, Some(e)) => { + let dur = e.duration().as_millis(); + let block = Block::default() + .title(" Results ") + .borders(Borders::ALL) + .title(Title::from(" Page ").alignment(Alignment::Right)) + .title_bottom(format!(" {}ms ", dur)); + let row = Row::new(vec![e.error().to_string()]); + let widths = vec![Constraint::Percentage(100)]; + let table = Table::new(vec![row], widths).block(block); + Widget::render(table, area, buf); + } + _ => { + let block = Block::default() + .title(" Results ") + .borders(Borders::ALL) + .title(Title::from(" Page ").alignment(Alignment::Right)); + let row = Row::new(vec!["Run a query to generate results"]); + let widths = vec![Constraint::Percentage(100)]; + let table = Table::new(vec![row], widths).block(block); + Widget::render(table, area, buf); } - } else { } } diff --git a/src/execution/mod.rs b/src/execution/mod.rs index 58d2b34..4cdfe2c 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -21,10 +21,9 @@ use std::sync::Arc; use color_eyre::eyre::Result; use datafusion::execution::SendableRecordBatchStream; -use datafusion::physical_plan::{visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor}; +use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*; use datafusion::sql::parser::Statement; -use log::info; use tokio_stream::StreamExt; #[cfg(feature = "flightsql")] use { @@ -150,66 +149,3 @@ impl ExecutionContext { .await } } - -// #[derive(Debug, Clone)] -// pub struct ExecMetrics { -// name: String, -// bytes_scanned: usize, -// } - -#[derive(Clone, Debug)] -pub struct ExecutionStats { - bytes_scanned: usize, - // exec_metrics: Vec, -} - -impl ExecutionStats { - pub fn bytes_scanned(&self) -> usize { - self.bytes_scanned - } -} - -#[derive(Default)] -struct PlanVisitor { - total_bytes_scanned: usize, - // exec_metrics: Vec, -} - -impl From for ExecutionStats { - fn from(value: PlanVisitor) -> Self { - Self { - bytes_scanned: value.total_bytes_scanned, - } - } -} - -impl ExecutionPlanVisitor for PlanVisitor { - type Error = datafusion_common::DataFusionError; - - fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { - match plan.metrics() { - Some(metrics) => match metrics.sum_by_name("bytes_scanned") { - Some(bytes_scanned) => { - info!("Adding {} to total_bytes_scanned", bytes_scanned.as_usize()); - self.total_bytes_scanned += bytes_scanned.as_usize(); - } - None => { - info!("No bytes_scanned for {}", plan.name()) - } - }, - None => { - info!("No MetricsSet for {}", plan.name()) - } - } - Ok(true) - } -} - -pub fn collect_plan_stats(plan: Arc) -> Option { - let mut visitor = PlanVisitor::default(); - if visit_execution_plan(plan.as_ref(), &mut visitor).is_ok() { - Some(visitor.into()) - } else { - None - } -} From 07ba7500653f4e117925c4509d75873b17d42a8e Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Tue, 17 Sep 2024 11:16:15 -0400 Subject: [PATCH 07/11] Cleanup --- src/app/app_execution.rs | 47 --------------------------------------- src/extensions/builder.rs | 2 +- 2 files changed, 1 insertion(+), 48 deletions(-) diff --git a/src/app/app_execution.rs b/src/app/app_execution.rs index a5ccde3..73d6110 100644 --- a/src/app/app_execution.rs +++ b/src/app/app_execution.rs @@ -131,53 +131,6 @@ impl AppExecution { sender.send(AppEvent::ExecutionResultsError(e))?; } } - // match self.inner.execute_sql(sql).await { - // Ok(stream) => { - // // self.set_result_stream(stream).await; - // // let mut stream = self.result_stream.lock().await; - // // if let Some(s) = stream.as_mut() { - // // if let Some(b) = s.next().await { - // // match b { - // // Ok(b) => { - // // sender.send(AppEvent::ExecutionResultsNextPage(b))?; - // // } - // // Err(e) => { - // // error!("Error getting RecordBatch: {:?}", e); - // // } - // // } - // // } - // // } - // // let mut batches = Vec::new(); - // // while let Some(maybe_batch) = stream.next().await { - // // match maybe_batch { - // // Ok(batch) => { - // // batches.push(batch); - // // } - // // Err(e) => { - // // let elapsed = start.elapsed(); - // // query.set_error(Some(e.to_string())); - // // query.set_execution_time(elapsed); - // // break; - // // } - // // } - // // } - // // let elapsed = start.elapsed(); - // // let rows: usize = batches.iter().map(|r| r.num_rows()).sum(); - // // query.set_results(Some(batches)); - // // query.set_num_rows(Some(rows)); - // // query.set_execution_time(elapsed); - // } - // Err(e) => { - // error!("Error creating dataframe: {:?}", e); - // let elapsed = start.elapsed(); - // let e = ExecutionError { - // query: sql.to_string(), - // error: e.to_string(), - // duration: elapsed, - // }; - // sender.send(AppEvent::ExecutionResultsError(e))?; - // } - // } } else { match self.inner.execute_sql_and_discard_results(sql).await { Ok(_) => { diff --git a/src/extensions/builder.rs b/src/extensions/builder.rs index ccad3c6..69953c3 100644 --- a/src/extensions/builder.rs +++ b/src/extensions/builder.rs @@ -71,7 +71,7 @@ impl DftSessionStateBuilder { pub fn new() -> Self { let session_config = SessionConfig::default() // TODO why is batch size 1? - .with_batch_size(1) + .with_batch_size(100) .with_information_schema(true); Self { From cb050d98872b2a1535151271decd653746cdaf1d Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Wed, 18 Sep 2024 08:45:35 -0400 Subject: [PATCH 08/11] Next and previous page working for sql tab --- src/app/app_execution.rs | 2 +- src/app/handlers/mod.rs | 1 + src/app/handlers/sql.rs | 4 ++++ src/app/state/tabs/sql.rs | 8 ++++++++ 4 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/app/app_execution.rs b/src/app/app_execution.rs index 73d6110..a4eff4a 100644 --- a/src/app/app_execution.rs +++ b/src/app/app_execution.rs @@ -163,7 +163,7 @@ impl AppExecution { batch: b, duration, }; - sender.send(AppEvent::ExecutionResultsNextPage(results)); + let _ = sender.send(AppEvent::ExecutionResultsNextPage(results)); } Err(e) => { error!("Error getting RecordBatch: {:?}", e); diff --git a/src/app/handlers/mod.rs b/src/app/handlers/mod.rs index 47d4fc4..c77bba6 100644 --- a/src/app/handlers/mod.rs +++ b/src/app/handlers/mod.rs @@ -203,6 +203,7 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> { } = r; app.state.sql_tab.add_batch(batch); app.state.sql_tab.next_page(); + app.state.sql_tab.refresh_query_results_state(); let history_query = HistoryQuery::new(Context::Local, query.to_string(), duration, None, None); app.state.history_tab.add_to_history(history_query); diff --git a/src/app/handlers/sql.rs b/src/app/handlers/sql.rs index 7e149d9..70be199 100644 --- a/src/app/handlers/sql.rs +++ b/src/app/handlers/sql.rs @@ -79,6 +79,10 @@ pub fn normal_mode_handler(app: &mut App, key: KeyEvent) { }); } } + KeyCode::Left => { + app.state.sql_tab.previous_page(); + app.state.sql_tab.refresh_query_results_state(); + } _ => {} } } diff --git a/src/app/state/tabs/sql.rs b/src/app/state/tabs/sql.rs index cdcd046..5dc6386 100644 --- a/src/app/state/tabs/sql.rs +++ b/src/app/state/tabs/sql.rs @@ -242,4 +242,12 @@ impl<'app> SQLTabState<'app> { self.results_page = Some(0); } } + + pub fn previous_page(&mut self) { + if let Some(page) = self.results_page { + if page > 0 { + self.results_page = Some(page - 1); + } + } + } } From edef60920ce61abca9e10c29086f1184f4be458b Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Wed, 18 Sep 2024 14:12:20 -0400 Subject: [PATCH 09/11] Start paginating flightsql --- src/app/app_execution.rs | 17 +++++++++++++ src/app/handlers/flightsql.rs | 33 +++++++++++++++++++++---- src/app/handlers/mod.rs | 46 +++++++++++++++++------------------ 3 files changed, 68 insertions(+), 28 deletions(-) diff --git a/src/app/app_execution.rs b/src/app/app_execution.rs index a4eff4a..72bbf58 100644 --- a/src/app/app_execution.rs +++ b/src/app/app_execution.rs @@ -20,6 +20,7 @@ use crate::app::state::tabs::sql::Query; use crate::app::{AppEvent, ExecutionError, ExecutionResultsBatch}; use crate::execution::ExecutionContext; +use arrow_flight::decode::FlightRecordBatchStream; use color_eyre::eyre::Result; use datafusion::arrow::array::RecordBatch; use datafusion::execution::context::SessionContext; @@ -36,11 +37,15 @@ use std::time::Duration; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::Mutex; +#[cfg(feature = "flightsql")] +use {arrow_flight::sql::client::FlightSqlServiceClient, tonic::transport::Channel}; + /// Handles executing queries for the TUI application, formatting results /// and sending them to the UI. pub(crate) struct AppExecution { inner: Arc, result_stream: Arc>>, + flight_result_stream: Arc>>, } impl AppExecution { @@ -49,6 +54,7 @@ impl AppExecution { Self { inner, result_stream: Arc::new(Mutex::new(None)), + flight_result_stream: Arc::new(Mutex::new(None)), } } @@ -56,11 +62,22 @@ impl AppExecution { self.inner.session_ctx() } + #[cfg(feature = "flightsql")] + pub fn flightsql_client(&self) -> &Mutex>> { + self.inner.flightsql_client() + } + pub async fn set_result_stream(&self, stream: SendableRecordBatchStream) { let mut s = self.result_stream.lock().await; *s = Some(stream) } + #[cfg(feature = "flightsql")] + pub async fn set_flight_result_stream(&self, stream: FlightRecordBatchStream) { + let mut s = self.flight_result_stream.lock().await; + *s = Some(stream) + } + /// Run the sequence of SQL queries, sending the results as [`AppEvent::QueryResult`] via the sender. /// /// All queries except the last one will have their results discarded. diff --git a/src/app/handlers/flightsql.rs b/src/app/handlers/flightsql.rs index f3b06f2..9b0a683 100644 --- a/src/app/handlers/flightsql.rs +++ b/src/app/handlers/flightsql.rs @@ -65,6 +65,34 @@ pub fn normal_mode_handler(app: &mut App, key: KeyEvent) { s.select_previous(); } } + KeyCode::Enter => { + info!("Run FS query"); + let sql = app.state.flightsql_tab.editor().lines().join(""); + info!("SQL: {}", sql); + let execution = Arc::clone(&app.execution); + let _event_tx = app.event_tx(); + tokio::spawn(async move { + let client = execution.flightsql_client(); + let mut query = + FlightSQLQuery::new(sql.clone(), None, None, None, Duration::default(), None); + let start = Instant::now(); + if let Some(ref mut c) = *client.lock().await { + match c.execute(sql, None).await { + Ok(flight_info) => { + for endpoint in flight_info.endpoint { + if let Some(ticket) = endpoint.ticket { + match c.do_get(ticket.into_request()).await { + Ok(mut stream) => {} + Err(e) => {} + } + } + } + } + Err(e) => {} + } + } + }); + } // KeyCode::Enter => { // info!("Run FS query"); @@ -154,11 +182,6 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) { true => editable_handler(app, key), false => normal_mode_handler(app, key), }, - AppEvent::FlightSQLQueryResult(r) => { - info!("Query results: {:?}", r); - app.state.flightsql_tab.set_query(r); - app.state.flightsql_tab.refresh_query_results_state(); - } AppEvent::Error => {} _ => {} }; diff --git a/src/app/handlers/mod.rs b/src/app/handlers/mod.rs index c77bba6..5036d6a 100644 --- a/src/app/handlers/mod.rs +++ b/src/app/handlers/mod.rs @@ -223,29 +223,29 @@ pub fn app_event_handler(app: &mut App, event: AppEvent) -> Result<()> { app.state.history_tab.add_to_history(history_query); app.state.history_tab.refresh_history_table_state() } - // #[cfg(feature = "flightsql")] - // AppEvent::EstablishFlightSQLConnection => { - // let url = app.state.config.flightsql.connection_url.clone(); - // info!("Connection to FlightSQL host: {}", url); - // let url: &'static str = Box::leak(url.into_boxed_str()); - // let execution = Arc::clone(&app.execution); - // tokio::spawn(async move { - // let client = execution.flightsql_client(); - // let maybe_channel = Channel::from_static(url).connect().await; - // info!("Created channel"); - // match maybe_channel { - // Ok(channel) => { - // let flightsql_client = FlightSqlServiceClient::new(channel); - // let mut locked_client = client.lock().await; - // *locked_client = Some(flightsql_client); - // info!("Connected to FlightSQL host"); - // } - // Err(e) => { - // info!("Error creating channel for FlightSQL: {:?}", e); - // } - // } - // }); - // } + #[cfg(feature = "flightsql")] + AppEvent::EstablishFlightSQLConnection => { + let url = app.state.config.flightsql.connection_url.clone(); + info!("Connection to FlightSQL host: {}", url); + let url: &'static str = Box::leak(url.into_boxed_str()); + let execution = Arc::clone(&app.execution); + tokio::spawn(async move { + let client = execution.flightsql_client(); + let maybe_channel = Channel::from_static(url).connect().await; + info!("Created channel"); + match maybe_channel { + Ok(channel) => { + let flightsql_client = FlightSqlServiceClient::new(channel); + let mut locked_client = client.lock().await; + *locked_client = Some(flightsql_client); + info!("Connected to FlightSQL host"); + } + Err(e) => { + info!("Error creating channel for FlightSQL: {:?}", e); + } + } + }); + } _ => { match app.state.tabs.selected { SelectedTab::SQL => sql::app_event_handler(app, event), From 4b3e8aaeb2493750e26bd7aa48f8a8bb63121c58 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Thu, 19 Sep 2024 10:44:58 -0400 Subject: [PATCH 10/11] More flightsql --- src/app/app_execution.rs | 70 ++++++++++++++++++++++++++++++++- src/app/handlers/flightsql.rs | 65 ++++++++++++++++++++---------- src/app/handlers/sql.rs | 6 +++ src/app/mod.rs | 3 ++ src/app/state/tabs/flightsql.rs | 26 ++++++++++++ src/app/state/tabs/history.rs | 2 +- src/app/state/tabs/sql.rs | 1 - 7 files changed, 149 insertions(+), 24 deletions(-) diff --git a/src/app/app_execution.rs b/src/app/app_execution.rs index 72bbf58..b021024 100644 --- a/src/app/app_execution.rs +++ b/src/app/app_execution.rs @@ -33,9 +33,10 @@ use log::{error, info}; use std::fmt::Debug; use std::pin::Pin; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::Mutex; +use tonic::IntoRequest; #[cfg(feature = "flightsql")] use {arrow_flight::sql::client::FlightSqlServiceClient, tonic::transport::Channel}; @@ -44,8 +45,11 @@ use {arrow_flight::sql::client::FlightSqlServiceClient, tonic::transport::Channe /// and sending them to the UI. pub(crate) struct AppExecution { inner: Arc, + // TODO: Store the SQL with the stream result_stream: Arc>>, + // TODO: Store the SQL with the stream flight_result_stream: Arc>>, + flight_results_current_row_start: Option, } impl AppExecution { @@ -55,6 +59,7 @@ impl AppExecution { inner, result_stream: Arc::new(Mutex::new(None)), flight_result_stream: Arc::new(Mutex::new(None)), + flight_results_current_row_start: None, } } @@ -167,6 +172,69 @@ impl AppExecution { Ok(()) } + pub async fn run_flightsqls(&self, sqls: Vec<&str>, sender: UnboundedSender) { + info!("Running sqls: {:?}", sqls); + let non_empty_sqls: Vec<&str> = sqls.into_iter().filter(|s| !s.is_empty()).collect(); + info!("Non empty SQLs: {:?}", non_empty_sqls); + let statement_count = non_empty_sqls.len(); + for (i, sql) in non_empty_sqls.into_iter().enumerate() { + let client = self.flightsql_client(); + // let mut query = + // FlightSQLQuery::new(sql.clone(), None, None, None, Duration::default(), None); + let start = Instant::now(); + if let Some(ref mut c) = *client.lock().await { + match c.execute(sql.to_string(), None).await { + Ok(flight_info) => { + for endpoint in flight_info.endpoint { + if let Some(ticket) = endpoint.ticket { + match c.do_get(ticket.into_request()).await { + Ok(stream) => { + self.set_flight_result_stream(stream).await; + let mut stream = self.flight_result_stream.lock().await; + if let Some(s) = stream.as_mut() { + if let Some(b) = s.next().await { + match b { + Ok(b) => { + let results = ExecutionResultsBatch { + query: sql.to_string(), + batch: b, + duration: start.elapsed(), + }; + let _ = sender.send(AppEvent::FlightSQLExecutionResultsNextPage(results)); + } + Err(e) => { + error!( + "Error getting RecordBatch: {:?}", + e + ); + let e = ExecutionError { + query: sql.to_string(), + error: e.to_string(), + duration: start.elapsed(), + }; + let _ = sender.send(AppEvent::FlightSQLExecutionResultsError(e)); + } + } + } + } + } + Err(e) => { + error!("Error getting RecordBatch: {:?}", e); + } + } + } else { + error!("No ticket in endpoint"); + } + } + } + Err(e) => { + error!("Error executing FlightSQL query: {:?}", e); + } + } + } + } + } + pub async fn next_batch(&self, sql: String, sender: UnboundedSender) { let mut stream = self.result_stream.lock().await; if let Some(s) = stream.as_mut() { diff --git a/src/app/handlers/flightsql.rs b/src/app/handlers/flightsql.rs index 9b0a683..a6b4733 100644 --- a/src/app/handlers/flightsql.rs +++ b/src/app/handlers/flightsql.rs @@ -25,6 +25,7 @@ use tokio_stream::StreamExt; use tonic::IntoRequest; use crate::app::state::tabs::flightsql::FlightSQLQuery; +use crate::app::state::tabs::history::Context; use crate::app::{handlers::tab_navigation_handler, AppEvent}; use super::App; @@ -67,32 +68,54 @@ pub fn normal_mode_handler(app: &mut App, key: KeyEvent) { } KeyCode::Enter => { info!("Run FS query"); - let sql = app.state.flightsql_tab.editor().lines().join(""); - info!("SQL: {}", sql); + let full_text = app.state.flightsql_tab.editor().lines().join(""); let execution = Arc::clone(&app.execution); let _event_tx = app.event_tx(); tokio::spawn(async move { - let client = execution.flightsql_client(); - let mut query = - FlightSQLQuery::new(sql.clone(), None, None, None, Duration::default(), None); - let start = Instant::now(); - if let Some(ref mut c) = *client.lock().await { - match c.execute(sql, None).await { - Ok(flight_info) => { - for endpoint in flight_info.endpoint { - if let Some(ticket) = endpoint.ticket { - match c.do_get(ticket.into_request()).await { - Ok(mut stream) => {} - Err(e) => {} - } - } - } - } - Err(e) => {} - } - } + let sqls = full_text.split(';').collect(); + execution.run_flightsqls(sqls, _event_tx).await; + // let client = execution.flightsql_client(); + // let mut query = + // FlightSQLQuery::new(sql.clone(), None, None, None, Duration::default(), None); + // let start = Instant::now(); + // if let Some(ref mut c) = *client.lock().await { + // match c.execute(sql, None).await { + // Ok(flight_info) => { + // for endpoint in flight_info.endpoint { + // if let Some(ticket) = endpoint.ticket { + // match c.do_get(ticket.into_request()).await { + // Ok(mut stream) => { + // execution.set_flight_result_stream(stream).await; + // exe + // } + // Err(e) => {} + // } + // } + // } + // } + // Err(e) => {} + // } + // } }); } + KeyCode::Right => { + if let Some(p) = app + .state + .history_tab + .history() + .iter() + .filter(|q| *q.context() == Context::FlightSQL) + .last() + { + let execution = Arc::clone(&app.execution); + let sql = p.sql().clone(); + let _event_tx = app.event_tx().clone(); + app.state.flightsql_tab.next_results_page(); + // tokio::spawn(async move { + // execution.flightsql_next_page().await; + // }); + } + } // KeyCode::Enter => { // info!("Run FS query"); diff --git a/src/app/handlers/sql.rs b/src/app/handlers/sql.rs index 70be199..b667add 100644 --- a/src/app/handlers/sql.rs +++ b/src/app/handlers/sql.rs @@ -71,10 +71,16 @@ pub fn normal_mode_handler(app: &mut App, key: KeyEvent) { } KeyCode::Right => { let _event_tx = app.event_tx().clone(); + // This won't work if you paginate the results, switch to FlightSQL tab, and then + // switch back to SQL tab, and paginate again. + // + // Need to decide if switching tabs should reset pagination. if let Some(p) = app.state.history_tab.history().last() { let execution = Arc::clone(&app.execution); let sql = p.sql().clone(); tokio::spawn(async move { + // TODO: Should be a call to `next_page` and `next_batch` is implementation + // detail. execution.next_batch(sql, _event_tx).await; }); } diff --git a/src/app/mod.rs b/src/app/mod.rs index 411d56e..a7fb7af 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -95,6 +95,9 @@ pub enum AppEvent { ExecutionResultsNextPage(ExecutionResultsBatch), ExecutionResultsPreviousPage, ExecutionResultsError(ExecutionError), + FlightSQLExecutionResultsNextPage(ExecutionResultsBatch), + FlightSQLExecutionResultsPreviousPage, + FlightSQLExecutionResultsError(ExecutionError), #[cfg(feature = "flightsql")] EstablishFlightSQLConnection, #[cfg(feature = "flightsql")] diff --git a/src/app/state/tabs/flightsql.rs b/src/app/state/tabs/flightsql.rs index 0aa2b04..ac35e5a 100644 --- a/src/app/state/tabs/flightsql.rs +++ b/src/app/state/tabs/flightsql.rs @@ -19,6 +19,7 @@ use core::cell::RefCell; use std::time::Duration; use datafusion::arrow::array::RecordBatch; +use deltalake::arrow::array::UInt32Array; use ratatui::crossterm::event::KeyEvent; use ratatui::style::palette::tailwind; use ratatui::style::Style; @@ -26,6 +27,7 @@ use ratatui::widgets::TableState; use tui_textarea::TextArea; use crate::app::app_execution::ExecutionStats; +use crate::app::ExecutionError; #[derive(Clone, Debug)] pub struct FlightSQLQuery { @@ -107,6 +109,9 @@ pub struct FlightSQLTabState<'app> { editor_editable: bool, query: Option, query_results_state: Option>, + result_batches: Option>, + results_page: Option, + execution_error: Option, } impl<'app> FlightSQLTabState<'app> { @@ -121,6 +126,9 @@ impl<'app> FlightSQLTabState<'app> { editor_editable: false, query: None, query_results_state: None, + result_batches: None, + results_page: None, + execution_error: None, } } @@ -193,4 +201,22 @@ impl<'app> FlightSQLTabState<'app> { pub fn delete_word(&mut self) { self.editor.delete_word(); } + + pub fn current_page_results(&self) -> Option { + match (self.results_page, &self.result_batches) { + (Some(p), Some(b)) => Some(get_records(p, b)), + _ => None, + } + } + + pub fn next_results_page(&mut self) {} +} + +fn get_records(page: usize, batches: &[RecordBatch]) -> RecordBatch { + let start = page * 100; + let end = start + 100; + let indices = ((start as u32)..(end as u32)).collect::>(); + let indices_array = UInt32Array::from(indices); + let taken = datafusion::arrow::compute::take_record_batch(&batches[0], &indices_array).unwrap(); + taken } diff --git a/src/app/state/tabs/history.rs b/src/app/state/tabs/history.rs index 5881870..b818d62 100644 --- a/src/app/state/tabs/history.rs +++ b/src/app/state/tabs/history.rs @@ -22,7 +22,7 @@ use ratatui::widgets::TableState; use crate::app::app_execution::ExecutionStats; -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum Context { Local, FlightSQL, diff --git a/src/app/state/tabs/sql.rs b/src/app/state/tabs/sql.rs index 5dc6386..92e59e7 100644 --- a/src/app/state/tabs/sql.rs +++ b/src/app/state/tabs/sql.rs @@ -19,7 +19,6 @@ use core::cell::RefCell; use std::time::Duration; use datafusion::arrow::array::RecordBatch; -use log::info; use ratatui::crossterm::event::KeyEvent; use ratatui::style::palette::tailwind; use ratatui::style::Style; From f726200eb84b84354a7ea43cf43da4b9e4ce6337 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Fri, 20 Sep 2024 09:13:48 -0400 Subject: [PATCH 11/11] Flightsql rendering updates --- src/app/ui/tabs/flightsql.rs | 38 +++++++++++++++++++++++++++++++++++- src/app/ui/tabs/sql.rs | 1 - 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/src/app/ui/tabs/flightsql.rs b/src/app/ui/tabs/flightsql.rs index b8bdc9a..10968a4 100644 --- a/src/app/ui/tabs/flightsql.rs +++ b/src/app/ui/tabs/flightsql.rs @@ -43,7 +43,43 @@ pub fn render_sql_editor(area: Rect, buf: &mut Buffer, app: &App) { } pub fn render_sql_results(area: Rect, buf: &mut Buffer, app: &App) { - let block = Block::default().title(" Results ").borders(Borders::ALL); + let flightsql_tab = &app.state.flightsql_tab; + match ( + flightsql_tab.query(), + flightsql_tab.current_page_results(), + flightsql_tab.query_results_state(), + ) { + (None, _, _) => { + let block = Block::default().title(" Results ").borders(Borders::ALL); + let row = Row::new(vec!["Run a query to generate results"]); + let widths = vec![Constraint::Percentage(100)]; + let table = Table::new(vec![row], widths).block(block); + Widget::render(table, area, buf); + } + (Some(_), Some(b), Some(s)) => { + let block = Block::default().title(" Results ").borders(Borders::ALL); + let batches = vec![&b]; + let maybe_table = record_batches_to_table(&batches); + let block = block.title_bottom(" Stats "); + match maybe_table { + Ok(table) => { + let table = table + .highlight_style(Style::default().bg(tailwind::WHITE).fg(tailwind::BLACK)) + .block(block); + + let mut s = s.borrow_mut(); + StatefulWidget::render(table, area, buf, &mut s); + } + Err(e) => { + let row = Row::new(vec![e.to_string()]); + let widths = vec![Constraint::Percentage(100)]; + let table = Table::new(vec![row], widths).block(block); + Widget::render(table, area, buf); + } + } + } + _ => {} + } if let Some(q) = app.state.flightsql_tab.query() { if let Some(r) = q.results() { if let Some(s) = app.state.flightsql_tab.query_results_state() { diff --git a/src/app/ui/tabs/sql.rs b/src/app/ui/tabs/sql.rs index 3a50f11..10f656d 100644 --- a/src/app/ui/tabs/sql.rs +++ b/src/app/ui/tabs/sql.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use log::info; use ratatui::{ buffer::Buffer, layout::{Alignment, Constraint, Direction, Layout, Rect},