Skip to content

Commit

Permalink
S3 to wasb (apache#40511)
Browse files Browse the repository at this point in the history

---------

Co-authored-by: Jake Roach <[email protected]>
Co-authored-by: Jake Roach <[email protected]>
  • Loading branch information
3 people authored Jul 8, 2024
1 parent dc08893 commit 03b1840
Show file tree
Hide file tree
Showing 6 changed files with 719 additions and 0 deletions.
4 changes: 4 additions & 0 deletions airflow/providers/microsoft/azure/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ transfers:
target-integration-name: Microsoft Azure Blob Storage
how-to-guide: /docs/apache-airflow-providers-microsoft-azure/transfer/sftp_to_wasb.rst
python-module: airflow.providers.microsoft.azure.transfers.sftp_to_wasb
- source-integration-name: Amazon Simple Storage Service (S3)
target-integration-name: Microsoft Azure Blob Storage
how-to-guide: /docs/apache-airflow-providers-microsoft-azure/transfer/s3_to_wasb.rst
python-module: airflow.providers.microsoft.azure.transfers.s3_to_wasb
- source-integration-name: Microsoft Azure Blob Storage
target-integration-name: Google Cloud Storage (GCS)
how-to-guide: /docs/apache-airflow-providers-microsoft-azure/transfer/azure_blob_to_gcs.rst
Expand Down
266 changes: 266 additions & 0 deletions airflow/providers/microsoft/azure/transfers/s3_to_wasb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import tempfile
from functools import cached_property
from typing import TYPE_CHECKING, Sequence

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook

if TYPE_CHECKING:
from airflow.utils.context import Context


# Create three custom exception that are
class TooManyFilesToMoveException(Exception):
"""Custom exception thrown when attempting to move multiple files from S3 to a single Azure Blob."""

def __init__(self, number_of_files: int):
# Call the parent constructor with a simple message
message: str = f"{number_of_files} cannot be moved to a single Azure Blob."
super().__init__(message)


class InvalidAzureBlobParameters(Exception):
"""Custom exception raised when neither a blob_prefix or blob_name are passed to the operator."""

def __init__(self):
message: str = "One of blob_name or blob_prefix must be provided."
super().__init__(message)


class InvalidKeyComponents(Exception):
"""Custom exception raised when neither a full_path or file_name + prefix are provided to _create_key."""

def __init__(self):
message = "Either full_path of prefix and file_name must not be None"
super().__init__(message)


class S3ToAzureBlobStorageOperator(BaseOperator):
"""
Operator to move data from and AWS S3 Bucket to Microsoft Azure Blob Storage.
A similar class exists to move data from Microsoft Azure Blob Storage to an AWS S3 Bucket, and lives in
the airflow/providers/amazon/aws/transfers/azure_blob_to_s3.py file
Either an explicit S3 key can be provided, or a prefix containing the files that are to be transferred to
Azure blob storage. The same holds for a Blob name; an explicit name can be passed, or a Blob prefix can
be provided for the file to be stored to
.. seealso:
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator::SFTPToWasbOperator`
:param aws_conn_id: ID for the AWS S3 connection to use.
:param wasb_conn_id: ID for the Azure Blob Storage connection to use.
:param s3_bucket: The name of the AWS S3 bucket that an object (or objects) would be transferred from.
(templated)
:param container_name: The name of the Azure Storage Blob container an object (or objects) would be
transferred to. (templated)
:param s3_prefix: Prefix string that filters any S3 objects that begin with this prefix. (templated)
:param s3_key: An explicit S3 key (object) to be transferred. (templated)
:param blob_prefix: Prefix string that would provide a path in the Azure Storage Blob container for an
object (or objects) to be moved to. (templated)
:param blob_name: An explicit blob name that an object would be transferred to. This can only be used
if a single file is being moved. If there are multiple files in an S3 bucket that are to be moved
to a single Azure blob, an exception will be raised. (templated)
:param create_container: True if a container should be created if it did not already exist, False
otherwise.
:param replace: If a blob exists in the container and replace takes a value of true, it will be
overwritten. If replace is False and a blob exists in the container, the file will NOT be
overwritten.
:param s3_verify: Whether or not to verify SSL certificates for S3 connection.
By default, SSL certificates are verified.
You can provide the following values:
- ``False``: do not validate SSL certificates. SSL will still be used
(unless use_ssl is False), but SSL certificates will not be
verified.
- ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses.
You can specify this argument if you want to use a different
CA cert bundle than the one used by botocore.
:param s3_extra_args: kwargs to pass to S3Hook.
:param wasb_extra_args: kwargs to pass to WasbHook.
"""

template_fields: Sequence[str] = (
"s3_bucket",
"container_name",
"s3_prefix",
"s3_key",
"blob_prefix",
"blob_name",
)

def __init__(
self,
*,
aws_conn_id: str = "aws_default",
wasb_conn_id: str = "wasb_default",
s3_bucket: str,
container_name: str,
s3_prefix: str | None = None, # Only use this to pull an entire directory of files
s3_key: str | None = None, # Only use this to pull a single file
blob_prefix: str | None = None,
blob_name: str | None = None,
create_container: bool = False,
replace: bool = False,
s3_verify: bool = False,
s3_extra_args: dict | None = None,
wasb_extra_args: dict | None = None,
**kwargs,
):
# Call to constructor of the inherited BaseOperator class
super().__init__(**kwargs)

self.aws_conn_id = aws_conn_id
self.wasb_conn_id = wasb_conn_id
self.s3_bucket = s3_bucket
self.container_name = container_name
self.s3_prefix = s3_prefix
self.s3_key = s3_key
self.blob_prefix = blob_prefix
self.blob_name = blob_name
self.create_container = create_container
self.replace = replace
self.s3_verify = s3_verify
self.s3_extra_args = s3_extra_args or {}
self.wasb_extra_args = wasb_extra_args or {}

# These cached properties come in handy when working with hooks. Rather than closing and opening new
# hooks, the same hook can be used across multiple methods (without having to use the constructor to
# create the hook)
@cached_property
def s3_hook(self) -> S3Hook:
"""Create and return an S3Hook."""
return S3Hook(aws_conn_id=self.aws_conn_id, verify=self.s3_verify, **self.s3_extra_args)

@cached_property
def wasb_hook(self) -> WasbHook:
"""Create and return a WasbHook."""
return WasbHook(wasb_conn_id=self.wasb_conn_id, **self.wasb_extra_args)

def execute(self, context: Context) -> list[str]:
"""Execute logic below when operator is executed as a task."""
self.log.info(
"Getting %s from %s" if self.s3_key else "Getting all files start with %s from %s",
self.s3_key if self.s3_key else self.s3_prefix,
self.s3_bucket,
)

# Pull a list of files to move from S3 to Azure Blob storage
files_to_move: list[str] = self.get_files_to_move()

# Check to see if there are indeed files to move. If so, move each of these files. Otherwise, output
# a logging message that denotes there are no files to move
if files_to_move:
for file_name in files_to_move:
self.move_file(file_name)

# Assuming that files_to_move is a list (which it always should be), this will get "hit" after the
# last file is moved from S3 -> Azure Blob
self.log.info("All done, uploaded %s to Azure Blob.", len(files_to_move))

else:
# If there are no files to move, a message will be logged. May want to consider alternative
# functionality (should an exception instead be raised?)
self.log.info("There are no files to move!")

# Return a list of the files that were moved
return files_to_move

def get_files_to_move(self) -> list[str]:
"""Determine the list of files that need to be moved, and return the name."""
if self.s3_key:
# Only pull the file name from the s3_key, drop the rest of the key
files_to_move: list[str] = [self.s3_key.split("/")[-1]]
else:
# Pull the keys from the s3_bucket using the provided prefix. Remove the prefix from the file
# name, and add to the list of files to move
s3_keys: list[str] = self.s3_hook.list_keys(bucket_name=self.s3_bucket, prefix=self.s3_prefix)
files_to_move = [s3_key.replace(f"{self.s3_prefix}/", "", 1) for s3_key in s3_keys]

# Now, make sure that there are not too many files to move to a single Azure blob
if self.blob_name and len(files_to_move) > 1:
raise TooManyFilesToMoveException(len(files_to_move))

if not self.replace:
# Only grab the files from S3 that are not in Azure Blob already. This will prevent any files that
# exist in both S3 and Azure Blob from being overwritten. If a blob_name is provided, check to
# see if that blob exists
azure_blob_files: list[str] = []

if self.blob_name:
# If the singular blob (stored at self.blob_name) exists, add it to azure_blob_files so it
# can be removed from the list of files to move
if self.wasb_hook.check_for_blob(self.container_name, self.blob_name):
azure_blob_files.append(self.blob_name.split("/")[-1])

elif self.blob_prefix:
azure_blob_files += self.wasb_hook.get_blobs_list_recursive(
container_name=self.container_name, prefix=self.blob_prefix
)
else:
raise InvalidAzureBlobParameters

# This conditional block only does one thing - it alters the elements in the files_to_move list.
# This list is being trimmed to remove the existing files in the Azure Blob (as mentioned above)
existing_files = azure_blob_files if azure_blob_files else []
files_to_move = list(set(files_to_move) - set(existing_files))

return files_to_move

def move_file(self, file_name: str) -> None:
"""Move file from S3 to Azure Blob storage."""
with tempfile.NamedTemporaryFile("w") as temp_file:
# If using an s3_key, this creates a scenario where the only file in the files_to_move
# list is going to be the name pulled from the s3_key. It's not verbose, but provides
# standard implementation across the operator
source_s3_key: str = self._create_key(self.s3_key, self.s3_prefix, file_name)

# Create retrieve the S3 client itself, rather than directly using the hook. Download the file to
# the temp_file.name
s3_client = self.s3_hook.get_conn()
s3_client.download_file(self.s3_bucket, source_s3_key, temp_file.name)

# Load the file to Azure Blob using either the key that has been passed in, or the key
# from the list of files present in the s3_prefix, plus the blob_prefix. There may be
# desire to only pass in an S3 key, in which case, the blob_name should be derived from
# the S3 key
destination_azure_blob_name: str = self._create_key(self.blob_name, self.blob_prefix, file_name)
self.wasb_hook.load_file(
file_path=temp_file.name,
container_name=self.container_name,
blob_name=destination_azure_blob_name,
create_container=self.create_container,
**self.wasb_extra_args,
)

@staticmethod
def _create_key(full_path: str | None, prefix: str | None, file_name: str | None):
"""Return a file key using its components."""
if full_path:
return full_path
elif prefix and file_name:
return f"{prefix}/{file_name}"
else:
raise InvalidKeyComponents
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@

.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
=================================================
Amazon S3 to Azure Blob Storage Transfer Operator
=================================================

The Blob service stores text and binary data as objects in the cloud.
The Blob service offers the following three resources: the storage account, containers, and blobs.
Within your storage account, containers provide a way to organize sets of blobs.
For more information about the service visit `Azure Blob Storage API documentation <https://docs.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api>`_.
This page shows how to upload data from local filesystem to Azure Blob Storage.

Use the ``S3ToWasbOperator`` transfer to copy the data from Amazon Simple Storage Service (S3) to Azure Blob Storage.

Prerequisite Tasks
------------------

.. include:: ../operators/_partials/prerequisite_tasks.rst

Operators
---------

.. _howto/operator:S3ToWasbOperator:


Transfer Data from Amazon S3 to Blob Storage
============================================

To copy data from an Amazon AWS S3 Bucket to an Azure Blob Storage container, the following operator can be used:
:class:`~airflow.providers.microsoft.azure.transfers.s3_to_wasb.S3ToWasbOperator`

Example usage:

.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_s3_to_wasb.py
:language: python
:dedent: 4
:start-after: [START howto_transfer_s3_to_wasb]
:end-before: [END howto_transfer_s3_to_wasb]

Reference
---------

For further information, please refer to the following links:

* `AWS boto3 library documentation for Amazon S3 <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html>`__
* `Azure Blob Storage client library <https://learn.microsoft.com/en-us/python/api/overview/azure/storage-blob-readme?view=azure-python>`__
1 change: 1 addition & 0 deletions generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,7 @@
],
"plugins": [],
"cross-providers-deps": [
"amazon",
"google",
"oracle",
"sftp"
Expand Down
Loading

0 comments on commit 03b1840

Please sign in to comment.