Skip to content

Commit

Permalink
Refactor parsing BigQuery execution method. Add more tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
nozdrenkov committed Jan 31, 2024
1 parent 47aefeb commit 1356b01
Showing 1 changed file with 63 additions and 32 deletions.
95 changes: 63 additions & 32 deletions graphql/src/bigquery.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{Error, Ok, Result};
use anyhow::{Error, Result};
use futures::stream::{self, StreamExt};
use gcp_bigquery_client::{
dataset::ListOptions as DatasetListOptions, model::query_request::QueryRequest,
Expand Down Expand Up @@ -115,7 +115,8 @@ pub async fn execute_bigquery_query(query: &str) -> Result<Vec<Value>> {
Ok(results)
}

/// Executes an arbitrary SQL query on Google BigQuery using Bearer token authentication and returns the results as JSON.
/// Executes an arbitrary SQL query on Google BigQuery using Bearer token authentication and returns
/// the results as JSON.
pub async fn execute_bigquery_query_bearer(
token: &str,
project_id: &str,
Expand Down Expand Up @@ -162,24 +163,28 @@ fn parse_bigquery_response(json_response: serde_json::Value) -> Result<Vec<Value
.map(|field| field["name"].as_str().unwrap_or_default().to_string())
.collect::<Vec<String>>();

let rows = json_response["rows"]
.as_array()
.ok_or_else(|| anyhow::anyhow!("Invalid rows format"))?;
let rows = json_response
.get("rows")
.and_then(Value::as_array)
.map_or_else(|| vec![], |r| r.to_vec());

let results = rows
.iter()
.map(|row| {
let mut row_map = Map::new();
// Create a named variable for the temporary Map object
let default_map = Map::new();
let values = row.as_object().unwrap_or(&default_map);

// Now iterate through each field and value
for (field_name, value) in fields.iter().zip(values.values()) {
row_map.insert(field_name.clone(), value.clone());
}

Value::Object(row_map)
let empty_row_values = Vec::new();
let row_values = row["f"].as_array().unwrap_or(&empty_row_values);
Value::Object(
fields
.iter()
.zip(row_values.iter())
.map(|(field_name, value)| {
(
field_name.clone(),
value.get("v").cloned().unwrap_or(Value::Null),
)
})
.collect::<Map<String, Value>>(),
)
})
.collect::<Vec<Value>>();

Expand All @@ -193,23 +198,49 @@ mod tests {
use tokio;

#[tokio::test]
async fn test_bigquery_run_query_with_bearer_auth_success() {
let _ = env_logger::try_init();
let token = std::env::var("GCP_ACCESS_TOKEN").expect("GCP_ACCESS_TOKEN not set");
let expected = vec![json!({"numbers": [{"v": "123"}]})];

let tested = execute_bigquery_query_bearer(
&token,
&std::env::var("WILDFLOW_BIGQUERY_TEST_PROJECT")
.unwrap()
.as_str(),
"
select 123 as numbers
",
)
.await
.expect("Query failed");
async fn test_run_query_auth_basic_select_ok() {
let expected = vec![json!({"numbers": "123", "letters": "abc"})];

let tested = run_query("select 123 as numbers, 'abc' as letters")
.await
.expect("Query failed");

assert_eq!(tested, expected);
}

#[tokio::test]
async fn test_run_query_auth_create_table_empty_output() {
let query = "
create or replace table raw.test_table as (
select 123 as numbers, 'abc' as letters
);
";
let expected: Vec<Value> = vec![];

let tested = run_query(query).await.expect("Query failed");

assert_eq!(tested, expected);
}

#[tokio::test]
async fn test_run_query_auth_incorrect_query_error() {
let query = "some incorrect query";

let result = run_query(query).await;

match result {
Ok(_) => panic!("Expected an error but query succeeded"),
Err(e) => assert!(
e.to_string()
.contains("Syntax error: Unexpected keyword SOME at [1:1]"),
"Error message did not match expected value"
),
}
}

async fn run_query(query: &str) -> Result<Vec<Value>> {
let token = std::env::var("GCP_ACCESS_TOKEN").expect("GCP_ACCESS_TOKEN not set");
let project = std::env::var("WILDFLOW_BIGQUERY_TEST_PROJECT").unwrap();
execute_bigquery_query_bearer(&token, &project, query).await
}
}

0 comments on commit 1356b01

Please sign in to comment.