Skip to content

Commit

Permalink
DataSink additions (#7778)
Browse files Browse the repository at this point in the history
* File sink additions

* Fmt

* Clippy

* Update datafusion/physical-plan/src/insert.rs

Co-authored-by: Andrew Lamb <[email protected]>

* Feedback

* Fmt

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
Dandandan and alamb authored Oct 10, 2023
1 parent b6f87ed commit d2cc4d2
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 0 deletions.
9 changes: 9 additions & 0 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};

use async_trait::async_trait;
use bytes::{Buf, Bytes};
use datafusion_physical_plan::metrics::MetricsSet;
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};
Expand Down Expand Up @@ -484,6 +485,14 @@ impl CsvSink {

#[async_trait]
impl DataSink for CsvSink {
fn as_any(&self) -> &dyn Any {
self
}

fn metrics(&self) -> Option<MetricsSet> {
None
}

async fn write_all(
&self,
data: Vec<SendableRecordBatchStream>,
Expand Down
9 changes: 9 additions & 0 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use datafusion_common::DataFusionError;
use datafusion_common::FileType;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalSortRequirement;
use datafusion_physical_plan::metrics::MetricsSet;
use rand::distributions::Alphanumeric;
use rand::distributions::DistString;
use std::fmt;
Expand Down Expand Up @@ -276,6 +277,14 @@ impl JsonSink {

#[async_trait]
impl DataSink for JsonSink {
fn as_any(&self) -> &dyn Any {
self
}

fn metrics(&self) -> Option<MetricsSet> {
None
}

async fn write_all(
&self,
data: Vec<SendableRecordBatchStream>,
Expand Down
9 changes: 9 additions & 0 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Parquet format abstractions
use datafusion_physical_plan::metrics::MetricsSet;
use parquet::column::writer::ColumnCloseResult;
use parquet::file::writer::SerializedFileWriter;
use rand::distributions::DistString;
Expand Down Expand Up @@ -757,6 +758,14 @@ impl ParquetSink {

#[async_trait]
impl DataSink for ParquetSink {
fn as_any(&self) -> &dyn Any {
self
}

fn metrics(&self) -> Option<MetricsSet> {
None
}

async fn write_all(
&self,
mut data: Vec<SendableRecordBatchStream>,
Expand Down
9 changes: 9 additions & 0 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! [`MemTable`] for querying `Vec<RecordBatch>` by DataFusion.
use datafusion_physical_plan::metrics::MetricsSet;
use futures::StreamExt;
use log::debug;
use std::any::Any;
Expand Down Expand Up @@ -259,6 +260,14 @@ impl MemSink {

#[async_trait]
impl DataSink for MemSink {
fn as_any(&self) -> &dyn Any {
self
}

fn metrics(&self) -> Option<MetricsSet> {
None
}

async fn write_all(
&self,
mut data: Vec<SendableRecordBatchStream>,
Expand Down
21 changes: 21 additions & 0 deletions datafusion/physical-plan/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;

use crate::metrics::MetricsSet;
use crate::stream::RecordBatchStreamAdapter;
use datafusion_common::{exec_err, internal_err, DataFusionError};
use datafusion_execution::TaskContext;
Expand All @@ -46,6 +47,16 @@ use datafusion_execution::TaskContext;
/// output.
#[async_trait]
pub trait DataSink: DisplayAs + Debug + Send + Sync {
/// Returns the data sink as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

/// Return a snapshot of the [MetricsSet] for this
/// [DataSink].
///
/// See [ExecutionPlan::metrics()] for more details
fn metrics(&self) -> Option<MetricsSet>;

// TODO add desired input ordering
// How does this sink want its input ordered?

Expand Down Expand Up @@ -151,6 +162,16 @@ impl FileSinkExec {
}
Ok(streams)
}

/// Returns insert sink
pub fn sink(&self) -> &dyn DataSink {
self.sink.as_ref()
}

/// Returns the metrics of the underlying [DataSink]
pub fn metrics(&self) -> Option<MetricsSet> {
self.sink.metrics()
}
}

impl DisplayAs for FileSinkExec {
Expand Down

0 comments on commit d2cc4d2

Please sign in to comment.