Skip to content

Commit

Permalink
Merge branch 'apache:main' into feature/internal_err_9164
Browse files Browse the repository at this point in the history
  • Loading branch information
Omega359 authored Feb 15, 2024
2 parents 11b6709 + 85be1bc commit f481e67
Show file tree
Hide file tree
Showing 58 changed files with 2,409 additions and 375 deletions.
43 changes: 43 additions & 0 deletions .github/workflows/audit.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: Security audit

concurrency:
group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }}
cancel-in-progress: true

on:
push:
paths:
- "**/Cargo.toml"
- "**/Cargo.lock"

pull_request:
paths:
- "**/Cargo.toml"
- "**/Cargo.lock"

jobs:
security_audit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install cargo-audit
run: cargo install cargo-audit
- name: Run audit check
run: cargo audit
3 changes: 3 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ jobs:
- name: Check function packages (encoding_expressions)
run: cargo check --no-default-features --features=encoding_expressions -p datafusion

- name: Check function packages (math_expressions)
run: cargo check --no-default-features --features=math_expressions -p datafusion

- name: Check function packages (array_expressions)
run: cargo check --no-default-features --features=array_expressions -p datafusion

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ arrow-string = { version = "50.0.0", default-features = false }
async-trait = "0.1.73"
bigdecimal = "0.4.1"
bytes = "1.4"
chrono = { version = "0.4.31", default-features = false }
chrono = { version = "0.4.34", default-features = false }
ctor = "0.2.0"
dashmap = "5.4.0"
datafusion = { path = "datafusion/core", version = "35.0.0" }
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ data_tpch() {
echo " tbl files exist ($FILE exists)."
else
echo " creating tbl files with tpch_dbgen..."
docker run -v "${TPCH_DIR}":/data -it --rm ghcr.io/databloom-ai/tpch-docker:main -vf -s ${SCALE_FACTOR}
docker run -v "${TPCH_DIR}":/data -it --rm ghcr.io/scalytics/tpch-docker:main -vf -s ${SCALE_FACTOR}
fi

# Copy expected answers into the ./data/answers directory if it does not already exist
Expand All @@ -288,7 +288,7 @@ data_tpch() {
else
echo " Copying answers to ${TPCH_DIR}/answers"
mkdir -p "${TPCH_DIR}/answers"
docker run -v "${TPCH_DIR}":/data -it --entrypoint /bin/bash --rm ghcr.io/databloom-ai/tpch-docker:main -c "cp -f /opt/tpch/2.18.0_rc2/dbgen/answers/* /data/answers/"
docker run -v "${TPCH_DIR}":/data -it --entrypoint /bin/bash --rm ghcr.io/scalytics/tpch-docker:main -c "cp -f /opt/tpch/2.18.0_rc2/dbgen/answers/* /data/answers/"
fi

# Create 'parquet' files from tbl
Expand Down
4 changes: 2 additions & 2 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

124 changes: 91 additions & 33 deletions datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::object_storage::get_object_store;
use async_trait::async_trait;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
Expand All @@ -24,10 +25,9 @@ use datafusion::datasource::listing::{
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::SessionState;
use object_store::http::HttpBuilder;
use object_store::ObjectStore;
use parking_lot::RwLock;
use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use url::Url;

Expand Down Expand Up @@ -155,27 +155,24 @@ impl SchemaProvider for DynamicFileSchemaProvider {
// that name, try to treat it as a listing table
let state = self.state.upgrade()?.read().clone();
let table_url = ListingTableUrl::parse(name).ok()?;

// Assure the `http` store for this url is registered if this
// is an `http(s)` listing
// TODO: support for other types, e.g. `s3`, may need to be added
match table_url.scheme() {
"http" | "https" => {
let url: &Url = table_url.as_ref();
match state.runtime_env().object_store_registry.get_store(url) {
Ok(_) => {}
Err(_) => {
let store = Arc::new(
HttpBuilder::new()
.with_url(url.origin().ascii_serialization())
.build()
.ok()?,
) as Arc<dyn ObjectStore>;
state.runtime_env().register_object_store(url, store);
}
}
let url: &Url = table_url.as_ref();

// If the store is already registered for this URL then `get_store`
// will return `Ok` which means we don't need to register it again. However,
// if `get_store` returns an `Err` then it means the corresponding store is
// not registered yet and we need to register it
match state.runtime_env().object_store_registry.get_store(url) {
Ok(_) => { /*Nothing to do here, store for this URL is already registered*/ }
Err(_) => {
// Register the store for this URL. Here we don't have access
// to any command options so the only choice is to use an empty collection
let mut options = HashMap::new();
let store =
get_object_store(&state, &mut options, table_url.scheme(), url)
.await
.unwrap();
state.runtime_env().register_object_store(url, store);
}
_ => {}
}

let config = ListingTableConfig::new(table_url)
Expand All @@ -198,15 +195,10 @@ impl SchemaProvider for DynamicFileSchemaProvider {
#[cfg(test)]
mod tests {
use super::*;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::prelude::SessionContext;

#[tokio::test]
async fn query_http_location_test() -> Result<()> {
// Perhaps this could be changed to use an existing file but
// that will require a permanently availalble web resource
let domain = "example.com";
let location = format!("http://{domain}/file.parquet");

fn setup_context() -> (SessionContext, Arc<dyn SchemaProvider>) {
let mut ctx = SessionContext::new();
ctx.register_catalog_list(Arc::new(DynamicFileCatalog::new(
ctx.state().catalog_list(),
Expand All @@ -222,12 +214,23 @@ mod tests {
let schema = catalog
.schema(catalog.schema_names().first().unwrap())
.unwrap();
let none = schema.table(&location).await;
(ctx, schema)
}

// That's a non-existing location so expecting None here
assert!(none.is_none());
#[tokio::test]
async fn query_http_location_test() -> Result<()> {
// This is a unit test so not expecting a connection or a file to be
// available
let domain = "example.com";
let location = format!("http://{domain}/file.parquet");

// It should still create an object store for the location
let (ctx, schema) = setup_context();

// That's a non registered table so expecting None here
let table = schema.table(&location).await;
assert!(table.is_none());

// It should still create an object store for the location in the SessionState
let store = ctx
.runtime_env()
.object_store(ListingTableUrl::parse(location)?)?;
Expand All @@ -240,4 +243,59 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn query_s3_location_test() -> Result<()> {
let bucket = "examples3bucket";
let location = format!("s3://{bucket}/file.parquet");

let (ctx, schema) = setup_context();

let table = schema.table(&location).await;
assert!(table.is_none());

let store = ctx
.runtime_env()
.object_store(ListingTableUrl::parse(location)?)?;
assert_eq!(format!("{store}"), format!("AmazonS3({bucket})"));

// The store must be configured for this domain
let expected_bucket = format!("bucket: \"{bucket}\"");
assert!(format!("{store:?}").contains(&expected_bucket));

Ok(())
}

#[tokio::test]
async fn query_gs_location_test() -> Result<()> {
let bucket = "examplegsbucket";
let location = format!("gs://{bucket}/file.parquet");

let (ctx, schema) = setup_context();

let table = schema.table(&location).await;
assert!(table.is_none());

let store = ctx
.runtime_env()
.object_store(ListingTableUrl::parse(location)?)?;
assert_eq!(format!("{store}"), format!("GoogleCloudStorage({bucket})"));

// The store must be configured for this domain
let expected_bucket = format!("bucket_name_encoded: \"{bucket}\"");
assert!(format!("{store:?}").contains(&expected_bucket));

Ok(())
}

#[tokio::test]
#[should_panic]
async fn query_invalid_location_test() {
let location = "ts://file.parquet";
let (_ctx, schema) = setup_context();

// This will panic, we cannot prevent that because `schema.table`
// returns an Option
schema.table(location).await;
}
}
60 changes: 11 additions & 49 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,20 @@
//! Execution functions
use std::collections::HashMap;
use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;
use std::time::Instant;
use std::{fs::File, sync::Arc};

use crate::print_format::PrintFormat;
use crate::{
command::{Command, OutputFormat},
helper::{unescape_input, CliHelper},
object_storage::{
get_gcs_object_store_builder, get_oss_object_store_builder,
get_s3_object_store_builder,
},
object_storage::get_object_store,
print_options::{MaxRows, PrintOptions},
};

use datafusion::common::{exec_datafusion_err, plan_datafusion_err};
use datafusion::common::plan_datafusion_err;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::physical_plan::is_plan_streaming;
use datafusion::error::{DataFusionError, Result};
Expand All @@ -45,8 +42,6 @@ use datafusion::sql::{parser::DFParser, sqlparser::dialect::dialect_from_str};

use datafusion::logical_expr::dml::CopyTo;
use datafusion::sql::parser::Statement;
use object_store::http::HttpBuilder;
use object_store::ObjectStore;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use tokio::signal;
Expand Down Expand Up @@ -280,8 +275,13 @@ async fn register_object_store(
copy_to: &mut CopyTo,
) -> Result<(), DataFusionError> {
let url = ListingTableUrl::parse(copy_to.output_url.as_str())?;
let store =
get_object_store(ctx, &mut HashMap::new(), url.scheme(), url.as_ref()).await?;
let store = get_object_store(
&ctx.state(),
&mut HashMap::new(),
url.scheme(),
url.as_ref(),
)
.await?;
ctx.runtime_env().register_object_store(url.as_ref(), store);
Ok(())
}
Expand All @@ -295,50 +295,12 @@ async fn create_external_table(
let url: &Url = table_path.as_ref();

// registering the cloud object store dynamically using cmd.options
let store = get_object_store(ctx, &mut cmd.options, scheme, url).await?;

let store = get_object_store(&ctx.state(), &mut cmd.options, scheme, url).await?;
ctx.runtime_env().register_object_store(url, store);

Ok(())
}

async fn get_object_store(
ctx: &SessionContext,
options: &mut HashMap<String, String>,
scheme: &str,
url: &Url,
) -> Result<Arc<dyn ObjectStore>, DataFusionError> {
let store = match scheme {
"s3" => {
let builder = get_s3_object_store_builder(url, options).await?;
Arc::new(builder.build()?) as Arc<dyn ObjectStore>
}
"oss" => {
let builder = get_oss_object_store_builder(url, options)?;
Arc::new(builder.build()?) as Arc<dyn ObjectStore>
}
"gs" | "gcs" => {
let builder = get_gcs_object_store_builder(url, options)?;
Arc::new(builder.build()?) as Arc<dyn ObjectStore>
}
"http" | "https" => Arc::new(
HttpBuilder::new()
.with_url(url.origin().ascii_serialization())
.build()?,
) as Arc<dyn ObjectStore>,
_ => {
// for other types, try to get from the object_store_registry
ctx.runtime_env()
.object_store_registry
.get_store(url)
.map_err(|_| {
exec_datafusion_err!("Unsupported object store scheme: {}", scheme)
})?
}
};
Ok(store)
}

#[cfg(test)]
mod tests {
use std::str::FromStr;
Expand Down
Loading

0 comments on commit f481e67

Please sign in to comment.