Skip to content

Commit

Permalink
Source SalesForce: Add Stream Slice Step option to specification (#35421
Browse files Browse the repository at this point in the history
)

Signed-off-by: Artem Inzhyyants <[email protected]>
  • Loading branch information
artem1205 authored and xiaohansong committed Feb 27, 2024
1 parent 637dd11 commit 0899722
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ acceptance_tests:
future_state:
future_state_path: "integration_tests/future_state.json"
timeout_seconds: 7200
# skip incremental tests as filter condition greater than or equal is used, so last record for any stream state is duplicated
skip_comprehensive_incremental_tests: true
full_refresh:
tests:
- config_path: "secrets/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
{"stream": "ActiveProfileMetric", "data": {"Id": "5H04W00000U3Ph4SAF", "MetricsDate": "2023-10-22", "UserLicenseId": "1004W000001gXv2QAE", "ProfileId": "00e4W000002LjMoQAK", "SystemModstamp": "2023-10-22T05:59:12.000Z", "AssignedUserCount": 0, "ActiveUserCount": 0}, "emitted_at": 1698150320258}
{"stream": "ActiveProfileMetric", "data": {"Id": "5H04W00000U3Ph5SAF", "MetricsDate": "2023-10-22", "UserLicenseId": "1004W000001gXv3QAE", "ProfileId": "00e4W000002LjMqQAK", "SystemModstamp": "2023-10-22T05:59:12.000Z", "AssignedUserCount": 0, "ActiveUserCount": 0}, "emitted_at": 1698150320258}
{"stream": "ActiveProfileMetric", "data": {"Id": "5H04W00000U3Ph6SAF", "MetricsDate": "2023-10-22", "UserLicenseId": "1004W000001gXv4QAE", "ProfileId": "00e4W000002LjMrQAK", "SystemModstamp": "2023-10-22T05:59:12.000Z", "AssignedUserCount": 0, "ActiveUserCount": 0}, "emitted_at": 1698150320259}
{"stream": "AppDefinition", "data": {"Id": "000000000000000AAA", "DurableId": "06m4W000001ldIZQAY", "Label": "Sales", "MasterLabel": "salesforce", "NamespacePrefix": "standard", "DeveloperName": "Sales", "LogoUrl": "/img/salesforce-noname-logo-v2.svg", "Description": "The world's most popular sales force automation (SFA) solution", "UiType": "Aloha", "NavType": "Standard", "UtilityBar": null, "HeaderColor": "#0070D2", "IsOverrideOrgTheme": false, "IsSmallFormFactorSupported": false, "IsMediumFormFactorSupported": false, "IsLargeFormFactorSupported": false, "IsNavPersonalizationDisabled": false, "IsNavAutoTempTabsDisabled": false, "IsNavTabPersistenceDisabled": false}, "emitted_at": 1697452785550}
{"stream": "AppDefinition", "data": {"Id": "000000000000000AAA", "DurableId": "06m4W000001ldIdQAI", "Label": "Service", "MasterLabel": "supportforce", "NamespacePrefix": "standard", "DeveloperName": "Service", "LogoUrl": "/img/salesforce-noname-logo-v2.svg", "Description": "Manage customer service with accounts, contacts, cases, and more", "UiType": "Aloha", "NavType": "Standard", "UtilityBar": null, "HeaderColor": "#0070D2", "IsOverrideOrgTheme": false, "IsSmallFormFactorSupported": false, "IsMediumFormFactorSupported": false, "IsLargeFormFactorSupported": true, "IsNavPersonalizationDisabled": false, "IsNavAutoTempTabsDisabled": false, "IsNavTabPersistenceDisabled": false}, "emitted_at": 1697452785551}
{"stream": "AppDefinition", "data": {"Id": "000000000000000AAA", "DurableId": "06m4W000001ldIeQAI", "Label": "Marketing", "MasterLabel": "Marketing", "NamespacePrefix": "standard", "DeveloperName": "Marketing", "LogoUrl": "/img/salesforce-noname-logo-v2.svg", "Description": "Best-in-class on-demand marketing automation", "UiType": "Aloha", "NavType": "Standard", "UtilityBar": null, "HeaderColor": "#0070D2", "IsOverrideOrgTheme": false, "IsSmallFormFactorSupported": false, "IsMediumFormFactorSupported": false, "IsLargeFormFactorSupported": true, "IsNavPersonalizationDisabled": false, "IsNavAutoTempTabsDisabled": false, "IsNavTabPersistenceDisabled": false}, "emitted_at": 1697452785552}
{"stream":"AppDefinition","data":{"Id":"000000000000000AAA","DurableId":"06m4W000001ldIZQAY","Label":"Sales","MasterLabel":"salesforce","NamespacePrefix":"standard","DeveloperName":"Sales","LogoUrl":"/img/salesforce-noname-logo-v2.svg","Description":"The world's most popular sales force automation (SFA) solution","UiType":"Aloha","NavType":"Standard","UtilityBar":null,"HeaderColor":"#0070D2","IsOverrideOrgTheme":false,"IsSmallFormFactorSupported":false,"IsMediumFormFactorSupported":false,"IsLargeFormFactorSupported":false,"IsNavPersonalizationDisabled":false,"IsNavAutoTempTabsDisabled":false,"IsNavTabPersistenceDisabled":false},"emitted_at":1708425402368}
{"stream":"AppDefinition","data":{"Id":"000000000000000AAA","DurableId":"06m4W000001ldIdQAI","Label":"Service","MasterLabel":"supportforce","NamespacePrefix":"standard","DeveloperName":"Service","LogoUrl":"/img/salesforce-noname-logo-v2.svg","Description":"Manage customer service with accounts, contacts, cases, and more","UiType":"Aloha","NavType":"Standard","UtilityBar":null,"HeaderColor":"#0070D2","IsOverrideOrgTheme":false,"IsSmallFormFactorSupported":false,"IsMediumFormFactorSupported":false,"IsLargeFormFactorSupported":true,"IsNavPersonalizationDisabled":false,"IsNavAutoTempTabsDisabled":false,"IsNavTabPersistenceDisabled":false},"emitted_at":1708425402369}
{"stream":"AppDefinition","data":{"Id":"000000000000000AAA","DurableId":"06m4W000001ldIeQAI","Label":"Marketing CRM Classic","MasterLabel":"Marketing","NamespacePrefix":"standard","DeveloperName":"Marketing","LogoUrl":"/img/salesforce-noname-logo-v2.svg","Description":"Track sales and marketing efforts with CRM objects.","UiType":"Aloha","NavType":"Standard","UtilityBar":null,"HeaderColor":"#0070D2","IsOverrideOrgTheme":false,"IsSmallFormFactorSupported":false,"IsMediumFormFactorSupported":false,"IsLargeFormFactorSupported":true,"IsNavPersonalizationDisabled":false,"IsNavAutoTempTabsDisabled":false,"IsNavTabPersistenceDisabled":false},"emitted_at":1708425402369}
{"stream": "Asset", "data": {"attributes": {"type": "Asset", "url": "/services/data/v57.0/sobjects/Asset/02i4W00000EkJspQAF"}, "Id": "02i4W00000EkJspQAF", "ContactId": null, "AccountId": "0014W00002DkoWNQAZ", "ParentId": null, "RootAssetId": "02i4W00000EkJspQAF", "Product2Id": null, "ProductCode": null, "IsCompetitorProduct": false, "CreatedDate": "2021-01-18T21:44:57.000+0000", "CreatedById": "0054W00000BZkk0QAD", "LastModifiedDate": "2021-01-18T21:44:57.000+0000", "LastModifiedById": "0054W00000BZkk0QAD", "SystemModstamp": "2021-01-18T21:44:57.000+0000", "IsDeleted": false, "Name": "Radish - Black, Winter, Organic", "SerialNumber": null, "InstallDate": null, "PurchaseDate": null, "UsageEndDate": null, "LifecycleStartDate": null, "LifecycleEndDate": null, "Status": null, "Price": null, "Quantity": null, "Description": null, "OwnerId": "0054W00000BZkk0QAD", "AssetProvidedById": null, "AssetServicedById": null, "IsInternal": false, "AssetLevel": 1, "StockKeepingUnit": null, "HasLifecycleManagement": false, "CurrentMrr": null, "CurrentLifecycleEndDate": null, "CurrentQuantity": null, "CurrentAmount": null, "TotalLifecycleAmount": null, "Street": null, "City": null, "State": null, "PostalCode": null, "Country": null, "Latitude": null, "Longitude": null, "GeocodeAccuracy": null, "Address": null, "LastViewedDate": null, "LastReferencedDate": null}, "emitted_at": 1697452787097}
{"stream": "Asset", "data": {"attributes": {"type": "Asset", "url": "/services/data/v57.0/sobjects/Asset/02i4W00000EkJsqQAF"}, "Id": "02i4W00000EkJsqQAF", "ContactId": null, "AccountId": "0014W00002DkoW0QAJ", "ParentId": null, "RootAssetId": "02i4W00000EkJsqQAF", "Product2Id": null, "ProductCode": null, "IsCompetitorProduct": false, "CreatedDate": "2021-01-18T21:44:57.000+0000", "CreatedById": "0054W00000BZkk0QAD", "LastModifiedDate": "2021-01-18T21:44:57.000+0000", "LastModifiedById": "0054W00000BZkk0QAD", "SystemModstamp": "2021-01-18T21:44:57.000+0000", "IsDeleted": false, "Name": "Cheese - Valancey", "SerialNumber": null, "InstallDate": null, "PurchaseDate": null, "UsageEndDate": null, "LifecycleStartDate": null, "LifecycleEndDate": null, "Status": null, "Price": null, "Quantity": null, "Description": null, "OwnerId": "0054W00000BZkk0QAD", "AssetProvidedById": null, "AssetServicedById": null, "IsInternal": false, "AssetLevel": 1, "StockKeepingUnit": null, "HasLifecycleManagement": false, "CurrentMrr": null, "CurrentLifecycleEndDate": null, "CurrentQuantity": null, "CurrentAmount": null, "TotalLifecycleAmount": null, "Street": null, "City": null, "State": null, "PostalCode": null, "Country": null, "Latitude": null, "Longitude": null, "GeocodeAccuracy": null, "Address": null, "LastViewedDate": null, "LastReferencedDate": null}, "emitted_at": 1697452787099}
{"stream": "Asset", "data": {"attributes": {"type": "Asset", "url": "/services/data/v57.0/sobjects/Asset/02i4W00000EkJsrQAF"}, "Id": "02i4W00000EkJsrQAF", "ContactId": null, "AccountId": "0014W00002DkoW5QAJ", "ParentId": null, "RootAssetId": "02i4W00000EkJsrQAF", "Product2Id": null, "ProductCode": null, "IsCompetitorProduct": false, "CreatedDate": "2021-01-18T21:44:57.000+0000", "CreatedById": "0054W00000BZkk0QAD", "LastModifiedDate": "2021-01-18T21:44:57.000+0000", "LastModifiedById": "0054W00000BZkk0QAD", "SystemModstamp": "2021-01-18T21:44:57.000+0000", "IsDeleted": false, "Name": "Truffle Cups Green", "SerialNumber": null, "InstallDate": null, "PurchaseDate": null, "UsageEndDate": null, "LifecycleStartDate": null, "LifecycleEndDate": null, "Status": null, "Price": null, "Quantity": null, "Description": null, "OwnerId": "0054W00000BZkk0QAD", "AssetProvidedById": null, "AssetServicedById": null, "IsInternal": false, "AssetLevel": 1, "StockKeepingUnit": null, "HasLifecycleManagement": false, "CurrentMrr": null, "CurrentLifecycleEndDate": null, "CurrentQuantity": null, "CurrentAmount": null, "TotalLifecycleAmount": null, "Street": null, "City": null, "State": null, "PostalCode": null, "Country": null, "Latitude": null, "Longitude": null, "GeocodeAccuracy": null, "Address": null, "LastViewedDate": null, "LastReferencedDate": null}, "emitted_at": 1697452787100}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ data:
hosts:
- "*.salesforce.com"
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:1.1.0@sha256:bd98f6505c6764b1b5f99d3aedc23dfc9e9af631a62533f60eb32b1d3dbab20c
baseImage: docker.io/airbyte/python-connector-base:1.2.0@sha256:c22a9d97464b69d6ef01898edf3f8612dc11614f05a84984451dde195f337db9
connectorSubtype: api
connectorType: source
definitionId: b117307c-14b6-41aa-9422-947e34922962
dockerImageTag: 2.3.1
dockerImageTag: 2.3.2
dockerRepository: airbyte/source-salesforce
documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce
githubIssueLabel: source-salesforce
Expand Down
10 changes: 5 additions & 5 deletions airbyte-integrations/connectors/source-salesforce/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.3.1"
version = "2.3.2"
name = "source-salesforce"
description = "Source implementation for Salesforce."
authors = [ "Airbyte <[email protected]>",]
Expand All @@ -18,7 +18,7 @@ include = "source_salesforce"
[tool.poetry.dependencies]
python = "^3.9,<3.12"
pandas = "==2.2.0"
airbyte-cdk = "==0.59.2"
airbyte-cdk = "^0.63.2"

