diff --git a/src/S3/Transfer.php b/src/S3/Transfer.php index 600f441008..a1937ecbcb 100644 --- a/src/S3/Transfer.php +++ b/src/S3/Transfer.php @@ -26,6 +26,7 @@ class Transfer implements PromisorInterface private $concurrency; private $mupThreshold; private $before; + private $after; private $s3Args = []; private $addContentMD5; @@ -52,6 +53,11 @@ class Transfer implements PromisorInterface * callback accepts a single argument: Aws\CommandInterface $command. * The provided command will be either a GetObject, PutObject, * InitiateMultipartUpload, or UploadPart command. + * - after: (callable) A callback to invoke after each transfer promise is fulfilled. + * The function is invoked with three arguments: the fulfillment value, the index + * position from the iterable list of the promise, and the aggregate + * promise that manages all the promises. The aggregate promise may + * be resolved from within the callback to short-circuit the promise. * - mup_threshold: (int) Size in bytes in which a multipart upload should * be used instead of PutObject. Defaults to 20971520 (20 MB). * - concurrency: (int, default=5) Number of files to upload concurrently. @@ -126,6 +132,14 @@ public function __construct( } } + // Handle "after" callback option. + if (isset($options['after'])) { + $this->after = $options['after']; + if (!is_callable($this->after)) { + throw new \InvalidArgumentException('after must be a callable.'); + } + } + // Handle "debug" option. if (isset($options['debug'])) { if ($options['debug'] === true) { @@ -285,6 +299,7 @@ private function createDownloadPromise() return (new Aws\CommandPool($this->client, $commands, [ 'concurrency' => $this->concurrency, 'before' => $this->before, + 'fulfill' => $this->after, 'rejected' => function ($reason, $idx, Promise\PromiseInterface $p) { $p->reject($reason); } @@ -302,7 +317,7 @@ private function createUploadPromise() // Create an EachPromise, that will concurrently handle the upload // operations' yielded promises from the iterator. - return Promise\Each::ofLimitAll($files, $this->concurrency); + return Promise\Each::ofLimitAll($files, $this->concurrency, $this->after); } /** @return Iterator */ diff --git a/tests/Build/Changelog/resources/.changes/nextrelease/feat-transfer-afterCallback.json b/tests/Build/Changelog/resources/.changes/nextrelease/feat-transfer-afterCallback.json new file mode 100644 index 0000000000..d409df013a --- /dev/null +++ b/tests/Build/Changelog/resources/.changes/nextrelease/feat-transfer-afterCallback.json @@ -0,0 +1,7 @@ +[ + { + "type": "enhancement", + "category": "S3", + "description": "Added possibility to execute a callback function after a transfer is fulfilled when using the Transfer class. Implemented similarly to the way the 'before' callback works." + } +] \ No newline at end of file diff --git a/tests/S3/TransferTest.php b/tests/S3/TransferTest.php index 170d20c4d2..e8079b50f8 100644 --- a/tests/S3/TransferTest.php +++ b/tests/S3/TransferTest.php @@ -112,6 +112,69 @@ public function testCanSetBeforeOptionForUploadsAndUsedWithDebug() } } + public function testEnsuresAfterIsCallable() + { + $this->expectExceptionMessage("after must be a callable"); + $this->expectException(\InvalidArgumentException::class); + $s3 = $this->getTestClient('s3'); + new Transfer($s3, __DIR__, 's3://foo/bar', ['after' => 'cheese']); + } + + public function testCanSetAfterOptionForUploads() + { + $s3 = $this->getTestClient('s3'); + $s3->getHandlerList()->appendInit( + $this->mockResult(function() { + return new Result(['ObjectURL' => 'file_url']); + }), + 's3.test' + ); + + $path = __DIR__ . '/Crypto'; + $filesCount = iterator_count(\Aws\recursive_dir_iterator($path)); + + $results = []; + $indices = []; + $aggregatePromises = []; + + $i = \Aws\recursive_dir_iterator($path); + $t = new Transfer($s3, $i, 's3://foo/bar', [ + 'after' => function ($result, $index, $aggregatePromise) use (&$results, &$indices, &$aggregatePromises) { + $results[] = $result; + $indices[] = $index; + $aggregatePromises[] = $aggregatePromise; + }, + 'debug' => true, + 'base_dir' => __DIR__, + ]); + + ob_start(); + $p = $t->promise(); + $p2 = $t->promise(); + $this->assertSame($p, $p2); + $p->wait(); + ob_get_clean(); + $this->assertNotEmpty($results); + $this->assertNotEmpty($indices); + $this->assertNotEmpty($aggregatePromises); + + $this->assertCount($filesCount, $results); + $this->assertCount($filesCount, $indices); + $this->assertCount($filesCount, $aggregatePromises); + + /** @var Result $result */ + foreach ($results as $result) { + $this->assertIsIterable($result); + $this->assertArrayHasKey("ObjectURL", iterator_to_array($result)); + $this->assertSame("file_url", $result["ObjectURL"]); + } + $this->assertSame(range(0, $filesCount-1), $indices); + /** @var Promise\Promise $aggregatePromise */ + foreach ($aggregatePromises as $aggregatePromise) { + $this->assertSame('fulfilled', $aggregatePromise->getState()); + } + } + public function testDoesMultipartForLargeFiles() { $s3 = $this->getTestClient('s3');