diff --git a/README.md b/README.md index 2b82b91..4c7e80a 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,22 @@ Online at: https://parquet-viewer.xiangpeng.systems ![screenshot](doc/parquet-viewer.gif) +### Headless mode +Despite being a battery-included web app, it can also be used as a headless tool, especially helpful when working with LLM tools. + +Like command args for CLI tools, you can pass url params to control the behavior of the app. + +For example, [this link](https://parquet-viewer.xiangpeng.systems/?tab=url&url=https%3A%2F%2Fraw.githubusercontent.com%2Ftobilg%2Fpublic-cloud-provider-ip-ranges%2Fmain%2Fdata%2Fproviders%2Fall.parquet&query=SELECT%20%22cloud_provider%22%2C%20%22region%22%2C%20SUM(%22ip_address_cnt%22)%20AS%20%22ip_count%22%20FROM%20%22all%22%20GROUP%20BY%20%22cloud_provider%22%2C%20%22region%22%20ORDER%20BY%20%22ip_count%22%20DESC%3B&export=csv) tells parquet-viewer to + +(1) load a file from a url, + +(2) run a SQL query (or natural language if you like), + +(3) export the results to a CSV file. + +Most of the query params are reflected in the URL as you operate on the app, so you can simply share the URL with others to reproduce your findings. + +Adding `export=csv` or `export=parquet` to the URL will export the query result to a CSV/Parquet file. ## Development diff --git a/src/bin/parquet_viewer.py b/src/bin/parquet_viewer.py deleted file mode 100644 index dc4f8ff..0000000 --- a/src/bin/parquet_viewer.py +++ /dev/null @@ -1,164 +0,0 @@ -import asyncio -import websockets -import logging -import json -import argparse -import sys -import os -from http.server import HTTPServer, SimpleHTTPRequestHandler -import threading -import socket -import time - -# Move logging configuration to after argument parsing -logging.getLogger().setLevel(logging.WARNING) # Set default to WARNING (less verbose) - -def start_http_server(directory, port): - """Start HTTP server in a separate thread and wait until it's ready""" - class Handler(SimpleHTTPRequestHandler): - def __init__(self, *args, **kwargs): - super().__init__(*args, directory=directory, **kwargs) - - def end_headers(self): - self.send_header('Access-Control-Allow-Origin', '*') - self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') - self.send_header('Access-Control-Allow-Headers', '*') - super().end_headers() - - def do_OPTIONS(self): - self.send_response(200) - self.end_headers() - - httpd = HTTPServer(('0.0.0.0', port), Handler) - thread = threading.Thread(target=httpd.serve_forever) - thread.daemon = True - thread.start() - - return httpd - -def find_free_port(): - """Find a free port to use for the HTTP server""" - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', 0)) - s.listen(1) - port = s.getsockname()[1] - return port - -async def handle_connection(websocket, sql_query, parquet_file, server): - try: - logging.debug(f"New client connected from {websocket.remote_address[0]}:{websocket.remote_address[1]}") - - # If SQL query is provided, send it in the expected format - if sql_query: - query_message = { - "sql": { - "query": sql_query - } - } - await websocket.send(json.dumps(query_message)) - logging.debug("SQL query sent") - - # If parquet file is provided, start HTTP server and send file info - if parquet_file: - if not os.path.exists(parquet_file): - logging.error(f"Parquet file not found: {parquet_file}") - await websocket.close() - return - - # Start HTTP server in the directory containing the parquet file - file_dir = os.path.dirname(os.path.abspath(parquet_file)) - if not file_dir: - file_dir = '.' - - http_port = find_free_port() - http_server = start_http_server(file_dir, http_port) - - # Send file information - file_message = { - "parquet_file": { - "file_name": os.path.basename(parquet_file), - "server_address": f"http://localhost:{http_port}", - } - } - await websocket.send(json.dumps(file_message)) - logging.debug(f"File information sent for {parquet_file}") - - # Wait for acknowledgment from client - while True: - try: - response = await websocket.recv() - logging.debug(f"Received response: {response}") - response_json = json.loads(response) - if response_json.get("message_type") == "ack": - logging.debug("Received acknowledgment from client") - break # Exit the loop when we get valid JSON with correct message type - else: - logging.warning(f"Received unexpected message type: {response_json.get('message_type')}") - # Continue waiting for correct message type - except json.JSONDecodeError: - logging.warning("Received invalid JSON response, waiting for valid response...") - # Continue waiting for valid JSON - except websockets.exceptions.ConnectionClosed: - raise - - # Close the connection gracefully - await websocket.close() - - # Shutdown HTTP server if it was started - if parquet_file: - http_server.shutdown() - http_server.server_close() - - # Close the websocket server after handling one connection - server.close() - - except websockets.exceptions.ConnectionClosed: - logging.debug("Client disconnected") - except Exception as e: - logging.error(f"Error handling connection: {e}") - -async def main(sql_query, parquet_file): - # Create a done event to signal when to stop the server - done = asyncio.Event() - - async def connection_handler(websocket): - server = ws_server - await handle_connection(websocket, sql_query, parquet_file, server) - done.set() # Signal that we're done after handling one connection - - ws_server = await websockets.serve(connection_handler, "0.0.0.0", 12306) - logging.debug("WebSocket server started on ws://0.0.0.0:12306") - - # Wait for the done event - await done.wait() - - # Clean up - ws_server.close() - await ws_server.wait_closed() - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description='WebSocket server for SQL queries') - group = parser.add_mutually_exclusive_group(required=True) - group.add_argument('-q', '--sql', type=str, help='SQL query to send to the client') - group.add_argument('-f', '--file', type=str, help='Path to parquet file to send to the client') - parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose logging') - - args = parser.parse_args() - - # Configure logging based on verbose flag - if args.verbose: - logging.basicConfig( - level=logging.DEBUG, - format='%(asctime)s - %(levelname)s - %(message)s' - ) - else: - logging.basicConfig( - level=logging.WARNING, - format='%(asctime)s - %(levelname)s - %(message)s' - ) - - try: - asyncio.run(main(args.sql, args.file)) - except KeyboardInterrupt: - logging.info("Server stopped by user") - sys.exit(0) diff --git a/src/main.rs b/src/main.rs index d0924c0..73066cf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,12 @@ mod schema; -use codee::string::FromToStringCodec; use datafusion::physical_plan::ExecutionPlan; use file_reader::{get_stored_value, FileReader}; -use leptos_router::{components::Router, hooks::query_signal}; -use leptos_use::{ - use_interval_fn, use_timestamp, use_websocket_with_options, ReconnectLimit, - UseWebSocketOptions, UseWebSocketReturn, +use leptos_router::{ + components::Router, + hooks::{query_signal, use_query_map}, }; -use opendal::{services::Http, Operator}; -use query_results::QueryResults; + +use query_results::{export_to_csv_inner, export_to_parquet_inner, QueryResults}; use schema::SchemaSection; mod file_reader; @@ -33,9 +31,7 @@ mod query_input; use query_input::{execute_query_inner, QueryInput}; mod settings; -use settings::{Settings, ANTHROPIC_API_KEY, WS_ENDPOINT_KEY}; - -use std::fmt::Display; +use settings::{Settings, ANTHROPIC_API_KEY}; #[derive(Debug, Clone, PartialEq)] struct ParquetInfo { @@ -169,90 +165,19 @@ async fn execute_query_async( Ok((results, physical_plan)) } -#[derive(Clone)] -pub struct WebsocketContext { - pub message: Signal>, - send: Arc, -} - -impl WebsocketContext { - pub fn new(message: Signal>, send: Arc) -> Self { - Self { message, send } - } - - pub fn send(&self, message: &str) { - (self.send)(&message.to_string()) - } -} - -#[derive(Debug, Clone, serde::Deserialize)] -#[serde(rename_all = "snake_case")] -enum WebSocketMessage { - Sql { - query: String, - }, - ParquetFile { - file_name: String, - server_address: String, - }, -} - -#[derive(Debug, Clone, serde::Serialize)] -struct AckMessage { - message_type: String, -} - -impl AckMessage { - fn new() -> Self { - Self { - message_type: "ack".to_string(), - } - } - - fn new_json() -> String { - serde_json::to_string(&Self::new()).unwrap() - } -} - -#[derive(Clone, Debug)] -pub struct ConnectionInfo { - pub last_message_time: Option, - pub display_time: RwSignal, -} - -impl ConnectionInfo { - pub fn new() -> Self { - Self { - last_message_time: None, - display_time: RwSignal::new("never".to_string()), - } - } -} - -impl Default for ConnectionInfo { - fn default() -> Self { - Self::new() - } -} - -impl Display for ConnectionInfo { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.display_time.get()) - } -} - #[component] fn App() -> impl IntoView { let (error_message, set_error_message) = signal(Option::::None); let (file_bytes, set_file_bytes) = signal(None::); let (user_input, set_user_input) = query_signal::("query"); + let export_to = use_query_map().with(|map| map.get("export").map(|v| v.to_string())); + let (sql_query, set_sql_query) = signal(String::new()); let (query_result, set_query_result) = signal(Vec::::new()); let (file_name, set_file_name) = signal(String::from("uploaded")); let (physical_plan, set_physical_plan) = signal(None::>); let (show_settings, set_show_settings) = signal(false); - let (connection_info, set_connection_info) = signal(ConnectionInfo::new()); let api_key = get_stored_value(ANTHROPIC_API_KEY, ""); let parquet_info = Memo::new(move |_| { @@ -261,72 +186,6 @@ fn App() -> impl IntoView { .and_then(|bytes| get_parquet_info(bytes.clone()).ok()) }); - let ws_url = get_stored_value(WS_ENDPOINT_KEY, "ws://localhost:12306"); - let UseWebSocketReturn { message, send, .. } = - use_websocket_with_options::( - &ws_url, - UseWebSocketOptions::default() - .reconnect_limit(ReconnectLimit::Infinite) - .reconnect_interval(1000), - ); - - let send = Arc::new(send); - Effect::watch( - message, - move |message, _, _| { - if let Some(message) = message { - set_connection_info.update(|info| { - info.last_message_time = Some(use_timestamp()()); - info.display_time.set("0s ago".to_string()); - }); - - let message = serde_json::from_str::(message).unwrap(); - match message { - WebSocketMessage::Sql { query } => { - // Send acknowledgment - let ack_json = AckMessage::new_json(); - send(&ack_json); - set_user_input.set(Some(query.clone())); - } - WebSocketMessage::ParquetFile { - file_name, - server_address, - } => { - logging::log!( - "Received file: {}, server_address: {}", - file_name, - server_address - ); - let builder = Http::default().endpoint(&server_address); - let Ok(op) = Operator::new(builder) else { - set_error_message.set(Some("Failed to create HTTP operator".into())); - return; - }; - let op = op.finish(); - let send_inner = send.clone(); - leptos::task::spawn_local(async move { - loop { - match op.read(&file_name).await { - Ok(bs) => { - send_inner(&AckMessage::new_json()); - set_file_bytes.set(Some(bs.to_bytes())); - set_file_name.set(file_name.clone()); - logging::log!("read file success"); - return; - } - Err(e) => { - logging::log!("read file failed: {}", e); - } - } - } - }); - } - } - } - }, - true, - ); - Effect::watch( parquet_info, move |info, _, _| { @@ -406,12 +265,20 @@ fn App() -> impl IntoView { }; let query = query.clone(); + let export_to = export_to.clone(); leptos::task::spawn_local(async move { match execute_query_async(query.clone(), bytes, table_name, parquet_info).await { Ok((results, physical_plan)) => { set_physical_plan.set(Some(physical_plan)); + if let Some(export_to) = export_to { + if export_to == "csv" { + export_to_csv_inner(&results); + } else if export_to == "parquet" { + export_to_parquet_inner(&results); + } + } set_query_result.set(results); } Err(e) => set_error_message.set(Some(e)), @@ -424,20 +291,6 @@ fn App() -> impl IntoView { true, ); - // Set up the interval in the component - use_interval_fn( - move || { - set_connection_info.update(|info| { - if let Some(last_time) = info.last_message_time { - let current = use_timestamp()(); - let seconds = ((current - last_time) / 1000.0).round() as i64; - info.display_time.set(format!("{}s ago", seconds)); - } - }); - }, - 1000, - ); - view! {

@@ -583,7 +436,6 @@ fn App() -> impl IntoView {

} diff --git a/src/query_results.rs b/src/query_results.rs index 9505346..a3421f9 100644 --- a/src/query_results.rs +++ b/src/query_results.rs @@ -15,7 +15,7 @@ use parquet::arrow::ArrowWriter; use web_sys::js_sys; use web_sys::wasm_bindgen::JsCast; -fn export_to_csv_inner(query_result: &[RecordBatch]) { +pub(crate) fn export_to_csv_inner(query_result: &[RecordBatch]) { let mut csv_data = String::new(); let headers: Vec = query_result[0] .schema() @@ -53,7 +53,7 @@ fn export_to_csv_inner(query_result: &[RecordBatch]) { web_sys::Url::revoke_object_url(&url).unwrap(); } -fn export_to_parquet_inner(query_result: &[RecordBatch]) { +pub(crate) fn export_to_parquet_inner(query_result: &[RecordBatch]) { // Create an in-memory buffer to write the parquet data let mut buf = Vec::new(); diff --git a/src/settings.rs b/src/settings.rs index 36e914d..1977446 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -1,9 +1,7 @@ -use crate::ConnectionInfo; use leptos::html::*; use leptos::prelude::*; use leptos::*; -pub(crate) const WS_ENDPOINT_KEY: &str = "ws_endpoint"; pub(crate) const ANTHROPIC_API_KEY: &str = "claude_api_key"; pub(crate) const S3_ENDPOINT_KEY: &str = "s3_endpoint"; pub(crate) const S3_ACCESS_KEY_ID_KEY: &str = "s3_access_key_id"; @@ -30,11 +28,8 @@ fn save_to_storage(key: &str, value: &str) { pub fn Settings( show: ReadSignal, set_show: WriteSignal, - connection_info: ReadSignal, ) -> impl IntoView { - let (ws_endpoint, set_ws_endpoint) = - signal(get_stored_value(WS_ENDPOINT_KEY, "ws://localhost:12306")); - let (anthropic_key, set_anthropic_key) = signal(get_stored_value(ANTHROPIC_API_KEY, "")); + let (anthropic_key, set_anthropic_key) = signal(get_stored_value(ANTHROPIC_API_KEY, "")); let (s3_endpoint, set_s3_endpoint) = signal(get_stored_value( S3_ENDPOINT_KEY, "https://s3.amazonaws.com", @@ -98,28 +93,7 @@ pub fn Settings( class="w-full px-3 py-2 border border-gray-300 rounded-md" /> -
- -
- -
- - "last msg: "{move || connection_info.get().to_string()} - -
-
-
+ // S3 Configuration Section