[tool.poetry.scripts]
source-salesforce = "source_salesforce.run:run"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from airbyte_protocol.models import FailureType
from dateutil.relativedelta import relativedelta
from pendulum.parsing.exceptions import ParserError
from requests import codes, exceptions # type: ignore[import]

from .api import PARENT_SALESFORCE_OBJECTS, UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS, UNSUPPORTED_FILTERING_STREAMS, Salesforce
Expand All @@ -49,7 +51,7 @@ class SourceSalesforce(ConcurrentSourceAdapter):
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
START_DATE_OFFSET_IN_YEARS = 2
MAX_WORKERS = 5

stop_sync_on_stream_failure = True
message_repository = InMemoryMessageRepository(Level(AirbyteLogFormatter.level_mapping[logger.level]))

def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: Optional[TState], **kwargs):
Expand All @@ -71,7 +73,24 @@ def _get_sf_object(config: Mapping[str, Any]) -> Salesforce:
sf.login()
return sf

@staticmethod
def _validate_stream_slice_step(stream_slice_step: str):
if stream_slice_step:
try:
duration = pendulum.parse(stream_slice_step)
if not isinstance(duration, pendulum.Duration):
message = "Stream slice step Interval should be provided in ISO 8601 format."
elif duration < pendulum.Duration(seconds=1):
message = "Stream slice step Interval is too small. It should be no less than 1 second. Please set higher value and try again."
else:
return
raise ParserError(message)
except ParserError as e:
internal_message = "Incorrect stream slice step"
raise AirbyteTracedException(failure_type=FailureType.config_error, internal_message=internal_message, message=e.args[0])

