Skip to content

Commit

Permalink
[destination-azure-blob-storage] File Extensions added for the output…
Browse files Browse the repository at this point in the history
… files #27701 (#38061)

Co-authored-by: richa-rochna <[email protected]>
Co-authored-by: Richa Rochna <[email protected]>
Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
4 people authored Jun 14, 2024
1 parent ff31275 commit 3c3a80a
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: b4c5d105-31fd-4817-96b6-cb923bfc04cb
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
dockerRepository: airbyte/destination-azure-blob-storage
githubIssueLabel: destination-azure-blob-storage
icon: azureblobstorage.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,18 @@ protected void startTracked() throws Exception {

for (final ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) {

final String blobName = configuredStream.getStream().getName() + "/" +
getOutputFilename(new Timestamp(System.currentTimeMillis()));
StringBuilder blobNameSb = new StringBuilder()
.append(configuredStream.getStream().getName())
.append("/")
.append(getOutputFilename(new Timestamp(System.currentTimeMillis())));

if (azureBlobStorageDestinationConfig.getFormatConfig().isFileExtensionRequired()) {
blobNameSb
.append(".")
.append(azureBlobStorageDestinationConfig.getFormatConfig().getFormat().getFileExtension());
}
String blobName = blobNameSb.toString();

final AppendBlobClient appendBlobClient = specializedBlobClientBuilder
.blobName(blobName)
.buildAppendBlobClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

public interface AzureBlobStorageFormatConfig {

boolean isFileExtensionRequired();

AzureBlobStorageFormat getFormat();

static String withDefault(final JsonNode config, final String property, final String defaultValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public static AzureBlobStorageFormatConfig getAzureBlobStorageFormatConfig(final
return new AzureBlobStorageCsvFormatConfig(formatConfig);
}
case JSONL -> {
return new AzureBlobStorageJsonlFormatConfig();
return new AzureBlobStorageJsonlFormatConfig(formatConfig);
}
default -> throw new RuntimeException("Unexpected output format: " + Jsons.serialize(config));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,15 @@ public String getValue() {
}

private final Flattening flattening;
private final boolean fileExtensionRequired;

public AzureBlobStorageCsvFormatConfig(final JsonNode formatConfig) {
this.flattening = Flattening.fromValue(formatConfig.get("flattening").asText());
this.fileExtensionRequired = formatConfig.has("file_extension") ? formatConfig.get("file_extension").asBoolean() : false;
}

public boolean isFileExtensionRequired() {
return fileExtensionRequired;
}

@Override
Expand All @@ -58,6 +64,7 @@ public Flattening getFlattening() {
public String toString() {
return "AzureBlobStorageCsvFormatConfig{" +
"flattening=" + flattening +
", fileExtensionRequired=" + fileExtensionRequired +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,33 @@

package io.airbyte.integrations.destination.azure_blob_storage.jsonl;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageFormat;
import io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageFormatConfig;

public class AzureBlobStorageJsonlFormatConfig implements AzureBlobStorageFormatConfig {

private final boolean fileExtensionRequired;

public AzureBlobStorageJsonlFormatConfig(final JsonNode formatConfig) {
this.fileExtensionRequired = formatConfig.has("file_extension") ? formatConfig.get("file_extension").asBoolean() : false;
}

@Override
public boolean isFileExtensionRequired() {
return fileExtensionRequired;
}

@Override
public AzureBlobStorageFormat getFormat() {
return AzureBlobStorageFormat.JSONL;
}

@Override
public String toString() {
return "AzureBlobStorageJsonlFormatConfig{" +
"fileExtensionRequired=" + fileExtensionRequired +
'}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@
"description": "Whether the input json data should be normalized (flattened) in the output CSV. Please refer to docs for details.",
"default": "No flattening",
"enum": ["No flattening", "Root level flattening"]
},
"file_extension": {
"title": "File Extension",
"type": "boolean",
"default": false,
"description": "Add file extensions to the output file."
}
}
},
Expand All @@ -88,6 +94,12 @@
"format_type": {
"type": "string",
"const": "JSONL"
},
"file_extension": {
"title": "File Extension",
"type": "boolean",
"default": false,
"description": "Add file extensions to the output file."
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
import com.azure.storage.blob.BlobContainerClientBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.destination.azure_blob_storage.jsonl.AzureBlobStorageJsonlFormatConfig;
import io.airbyte.integrations.destination.azure_blob_storage.writer.ProductionWriterFactory;
Expand Down Expand Up @@ -43,6 +46,8 @@ public class AzureBlobStorageSpillTest {

private BlobContainerClient blobContainerClient;

private static final ObjectMapper mapper = MoreMappers.initMapper();

@BeforeEach
void setup() {
azureBlobStorageContainer = new AzureBlobStorageContainer().withExposedPorts(10000);
Expand Down Expand Up @@ -97,14 +102,19 @@ void testSpillBlobWithExceedingSize() throws Exception {
}

private static AzureBlobStorageDestinationConfig createConfig(String host, Integer mappedPort) {
final ObjectNode stubFormatConfig = mapper.createObjectNode();
stubFormatConfig.put("file_extension", Boolean.TRUE);
final ObjectNode stubConfig = mapper.createObjectNode();
stubConfig.set("format", stubFormatConfig);

return new AzureBlobStorageDestinationConfig(
"http://" + host + ":" + mappedPort + "/devstoreaccount1",
"devstoreaccount1",
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==",
"container-name",
1,
1,
new AzureBlobStorageJsonlFormatConfig());
new AzureBlobStorageJsonlFormatConfig(stubConfig));
}

private static AirbyteMessage createAirbyteMessage(JsonNode data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ void testConfigBlobStorageSpillSize() {

private JsonNode getFormatConfig() {
return Jsons.deserialize("{\n"
+ " \"format_type\": \"JSONL\"\n"
+ " \"format_type\": \"JSONL\",\n"
+ " \"file_extension\": \"true\"\n"
+ "}");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public void testGetCsvS3FormatConfig() {
final ObjectNode stubFormatConfig = mapper.createObjectNode();
stubFormatConfig.put("format_type", AzureBlobStorageFormat.CSV.toString());
stubFormatConfig.put("flattening", Flattening.ROOT_LEVEL.getValue());
stubFormatConfig.put("file_extension", Boolean.TRUE);

final ObjectNode stubConfig = mapper.createObjectNode();
stubConfig.set("format", stubFormatConfig);
Expand Down
37 changes: 22 additions & 15 deletions docs/integrations/destinations/azure-blob-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ With root level normalization, the output CSV is:
| :------------------------------------- | :-------------------- | :-------- | :----------------------------------- |
| `26d73cde-7eb1-4e1e-b7db-a4c03b4cf206` | 1622135805000 | 123 | `{ "first": "John", "last": "Doe" }` |

With the field `File Extension`, it is possible to save the output files with extension. It is an optional field with default value as `false`. Enable this to store the files with `csv` extension.

### JSON Lines \(JSONL\)

[Json Lines](https://jsonlines.org/) is a text format with one JSON per line. Each line has a structure as follows:
Expand Down Expand Up @@ -115,6 +117,8 @@ They will be like this in the output file:
{ "_airbyte_ab_id": "0a61de1b-9cdd-4455-a739-93572c9a5f20", "_airbyte_emitted_at": "1631948170000", "_airbyte_data": { "user_id": 456, "name": { "first": "Jane", "last": "Roe" } } }
```

With the field `File Extension`, it is possible to save the output files with extension. It is an optional field with default value as `false`. Enable this to store the files with `jsonl` extension.

## Getting started

### Requirements
Expand All @@ -124,28 +128,31 @@ They will be like this in the output file:

### Setup guide

- Fill up AzureBlobStorage info
- **Endpoint Domain Name**
- Leave default value \(or leave it empty if run container from command line\) to use Microsoft native one or use your own.
- **Azure blob storage container**
- If not exists - will be created automatically. If leave empty, then will be created automatically airbytecontainer+timestamp..
- **Azure Blob Storage account name**
- See [this](https://docs.microsoft.com/en-us/azure/storage/common/storage-account-create?tabs=azure-portal) on how to create an account.
- **The Azure blob storage account key**
- Corresponding key to the above user.
- **Format**
- Data format that will be use for a migrated data representation in blob.
- Make sure your user has access to Azure from the machine running Airbyte.
- This depends on your networking setup.
- The easiest way to verify if Airbyte is able to connect to your Azure blob storage container is via the check connection tool in the UI.
* Fill up AzureBlobStorage info
* **Endpoint Domain Name**
* Leave default value \(or leave it empty if run container from command line\) to use Microsoft native one or use your own.
* **Azure blob storage container**
* If not exists - will be created automatically. If leave empty, then will be created automatically airbytecontainer+timestamp..
* **Azure Blob Storage account name**
* See [this](https://docs.microsoft.com/en-us/azure/storage/common/storage-account-create?tabs=azure-portal) on how to create an account.
* **The Azure blob storage account key**
* Corresponding key to the above user.
* **Format**
* Data format that will be use for a migrated data representation in blob.
* With the field **File Extension**, it is possible to save the output files with extension. It is an optional field with default value as `false`. Enable this to store the files with extension.
* Make sure your user has access to Azure from the machine running Airbyte.
* This depends on your networking setup.
* The easiest way to verify if Airbyte is able to connect to your Azure blob storage container is via the check connection tool in the UI.


## Changelog

<details>
<summary>Expand to review</summary>

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.2.2 | 2024-06-12 | [\#38061](https://github.com/airbytehq/airbyte/pull/38061) | File Extensions added for the output files |
| 0.2.1 | 2023-09-13 | [\#30412](https://github.com/airbytehq/airbyte/pull/30412) | Switch noisy logging to debug |
| 0.2.0 | 2023-01-18 | [\#21467](https://github.com/airbytehq/airbyte/pull/21467) | Support spilling of objects exceeding configured size threshold |
| 0.1.6 | 2022-08-08 | [\#15318](https://github.com/airbytehq/airbyte/pull/15318) | Support per-stream state |
Expand Down

0 comments on commit 3c3a80a

Please sign in to comment.