Skip to content

Commit

Permalink
Feature/padw 115 incremental build (#32)
Browse files Browse the repository at this point in the history
* feat (PADW-115): Add DW Source Table

* feat (PADW-115): Use DW_SOURCE_COLUMNS in Function Source Column

* chore (PADW-115): Remove Column Lookup

* feat (PADW-115): Add Incremental Build
  • Loading branch information
analyzer1 authored Dec 17, 2024
1 parent cb5c775 commit 03fb3c0
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 45 deletions.
26 changes: 26 additions & 0 deletions extension/src/controller/dv_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,36 @@ pub fn build_dv(build_id: Uuid, dv_objects_query: &str, load_data: bool) {
}
};

insert_dw_source_columns(&dv_schema);

if load_data {dv_data_loader(&dv_schema);}

}

fn insert_dw_source_columns(dv_schema: &DVSchema) {

let insert_dw_source_column: &str = r#"
INSERT INTO auto_dw.dw_source_objects (table_oid, column_ordinal_position)
VALUES ($1, $2)
"#;

for column in dv_schema.get_columns() {
let table_oid = column.0;
let column_ordinal_position = column.1;

log!("DV'd Table: {table_oid}, Col: {column_ordinal_position}");

Spi::connect( |mut client| {
_ = client.update(insert_dw_source_column, None,
Some(vec![
(PgOid::from(pg_sys::OIDOID), table_oid.into_datum()),
(PgOid::from(pg_sys::INT2OID), column_ordinal_position.into_datum()),
]));
}
);
}
}

fn dv_schema_push_to_repo(build_id: &String, dv_schema: &mut DVSchema) {

let now_gmt = Utc::now().naive_utc();
Expand Down
9 changes: 0 additions & 9 deletions extension/src/controller/dv_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,6 @@ pub fn get_dv_schemas() -> Vec<DVSchema> {
dv_schemas
}

pub fn column_in_dv_schemas(table_oid: u32, column_ordinal_position: i16) -> bool {

for dv_schema in get_dv_schemas() {
if dv_schema.contains_column(table_oid, column_ordinal_position) {return true;}
}

false
}

// Load All DV Schemas

pub fn dv_load_schemas_all() -> bool {
Expand Down
24 changes: 1 addition & 23 deletions extension/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,33 +141,11 @@ fn source_column() -> Result<
Ok(client
.select(query, None, None)?
.map(|row| {

let table_oid: Result<Option<u32>, pgrx::spi::Error> = row["table_oid"].value();
let column_ordinal_position: Result<Option<i16>, pgrx::spi::Error> = row["column_ordinal_position"].value();

let mut is_built = false;

if let (
Ok(Some(table_oid)),
Ok(Some(column_ordinal_position)),
) = (table_oid, column_ordinal_position) {
log!("Table OID: {table_oid}, Column Ordinal Position: {column_ordinal_position}");
is_built = dv_loader::column_in_dv_schemas(table_oid, column_ordinal_position);
} else {
log!("One or both values are missing or there was an error.");
}

(
row["schema"].value(),
row["table"].value(),
row["column"].value(),
{
if is_built {
Ok(Some("Built".to_string()))
} else {
row["status"].value()
}
},
row["status"].value(),
row["category"].value(),
row["is_sensitive"].value(),
row["confidence_level"].value(),
Expand Down
29 changes: 16 additions & 13 deletions extension/src/model/dv_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,25 @@ pub struct DVSchema {
}

impl DVSchema {
pub fn contains_column(&self, table_oid: u32, column_ordinal_position: i16) -> bool {


pub fn get_columns(&self) -> Vec<(u32, i16)> {
let mut columns: Vec<(u32, i16)> = Vec::new();
for link_key in &self.link_keys {
for business_key in &link_key.business_keys {
if business_key.contains_column(table_oid, column_ordinal_position) {return true;}
columns.append(&mut business_key.get_columns());
}

for descriptor in &link_key.descriptors {
if let Some(source_column) = &descriptor.descriptor_link.source_column {
if source_column.contains_column(table_oid, column_ordinal_position) {return true;}
columns.push(source_column.get_column());
}
}
}

for business_key in &self.business_keys {
if business_key.contains_column(table_oid, column_ordinal_position) {return true;}
columns.append(&mut business_key.get_columns());
}
false
columns
}
}

Expand Down Expand Up @@ -78,20 +79,23 @@ pub struct BusinessKeyPartLink {
}

impl BusinessKey {
pub fn contains_column(&self, table_oid: u32, column_ordinal_position: i16) -> bool {

pub fn get_columns(&self) -> Vec<(u32, i16)> {

let mut columns: Vec<(u32, i16)> = Vec::new();
// BK Part Search
for bkp_link in &self.business_key_part_links {
for source_column in &bkp_link.source_columns {
if source_column.contains_column(table_oid, column_ordinal_position) {return true;}
columns.push(source_column.get_column());
}
}
// Descriptor Search
for descriptor in &self.descriptors {
if let Some(source_column) = &descriptor.descriptor_link.source_column {
if source_column.contains_column(table_oid, column_ordinal_position) {return true;}
columns.push(source_column.get_column());
}
}
false
columns
}
}

Expand Down Expand Up @@ -140,8 +144,7 @@ pub struct ColumnData {
}

impl ColumnData {
pub fn contains_column(&self, table_oid: u32, column_ordinal_position: i16) -> bool {
if self.table_oid == table_oid && self.column_ordinal_position == column_ordinal_position {true}
else {false}
pub fn get_column(&self) -> (u32, i16) {
(self.table_oid, self.column_ordinal_position)
}
}
10 changes: 10 additions & 0 deletions extension/src/model/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,9 @@ pub fn insert_into_build_call(
t.reason,
t.category,
t.model_name,
CASE
WHEN dws.column_ordinal_position IS NOT NULL THEN true ELSE false
END AS is_dw,
MAX(
CASE
WHEN t.category = 'Business Key Part' AND t.confidence_score < cl.value THEN 1
Expand All @@ -360,12 +363,14 @@ pub fn insert_into_build_call(
) OVER (PARTITION BY s.schema_name, s.table_name) AS bkp_cnt
FROM auto_dw.source_objects AS s
JOIN confidence_level AS cl ON true
LEFT JOIN auto_dw.dw_source_objects AS dws ON s.table_oid = dws.table_oid AND s.column_ordinal_position = dws.column_ordinal_position
LEFT JOIN source_object_transformation_latest AS t ON s.pk_source_objects = t.fk_source_objects
WHERE s.current_flag = 'Y' AND s.deleted_flag = 'N'
),
source_object AS (
SELECT *,
CASE
WHEN is_dw THEN 'Built'
WHEN confidence_score IS NULL THEN 'Queued for Processing'
-- Links
WHEN category = 'Business Key Part' AND confidence_score >= cl.value AND bkp_cnt > 1 THEN 'Ready to Deploy'
Expand Down Expand Up @@ -462,6 +467,9 @@ pub fn source_column(accepted_transformer_confidence_level: &str) -> String {
t.reason,
t.category,
t.model_name,
CASE
WHEN dws.column_ordinal_position IS NOT NULL THEN true ELSE false
END AS is_dw,
MAX(
CASE
WHEN t.category = 'Business Key Part' AND t.confidence_score < cl.value THEN 1
Expand All @@ -476,12 +484,14 @@ pub fn source_column(accepted_transformer_confidence_level: &str) -> String {
) OVER (PARTITION BY s.schema_name, s.table_name) AS bkp_cnt
FROM auto_dw.source_objects AS s
JOIN confidence_level AS cl ON true
LEFT JOIN auto_dw.dw_source_objects AS dws ON s.table_oid = dws.table_oid AND s.column_ordinal_position = dws.column_ordinal_position
LEFT JOIN source_object_transformation_latest AS t ON s.pk_source_objects = t.fk_source_objects
WHERE s.current_flag = 'Y' AND s.deleted_flag = 'N'
),
source_object AS (
SELECT *,
CASE
WHEN is_dw THEN 'Built'
WHEN confidence_score IS NULL THEN 'Queued for Processing'
-- Links
WHEN category = 'Business Key Part' AND confidence_score >= cl.value AND bkp_cnt > 1 THEN 'Ready'
Expand Down
9 changes: 9 additions & 0 deletions extension/src/utility/sql/info_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ CREATE TABLE IF NOT EXISTS source_objects
deleted_flag CHAR(1) DEFAULT 'N'
);

DROP TABLE IF EXISTS dw_source_objects;

CREATE TABLE IF NOT EXISTS dw_source_objects
(
pk_dw_source_objects BIGSERIAL PRIMARY KEY,
table_oid OID,
column_ordinal_position SMALLINT
);

DROP TABLE IF EXISTS auto_dw.transformer_responses;

CREATE TABLE IF NOT EXISTS transformer_responses
Expand Down

0 comments on commit 03fb3c0

Please sign in to comment.