Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: (airbyte-protocol) add new record message type #102

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

aldogonzalez8
Copy link

@aldogonzalez8 aldogonzalez8 commented Nov 12, 2024

Context

To be competitive with very large data volumes, we have no other option than to add file-based bulk data transfers.

image

How

Through the CDK, we will leverage the file-based connector's ability to write a file locally instead of sending records and emit a new record message with metadata related to the file.

{
  "type": "RECORD",
  "record": {
    "stream": "simpsons_locations",
    "file": {
      "file_url": "/tmp/airbyte-file-transfer/sftp-testing-for-file-transfer/sftp-folder/simpsons_locations.csv",
      "bytes": 182776,
      "file_relative_path": "sftp-testing-for-file-transfer/sftp-folder/simpsons_locations.csv",
      "modified": 1728330963000,
      "source_file_url": "//sftp-testing-for-file-transfer/sftp-folder/simpsons_locations.csv"
    },
    "emitted_at": 1730226374632,
    "data": {}
  }
}

Now we want to formerly move this message in the cdk to airbyte protocol.

Here, you can find a ref to the documentation of the project.

How would be used in python (CDK)?

Initially, this should replace this file so we don't import this but get from protocol library in this other import. Now, final implementation could require more work if we decide to change keys naming and structure.

Also, we could change this dataclass to have a new file key rather than threat as another record class.

Note:

We agree to wait a little bit here, but it's nice to have some point to start talking about for future change, so there's no rush on this.

@aldogonzalez8 aldogonzalez8 self-assigned this Nov 12, 2024
@aldogonzalez8 aldogonzalez8 changed the title Airbyte-protocol: add new record message type Feat(airbyte-protocol): add new record message type Nov 12, 2024
@aldogonzalez8 aldogonzalez8 changed the title Feat(airbyte-protocol): add new record message type Feat:(airbyte-protocol): add new record message type Nov 12, 2024
@aldogonzalez8 aldogonzalez8 changed the title Feat:(airbyte-protocol): add new record message type Feat: (airbyte-protocol) add new record message type Nov 12, 2024
@aldogonzalez8 aldogonzalez8 changed the title Feat: (airbyte-protocol) add new record message type feat: (airbyte-protocol) add new record message type Nov 12, 2024
description: "transfer file data"
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
data:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data is not needed

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I recall, we agreed to have an empty data key, but if not really needed, I'm happy to 🪓

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

Copy link

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we document how this will be used in Python so that I understand the full context?

@@ -176,6 +195,30 @@ definitions:
type: array
items:
"$ref": "#/definitions/AirbyteStreamState"
AirbyteFileType:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: What is Type referring to in this name? When I hear "type", I think about an enum like failure_type here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fair, I think i can reuse the idea of a descriptor which makes more sense, then we can have AirbyteFileDescriptor

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be a breaking change considering the current implementation?

Copy link
Author

@aldogonzalez8 aldogonzalez8 Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be a breaking change considering the current implementation?

Do you mean this ref for this comment? No

Introducing the new message to protocol? It depends, it would be replacing this message in cdk.

Renaming keys or changing expected structure could lead to changes on the cdk (not a big problem) and platform/destination (more problematic) , I think it wouldn't be a change to impact on connectors but may we depend on coordinate to introduce such changes.

- file
- emitted_at
properties:
namespace:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It feels weird to me to see fields like namespace and stream (which I assume is stream_name) as it feels like the StreamDescriptor concept we already have. Why don't we re-use it?

Copy link
Author

@aldogonzalez8 aldogonzalez8 Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels weird to me to see fields like namespace.

Revisiting namespace is not needed; I will remove it; thanks for making me give a second check on this.

stream (which I assume is stream_name) as it feels like the StreamDescriptor concept we already have. Why don't we re-use it?

Well, we could, but I think it would require us to make changes on the platform (and destination?) side, as the current structure is similar to a record, where stream is a string rather than an object.

File record sample:

{
  "type": "RECORD",
  "record": {
    "stream": "random_data_10",
    "file": {
      "file_url": "/tmp/airbyte-file-transfer/5GB_Files/random_data_10_1.csv",
      "bytes": 996913046,
      "file_relative_path": "5GB_Files//random_data_10_1.csv",
      "modified": 1729720256000,
      "source_file_url": "5GB_Files//random_data_10_1.csv"
    },
    "emitted_at": 1731446522971,
    "data": {}
  }
}

Regular record sample

{
  "type": "RECORD",
  "record": {
    "stream": "random_data_10",
    "data": {
      "name": "Daniel Johnson",
      "age": 36,
      "state": "California",
      "nationality": "Albania",
      "race": "HoneyDew",
      "_ab_source_file_last_modified": "2024-10-23T15:50:56.000000Z",
      "_ab_source_file_url": "5GB_Files//random_data_10_1.csv"
    },
    "emitted_at": 1731512134588
  }
}

Stream descriptor from state message:

{"stream_descriptor":{"name":"random_data_10"}

Currently StreamDescriptor is used for AirbyteStreamState, AirbyteErrorTraceMessage, AirbyteStreamStatusTraceMessage. It would depend on platform expectations if we can simply change this or coordinate changes.

cc @benmoriceau @tryangul

@@ -122,6 +122,25 @@ definitions:
- SOURCE_RETRIEVAL_ERROR
# Errors casting to appropriate type
- DESTINATION_TYPECAST_ERROR
AirbyteFileTransferMessage:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this used? I don't see any reference to this definition

@aldogonzalez8
Copy link
Author

How would be used in python (CDK)?

I added a section How would be used in python (CDK)? in the PR description.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants