Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scratch: Try coupling vals and logs #44

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 41 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -965,37 +965,38 @@ logger = logging.getLogger(__name__)


def main() -> None:
"""The entry point for the Airflow Staging task.
"""The entry point for the Airflow Staging task.

Returns:
Void function.
"""
# LOAD DATA
logger.info("Loading data from live sources...")
Returns:
Void function.
"""
# LOAD DATA
logger.info("Loading data from live sources...")

bar_df = InputIO(source_config=input_config.get(source_key="BAR"), apply_schema_validations=True, log_schema_metrics=True).read()
foo_df = InputIO(source_config=input_config.get(source_key="FOO"), apply_schema_validations=True, log_schema_metrics=True).read()
bar_df = InputIO(resource_definition=input_config.get(source_key="BAR"), apply_schema_validations=True, log_schema_metrics=True).read()
foo_df = InputIO(resource_definition=input_config.get(source_key="FOO"), apply_schema_validations=True, log_schema_metrics=True).read()

logger.info("Data successfully loaded from live sources...")
logger.info("Data successfully loaded from live sources...")

# TRANSFORM DATA
logger.info("Apply transformations...")
# TRANSFORM DATA
logger.info("Apply transformations...")

# TODO: Apply your transformations
# TODO: Apply your transformations

logger.info("Transformations applied successfully...")
logger.info("Transformations applied successfully...")

# SINK DATA
logger.info("Begin sinking data to staging area:")
StagedFoo(source_config=raw_config.get(source_key="STAGED_FOO"), **constants.TO_PARQUET_KWARGS).write(foo_df)
StagedBar(source_config=raw_config.get(source_key="STAGED_BAR")).write(bar_df)
logger.info("Data staging is complete...")
# SINK DATA
logger.info("Begin sinking data to staging area:")
StagedFoo(resource_definition=raw_config.get(source_key="STAGED_FOO"), **constants.TO_PARQUET_KWARGS).write(foo_df)
StagedBar(resource_definition=raw_config.get(source_key="STAGED_BAR")).write(bar_df)
logger.info("Data staging is complete...")

```
### Utilising `asyncio`
`Dynamic(i/o)` supports use of `asyncio` to speed up `I/O bound` operations through leveraging multithreading.

An example can be found in the second of the two demo tasks, namely, the `transform.py` task.

```python
"""Add module docstring...."""
import asyncio
Expand All @@ -1009,35 +1010,35 @@ logger = logging.getLogger(__name__)


async def main() -> None:
"""The entry point for the Airflow Staging task.
"""The entry point for the Airflow Staging task.

Returns:
Void function.
"""
# LOAD DATA
logger.info("Loading data from live sources...")
Returns:
Void function.
"""
# LOAD DATA
logger.info("Loading data from live sources...")

[bar_df, foo_df] = await asyncio.gather(
StagedBar(source_config=raw_config.get(source_key="STAGED_BAR")).async_read(),
StagedFoo(source_config=raw_config.get(source_key="STAGED_FOO")).async_read()
)
[bar_df, foo_df] = await asyncio.gather(
StagedBar(resource_definition=raw_config.get(source_key="STAGED_BAR")).async_read(),
StagedFoo(resource_definition=raw_config.get(source_key="STAGED_FOO")).async_read()
)

logger.info("Data successfully loaded from live sources...")
logger.info("Data successfully loaded from live sources...")

# TRANSFORM DATA
logger.info("Apply transformations...")
# TRANSFORM DATA
logger.info("Apply transformations...")

# TODO: Apply your transformations
# TODO: Apply your transformations

logger.info("Transformations applied successfully...")
logger.info("Transformations applied successfully...")