def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Optional[str]]:
self._validate_stream_slice_step(config.get("stream_slice_step"))
try:
salesforce = self._get_sf_object(config)
salesforce.describe()
Expand Down Expand Up @@ -147,6 +166,7 @@ def prepare_stream(cls, stream_name: str, json_schema, sobject_options, sf_objec
if replication_key and stream_name not in UNSUPPORTED_FILTERING_STREAMS:
stream_class = incremental
stream_kwargs["replication_key"] = replication_key
stream_kwargs["stream_slice_step"] = config.get("stream_slice_step", "P30D")
else:
stream_class = full_refresh

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,21 @@ connectionSpecification:
description: Toggle to use Bulk API (this might cause empty fields for some streams)
default: false
order: 6
stream_slice_step:
title: Stream Slice Step for Incremental sync
type: string
description: The size of the time window (ISO8601 duration) to slice requests.
default: P30D
order: 7
examples:
- PT12H
- P7D
- P30D
- P1M
- P1Y
streams_criteria:
type: array
order: 7
order: 8
items:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import uuid
from abc import ABC
from contextlib import closing
from datetime import datetime, timedelta
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Type, Union

import pandas as pd
Expand Down Expand Up @@ -112,7 +111,7 @@ def get_error_display_message(self, exception: BaseException) -> Optional[str]:
return f"After {self.max_retries} retries the connector has failed with a network error. It looks like Salesforce API experienced temporary instability, please try again later."
return super().get_error_display_message(exception)

def get_start_date_from_state(self, stream_state: Mapping[str, Any] = None) -> datetime:
def get_start_date_from_state(self, stream_state: Mapping[str, Any] = None) -> pendulum.DateTime:
if self.state_converter.is_state_message_compatible(stream_state):
# stream_state is in the concurrent format
if stream_state.get("slices", []):
Expand Down Expand Up @@ -689,28 +688,34 @@ def transform_empty_string_to_none(instance: Any, schema: Any):

class IncrementalRestSalesforceStream(RestSalesforceStream, ABC):
state_checkpoint_interval = 500
STREAM_SLICE_STEP = 30
_slice = None

def __init__(self, replication_key: str, **kwargs):
def __init__(self, replication_key: str, stream_slice_step: str = "P30D", **kwargs):
super().__init__(**kwargs)
self.replication_key = replication_key
self._stream_slice_step = stream_slice_step

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
start, end = (None, None)
now = pendulum.now(tz="UTC")
assert LOOKBACK_SECONDS is not None and LOOKBACK_SECONDS >= 0
initial_date = self.get_start_date_from_state(stream_state) - timedelta(seconds=LOOKBACK_SECONDS)

slice_number = 1
while not end == now:
start = initial_date.add(days=(slice_number - 1) * self.STREAM_SLICE_STEP)
end = min(now, initial_date.add(days=slice_number * self.STREAM_SLICE_STEP))
self._slice = {"start_date": start.isoformat(timespec="milliseconds"), "end_date": end.isoformat(timespec="milliseconds")}
yield {"start_date": start.isoformat(timespec="milliseconds"), "end_date": end.isoformat(timespec="milliseconds")}
slice_number = slice_number + 1

initial_date = self.get_start_date_from_state(stream_state) - pendulum.Duration(seconds=LOOKBACK_SECONDS)
slice_start = initial_date
while slice_start < now:
slice_end = slice_start + self.stream_slice_step
self._slice = {
"start_date": slice_start.isoformat(timespec="milliseconds"),
"end_date": min(slice_end, now).isoformat(timespec="milliseconds"),
}
yield self._slice

slice_start += self.stream_slice_step

@property
def stream_slice_step(self) -> pendulum.Duration:
return pendulum.parse(self._stream_slice_step)

def request_params(
self,
Expand Down
Loading

0 comments on commit 0899722

Please sign in to comment.