Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] I would like to have the capability develop using python classes instead of acons #14

Open
callmesora opened this issue Jun 27, 2024 · 1 comment
Assignees
Labels
enhancement New feature or request

Comments

@callmesora
Copy link

callmesora commented Jun 27, 2024

Is your feature request related to a problem? Please describe.
Acons are harder to debug and to be validated on the user end.
Doing a class based solution would allow us to use validations for example with pydantic and have a better flow as a developer and allow the use of a debuger
Describe the solution you'd like

This is a rough sketch (classes I used here don't make much sense) just to get the idea but something along this line of

def main():
    options = Options(
        badRecordsPath="s3://my-data-product-bucket/badrecords/order_events_with_dq/",
        header=False,
        delimiter="\u005E",
        dateFormat="yyyyMMdd"
    )

    input_spec = InputSpec(
        spec_id="orders_bronze",
        read_type="streaming",
        data_format="csv",
        schema_path="s3://my-data-product-bucket/artefacts/metadata/bronze/schemas/orders.json",
        with_filepath=True,
        options=options,
        location="s3://my-data-product-bucket/bronze/orders/"
    )

    transformers = [
        Transformer(function="with_row_id"),
        Transformer(
            function="with_regex_value",
            args={
                "input_col": "lhe_extraction_filepath",
                "output_col": "extraction_date",
                "drop_input_col": True,
                "regex": ".*WE_SO_SCL_(\\d+).csv"
            }
        )
    ]

    transform_spec = TransformSpec(
        spec_id="orders_bronze_with_extraction_date",
        input_id="orders_bronze",
        transformers=transformers
    )

    dq_functions = [
        DQFunction(dq_function="expect_column_values_to_not_be_null", args={"column": "omnihub_locale_code"}),
        DQFunction(dq_function="expect_column_unique_value_count_to_be_between", args={"column": "product_division", "min_value": 10, "max_value": 100}),
        DQFunction(dq_function="expect_column_max_to_be_between", args={"column": "so_net_value", "min_value": 10, "max_value": 1000}),
        DQFunction(dq_function="expect_column_value_lengths_to_be_between", args={"column": "omnihub_locale_code", "min_value": 1, "max_value": 10}),
        DQFunction(dq_function="expect_column_mean_to_be_between", args={"column": "coupon_code", "min_value": 15, "max_value": 20})
    ]

    dq_spec = DQSpec(
        spec_id="check_orders_bronze_with_extraction_date",
        input_id="orders_bronze_with_extraction_date",
        dq_type="validator",
        result_sink_db_table="my_database.my_table_dq_checks",
        fail_on_error=False,
        dq_functions=dq_functions
    )

    merge_options = MergeOptions(
        merge_predicate="""
            new.sales_order_header = current.sales_order_header
            and new.sales_order_schedule = current.sales_order_schedule
            and new.sales_order_item=current.sales_order_item
            and new.epoch_status=current.epoch_status
            and new.changed_on=current.changed_on
            and new.extraction_date=current.extraction_date
            and new.lhe_batch_id=current.lhe_batch_id
            and new.lhe_row_id=current.lhe_row_id
        """,
        insert_only=True
    )

    output_spec = OutputSpec(
        spec_id="orders_silver",
        input_id="check_orders_bronze_with_extraction_date",
        data_format="delta",
        write_type="merge",
        partitions=["order_date_header"],
        merge_opts=merge_options,
        db_table="my_database.my_table_with_dq",
        location="s3://my-data-product-bucket/silver/order_events_with_dq/",
        with_batch_id=True,
        options={"checkpointLocation": "s3://my-data-product-bucket/checkpoints/order_events_with_dq/"}
    )

    terminate_spec = TerminateSpec(
        function="optimize_dataset",
        args={"db_table": "my_database.my_table_with_dq"}
    )

    exec_env = ExecEnv(spark_databricks_delta_schema_autoMerge_enabled=True)

    engine = LakehouseEngine(
        input_specs=[input_spec],
        transform_specs=[transform_spec],
        dq_specs=[dq_spec],
        output_specs=[output_spec],
        terminate_specs=[terminate_spec],
        exec_env=exec_env
    )

    engine.run()

if __name__ == "__main__":
    main()

As oposed to full acon based:

from [lakehouse_engine.engine](https://adidas.github.io/lakehouse-engine-docs/lakehouse_engine/engine.html) import load_data

acon = {
  "input_specs": [
    {
      "spec_id": "orders_bronze",
      "read_type": "streaming",
      "data_format": "csv",
      "schema_path": "s3://my-data-product-bucket/artefacts/metadata/bronze/schemas/orders.json",
      "with_filepath": True,
      "options": {
        "badRecordsPath": "s3://my-data-product-bucket/badrecords/order_events_with_dq/",
        "header": False,
        "delimiter": "\u005E",
        "dateFormat": "yyyyMMdd"
      },
      "location": "s3://my-data-product-bucket/bronze/orders/"
    }
  ],
  "transform_specs": [
    {
      "spec_id": "orders_bronze_with_extraction_date",
      "input_id": "orders_bronze",
      "transformers": [
        {
          "function": "with_row_id"
        },
        {
          "function": "with_regex_value",
          "args": {
            "input_col": "lhe_extraction_filepath",
            "output_col": "extraction_date",
            "drop_input_col": True,
            "regex": ".*WE_SO_SCL_(\\d+).csv"
          }
        }
      ]
    }
  ],
  "dq_specs": [
    {
      "spec_id": "check_orders_bronze_with_extraction_date",
      "input_id": "orders_bronze_with_extraction_date",
      "dq_type": "validator",
      "result_sink_db_table": "my_database.my_table_dq_checks",
      "fail_on_error": False,
      "dq_functions": [
        {
          "dq_function": "expect_column_values_to_not_be_null",
          "args": {
            "column": "omnihub_locale_code"
          }
        },
        {
          "dq_function": "expect_column_unique_value_count_to_be_between",
          "args": {
            "column": "product_division",
            "min_value": 10,
            "max_value": 100
          }
        },
        {
          "dq_function": "expect_column_max_to_be_between",
          "args": {
            "column": "so_net_value",
            "min_value": 10,
            "max_value": 1000
          }
        },
        {
          "dq_function": "expect_column_value_lengths_to_be_between",
          "args": {
            "column": "omnihub_locale_code",
            "min_value": 1,
            "max_value": 10
          }
        },
        {
          "dq_function": "expect_column_mean_to_be_between",
          "args": {
            "column": "coupon_code",
            "min_value": 15,
            "max_value": 20
          }
        }
      ]
    }
  ],
  "output_specs": [
    {
      "spec_id": "orders_silver",
      "input_id": "check_orders_bronze_with_extraction_date",
      "data_format": "delta",
      "write_type": "merge",
      "partitions": [
        "order_date_header"
      ],
      "merge_opts": {
        "merge_predicate": """
            new.sales_order_header = current.sales_order_header
            and new.sales_order_schedule = current.sales_order_schedule
            and new.sales_order_item=current.sales_order_item
            and new.epoch_status=current.epoch_status
            and new.changed_on=current.changed_on
            and new.extraction_date=current.extraction_date
            and new.lhe_batch_id=current.lhe_batch_id
            and new.lhe_row_id=current.lhe_row_id
        """,
        "insert_only": True
      },
      "db_table": "my_database.my_table_with_dq",
      "location": "s3://my-data-product-bucket/silver/order_events_with_dq/",
      "with_batch_id": True,
      "options": {
        "checkpointLocation": "s3://my-data-product-bucket/checkpoints/order_events_with_dq/"
      }
    }
  ],
  "terminate_specs": [
    {
      "function": "optimize_dataset",
      "args": {
        "db_table": "my_database.my_table_with_dq"
      }
    }
  ],
  "exec_env": {
    "spark.databricks.delta.schema.autoMerge.enabled": True
  }
}

load_data(acon=acon)
@callmesora callmesora added the enhancement New feature or request label Jun 27, 2024
@jmcorreia
Copy link
Contributor

Hi @callmesora,

thanks for opening the idea for a new Feature and for the sketch/suggestion 💯. This is definitely something that I feel would make lots of sense and I have been having it on my personal backlog for a while, but which did not collect enough interest or demand to pursue, for now.
The idea would always be to keep the current approach, but also provide an additional interface (along the suggested lines). We actually already have some of the ingredients there, so the work needed is really on this "interface" solely.

I will keep this open, so that we can collect more interest or even ideas until we have enough capacity to take it. Otherwise, please also feel free to contribute and I can help you out if you face any problems or want to discuss it further.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants