Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: (airbyte-protocol) add new record message type #102

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,25 @@ definitions:
- SOURCE_RETRIEVAL_ERROR
# Errors casting to appropriate type
- DESTINATION_TYPECAST_ERROR
AirbyteFileTransferMessage:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this used? I don't see any reference to this definition

type: object
additionalProperties: true
required:
- stream
- file
- emitted_at
properties:
namespace:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It feels weird to me to see fields like namespace and stream (which I assume is stream_name) as it feels like the StreamDescriptor concept we already have. Why don't we re-use it?

Copy link
Author

@aldogonzalez8 aldogonzalez8 Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels weird to me to see fields like namespace.

Revisiting namespace is not needed; I will remove it; thanks for making me give a second check on this.

stream (which I assume is stream_name) as it feels like the StreamDescriptor concept we already have. Why don't we re-use it?

Well, we could, but I think it would require us to make changes on the platform (and destination?) side, as the current structure is similar to a record, where stream is a string rather than an object.

File record sample:

{
  "type": "RECORD",
  "record": {
    "stream": "random_data_10",
    "file": {
      "file_url": "/tmp/airbyte-file-transfer/5GB_Files/random_data_10_1.csv",
      "bytes": 996913046,
      "file_relative_path": "5GB_Files//random_data_10_1.csv",
      "modified": 1729720256000,
      "source_file_url": "5GB_Files//random_data_10_1.csv"
    },
    "emitted_at": 1731446522971,
    "data": {}
  }
}

Regular record sample

{
  "type": "RECORD",
  "record": {
    "stream": "random_data_10",
    "data": {
      "name": "Daniel Johnson",
      "age": 36,
      "state": "California",
      "nationality": "Albania",
      "race": "HoneyDew",
      "_ab_source_file_last_modified": "2024-10-23T15:50:56.000000Z",
      "_ab_source_file_url": "5GB_Files//random_data_10_1.csv"
    },
    "emitted_at": 1731512134588
  }
}

Stream descriptor from state message:

{"stream_descriptor":{"name":"random_data_10"}

Currently StreamDescriptor is used for AirbyteStreamState, AirbyteErrorTraceMessage, AirbyteStreamStatusTraceMessage. It would depend on platform expectations if we can simply change this or coordinate changes.

cc @benmoriceau @tryangul

description: "namespace the data is associated with"
type: string
stream:
description: "stream the data is associated with"
type: string
file:
"$ref": "#/definitions/AirbyteFileType"
emitted_at:
description: "when the data was emitted from the source. epoch in millisecond."
type: integer
AirbyteStateMessage:
type: object
additionalProperties: true
Expand Down Expand Up @@ -176,6 +195,30 @@ definitions:
type: array
items:
"$ref": "#/definitions/AirbyteStreamState"
AirbyteFileType:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: What is Type referring to in this name? When I hear "type", I think about an enum like failure_type here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fair, I think i can reuse the idea of a descriptor which makes more sense, then we can have AirbyteFileDescriptor

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be a breaking change considering the current implementation?

Copy link
Author

@aldogonzalez8 aldogonzalez8 Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be a breaking change considering the current implementation?

Do you mean this ref for this comment? No

Introducing the new message to protocol? It depends, it would be replacing this message in cdk.

Renaming keys or changing expected structure could lead to changes on the cdk (not a big problem) and platform/destination (more problematic) , I think it wouldn't be a change to impact on connectors but may we depend on coordinate to introduce such changes.

type: object
additionalProperties: true
required:
- file_url
- file_relative_path
- modified
- bytes
properties:
file_url:
description: "local path to file"
type: string
file_relative_path:
description: "relative path to file"
type: string
source_file_url:
description: "source file path"
type: string
modified:
description: "file modified date. epoch in millisecond"
type: string
bytes:
description: "file size in bytes"
type: integer
StreamDescriptor:
type: object
additionalProperties: true
Expand Down
Loading