# SINK DATA
logger.info(f"Begin sinking data to staging area: S3:{demo.src.environment.S3_YOUR_OUTPUT_BUCKET}:live/data/raw")
await asyncio.gather(
InputIO(source_config=processed_config.get(source_key="FINAL_FOO"), apply_schema_validations=True, log_schema_metrics=True).async_write(foo_df),
InputIO(source_config=processed_config.get(source_key="FINAL_BAR"), apply_schema_validations=True, log_schema_metrics=True).async_write(bar_df),
)
logger.info("Data staging is complete...")
# SINK DATA
logger.info(f"Begin sinking data to staging area: S3:{demo.src.environment.S3_YOUR_OUTPUT_BUCKET}:live/data/raw")
await asyncio.gather(
InputIO(resource_definition=processed_config.get(source_key="FINAL_FOO"), apply_schema_validations=True, log_schema_metrics=True).async_write(foo_df),
InputIO(resource_definition=processed_config.get(source_key="FINAL_BAR"), apply_schema_validations=True, log_schema_metrics=True).async_write(bar_df),
)
logger.info("Data staging is complete...")

```
In short, you simply need to utilise the `async_read()` or the `async_write()` methods instead, plus await and gather your calls.
Expand Down
26 changes: 26 additions & 0 deletions demo/resources/definitions/transform_input.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
STAGED_FOO:
sample:
type: "local"
local:
file_path: "[[ TEST_RESOURCES ]]/data/raw/staged_foo.parquet"
file_type: "parquet"
actual:
type: "s3"
s3:
bucket: "[[ S3_YOUR_OUTPUT_BUCKET ]]"
file_path: "live/data/raw/staged_foo.parquet"
file_type: "parquet"

STAGED_BAR:
sample:
type: "local"
local:
file_path: "[[ TEST_RESOURCES ]]/data/raw/staged_bar.parquet"
file_type: "parquet"
actual:
type: "s3"
s3:
bucket: "[[ S3_YOUR_OUTPUT_BUCKET ]]"
file_path: "live/data/raw/staged_bar.parquet"
file_type: "parquet"
19 changes: 12 additions & 7 deletions demo/src/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Set config IOs."""
__all__ = ["input_config", "raw_config", "processed_config"]
__all__ = ["staging_input_config", "staging_output_config", "transform_input_config", "transform_output_config"]

import logging
import os
Expand All @@ -12,18 +12,23 @@
logging.getLogger("kafka").setLevel(logging.WARNING)


input_config = IOConfig(
path_to_source_yaml=(os.path.join(RESOURCES, "definitions/input.yaml")),
staging_input_config = IOConfig(
path_to_source_yaml=(os.path.join(RESOURCES, "definitions/staging_input.yaml")),
env_identifier=ENVIRONMENT,
dynamic_vars=environment,
)
raw_config = IOConfig(
path_to_source_yaml=(os.path.join(RESOURCES, "definitions/raw.yaml")),
staging_output_config = IOConfig(
path_to_source_yaml=(os.path.join(RESOURCES, "definitions/staging_output.yaml")),
env_identifier=ENVIRONMENT,
dynamic_vars=environment,
)
processed_config = IOConfig(
path_to_source_yaml=(os.path.join(RESOURCES, "definitions/processed.yaml")),
transform_input_config = IOConfig(
path_to_source_yaml=(os.path.join(RESOURCES, "definitions/transform_input.yaml")),
env_identifier=ENVIRONMENT,
dynamic_vars=environment,
)
transform_output_config = IOConfig(
path_to_source_yaml=(os.path.join(RESOURCES, "definitions/transform_output.yaml")),
env_identifier=ENVIRONMENT,
dynamic_vars=environment,
)
38 changes: 0 additions & 38 deletions demo/src/io.py

This file was deleted.

18 changes: 18 additions & 0 deletions demo/src/io/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""Responsible for configuring io operations for input data."""
# pylint: disable=too-few-public-methods
__all__ = ["InputIO", "StagedFooIO", "StagedBarIO"]


from dynamicio import UnifiedIO, WithS3File, WithLocal, DynamicDataIO, WithPostgres


class InputIO(UnifiedIO):
"""UnifiedIO subclass for V6 data."""


class StagedFooIO(WithS3File, WithLocal, DynamicDataIO):
"""UnifiedIO subclass for staged foos."""


class StagedBarIO(WithLocal, WithPostgres, DynamicDataIO):
"""UnifiedIO subclass for cargo movements volumes data."""
41 changes: 41 additions & 0 deletions demo/src/io/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
__all__ = ["Foo", "Bar", "StagedFoo", "StagedBar", "FinalFoo", "FinalBar"]

from pandera import SchemaModel, String, Float, Field
from pandera.typing import Series


class Foo(SchemaModel):
column_a: Series[String] = Field(unique=True, report_duplicates="all", logging={"metrics": ["Counts"], "dataset_name": "Foo", "column": "column_a"})
column_b: Series[String] = Field(nullable=False, logging={"metrics": ["CountsPerLabel"], "dataset_name": "Foo", "column": "column_a"})
column_c: Series[Float] = Field(gt=1000)
column_d: Series[Float] = Field(lt=1000, logging={"metrics": ["Min", "Max", "Mean", "Std", "Variance"], "dataset_name": "Foo", "column": "column_a"})

class Bar(SchemaModel):
column_a: Series[String] = Field(unique=True, report_duplicates="all", logging={"metrics": ["Counts"], "dataset_name": "Foo", "column": "column_a"})
column_b: Series[String] = Field(nullable=False, logging={"metrics": ["CountsPerLabel"], "dataset_name": "Foo", "column": "column_a"})
column_c: Series[Float] = Field(gt=1000)
column_d: Series[Float] = Field(lt=1000, logging={"metrics": ["Min", "Max", "Mean", "Std", "Variance"], "dataset_name": "Foo", "column": "column_a"})

class StagedFoo(SchemaModel):
column_a: Series[String] = Field(unique=True, report_duplicates="all", logging={"metrics": ["Counts"], "dataset_name": "Foo", "column": "column_a"})
column_b: Series[String] = Field(nullable=False, logging={"metrics": ["CountsPerLabel"], "dataset_name": "Foo", "column": "column_a"})
column_c: Series[Float] = Field(gt=1000)
column_d: Series[Float] = Field(lt=1000, logging={"metrics": ["Min", "Max", "Mean", "Std", "Variance"], "dataset_name": "Foo", "column": "column_a"})

class StagedBar(SchemaModel):
column_a: Series[String] = Field(unique=True, report_duplicates="all", logging={"metrics": ["Counts"], "dataset_name": "Foo", "column": "column_a"})
column_b: Series[String] = Field(nullable=False, logging={"metrics": ["CountsPerLabel"], "dataset_name": "Foo", "column": "column_a"})
column_c: Series[Float] = Field(gt=1000)
column_d: Series[Float] = Field(lt=1000, logging={"metrics": ["Min", "Max", "Mean", "Std", "Variance"], "dataset_name": "Foo", "column": "column_a"})

class FinalFoo(SchemaModel):
column_a: Series[String] = Field(unique=True, report_duplicates="all", logging={"metrics": ["Counts"], "dataset_name": "Foo", "column": "column_a"})
column_b: Series[String] = Field(nullable=False, logging={"metrics": ["CountsPerLabel"], "dataset_name": "Foo", "column": "column_a"})
column_c: Series[Float] = Field(gt=1000)
column_d: Series[Float] = Field(lt=1000, logging={"metrics": ["Min", "Max", "Mean", "Std", "Variance"], "dataset_name": "Foo", "column": "column_a"})

class FinalBar(SchemaModel):
column_a: Series[String] = Field(unique=True, report_duplicates="all", logging={"metrics": ["Counts"], "dataset_name": "Foo", "column": "column_a"})
column_b: Series[String] = Field(nullable=False, logging={"metrics": ["CountsPerLabel"], "dataset_name": "Foo", "column": "column_a"})
column_c: Series[Float] = Field(gt=1000)
column_d: Series[Float] = Field(lt=1000, logging={"metrics": ["Min", "Max", "Mean", "Std", "Variance"], "dataset_name": "Foo", "column": "column_a"})
27 changes: 21 additions & 6 deletions demo/src/runners/staging.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Add module docstring...."""
import logging

from demo.src import constants, input_config, raw_config
from demo.src.io import InputIO, StagedBar, StagedFoo
from demo.src import constants, staging_input_config, staging_output_config
from demo.src.io import InputIO, StagedFooIO, StagedBarIO
from demo.src.io.schemas import Bar, Foo

logger = logging.getLogger(__name__)

Expand All @@ -16,8 +17,13 @@ def main() -> None:
# LOAD DATA
logger.info("Loading data from live sources...")

bar_df = InputIO(source_config=input_config.get(source_key="BAR"), apply_schema_validations=True, log_schema_metrics=True).read()
foo_df = InputIO(source_config=input_config.get(source_key="FOO"), apply_schema_validations=True, log_schema_metrics=True).read()
bar_df = UnifiedIO.read(
resource_definition=staging_input_config.get(source_key="BAR"),
schema=Bar,
apply_schema_validations=True,
log_schema_metrics=True
)
foo_df = InputIO(resource_definition=staging_input_config.get(source_key="FOO"), schema=Foo, apply_schema_validations=True, log_schema_metrics=True).read()

logger.info("Data successfully loaded from live sources...")

Expand All @@ -30,6 +36,15 @@ def main() -> None:

# SINK DATA
logger.info("Begin sinking data to staging area:")
StagedFoo(source_config=raw_config.get(source_key="STAGED_FOO"), **constants.TO_PARQUET_KWARGS).write(foo_df)
StagedBar(source_config=raw_config.get(source_key="STAGED_BAR")).write(bar_df)
UnifiedIO(resource_definition=staging_output_config.get(source_key="STAGED_FOO"), **constants.TO_PARQUET_KWARGS).write(foo_df)
UnifiedIO(resource_definition=staging_output_config.get(source_key="STAGED_BAR")).write(bar_df)
logger.info("Data staging is complete...")



# 1. Use UnifiedIO as IO class for loading writing data
# 2. Introduce pandera schema as extra field in UnifiedIO parameters
# 3. Write script to generate pandera schema from dataframe (extra propose logging and validations too)
# 4. Write script to migrate from yamls schemas to pandera schemas
# 5. Write validations that are not available through pandera standard validations but we use after we identify what is missing!
# 6. Confirm everything works
12 changes: 7 additions & 5 deletions demo/src/runners/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import logging

import demo.src.environment
from demo.src import processed_config, raw_config
from demo.src.io import InputIO, StagedBar, StagedFoo
from demo.src import transform_input_config, transform_output_config
from demo.src.io import InputIO, StagedFooIO, StagedBarIO
from demo.src.io.schemas import FinalFoo, FinalBar, StagedBar, StagedFoo

logger = logging.getLogger(__name__)

Expand All @@ -19,7 +20,8 @@ async def main() -> None:
logger.info("Loading data from live sources...")

[bar_df, foo_df] = await asyncio.gather(
StagedBar(source_config=raw_config.get(source_key="STAGED_BAR")).async_read(), StagedFoo(source_config=raw_config.get(source_key="STAGED_FOO")).async_read()
StagedBarIO(resource_definition=transform_input_config.get(source_key="STAGED_BAR"), schema=StagedBar).async_read(),
StagedFooIO(resource_definition=transform_input_config.get(source_key="STAGED_FOO"), schema=StagedFoo).async_read()
)

logger.info("Data successfully loaded from live sources...")
Expand All @@ -34,7 +36,7 @@ async def main() -> None:
# SINK DATA
logger.info(f"Begin sinking data to staging area: S3:{demo.src.environment.S3_YOUR_OUTPUT_BUCKET}:live/data/raw")
await asyncio.gather(
InputIO(source_config=processed_config.get(source_key="FINAL_FOO"), apply_schema_validations=True, log_schema_metrics=True).async_write(foo_df),
InputIO(source_config=processed_config.get(source_key="FINAL_BAR"), apply_schema_validations=True, log_schema_metrics=True).async_write(bar_df),
InputIO(resource_definition=transform_output_config.get(source_key="FINAL_FOO"), schema=FinalFoo, apply_schema_validations=True, log_schema_metrics=True).async_write(foo_df),
InputIO(resource_definition=transform_output_config.get(source_key="FINAL_BAR"), schema=FinalBar, apply_schema_validations=True, log_schema_metrics=True).async_write(bar_df),
)
logger.info("Data staging is complete...")
14 changes: 14 additions & 0 deletions demo/tests/test_pandera_validations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import pandas as pd

from demo.src.io import Foo


def test_pandera_validations():

# Given
df = pd.read_csv("/Users/chadjinik/Github/Vortexa/dynamicio/demo/tests/data/input/foo.csv")

# When
Foo.validate(df)

# Then
Loading