diff --git a/week_2/workspaces/resources.py b/week_2/workspaces/resources.py index 2c2cba81..38172a29 100644 --- a/week_2/workspaces/resources.py +++ b/week_2/workspaces/resources.py @@ -23,7 +23,7 @@ def uri(self): return f"postgresql://{self.user}:{self.password}@{self.host}/{self.database}" def execute_query(self, query: str): - self._engine.execute(query) + return self._engine.execute(query) class S3: diff --git a/week_3/workspaces/challenge/week_3_challenge.py b/week_3/workspaces/challenge/week_3_challenge.py index 3be63e5c..35ecd703 100644 --- a/week_3/workspaces/challenge/week_3_challenge.py +++ b/week_3/workspaces/challenge/week_3_challenge.py @@ -13,7 +13,7 @@ def load_input(self, context): pass -@io_manager(required_resource_keys={"postgres"}) +@io_manager(required_resource_keys={"database"}) def postgres_io_manager(init_context): return PostgresIOManager() diff --git a/week_3/workspaces/project/week_3.py b/week_3/workspaces/project/week_3.py index 68e1f11b..c88c3c6f 100644 --- a/week_3/workspaces/project/week_3.py +++ b/week_3/workspaces/project/week_3.py @@ -62,7 +62,7 @@ def week_3_pipeline(): "bucket": "dagster", "access_key": "test", "secret_key": "test", - "endpoint_url": "http://localstack:4566", + "endpoint_url": "http://localhost:4566", } }, "redis": { diff --git a/week_3/workspaces/resources.py b/week_3/workspaces/resources.py index 28b548f5..192428b9 100644 --- a/week_3/workspaces/resources.py +++ b/week_3/workspaces/resources.py @@ -24,7 +24,7 @@ def uri(self): return f"postgresql://{self.user}:{self.password}@{self.host}/{self.database}" def execute_query(self, query: str): - self._engine.execute(query) + return self._engine.execute(query) class S3: diff --git a/week_4/workspaces/challenge/repo.py b/week_4/workspaces/challenge/repo.py index d09ac407..88d67ceb 100644 --- a/week_4/workspaces/challenge/repo.py +++ b/week_4/workspaces/challenge/repo.py @@ -4,7 +4,7 @@ create_dbt_table, dbt_assets, end, - insert_dbt_data, + dbt_table, ) from workspaces.dbt_config import DBT_PROJECT_PATH from workspaces.resources import postgres_resource @@ -13,7 +13,7 @@ @repository def repo(): return with_resources( - dbt_assets + [create_dbt_table, insert_dbt_data, end], + dbt_assets + [create_dbt_table, dbt_table, end], resource_defs={ "dbt": dbt_cli_resource.configured( { diff --git a/week_4/workspaces/challenge/week_4_challenge.py b/week_4/workspaces/challenge/week_4_challenge.py index 8b499b48..326b829c 100644 --- a/week_4/workspaces/challenge/week_4_challenge.py +++ b/week_4/workspaces/challenge/week_4_challenge.py @@ -10,6 +10,7 @@ @asset( required_resource_keys={"database"}, op_tags={"kind": "postgres"}, + key_prefix=["postgresql"], ) def create_dbt_table(context): sql = "CREATE SCHEMA IF NOT EXISTS analytics;" @@ -21,8 +22,9 @@ def create_dbt_table(context): @asset( required_resource_keys={"database"}, op_tags={"kind": "postgres"}, + key_prefix=["postgresql"], ) -def insert_dbt_data(context, create_dbt_table): +def dbt_table(context, create_dbt_table): sql = f"INSERT INTO {SOURCE_TABLE} (column_1, column_2, column_3) VALUES ('A', 'B', 'C');" number_of_rows = randint(1, 10) @@ -31,3 +33,8 @@ def insert_dbt_data(context, create_dbt_table): context.log.info("Inserted a row") context.log.info("Batch inserted") + + +@asset +def end(context): + pass \ No newline at end of file diff --git a/week_4/workspaces/dbt_config.py b/week_4/workspaces/dbt_config.py index 77efd7f2..38f66810 100644 --- a/week_4/workspaces/dbt_config.py +++ b/week_4/workspaces/dbt_config.py @@ -1 +1 @@ -DBT_PROJECT_PATH = "/opt/dagster/dagster_home/project/dbt_test_project" +DBT_PROJECT_PATH = "/opt/dagster/dagster_home/dbt_test_project" diff --git a/week_4/workspaces/resources.py b/week_4/workspaces/resources.py index 54a578da..39696f9e 100644 --- a/week_4/workspaces/resources.py +++ b/week_4/workspaces/resources.py @@ -23,7 +23,7 @@ def uri(self): return f"postgresql://{self.user}:{self.password}@{self.host}/{self.database}" def execute_query(self, query: str): - self._engine.execute(query) + return self._engine.execute(query) class S3: