Skip to content

Commit

Permalink
✨ Source Azure Blob Storage: add client_credentials auth (#50398)
Browse files Browse the repository at this point in the history
Co-authored-by: Octavia Squidington III <[email protected]>
Co-authored-by: Marcos Marx <[email protected]>
Co-authored-by: marcosmarxm <[email protected]>
  • Loading branch information
4 people authored Jan 6, 2025
1 parent 8fbeb7b commit 01363dd
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@
"default": ["**"],
"order": 1,
"type": "array",
"items": {
"type": "string"
}
"items": { "type": "string" }
},
"legacy_prefix": {
"title": "Legacy Prefix",
Expand Down Expand Up @@ -136,9 +134,7 @@
"description": "A set of case-sensitive strings that should be interpreted as null values. For example, if the value 'NA' should be interpreted as null, enter 'NA' in this field.",
"default": [],
"type": "array",
"items": {
"type": "string"
},
"items": { "type": "string" },
"uniqueItems": true
},
"strings_can_be_null": {
Expand All @@ -162,9 +158,7 @@
"header_definition": {
"title": "CSV Header Definition",
"description": "How headers will be defined. `User Provided` assumes the CSV does not have a header row and uses the headers provided and `Autogenerated` assumes the CSV does not have a header row and the CDK will generate headers using for `f{i}` where `i` is the index starting from 0. Else, the default behavior is to use the header from the CSV file. If a user wants to autogenerate or provide column names for a CSV having headers, they can skip rows.",
"default": {
"header_definition_type": "From CSV"
},
"default": { "header_definition_type": "From CSV" },
"oneOf": [
{
"title": "From CSV",
Expand Down Expand Up @@ -206,9 +200,7 @@
"title": "Column Names",
"description": "The column names that will be used while emitting the CSV records",
"type": "array",
"items": {
"type": "string"
}
"items": { "type": "string" }
}
},
"required": ["column_names", "header_definition_type"]
Expand All @@ -221,19 +213,15 @@
"description": "A set of case-sensitive strings that should be interpreted as true values.",
"default": ["y", "yes", "t", "true", "on", "1"],
"type": "array",
"items": {
"type": "string"
},
"items": { "type": "string" },
"uniqueItems": true
},
"false_values": {
"title": "False Values",
"description": "A set of case-sensitive strings that should be interpreted as false values.",
"default": ["n", "no", "f", "false", "off", "0"],
"type": "array",
"items": {
"type": "string"
},
"items": { "type": "string" },
"uniqueItems": true
},
"inference_type": {
Expand Down Expand Up @@ -313,9 +301,7 @@
"processing": {
"title": "Processing",
"description": "Processing configuration",
"default": {
"mode": "local"
},
"default": { "mode": "local" },
"type": "object",
"oneOf": [
{
Expand Down Expand Up @@ -401,6 +387,43 @@
"auth_type"
]
},
{
"title": "Authenticate via Client Credentials",
"type": "object",
"properties": {
"auth_type": {
"title": "Auth Type",
"default": "client_credentials",
"const": "client_credentials",
"enum": ["client_credentials"],
"type": "string"
},
"app_tenant_id": {
"title": "Tenant ID",
"description": "Tenant ID of the Microsoft Azure Application",
"airbyte_secret": true,
"type": "string"
},
"app_client_id": {
"title": "Client ID",
"description": "Client ID of your Microsoft developer application",
"airbyte_secret": true,
"type": "string"
},
"app_client_secret": {
"title": "Client Secret",
"description": "Client Secret of your Microsoft developer application",
"airbyte_secret": true,
"type": "string"
}
},
"required": [
"app_tenant_id",
"app_client_id",
"app_client_secret",
"auth_type"
]
},
{
"title": "Authenticate via Storage Account Key",
"type": "object",
Expand Down Expand Up @@ -485,12 +508,8 @@
"type": "object",
"additionalProperties": false,
"properties": {
"client_id": {
"type": "string"
},
"client_secret": {
"type": "string"
}
"client_id": { "type": "string" },
"client_secret": { "type": "string" }
}
},
"complete_oauth_server_output_specification": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: fdaaba68-4875-4ed9-8fcd-4ae1e0a25093
dockerImageTag: 0.4.4
dockerImageTag: 0.5.0
dockerRepository: airbyte/source-azure-blob-storage
documentationUrl: https://docs.airbyte.com/integrations/sources/azure-blob-storage
githubIssueLabel: source-azure-blob-storage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.4.4"
version = "0.5.0"
name = "source-azure-blob-storage"
description = "Source implementation for Azure Blob Storage."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,25 @@ class Config(OneOfOptionConfig):
)


class ClientCredentials(BaseModel):
class Config(OneOfOptionConfig):
title = "Authenticate via Client Credentials"
discriminator = "auth_type"

auth_type: Literal["client_credentials"] = Field("client_credentials", const=True)
app_tenant_id: str = Field(title="Tenant ID", description="Tenant ID of the Microsoft Azure Application", airbyte_secret=True)
app_client_id: str = Field(
title="Client ID",
description="Client ID of your Microsoft developer application",
airbyte_secret=True,
)
app_client_secret: str = Field(
title="Client Secret",
description="Client Secret of your Microsoft developer application",
airbyte_secret=True,
)


class StorageAccountKey(BaseModel):
class Config(OneOfOptionConfig):
title = "Authenticate via Storage Account Key"
Expand All @@ -61,7 +80,7 @@ class SourceAzureBlobStorageSpec(AbstractFileBasedSpec):
def documentation_url(cls) -> AnyUrl:
return AnyUrl("https://docs.airbyte.com/integrations/sources/azure-blob-storage", scheme="https")

credentials: Union[Oauth2, StorageAccountKey] = Field(
credentials: Union[Oauth2, ClientCredentials, StorageAccountKey] = Field(
title="Authentication",
description="Credentials for connecting to the Azure Blob Storage",
discriminator="auth_type",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

import logging
from io import IOBase
from typing import Iterable, List, Optional, Union
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import pytz
from azure.core.credentials import AccessToken
from azure.core.credentials import AccessToken, TokenCredential
from azure.core.exceptions import ResourceNotFoundError
from azure.storage.blob import BlobServiceClient, ContainerClient
from smart_open import open
Expand All @@ -19,7 +19,46 @@
from .spec import SourceAzureBlobStorageSpec


class AzureOauth2Authenticator(Oauth2Authenticator):
class AzureClientCredentialsAuthenticator(Oauth2Authenticator, TokenCredential):
def __init__(self, tenant_id: str, client_id: str, client_secret: str, **kwargs):
super().__init__(
token_refresh_endpoint=f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token",
client_id=client_id,
client_secret=client_secret,
grant_type="client_credentials",
scopes=["https://storage.azure.com/.default"],
refresh_token=None,
)

def build_refresh_request_body(self) -> Mapping[str, Any]:
"""
Returns the request body to set on the refresh request
Override to define additional parameters
"""
payload: MutableMapping[str, Any] = {
"grant_type": self.get_grant_type(),
"client_id": self.get_client_id(),
"client_secret": self.get_client_secret(),
}

if self.get_scopes():
payload["scope"] = " ".join(self.get_scopes())

if self.get_refresh_request_body():
for key, val in self.get_refresh_request_body().items():
# We defer to existing oauth constructs over custom configured fields
if key not in payload:
payload[key] = val

return payload

def get_token(self, *args, **kwargs) -> AccessToken:
"""Parent class handles Oauth Refresh token logic."""
return AccessToken(token=self.get_access_token(), expires_on=int(self.get_token_expiry_date().timestamp()))


class AzureOauth2Authenticator(Oauth2Authenticator, TokenCredential):
"""
Authenticator for Azure Blob Storage SDK to align with azure.core.credentials.TokenCredential protocol
"""
Expand Down Expand Up @@ -63,17 +102,24 @@ def azure_blob_service_client(self):
return BlobServiceClient(self.account_url, credential=self._credentials)

@property
def azure_credentials(self) -> Union[str, AzureOauth2Authenticator]:
def azure_credentials(self) -> Union[str, AzureOauth2Authenticator, AzureClientCredentialsAuthenticator]:
if not self._credentials:
if self.config.credentials.auth_type == "storage_account_key":
self._credentials = self.config.credentials.azure_blob_storage_account_key
else:
elif self.config.credentials.auth_type == "oauth2":
self._credentials = AzureOauth2Authenticator(
token_refresh_endpoint=f"https://login.microsoftonline.com/{self.config.credentials.tenant_id}/oauth2/v2.0/token",
client_id=self.config.credentials.client_id,
client_secret=self.config.credentials.client_secret,
refresh_token=self.config.credentials.refresh_token,
)
elif self.config.credentials.auth_type == "client_credentials":
self._credentials = AzureClientCredentialsAuthenticator(
tenant_id=self.config.credentials.app_tenant_id,
client_id=self.config.credentials.app_client_id,
client_secret=self.config.credentials.app_client_secret,
)

return self._credentials

def get_matching_files(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


from azure.core.credentials import AccessToken
from source_azure_blob_storage.stream_reader import AzureOauth2Authenticator
from source_azure_blob_storage.stream_reader import AzureClientCredentialsAuthenticator, AzureOauth2Authenticator


def test_custom_authenticator(requests_mock):
Expand All @@ -24,3 +24,23 @@ def test_custom_authenticator(requests_mock):
new_token = authenticator.get_token()
assert isinstance(new_token, AccessToken)
assert new_token.token == "access_token"


def test_client_authenticator(requests_mock):
authenticator = AzureClientCredentialsAuthenticator(
token_refresh_endpoint="https://login.microsoftonline.com/tenant_id/oauth2/v2.0/token",
tenant_id="tenant_id",
client_id="client_id",
client_secret="client_secret",
)
token_response = {
"token_type": "Bearer",
"scope": "https://storage.azure.com/.default",
"expires_in": 3600,
"ext_expires_in": 3600,
"access_token": "access_token_123",
}
requests_mock.post("https://login.microsoftonline.com/tenant_id/oauth2/v2.0/token", json=token_response)
new_token = authenticator.get_token()
assert isinstance(new_token, AccessToken)
assert new_token.token == "access_token_123"
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,35 @@

import json
import os
from pathlib import Path
from shutil import copytree
from tempfile import TemporaryDirectory
from typing import Any, Mapping

from pytest import fixture
from source_azure_blob_storage import SourceAzureBlobStorage, SourceAzureBlobStorageSpec, SourceAzureBlobStorageStreamReader
from source_azure_blob_storage.config_migrations import MigrateCredentials, MigrateLegacyConfig

from airbyte_cdk.sources.file_based.stream.cursor import DefaultFileBasedCursor


@fixture
def temp_configs():
config_path = f"{os.path.dirname(__file__)}/test_configs/"
with TemporaryDirectory() as _tempdir:
configs_dir = Path(_tempdir) / "test_configs"
copytree(config_path, configs_dir)
yield configs_dir


# HELPERS
def load_config(config_path: str) -> Mapping[str, Any]:
with open(config_path, "r") as config:
return json.load(config)


def test_legacy_config_migration():
config_path = f"{os.path.dirname(__file__)}/test_configs/test_legacy_config.json"
def test_legacy_config_migration(temp_configs):
config_path = str((Path(temp_configs) / "test_legacy_config.json").resolve())
migration_instance = MigrateLegacyConfig
source = SourceAzureBlobStorage(
SourceAzureBlobStorageStreamReader(),
Expand Down Expand Up @@ -47,9 +60,11 @@ def test_legacy_config_migration():
assert test_migrated_config == expected_config


def test_credentials_config_migration():
config_path = f"{os.path.dirname(__file__)}/test_configs/test_config_without_credentials.json"
def test_credentials_config_migration(temp_configs):
config_path = str((Path(temp_configs) / "test_config_without_credentials.json").resolve())
initial_config = load_config(config_path)
expected = initial_config["azure_blob_storage_account_key"]

migration_instance = MigrateCredentials
source = SourceAzureBlobStorage(
SourceAzureBlobStorageStreamReader(),
Expand All @@ -61,4 +76,4 @@ def test_credentials_config_migration():
)
migration_instance.migrate(["check", "--config", config_path], source)
test_migrated_config = load_config(config_path)
assert test_migrated_config["credentials"]["azure_blob_storage_account_key"] == initial_config["azure_blob_storage_account_key"]
assert test_migrated_config["credentials"]["azure_blob_storage_account_key"] == expected
Loading

0 comments on commit 01363dd

Please sign in to comment.