Skip to content

Commit

Permalink
HADOOP-19226: [ABFS][FNSOverBlob] Implementing Azure Rest APIs on Blo…
Browse files Browse the repository at this point in the history
…b Endpoint for AbfsBlobClient (#6944)

Contributed by Anuj Modi
  • Loading branch information
anujmodi2021 authored Nov 26, 2024
1 parent 919bd18 commit 65a5bf3
Show file tree
Hide file tree
Showing 22 changed files with 1,609 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]AzureBlobFileSystemStore.java"/>
<suppress checks="ParameterNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]AbfsClient.java"/>
<suppress checks="ParameterNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]AbfsBlobClient.java"/>
<suppress checks="ParameterNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]contracts[\\/]services[\\/]AppendRequestParameters.java"/>
<suppress checks="ParameterNumber|MagicNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/>
<suppress checks="ParameterNumber|VisibilityModifier"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ public boolean isDfsToBlobFallbackEnabled() {
*/
public void validateConfiguredServiceType(boolean isHNSEnabled)
throws InvalidConfigurationValueException {
// Todo: [FnsOverBlob] - Remove this check, Failing FS Init with Blob Endpoint Until FNS over Blob is ready.
// TODO: [FnsOverBlob][HADOOP-19179] Remove this check when FNS over Blob is ready.
if (getFsConfiguredServiceType() == AbfsServiceType.BLOB) {
throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY,
"Blob Endpoint Support not yet available");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1457,7 +1457,7 @@ private boolean fileSystemExists() throws IOException {
try {
checkException(null, ex);
// Because HEAD request won't contain message body,
// there is not way to get the storage error code
// there is no way to get the storage error code
// workaround here is to check its status code.
} catch (FileNotFoundException e) {
statIncrement(ERROR_IGNORED);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,44 @@ public final class AbfsHttpConstants {
public static final String DEFAULT_LEASE_BREAK_PERIOD = "0";
public static final String DEFAULT_TIMEOUT = "90";
public static final String APPEND_BLOB_TYPE = "appendblob";
public static final String TOKEN_VERSION = "2";

//Abfs Http Client Constants for Blob Endpoint APIs.

/**
* HTTP Header Value to denote resource type as container.
* {@value}.
*/
public static final String CONTAINER = "container";

/**
* HTTP Header Value to denote component as metadata.
* {@value}.
*/
public static final String METADATA = "metadata";

/**
* HTTP Header Value to denote component as block.
* {@value}.
*/
public static final String BLOCK = "block";

/**
* HTTP Header Value to denote component as blocklist.
* {@value}.
*/
public static final String BLOCKLIST = "blocklist";

/**
* HTTP Header Value to denote component as lease.
* {@value}.
*/
public static final String LEASE = "lease";

/**
* HTTP Header Value to denote bock list type as committed.
* {@value}.
*/
public static final String BLOCK_TYPE_COMMITTED = "committed";

public static final String JAVA_VENDOR = "java.vendor";
public static final String JAVA_VERSION = "java.version";
Expand All @@ -60,6 +97,10 @@ public final class AbfsHttpConstants {

public static final String APN_VERSION = "APN/1.0";
public static final String CLIENT_VERSION = "Azure Blob FS/" + VersionInfo.getVersion();
/**
* {@value}.
*/
public static final String TOKEN_VERSION = "2";

// Abfs Http Verb
public static final String HTTP_METHOD_DELETE = "DELETE";
Expand Down Expand Up @@ -92,6 +133,7 @@ public final class AbfsHttpConstants {
public static final String HTTP_HEADER_PREFIX = "x-ms-";
public static final String HASH = "#";
public static final String TRUE = "true";
public static final String ZERO = "0";

public static final String PLUS_ENCODE = "%20";
public static final String FORWARD_SLASH_ENCODE = "%2F";
Expand All @@ -101,6 +143,7 @@ public final class AbfsHttpConstants {
public static final String GMT_TIMEZONE = "GMT";
public static final String APPLICATION_JSON = "application/json";
public static final String APPLICATION_OCTET_STREAM = "application/octet-stream";
public static final String APPLICATION_XML = "application/xml";
public static final String XMS_PROPERTIES_ENCODING_ASCII = "ISO-8859-1";
public static final String XMS_PROPERTIES_ENCODING_UNICODE = "UTF-8";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public enum FSOperationType {
SET_OWNER("SO"),
SET_ACL("SA"),
TEST_OP("TS"),
WRITE("WR"),
INIT("IN");
WRITE("WR");

private final String opCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public final class HttpHeaderConfigurations {
public static final String X_MS_ACL = "x-ms-acl";
public static final String X_MS_PERMISSIONS = "x-ms-permissions";
public static final String X_MS_UMASK = "x-ms-umask";
public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled";
public static final String X_MS_ABFS_CLIENT_LATENCY = "x-ms-abfs-client-latency";
public static final String X_MS_ENCRYPTION_KEY = "x-ms-encryption-key";
public static final String X_MS_ENCRYPTION_KEY_SHA256 = "x-ms-encryption-key-sha256";
Expand All @@ -70,10 +69,40 @@ public final class HttpHeaderConfigurations {
public static final String X_MS_LEASE_ACTION = "x-ms-lease-action";
public static final String X_MS_LEASE_DURATION = "x-ms-lease-duration";
public static final String X_MS_LEASE_ID = "x-ms-lease-id";

/**
* Http Request Header for denoting the lease id of source in copy operation.
* {@value}
*/
public static final String X_MS_SOURCE_LEASE_ID = "x-ms-source-lease-id";
public static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id";
public static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period";
public static final String EXPECT = "Expect";
public static final String X_MS_RANGE_GET_CONTENT_MD5 = "x-ms-range-get-content-md5";

/**
* Http Response Header for denoting directory.
* {@value}
*/
public static final String X_MS_META_HDI_ISFOLDER = "x-ms-meta-hdi_isfolder";

/**
* Http Response Header prefix for user-defined properties.
* {@value}
*/
public static final String X_MS_METADATA_PREFIX = "x-ms-meta-";

/**
* Http Request Header for denoting the source of copy operation.
* {@value}
*/
public static final String X_MS_COPY_SOURCE = "x-ms-copy-source";

/**
* Http Request Header for denoting MD5 hash of the blob content.
* {@value}
*/
public static final String X_MS_BLOB_CONTENT_MD5 = "x-ms-blob-content-md5";

private HttpHeaderConfigurations() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,32 @@ public final class HttpQueryParams {
public static final String QUERY_PARAM_BLOBTYPE = "blobtype";
public static final String QUERY_PARAM_PAGINATED = "paginated";

// query parameters for Blob Endpoint Rest APIs

/**
* Http Query parameter for specifying resource type.
* {@value}
*/
public static final String QUERY_PARAM_RESTYPE = "restype";

/**
* Http Query parameter for specifying component.
* {@value}
*/
public static final String QUERY_PARAM_COMP = "comp";

/**
* Http Query parameter for specifying blockId.
* {@value}
*/
public static final String QUERY_PARAM_BLOCKID = "blockid";

/**
* Http Query parameter for specifying block list type.
* {@value}
*/
public static final String QUERY_PARAM_BLOCKLISTTYPE = "blocklisttype";

//query params for SAS
public static final String QUERY_PARAM_SAOID = "saoid";
public static final String QUERY_PARAM_SKOID = "skoid";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,19 @@ public enum Mode {
private final String leaseId;
private boolean isExpectHeaderEnabled;
private boolean isRetryDueToExpect;
private final BlobAppendRequestParameters blobParams;


/**
* Constructor to be used for interacting with AbfsDfsClient.
* @param position position in remote blob at which append should happen
* @param offset position in the buffer to be appended
* @param length length of the data to be appended
* @param mode mode of the append operation
* @param isAppendBlob true if the blob is append-blob
* @param leaseId leaseId of the blob to be appended
* @param isExpectHeaderEnabled true if the expect header is enabled
*/
public AppendRequestParameters(final long position,
final int offset,
final int length,
Expand All @@ -52,6 +64,37 @@ public AppendRequestParameters(final long position,
this.leaseId = leaseId;
this.isExpectHeaderEnabled = isExpectHeaderEnabled;
this.isRetryDueToExpect = false;
this.blobParams = null;
}

/**
* Constructor to be used for interacting with AbfsBlobClient.
* @param position position in remote blob at which append should happen
* @param offset position in the buffer to be appended
* @param length length of the data to be appended
* @param mode mode of the append operation
* @param isAppendBlob true if the blob is append-blob
* @param leaseId leaseId of the blob to be appended
* @param isExpectHeaderEnabled true if the expect header is enabled
* @param blobParams parameters specific to append operation on Blob Endpoint.
*/
public AppendRequestParameters(final long position,
final int offset,
final int length,
final Mode mode,
final boolean isAppendBlob,
final String leaseId,
final boolean isExpectHeaderEnabled,
final BlobAppendRequestParameters blobParams) {
this.position = position;
this.offset = offset;
this.length = length;
this.mode = mode;
this.isAppendBlob = isAppendBlob;
this.leaseId = leaseId;
this.isExpectHeaderEnabled = isExpectHeaderEnabled;
this.isRetryDueToExpect = false;
this.blobParams = blobParams;
}

public long getPosition() {
Expand Down Expand Up @@ -86,6 +129,22 @@ public boolean isRetryDueToExpect() {
return isRetryDueToExpect;
}

/**
* Returns BlockId of the block blob to be appended.
* @return blockId
*/
public String getBlockId() {
return blobParams.getBlockId();
}

/**
* Returns ETag of the block blob.
* @return eTag
*/
public String getETag() {
return blobParams.getETag();
}

public void setRetryDueToExpect(boolean retryDueToExpect) {
isRetryDueToExpect = retryDueToExpect;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
public enum AzureServiceErrorCode {
FILE_SYSTEM_ALREADY_EXISTS("FilesystemAlreadyExists", HttpURLConnection.HTTP_CONFLICT, null),
PATH_ALREADY_EXISTS("PathAlreadyExists", HttpURLConnection.HTTP_CONFLICT, null),
BLOB_ALREADY_EXISTS("BlobAlreadyExists", HttpURLConnection.HTTP_CONFLICT, null),
INTERNAL_OPERATION_ABORT("InternalOperationAbortError", HttpURLConnection.HTTP_CONFLICT, null),
PATH_CONFLICT("PathConflict", HttpURLConnection.HTTP_CONFLICT, null),
FILE_SYSTEM_NOT_FOUND("FilesystemNotFound", HttpURLConnection.HTTP_NOT_FOUND, null),
PATH_NOT_FOUND("PathNotFound", HttpURLConnection.HTTP_NOT_FOUND, null),
BLOB_PATH_NOT_FOUND("BlobNotFound", HttpURLConnection.HTTP_NOT_FOUND, null),
PRE_CONDITION_FAILED("PreconditionFailed", HttpURLConnection.HTTP_PRECON_FAILED, null),
SOURCE_PATH_NOT_FOUND("SourcePathNotFound", HttpURLConnection.HTTP_NOT_FOUND, null),
INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE("InvalidSourceOrDestinationResourceType", HttpURLConnection.HTTP_CONFLICT, null),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.contracts.services;

/**
* Following parameters are used by AbfsBlobClient only.
* Blob Endpoint Append API requires blockId and eTag to be passed in the request.
*/
public class BlobAppendRequestParameters {
private String blockId;
private String eTag;

/**
* Constructor to be used for interacting with AbfsBlobClient.
* @param blockId blockId of the block to be appended
* @param eTag eTag of the blob being appended
*/
public BlobAppendRequestParameters(String blockId, String eTag) {
this.blockId = blockId;
this.eTag = eTag;
}

public String getBlockId() {
return blockId;
}

public String getETag() {
return eTag;
}
}
Loading

0 comments on commit 65a5bf3

Please sign in to comment.