Skip to content

Commit

Permalink
Create & Delete directory support akka#3253
Browse files Browse the repository at this point in the history
1. Request builder for create & delete directories
2. Implementation of create & delete directories and Scala and Java API
3. Tests for create & delete directories
4. Documentation
  • Loading branch information
sfali committed Sep 11, 2024
1 parent 689cd53 commit d6d9ed0
Show file tree
Hide file tree
Showing 11 changed files with 391 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import akka.stream.alpakka.azure.storage.impl.auth.Signer
import akka.stream.alpakka.azure.storage.requests.{
ClearFileRange,
CreateContainer,
CreateDirectory,
CreateFile,
DeleteContainer,
DeleteDirectory,
GetProperties,
RequestBuilder,
UpdateFileRange
Expand Down Expand Up @@ -140,6 +142,20 @@ object AzureStorageStream {
objectPath = objectPath,
requestBuilder = requestBuilder)

private[storage] def createDirectory(directoryPath: String,
requestBuilder: CreateDirectory): Source[Option[ObjectMetadata], NotUsed] =
handleRequest(successCode = Created,
storageType = FileType,
objectPath = directoryPath,
requestBuilder = requestBuilder)

private[storage] def deleteDirectory(directoryPath: String,
requestBuilder: DeleteDirectory): Source[Option[ObjectMetadata], NotUsed] =
handleRequest(successCode = Accepted,
storageType = FileType,
objectPath = directoryPath,
requestBuilder = requestBuilder)

/**
* Common function to handle all requests where we don't expect response body.
*
Expand Down Expand Up @@ -168,7 +184,13 @@ object AzureStorageStream {
case HttpResponse(sc, h, entity, _) if sc == successCode =>
Source.future(entity.withoutSizeLimit().discardBytes().future().map(_ => Some(computeMetaData(h, entity))))
case HttpResponse(NotFound, _, entity, _) =>
Source.future(entity.withoutSizeLimit().discardBytes().future().map(_ => None)(ExecutionContexts.parasitic))
Source.future(
entity
.withoutSizeLimit()
.discardBytes()
.future()
.map(_ => None)(ExecutionContexts.parasitic)
)
case response: HttpResponse => Source.future(unmarshalError(response.status, response.entity))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import akka.http.scaladsl.model.HttpEntity
import akka.stream.alpakka.azure.storage.impl.AzureStorageStream
import akka.stream.alpakka.azure.storage.requests.{
ClearFileRange,
CreateDirectory,
CreateFile,
DeleteDirectory,
DeleteFile,
GetFile,
GetProperties,
Expand Down Expand Up @@ -125,4 +127,29 @@ object FileService {
.clearRange(objectPath, requestBuilder)
.map(opt => Optional.ofNullable(opt.orNull))
.asJava

/**
* Create directory.
*
* @param directoryPath path of the directory to be created, e.g., `myshare/myparentdirectorypath/mydirectory`
* @param requestBuilder builder to build createDirectory request
* @return A [[akka.stream.scaladsl.Source Source]] containing an [[scala.Option]] of
* [[akka.stream.alpakka.azure.storage.ObjectMetadata]], will be [[scala.None]] in case the object does not exist
*/
def createDirectory(directoryPath: String,
requestBuilder: CreateDirectory): Source[Optional[ObjectMetadata], NotUsed] =
AzureStorageStream.createDirectory(directoryPath, requestBuilder).map(opt => Optional.ofNullable(opt.orNull)).asJava

/**
* Delete directory.
*
* @param directoryPath path of the directory to be deleted, e.g., `myshare/myparentdirectorypath/mydirectory`
* @param requestBuilder builder to build deleteDirectory request
* @return A [[akka.stream.scaladsl.Source Source]] containing an [[scala.Option]] of
* [[akka.stream.alpakka.azure.storage.ObjectMetadata]], will be [[scala.None]] in case the object does not exist
*/
def deleteDirectory(directoryPath: String,
requestBuilder: DeleteDirectory): Source[Optional[ObjectMetadata], NotUsed] =
AzureStorageStream.deleteDirectory(directoryPath, requestBuilder).map(opt => Optional.ofNullable(opt.orNull)).asJava

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka
package azure
package storage
package requests

import akka.http.scaladsl.model.{HttpHeader, HttpMethod, HttpMethods}
import akka.stream.alpakka.azure.storage.headers.ServerSideEncryption
import akka.stream.alpakka.azure.storage.impl.StorageHeaders

final class CreateDirectory(override val sse: Option[ServerSideEncryption] = None,
override val additionalHeaders: Seq[HttpHeader] = Seq.empty)
extends RequestBuilder {

override protected val method: HttpMethod = HttpMethods.PUT

override protected val queryParams: Map[String, String] = super.queryParams ++ Map("restype" -> "directory")

override def withServerSideEncryption(sse: ServerSideEncryption): CreateDirectory = copy(sse = Option(sse))

override def addHeader(httpHeader: HttpHeader): CreateDirectory =
copy(additionalHeaders = additionalHeaders :+ httpHeader)

private def copy(sse: Option[ServerSideEncryption] = sse, additionalHeaders: Seq[HttpHeader] = additionalHeaders) =
new CreateDirectory(sse = sse, additionalHeaders = additionalHeaders)

override protected def getHeaders: Seq[HttpHeader] =
StorageHeaders()
.witServerSideEncryption(sse)
.withAdditionalHeaders(additionalHeaders)
.headers
}

object CreateDirectory {

/*
* Scala API
*/
def apply(): CreateDirectory = new CreateDirectory()

/*
* Java API
*/
def create(): CreateDirectory = CreateDirectory()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka
package azure
package storage
package requests

import akka.http.scaladsl.model.{HttpHeader, HttpMethod, HttpMethods}
import akka.stream.alpakka.azure.storage.headers.ServerSideEncryption
import akka.stream.alpakka.azure.storage.impl.StorageHeaders

final class DeleteDirectory(override val sse: Option[ServerSideEncryption] = None,
override val additionalHeaders: Seq[HttpHeader] = Seq.empty)
extends RequestBuilder {

override protected val method: HttpMethod = HttpMethods.DELETE

override protected val queryParams: Map[String, String] = super.queryParams ++ Map("restype" -> "directory")

override def withServerSideEncryption(sse: ServerSideEncryption): DeleteDirectory = copy(sse = Option(sse))

override def addHeader(httpHeader: HttpHeader): DeleteDirectory =
copy(additionalHeaders = additionalHeaders :+ httpHeader)

private def copy(sse: Option[ServerSideEncryption] = sse, additionalHeaders: Seq[HttpHeader] = additionalHeaders) =
new DeleteDirectory(sse = sse, additionalHeaders = additionalHeaders)

override protected def getHeaders: Seq[HttpHeader] =
StorageHeaders()
.witServerSideEncryption(sse)
.withAdditionalHeaders(additionalHeaders)
.headers
}

object DeleteDirectory {

/*
* Scala API
*/
def apply(): DeleteDirectory = new DeleteDirectory()

/*
* Java API
*/
def create(): DeleteDirectory = DeleteDirectory()
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import akka.http.scaladsl.model.HttpEntity
import akka.stream.alpakka.azure.storage.impl.AzureStorageStream
import akka.stream.alpakka.azure.storage.requests.{
ClearFileRange,
CreateDirectory,
CreateFile,
DeleteDirectory,
DeleteFile,
GetFile,
GetProperties,
Expand Down Expand Up @@ -101,4 +103,26 @@ object FileService {
*/
def clearRange(objectPath: String, requestBuilder: ClearFileRange): Source[Option[ObjectMetadata], NotUsed] =
AzureStorageStream.clearRange(objectPath, requestBuilder)

/**
* Create directory.
*
* @param directoryPath path of the directory to be created, e.g., `myshare/myparentdirectorypath/mydirectory`
* @param requestBuilder builder to build createDirectory request
* @return A [[akka.stream.scaladsl.Source Source]] containing an [[scala.Option]] of
* [[akka.stream.alpakka.azure.storage.ObjectMetadata]], will be [[scala.None]] in case the object does not exist
*/
def createDirectory(directoryPath: String, requestBuilder: CreateDirectory): Source[Option[ObjectMetadata], NotUsed] =
AzureStorageStream.createDirectory(directoryPath, requestBuilder)

/**
* Delete directory.
*
* @param directoryPath path of the directory to be deleted, e.g., `myshare/myparentdirectorypath/mydirectory`
* @param requestBuilder builder to build deleteDirectory request
* @return A [[akka.stream.scaladsl.Source Source]] containing an [[scala.Option]] of
* [[akka.stream.alpakka.azure.storage.ObjectMetadata]], will be [[scala.None]] in case the object does not exist
*/
def deleteDirectory(directoryPath: String, requestBuilder: DeleteDirectory): Source[Option[ObjectMetadata], NotUsed] =
AzureStorageStream.deleteDirectory(directoryPath, requestBuilder)
}
37 changes: 35 additions & 2 deletions azure-storage/src/test/java/docs/javadsl/StorageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import akka.stream.alpakka.azure.storage.javadsl.FileService;
import akka.stream.alpakka.azure.storage.requests.ClearFileRange;
import akka.stream.alpakka.azure.storage.requests.CreateContainer;
import akka.stream.alpakka.azure.storage.requests.CreateDirectory;
import akka.stream.alpakka.azure.storage.requests.CreateFile;
import akka.stream.alpakka.azure.storage.requests.DeleteContainer;
import akka.stream.alpakka.azure.storage.requests.DeleteDirectory;
import akka.stream.alpakka.azure.storage.requests.DeleteFile;
import akka.stream.alpakka.azure.storage.requests.GetBlob;
import akka.stream.alpakka.azure.storage.requests.GetFile;
Expand Down Expand Up @@ -65,7 +67,6 @@ public static void afterAll() {
.thenRun(() -> TestKit.shutdownActorSystem(system));
}


@Test
public void createContainer() throws Exception {
mockCreateContainer();
Expand Down Expand Up @@ -99,7 +100,6 @@ public void deleteContainer() throws Exception {
Assert.assertEquals(objectMetadata.getContentLength(), 0L);
}


// TODO: There are couple of issues, firstly there are two `Content-Length` headers being added, one by `putBlob`
// function and secondly by, most likely, by WireMock. Need to to figure out how to tell WireMock not to add `Content-Length`
// header, secondly once that resolve then we get `akka.http.scaladsl.model.EntityStreamException`.
Expand Down Expand Up @@ -224,6 +224,39 @@ public void deleteBlob() throws Exception {
Assert.assertEquals(0L, objectMetadata.getContentLength());
}

@Test
public void createDirectory() throws Exception {
mockCreateDirectory();

//#create-directory
final Source<Optional<ObjectMetadata>, NotUsed> source = FileService.createDirectory(containerName(), CreateDirectory.create());

final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);
//#create-directory

final var optionalObjectMetadata = optionalCompletionStage.toCompletableFuture().get();
Assert.assertTrue(optionalObjectMetadata.isPresent());
final var objectMetadata = optionalObjectMetadata.get();
Assert.assertEquals(objectMetadata.getContentLength(), 0L);
Assert.assertEquals(objectMetadata.getETag().get(), ETagRawValue());
}

@Test
public void deleteDirectory() throws Exception {
mockDeleteDirectory();

//#delete-directory
final Source<Optional<ObjectMetadata>, NotUsed> source = FileService.deleteDirectory(containerName(), DeleteDirectory.create());

final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);
//#delete-directory

final var optionalObjectMetadata = optionalCompletionStage.toCompletableFuture().get();
Assert.assertTrue(optionalObjectMetadata.isPresent());
final var objectMetadata = optionalObjectMetadata.get();
Assert.assertEquals(objectMetadata.getContentLength(), 0L);
}

@Test
public void createFile() throws Exception {
mockCreateFile();
Expand Down
Loading

0 comments on commit d6d9ed0

Please sign in to comment.