Skip to content

Commit

Permalink
cache server
Browse files Browse the repository at this point in the history
  • Loading branch information
XiangpengHao committed Nov 5, 2024
1 parent 07c347b commit e41cbee
Show file tree
Hide file tree
Showing 7 changed files with 275 additions and 202 deletions.
122 changes: 0 additions & 122 deletions benchmarks/src/bin/cache_client.rs

This file was deleted.

10 changes: 7 additions & 3 deletions benchmarks/src/bin/cache_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ macro_rules! status {
struct Options {
#[structopt(short, long)]
path: PathBuf,

#[structopt(long)]
partitions: Option<usize>,
}

#[tokio::main]
Expand All @@ -82,7 +85,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let mut session_config = SessionConfig::from_env()
.map_err(|e| Status::internal(format!("Error building plan: {e}")))?
.with_information_schema(true);
.with_information_schema(true)
.with_target_partitions(options.partitions.unwrap_or(num_cpus::get()));

session_config
.options_mut()
Expand Down Expand Up @@ -275,10 +279,10 @@ impl FlightSqlService for FlightSqlServiceImpl {
let execution_plan = self.get_result(&handle)?;

let displayable =
datafusion::physical_plan::display::DisplayableExecutionPlan::with_metrics(
datafusion::physical_plan::display::DisplayableExecutionPlan::new(
execution_plan.as_ref(),
);
info!("physical plan:\n{}", displayable.indent(true));
info!("physical plan:\n{}", displayable.indent(false));

let ctx = self.get_ctx(&request)?;

Expand Down
99 changes: 65 additions & 34 deletions benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::env;
use std::fs::File;
use std::path::Path;
Expand All @@ -34,6 +35,9 @@ use datafusion::{
};
use datafusion_common::exec_datafusion_err;
use datafusion_common::instant::Instant;
use datafusion_flight_table::sql::FlightSqlDriver;
use datafusion_flight_table::sql::USERNAME;
use datafusion_flight_table::FlightTableFactory;
use object_store::aws::AmazonS3Builder;
use object_store::ObjectStore;
use parquet::arrow::arrow_cache::ArrowArrayCache;
Expand Down Expand Up @@ -101,6 +105,9 @@ pub struct RunOpt {
/// Predicate pushdown
#[structopt(long)]
pushdown_filters: bool,

#[structopt(long)]
flight_cache: Option<String>,
}

struct AllQueries {
Expand Down Expand Up @@ -171,7 +178,7 @@ impl RunOpt {
config.options_mut().execution.parquet.pushdown_filters = self.pushdown_filters;

let ctx = SessionContext::new_with_config(config);
self.register_hits(&ctx).await?;
self.register_hits(&ctx, &self.flight_cache).await?;

let iterations = self.common.iterations;
let mut benchmark_run = BenchmarkRun::new();
Expand Down Expand Up @@ -298,41 +305,65 @@ impl RunOpt {
Ok(())
}

/// Registrs the `hits.parquet` as a table named `hits`
async fn register_hits(&self, ctx: &SessionContext) -> Result<()> {
/// Registers the `hits.parquet` as a table named `hits`
async fn register_hits(
&self,
ctx: &SessionContext,
flight_cache: &Option<String>,
) -> Result<()> {
let path = self.path.as_os_str().to_str().unwrap();

let object_store: Arc<dyn ObjectStore> = if path.starts_with("minio://") {
let url = Url::parse(path).unwrap();
let bucket_name = url.host_str().unwrap_or("parquet-oo");
let object_store = AmazonS3Builder::new()
.with_bucket_name(bucket_name)
.with_endpoint("http://c220g5-110910.wisc.cloudlab.us:9000")
.with_allow_http(true)
.with_region("us-east-1")
.with_access_key_id(env::var("MINIO_ACCESS_KEY_ID").unwrap())
.with_secret_access_key(env::var("MINIO_SECRET_ACCESS_KEY").unwrap())
.build()?;
let object_store = Arc::new(object_store);
ctx.register_object_store(&url, object_store.clone());
object_store
} else {
let url = ObjectStoreUrl::local_filesystem();
let object_store = ctx.runtime_env().object_store(url).unwrap();
Arc::new(object_store)
};

let mut options: ParquetReadOptions<'_> = Default::default();
use datafusion::datasource::physical_plan::parquet::Parquet7FileReaderFactory;
options.reader = Some(Arc::new(Parquet7FileReaderFactory::new(object_store)));
match flight_cache {
Some(flight_cache) => {
let flight_sql =
FlightTableFactory::new(Arc::new(FlightSqlDriver::default()));
let table = flight_sql
.open_table(
flight_cache,
HashMap::from([(USERNAME.into(), "whatever".into())]),
"hits",
)
.await?;
ctx.register_table("hits", Arc::new(table))?;
Ok(())
}
None => {
let object_store: Arc<dyn ObjectStore> = if path.starts_with("minio://") {
let url = Url::parse(path).unwrap();
let bucket_name = url.host_str().unwrap_or("parquet-oo");
let object_store = AmazonS3Builder::new()
.with_bucket_name(bucket_name)
.with_endpoint("http://c220g5-110910.wisc.cloudlab.us:9000")
.with_allow_http(true)
.with_region("us-east-1")
.with_access_key_id(env::var("MINIO_ACCESS_KEY_ID").unwrap())
.with_secret_access_key(
env::var("MINIO_SECRET_ACCESS_KEY").unwrap(),
)
.build()?;
let object_store = Arc::new(object_store);
ctx.register_object_store(&url, object_store.clone());
object_store
} else {
let url = ObjectStoreUrl::local_filesystem();
let object_store = ctx.runtime_env().object_store(url).unwrap();
Arc::new(object_store)
};

ctx.register_parquet("hits", &path, options)
.await
.map_err(|e| {
DataFusionError::Context(
format!("Registering 'hits' as {path}"),
Box::new(e),
)
})
let mut options: ParquetReadOptions<'_> = Default::default();
use datafusion::datasource::physical_plan::parquet::Parquet7FileReaderFactory;
options.reader =
Some(Arc::new(Parquet7FileReaderFactory::new(object_store)));

ctx.register_parquet("hits", &path, options)
.await
.map_err(|e| {
DataFusionError::Context(
format!("Registering 'hits' as {path}"),
Box::new(e),
)
})
}
}
}
}
Loading

0 comments on commit e41cbee

Please sign in to comment.