Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore: New example sync to snowflake Cortex #350

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

aaronsteers
Copy link
Contributor

@aaronsteers aaronsteers commented Sep 1, 2024

Summary by CodeRabbit

  • New Features

    • Introduced a new script for integrating with Snowflake as a destination, demonstrating data sourcing and credential management.
    • Added a specific error class for handling broken pipe scenarios, enhancing error reporting.
  • Bug Fixes

    • Improved error handling in various methods to provide clearer context and diagnostics for failures.
  • Documentation

    • Updated documentation strings for error classes to better reflect their functionality and usage.
  • Chores

    • Enhanced error reporting capabilities by adding attributes to error classes for better context during exceptions.

Copy link

coderabbitai bot commented Sep 1, 2024

Walkthrough

Walkthrough

The changes involve enhancements to error handling across multiple files in the Airbyte framework. Modifications include the introduction of specific exception classes, adjustments to exception propagation, and refinements in how errors are logged and reported. A new example script for integrating with Snowflake as a destination has also been added, demonstrating the setup and configuration processes for data integration.

Changes

Files Change Summary
airbyte/_connector_base.py, airbyte/_executors/base.py Updated error handling logic to improve clarity and context in exceptions. Introduced handling for AirbyteConnectorFailedError, AirbyteSubprocessFailedError, and AirbyteConnectorBrokenPipeError. Consistent use of original_exception for better error reporting.
airbyte/destinations/base.py Modified exception handling in _write_airbyte_message_stream to decouple the original exception from the raised AirbyteConnectorWriteError, potentially affecting error logging and reporting.
airbyte/exceptions.py Enhanced PyAirbyteError and its subclasses with a new original_exception attribute and added AirbyteConnectorBrokenPipeError class. Updated guidance messages and documentation strings for clarity in error handling.
examples/run_snowflake_destination.py Introduced a new script for integrating with Snowflake, detailing the setup of sources, destinations, and handling of sensitive credentials, along with example data writing and reading operations.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant AirbyteConnector
    participant Subprocess
    participant ExceptionHandler

    User->>AirbyteConnector: Initiate check
    AirbyteConnector->>ExceptionHandler: Handle AirbyteConnectorFailedError
    ExceptionHandler->>AirbyteConnector: Log original_exception
    AirbyteConnector->>Subprocess: Execute subprocess
    Subprocess-->>AirbyteConnector: Return exit_code
    AirbyteConnector->>ExceptionHandler: Handle AirbyteSubprocessFailedError
    ExceptionHandler->>AirbyteConnector: Log exit_code and original_exception
Loading

Wdyt?


Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between b048edc and ddd699e.

Files selected for processing (1)
  • examples/run_snowflake_destination.py (1 hunks)
Files skipped from review as they are similar to previous changes (1)
  • examples/run_snowflake_destination.py

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai or @coderabbitai title anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Outside diff range, codebase verification and nitpick comments (4)
examples/run_snowflake_destination.py (3)

8-34: Consider parameterizing the GCP project name?

The script looks great for demonstrating Snowflake integration! I noticed the GCP project name is hardcoded. For a demo script, this is totally fine, but what do you think about making it a parameter? This could make the script more flexible for different demo scenarios. Wdyt?

If you agree, here's a simple way to do it:

import os

AIRBYTE_INTERNAL_GCP_PROJECT = os.getenv("GCP_PROJECT", "dataline-integration-testing")

This way, you can easily override the project name when running the script, like:

GCP_PROJECT=my-demo-project poetry run python examples/run_snowflake_destination.py

74-87: Streamline the main flow?

I like how you're checking the destinations and source before writing. That's a good practice to show in a demo. However, I noticed there's some commented-out code for writing to the Snowflake destination. For a clear demo flow, what do you think about removing the commented sections and focusing on the main Cortex write operation? This could make the script's purpose more immediately clear to readers. Wdyt?

If you agree, we could simplify this section to something like:

snowflake_destination.check()
cortex_destination.check()
source.check()

cortex_write_result = cortex_destination.write(
    source,
    cache=False,
)

This keeps the focus on the main flow of the demo.


89-92: Clarify the purpose of the commented section?

I noticed there's a commented-out section at the end for reading from the cache and printing stream information. While this could be useful for debugging or extending the demo, it might be a bit confusing in the main script. What do you think about either removing this section or adding a clear comment explaining its purpose and how it could be used? This could help readers understand the full capabilities of the script without cluttering the main flow. Wdyt?

If you decide to keep it, maybe add a comment like:

# Uncomment the following lines to read from the cache and print stream information
# result = source.read(cache)
# for name in ["products"]:
#     print(f"Stream {name}: {len(list(result[name]))} records")

This way, it's clear how this section could be used without distracting from the main demo.

airbyte/_executors/base.py (1)

45-48: Nice addition of specific error handling for BrokenPipeError!

This change improves the robustness of the input pumping process. Would you consider adding a brief comment explaining why we're treating BrokenPipeError differently? It might help future maintainers understand the rationale. Something like:

# BrokenPipeError is expected when the subprocess closes its input stream
# before we finish writing. We handle this gracefully to allow normal termination.

What do you think? This could make the code even more self-documenting. WDYT?

Comment on lines 45 to 70
cache = SnowflakeCache(
account=secret_config["account"],
username=secret_config["username"],
password=secret_config["password"],
database=secret_config["database"],
warehouse=secret_config["warehouse"],
role=secret_config["role"],
)
snowflake_destination = ab.get_destination(
"destination-snowflake",
config={
**snowflake_destination_secret,
"default_schema": "pyairbyte_tests",
},
)
cortex_destination_secret["processing"]["text_fields"] = [
"make",
"model",
"name",
"gender",
]
cortex_destination_secret["indexing"]["default_schema"] = "pyairbyte_tests"
cortex_destination = ab.get_destination(
"destination-snowflake-cortex",
config=cortex_destination_secret,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider focusing on one destination type?

The setup for both Snowflake and Cortex destinations is comprehensive. For a demo script, though, we might want to keep things as simple as possible. What do you think about focusing on just one destination type? Maybe we could have separate example scripts for Snowflake and Cortex? This could make each example more focused and easier to follow. Wdyt?

If you agree, we could either:

  1. Keep just the Snowflake destination and remove the Cortex-specific code.
  2. Keep just the Cortex destination and remove the regular Snowflake destination code.
  3. Create two separate example scripts, one for each destination type.

Comment on lines 91 to 93
if exception_holder.exception and not isinstance(
exception_holder.exception, exc.AirbyteConnectorBrokenPipeError
):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great refinement of the error handling logic!

I like how you've improved the control flow by checking for AirbyteConnectorBrokenPipeError. This change enhances the overall error management.

For the conditional inclusion of the original exception, would you consider extracting the condition into a separate variable for improved readability? Something like:

is_reportable_exception = (
    exception_holder.exception
    and not isinstance(exception_holder.exception, exc.AirbyteConnectorBrokenPipeError)
)

raise exc.AirbyteSubprocessFailedError(
    run_args=args,
    exit_code=exit_code,
    original_exception=exception_holder.exception if is_reportable_exception else None,
)

This could make the logic a bit clearer at a glance. What are your thoughts on this suggestion?

Also applies to: 142-148

airbyte/exceptions.py Outdated Show resolved Hide resolved
@aaronsteers
Copy link
Contributor Author

aaronsteers commented Sep 6, 2024

/fix-pr

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.
(This job requires that the PR author has "Allow edits from maintainers" enabled.)

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant