-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[source-amplitude] - Migrate to manifest-only #51601
Changes from 26 commits
629a6fa
a9814e7
c1c6df4
ef971e5
5d65756
5b7ed37
370bc60
ce974f9
e692e84
9eab2a2
9e1b952
cdd165e
093ab6e
5a4b301
363e66f
43081c0
19be117
78910cd
99b223c
9b5e5c6
0e31378
c294da4
f6db2b6
63c6074
eccb6f7
06a33bf
45100a9
0489051
37cb3e6
b097d59
f591a66
14d3bcb
afb747a
619a179
87273ac
48e2235
20bb3b6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
# | ||
# Copyright (c) 2025 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
import logging | ||
from dataclasses import dataclass | ||
from typing import Any, Dict, List, Mapping, MutableMapping, Optional | ||
|
||
import pendulum | ||
import requests | ||
|
||
from airbyte_cdk.models import FailureType | ||
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor | ||
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration | ||
from airbyte_cdk.sources.declarative.schema import JsonFileSchemaLoader | ||
from airbyte_cdk.sources.declarative.transformations import RecordTransformation | ||
from airbyte_cdk.sources.declarative.types import Config, Record | ||
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState | ||
from airbyte_cdk.utils import AirbyteTracedException | ||
|
||
|
||
logger = logging.getLogger("airbyte") | ||
|
||
|
||
class AverageSessionLengthRecordExtractor(RecordExtractor): | ||
""" | ||
Create records from complex response structure | ||
Issue: https://github.com/airbytehq/airbyte/issues/23145 | ||
""" | ||
|
||
def extract_records(self, response: requests.Response) -> List[Record]: | ||
response_data = response.json().get("data", []) | ||
if response_data: | ||
# From the Amplitude documentation it follows that "series" is an array with one element which is itself | ||
# an array that contains the average session length for each day. | ||
# https://developers.amplitude.com/docs/dashboard-rest-api#returns-2 | ||
series = response_data.get("series", []) | ||
if len(series) > 0: | ||
series = series[0] # get the nested list | ||
return [{"date": date, "length": length} for date, length in zip(response_data["xValues"], series)] | ||
return [] | ||
|
||
|
||
class ActiveUsersRecordExtractor(RecordExtractor): | ||
""" | ||
Create records from complex response structure | ||
Issue: https://github.com/airbytehq/airbyte/issues/23145 | ||
""" | ||
|
||
def extract_records(self, response: requests.Response) -> List[Record]: | ||
response_data = response.json().get("data", []) | ||
if response_data: | ||
series = list(zip(*response_data["series"])) | ||
if series: | ||
return [ | ||
{"date": date, "statistics": dict(zip(response_data["seriesLabels"], users))} | ||
for date, users in zip(response_data["xValues"], series) | ||
] | ||
return [] | ||
|
||
|
||
class TransformDatetimesToRFC3339(RecordTransformation): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit, potential improvement for a follow-up PR: I think this is possible to do in the Builder already, no? Do we not support date transforms in jinja? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Per convo I'll make this a follow-up task so we can remove the custom component. |
||
def __init__(self): | ||
self.name = "events" | ||
self.date_time_fields = [ | ||
"event_time", | ||
"server_upload_time", | ||
"processed_time", | ||
"server_received_time", | ||
"user_creation_time", | ||
"client_upload_time", | ||
"client_event_time", | ||
] | ||
|
||
def transform( | ||
self, | ||
record: Dict[str, Any], | ||
config: Optional[Config] = None, | ||
stream_state: Optional[StreamState] = None, | ||
stream_slice: Optional[StreamSlice] = None, | ||
) -> None: | ||
""" | ||
Transform 'date-time' items to RFC3339 format | ||
""" | ||
for item in record: | ||
if item in self.date_time_fields and record[item]: | ||
try: | ||
record[item] = pendulum.parse(record[item]).to_rfc3339_string() | ||
except Exception as e: | ||
logger.error(f"Error converting {item} to RFC3339 format: {e}") | ||
raise AirbyteTracedException( | ||
message=f"Error converting {item} to RFC3339 format. See logs for more infromation", | ||
internal_message=f"Error converting {item} to RFC3339 format: {e}", | ||
failure_type=FailureType.system_error, | ||
) from e | ||
return record |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
# | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
# Copyright (c) 2025 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
import json | ||
|
@@ -11,6 +11,7 @@ | |
|
||
from airbyte_cdk.models import SyncMode | ||
from airbyte_cdk.sources.declarative.types import StreamSlice | ||
from airbyte_cdk.test.catalog_builder import CatalogBuilder | ||
|
||
|
||
@pytest.fixture(scope="module") | ||
|
@@ -21,7 +22,13 @@ def config(): | |
|
||
@pytest.fixture(scope="module") | ||
def streams(config): | ||
return SourceAmplitude().streams(config=config) | ||
catalog = ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Out of curiosity, how does that work? Why is this needed? How are they usually configured? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated -- accidentally pushed before completing. |
||
CatalogBuilder() | ||
.with_stream("annotations_stream", sync_mode=SyncMode.full_refresh) | ||
.with_stream("cohorts_stream", sync_mode=SyncMode.full_refresh) | ||
.build() | ||
) | ||
return SourceAmplitude(catalog=catalog, config=config, state={}).streams(config=config) | ||
|
||
|
||
@pytest.fixture(scope="module") | ||
|
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the docs link to the same issue, I'd love to have longer, more descriptive docstrings that explain what the custom component does, and why do we have to use it.
I know this might not be related to the migration itself as you're just moving them over.