diff --git a/dozer-admin/src/services/connection_service.rs b/dozer-admin/src/services/connection_service.rs index 1fc08c38be..9b4481a62f 100644 --- a/dozer-admin/src/services/connection_service.rs +++ b/dozer-admin/src/services/connection_service.rs @@ -33,7 +33,7 @@ impl ConnectionService { connection: Connection, ) -> Result, ErrorResponse> { let res = thread::spawn(|| { - let connector = get_connector(connection).map_err(|err| err.to_string())?; + let connector = get_connector(connection, None).map_err(|err| err.to_string())?; connector.get_tables().map_err(|err| err.to_string()) }) .join() @@ -185,7 +185,7 @@ impl ConnectionService { ) -> Result { let c = input.connection.unwrap(); let validate_result = thread::spawn(|| { - let connector = get_connector(c).map_err(|err| err.to_string())?; + let connector = get_connector(c, None).map_err(|err| err.to_string())?; connector.validate(None).map_err(|err| err.to_string()) }); validate_result diff --git a/dozer-ingestion/benches/helper.rs b/dozer-ingestion/benches/helper.rs index 63c1bd3a55..f9ae1bc3f5 100644 --- a/dozer-ingestion/benches/helper.rs +++ b/dozer-ingestion/benches/helper.rs @@ -40,7 +40,8 @@ pub fn get_progress() -> ProgressBar { pub async fn get_connection_iterator(config: TestConfig) -> IngestionIterator { let (ingestor, iterator) = Ingestor::initialize_channel(IngestionConfig::default()); std::thread::spawn(move || { - let grpc_connector = dozer_ingestion::connectors::get_connector(config.connection).unwrap(); + let grpc_connector = + dozer_ingestion::connectors::get_connector(config.connection, None).unwrap(); let mut tables = grpc_connector.get_tables().unwrap(); if let Some(tables_filter) = config.tables_filter { diff --git a/dozer-ingestion/src/connectors/delta_lake/connector.rs b/dozer-ingestion/src/connectors/delta_lake/connector.rs index b8ca3bf905..e12a8cd409 100644 --- a/dozer-ingestion/src/connectors/delta_lake/connector.rs +++ b/dozer-ingestion/src/connectors/delta_lake/connector.rs @@ -42,7 +42,7 @@ impl Connector for DeltaLakeConnector { } fn validate_schemas(&self, tables: &[TableInfo]) -> ConnectorResult { - let schemas = self.get_schemas(Some(tables.to_vec()))?; + let schemas = self.get_schemas(Some(&tables.to_vec()))?; let mut validation_result = ValidationResults::new(); let existing_schemas_names: Vec = schemas.iter().map(|s| s.name.clone()).collect(); for table in tables { @@ -59,7 +59,7 @@ impl Connector for DeltaLakeConnector { fn get_schemas( &self, - table_names: Option>, + table_names: Option<&Vec>, ) -> ConnectorResult> { let schema_helper = SchemaHelper::new(self.config.clone()); schema_helper.get_schemas(self.id, table_names) diff --git a/dozer-ingestion/src/connectors/delta_lake/schema_helper.rs b/dozer-ingestion/src/connectors/delta_lake/schema_helper.rs index 8fed1afb7c..6f2598bdf6 100644 --- a/dozer-ingestion/src/connectors/delta_lake/schema_helper.rs +++ b/dozer-ingestion/src/connectors/delta_lake/schema_helper.rs @@ -21,7 +21,7 @@ impl SchemaHelper { pub fn get_schemas( &self, id: u64, - tables: Option>, + tables: Option<&Vec>, ) -> ConnectorResult> { if tables.is_none() { return Ok(vec![]); diff --git a/dozer-ingestion/src/connectors/delta_lake/test/deltalake_test.rs b/dozer-ingestion/src/connectors/delta_lake/test/deltalake_test.rs index d131a5d6ef..cc1826a0dd 100644 --- a/dozer-ingestion/src/connectors/delta_lake/test/deltalake_test.rs +++ b/dozer-ingestion/src/connectors/delta_lake/test/deltalake_test.rs @@ -25,7 +25,7 @@ fn get_schema_from_deltalake() { name: table_name.to_string(), columns: None, }; - let field = connector.get_schemas(Some(vec![table_info])).unwrap()[0] + let field = connector.get_schemas(Some(&vec![table_info])).unwrap()[0] .schema .fields[0] .clone(); diff --git a/dozer-ingestion/src/connectors/ethereum/log/connector.rs b/dozer-ingestion/src/connectors/ethereum/log/connector.rs index bb467f0ac3..03adaa3bfc 100644 --- a/dozer-ingestion/src/connectors/ethereum/log/connector.rs +++ b/dozer-ingestion/src/connectors/ethereum/log/connector.rs @@ -124,7 +124,7 @@ impl EthLogConnector { impl Connector for EthLogConnector { fn get_schemas( &self, - tables: Option>, + tables: Option<&Vec>, ) -> Result, ConnectorError> { let mut schemas = vec![SourceSchema::new( ETH_LOGS_TABLE.to_string(), diff --git a/dozer-ingestion/src/connectors/ethereum/trace/connector.rs b/dozer-ingestion/src/connectors/ethereum/trace/connector.rs index 4c15307695..dd468fb697 100644 --- a/dozer-ingestion/src/connectors/ethereum/trace/connector.rs +++ b/dozer-ingestion/src/connectors/ethereum/trace/connector.rs @@ -36,7 +36,7 @@ impl EthTraceConnector { impl Connector for EthTraceConnector { fn get_schemas( &self, - _table_names: Option>, + _table_names: Option<&Vec>, ) -> Result, ConnectorError> { Ok(vec![SourceSchema::new( ETH_TRACE_TABLE.to_string(), diff --git a/dozer-ingestion/src/connectors/grpc/connector.rs b/dozer-ingestion/src/connectors/grpc/connector.rs index b40d252a0a..e31e0eff11 100644 --- a/dozer-ingestion/src/connectors/grpc/connector.rs +++ b/dozer-ingestion/src/connectors/grpc/connector.rs @@ -131,7 +131,7 @@ where { fn get_schemas( &self, - table_names: Option>, + table_names: Option<&Vec>, ) -> Result, ConnectorError> { let schemas_str = Self::parse_config(&self.config)?; let adapter = GrpcIngestor::::new(schemas_str)?; @@ -156,7 +156,7 @@ where } fn validate(&self, table_names: Option>) -> Result<(), ConnectorError> { - let schemas = self.get_schemas(table_names); + let schemas = self.get_schemas(table_names.as_ref()); schemas.map(|_| ()) } diff --git a/dozer-ingestion/src/connectors/grpc/tests.rs b/dozer-ingestion/src/connectors/grpc/tests.rs index c2bb7f3312..6ac60d410e 100644 --- a/dozer-ingestion/src/connectors/grpc/tests.rs +++ b/dozer-ingestion/src/connectors/grpc/tests.rs @@ -32,15 +32,18 @@ async fn ingest_grpc( let (ingestor, iterator) = Ingestor::initialize_channel(IngestionConfig::default()); std::thread::spawn(move || { - let grpc_connector = crate::connectors::get_connector(Connection { - config: Some(ConnectionConfig::Grpc(GrpcConfig { - schemas: Some(GrpcConfigSchemas::Inline(schemas.to_string())), - adapter, - port, - ..Default::default() - })), - name: "grpc".to_string(), - }) + let grpc_connector = crate::connectors::get_connector( + Connection { + config: Some(ConnectionConfig::Grpc(GrpcConfig { + schemas: Some(GrpcConfigSchemas::Inline(schemas.to_string())), + adapter, + port, + ..Default::default() + })), + name: "grpc".to_string(), + }, + None, + ) .unwrap(); let tables = grpc_connector.get_tables().unwrap(); diff --git a/dozer-ingestion/src/connectors/kafka/connector.rs b/dozer-ingestion/src/connectors/kafka/connector.rs index 6176ac2ad0..ebf1c3f860 100644 --- a/dozer-ingestion/src/connectors/kafka/connector.rs +++ b/dozer-ingestion/src/connectors/kafka/connector.rs @@ -28,10 +28,10 @@ impl KafkaConnector { impl Connector for KafkaConnector { fn get_schemas( &self, - table_names: Option>, + table_names: Option<&Vec>, ) -> Result, ConnectorError> { self.config.schema_registry_url.clone().map_or( - NoSchemaRegistry::get_schema(table_names.clone(), self.config.clone()), + NoSchemaRegistry::get_schema(table_names, self.config.clone()), |_| SchemaRegistry::get_schema(table_names, self.config.clone()), ) } diff --git a/dozer-ingestion/src/connectors/kafka/debezium/no_schema_registry.rs b/dozer-ingestion/src/connectors/kafka/debezium/no_schema_registry.rs index 6c7f1340a4..d1fd916591 100644 --- a/dozer-ingestion/src/connectors/kafka/debezium/no_schema_registry.rs +++ b/dozer-ingestion/src/connectors/kafka/debezium/no_schema_registry.rs @@ -14,7 +14,7 @@ pub struct NoSchemaRegistry {} impl NoSchemaRegistry { pub fn get_schema( - table_names: Option>, + table_names: Option<&Vec>, config: KafkaConfig, ) -> Result, ConnectorError> { table_names.map_or(Ok(vec![]), |tables| { diff --git a/dozer-ingestion/src/connectors/kafka/debezium/schema_registry.rs b/dozer-ingestion/src/connectors/kafka/debezium/schema_registry.rs index 264e85a33a..7ad6336836 100644 --- a/dozer-ingestion/src/connectors/kafka/debezium/schema_registry.rs +++ b/dozer-ingestion/src/connectors/kafka/debezium/schema_registry.rs @@ -80,7 +80,7 @@ impl SchemaRegistry { } pub fn get_schema( - table_names: Option>, + table_names: Option<&Vec>, config: KafkaConfig, ) -> Result, ConnectorError> { let sr_settings = SrSettings::new(config.schema_registry_url.unwrap()); diff --git a/dozer-ingestion/src/connectors/mod.rs b/dozer-ingestion/src/connectors/mod.rs index 04ee95208a..6bb70532ca 100644 --- a/dozer-ingestion/src/connectors/mod.rs +++ b/dozer-ingestion/src/connectors/mod.rs @@ -40,7 +40,7 @@ pub trait Connector: Send + Sync + Debug { fn get_schemas( &self, - table_names: Option>, + table_names: Option<&Vec>, ) -> Result, ConnectorError>; fn can_start_from(&self, last_checkpoint: (u64, u64)) -> Result; @@ -95,7 +95,10 @@ impl ColumnInfo { } } -pub fn get_connector(connection: Connection) -> Result, ConnectorError> { +pub fn get_connector( + connection: Connection, + tables: Option>, +) -> Result, ConnectorError> { let config = connection .config .ok_or_else(|| ConnectorError::MissingConfiguration(connection.name.clone()))?; @@ -104,7 +107,7 @@ pub fn get_connector(connection: Connection) -> Result, Conne let config = map_connection_config(&config)?; let postgres_config = PostgresConfig { name: connection.name, - tables: None, + tables, config, }; diff --git a/dozer-ingestion/src/connectors/object_store/connector.rs b/dozer-ingestion/src/connectors/object_store/connector.rs index 8469adfa31..9998980ab3 100644 --- a/dozer-ingestion/src/connectors/object_store/connector.rs +++ b/dozer-ingestion/src/connectors/object_store/connector.rs @@ -34,7 +34,7 @@ impl Connector for ObjectStoreConnector { fn get_schemas( &self, - table_names: Option>, + table_names: Option<&Vec>, ) -> ConnectorResult> { let mapper = SchemaMapper::new(self.config.clone()); mapper.get_schema(table_names) diff --git a/dozer-ingestion/src/connectors/object_store/schema_mapper.rs b/dozer-ingestion/src/connectors/object_store/schema_mapper.rs index 7ac8f0ce28..9d5ccd9248 100644 --- a/dozer-ingestion/src/connectors/object_store/schema_mapper.rs +++ b/dozer-ingestion/src/connectors/object_store/schema_mapper.rs @@ -51,18 +51,18 @@ impl SchemaMapper { pub trait Mapper { fn get_schema( &self, - tables: Option>, + tables: Option<&Vec>, ) -> Result, ConnectorError>; } impl Mapper for SchemaMapper { fn get_schema( &self, - tables: Option>, + tables: Option<&Vec>, ) -> Result, ConnectorError> { let rt = Runtime::new().map_err(|_| ObjectStoreConnectorError::RuntimeCreationError)?; - let tables_list = tables.unwrap_or_else(|| { + let tables_list = tables.cloned().unwrap_or_else(|| { self.config .tables() .iter() diff --git a/dozer-ingestion/src/connectors/postgres/connector.rs b/dozer-ingestion/src/connectors/postgres/connector.rs index 83f6434a10..49b4e50888 100644 --- a/dozer-ingestion/src/connectors/postgres/connector.rs +++ b/dozer-ingestion/src/connectors/postgres/connector.rs @@ -82,10 +82,10 @@ impl PostgresConnector { impl Connector for PostgresConnector { fn get_schemas( &self, - table_names: Option>, + table_names: Option<&Vec>, ) -> Result, ConnectorError> { self.schema_helper - .get_schemas(table_names.as_deref()) + .get_schemas(table_names.map(|t| &t[..])) .map_err(PostgresConnectorError) } diff --git a/dozer-ingestion/src/connectors/postgres/test_utils.rs b/dozer-ingestion/src/connectors/postgres/test_utils.rs index 466c2ffa08..09882b3973 100644 --- a/dozer-ingestion/src/connectors/postgres/test_utils.rs +++ b/dozer-ingestion/src/connectors/postgres/test_utils.rs @@ -36,7 +36,7 @@ pub fn get_iterator(config: Connection, table_name: String) -> IngestionIterator columns: None, }]; - let connector = get_connector(config).unwrap(); + let connector = get_connector(config, Some(tables.clone())).unwrap(); connector.start(None, &ingestor, tables).unwrap(); }); diff --git a/dozer-ingestion/src/connectors/postgres/xlog_mapper.rs b/dozer-ingestion/src/connectors/postgres/xlog_mapper.rs index 6c04e62291..b4b3f54716 100644 --- a/dozer-ingestion/src/connectors/postgres/xlog_mapper.rs +++ b/dozer-ingestion/src/connectors/postgres/xlog_mapper.rs @@ -2,7 +2,7 @@ use crate::connectors::postgres::helper; use crate::connectors::ColumnInfo; use crate::errors::{PostgresConnectorError, PostgresSchemaError}; use dozer_types::node::OpIdentifier; -use dozer_types::types::{Field, FieldDefinition, Operation, Record, Schema, SourceDefinition}; +use dozer_types::types::{Field, FieldDefinition, Operation, Record, SourceDefinition}; use helper::postgres_type_to_dozer_type; use postgres_protocol::message::backend::LogicalReplicationMessage::{ Begin, Commit, Delete, Insert, Relation, Update, @@ -123,7 +123,7 @@ impl XlogMapper { new: Record::new( Some(dozer_types::types::SchemaIdentifier { id: table.rel_id, - version: table.rel_id as u16, + version: 0, }), values, None, @@ -143,7 +143,7 @@ impl XlogMapper { old: Record::new( Some(dozer_types::types::SchemaIdentifier { id: table.rel_id, - version: table.rel_id as u16, + version: 0, }), old_values, None, @@ -151,7 +151,7 @@ impl XlogMapper { new: Record::new( Some(dozer_types::types::SchemaIdentifier { id: table.rel_id, - version: table.rel_id as u16, + version: 0, }), values, None, @@ -171,7 +171,7 @@ impl XlogMapper { old: Record::new( Some(dozer_types::types::SchemaIdentifier { id: table.rel_id, - version: table.rel_id as u16, + version: 0, }), values, None, @@ -248,15 +248,6 @@ impl XlogMapper { }); } - let _schema = Schema { - identifier: Some(dozer_types::types::SchemaIdentifier { - id: table.rel_id, - version: table.rel_id as u16, - }), - fields, - primary_index: vec![0], - }; - self.relations_map.insert(rel_id, table); Ok(()) diff --git a/dozer-ingestion/src/connectors/snowflake/connection/client.rs b/dozer-ingestion/src/connectors/snowflake/connection/client.rs index ded16ad3f2..41ef1fb7c7 100644 --- a/dozer-ingestion/src/connectors/snowflake/connection/client.rs +++ b/dozer-ingestion/src/connectors/snowflake/connection/client.rs @@ -387,7 +387,7 @@ impl Client { pub fn fetch_tables( &self, - tables: Option>, + tables: Option<&Vec>, tables_indexes: HashMap, keys: HashMap>, conn: &Connection, diff --git a/dozer-ingestion/src/connectors/snowflake/connector.rs b/dozer-ingestion/src/connectors/snowflake/connector.rs index 58a56822ca..8de74c8a63 100644 --- a/dozer-ingestion/src/connectors/snowflake/connector.rs +++ b/dozer-ingestion/src/connectors/snowflake/connector.rs @@ -55,7 +55,7 @@ impl Connector for SnowflakeConnector { #[cfg(feature = "snowflake")] fn get_schemas( &self, - table_names: Option>, + table_names: Option<&Vec>, ) -> Result, ConnectorError> { SchemaHelper::get_schema(&self.config, table_names) } @@ -63,7 +63,7 @@ impl Connector for SnowflakeConnector { #[cfg(not(feature = "snowflake"))] fn get_schemas( &self, - _table_names: Option>, + _table_names: Option<&Vec>, ) -> Result, ConnectorError> { todo!() } diff --git a/dozer-ingestion/src/connectors/snowflake/schema_helper.rs b/dozer-ingestion/src/connectors/snowflake/schema_helper.rs index 204897fb10..2d6e476d7a 100644 --- a/dozer-ingestion/src/connectors/snowflake/schema_helper.rs +++ b/dozer-ingestion/src/connectors/snowflake/schema_helper.rs @@ -14,7 +14,7 @@ pub struct SchemaHelper {} impl SchemaHelper { pub fn get_schema( config: &SnowflakeConfig, - table_names: Option>, + table_names: Option<&Vec>, ) -> Result, ConnectorError> { let client = Client::new(config); let env = create_environment_v3().map_err(|e| e.unwrap()).unwrap(); @@ -26,7 +26,7 @@ impl SchemaHelper { .fetch_keys(&conn) .map_err(ConnectorError::SnowflakeError)?; - let tables_indexes = table_names.clone().map_or(HashMap::new(), |tables| { + let tables_indexes = table_names.map_or(HashMap::new(), |tables| { let mut result = HashMap::new(); for (idx, table) in tables.iter().enumerate() { result.insert(table.name.clone(), idx); @@ -68,7 +68,7 @@ impl SchemaHelper { config: &SnowflakeConfig, tables: &[TableInfo], ) -> Result { - let schemas = Self::get_schema(config, Some(tables.to_vec()))?; + let schemas = Self::get_schema(config, Some(&tables.to_vec()))?; let mut validation_result = ValidationResults::new(); let existing_schemas_names: Vec = schemas.iter().map(|s| s.name.clone()).collect(); diff --git a/dozer-ingestion/src/connectors/snowflake/tests.rs b/dozer-ingestion/src/connectors/snowflake/tests.rs index 048257970e..793f534d0a 100644 --- a/dozer-ingestion/src/connectors/snowflake/tests.rs +++ b/dozer-ingestion/src/connectors/snowflake/tests.rs @@ -60,7 +60,7 @@ fn test_disabled_connector_and_read_from_stream() { thread::spawn(move || { let tables: Vec = vec![table]; - let connector = get_connector(connection_config).unwrap(); + let connector = get_connector(connection_config, None).unwrap(); let _ = connector.start(None, &ingestor, tables); }); @@ -103,7 +103,7 @@ fn test_disabled_connector_and_read_from_stream() { fn test_disabled_connector_get_schemas_test() { run_connector_test("snowflake", |config| { let connection = config.connections.get(0).unwrap(); - let connector = get_connector(connection.clone()).unwrap(); + let connector = get_connector(connection.clone(), None).unwrap(); let client = get_client(connection); let env = create_environment_v3().map_err(|e| e.unwrap()).unwrap(); @@ -138,7 +138,7 @@ fn test_disabled_connector_get_schemas_test() { let schemas = connector .as_ref() - .get_schemas(Some(vec![TableInfo { + .get_schemas(Some(&vec![TableInfo { name: table_name.clone(), columns: None, }])) @@ -176,7 +176,7 @@ fn test_disabled_connector_get_schemas_test() { fn test_disabled_connector_missing_table_validator() { run_connector_test("snowflake", |config| { let connection = config.connections.get(0).unwrap(); - let connector = get_connector(connection.clone()).unwrap(); + let connector = get_connector(connection.clone(), None).unwrap(); let not_existing_table = "not_existing_table".to_string(); let result = connector diff --git a/dozer-orchestrator/src/pipeline/connector_source.rs b/dozer-orchestrator/src/pipeline/connector_source.rs index 735ab4904f..dee3d41d75 100644 --- a/dozer-orchestrator/src/pipeline/connector_source.rs +++ b/dozer-orchestrator/src/pipeline/connector_source.rs @@ -75,14 +75,14 @@ impl ConnectorSourceFactory { ) -> Result { let connection_name = connection.name.clone(); - let connector = get_connector(connection).map_err(|e| InternalError(Box::new(e)))?; + let tables_list: Vec = table_and_ports + .iter() + .map(|(table, _)| table.clone()) + .collect(); + let connector = get_connector(connection, Some(tables_list.clone())) + .map_err(|e| InternalError(Box::new(e)))?; let source_schemas = connector - .get_schemas(Some( - table_and_ports - .iter() - .map(|(table, _)| table.clone()) - .collect(), - )) + .get_schemas(Some(&tables_list)) .map_err(|e| InternalError(Box::new(e)))?; let mut tables = vec![]; diff --git a/dozer-orchestrator/src/pipeline/validate.rs b/dozer-orchestrator/src/pipeline/validate.rs index 81c9577192..48985cbcc8 100644 --- a/dozer-orchestrator/src/pipeline/validate.rs +++ b/dozer-orchestrator/src/pipeline/validate.rs @@ -112,12 +112,12 @@ pub fn validate_grouped_connections( } pub fn validate(input: Connection, tables: Option>) -> Result<(), ConnectorError> { - get_connector(input)?.validate(tables) + get_connector(input, tables.clone())?.validate(tables) } pub fn validate_schema( input: Connection, tables: &[TableInfo], ) -> Result { - get_connector(input)?.validate_schemas(tables) + get_connector(input, Some(tables.to_vec()))?.validate_schemas(tables) } diff --git a/dozer-orchestrator/src/simple/executor.rs b/dozer-orchestrator/src/simple/executor.rs index 4b445e6fa3..ba385e40e8 100644 --- a/dozer-orchestrator/src/simple/executor.rs +++ b/dozer-orchestrator/src/simple/executor.rs @@ -106,7 +106,7 @@ impl<'a> Executor<'a> { for connection in connections { validate(connection.to_owned(), None)?; - let connector = get_connector(connection.to_owned())?; + let connector = get_connector(connection.to_owned(), None)?; let schema_tuples = connector.get_schemas(None)?; schema_map.insert(connection.name.to_owned(), schema_tuples); }