Skip to content

Commit

Permalink
Merge pull request #185 from tomwalder/feature/retry-backoff
Browse files Browse the repository at this point in the history
Support for exponential back-off.
  • Loading branch information
tomwalder authored Dec 1, 2023
2 parents 37349a6 + a4e528c commit b865372
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 21 deletions.
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ This library is intended to make it easier for you to get started with and to us

## Quick Start ##
```bash
composer require "tomwalder/php-gds:^5.1"
composer require "tomwalder/php-gds:^6.1"
```
```php
// Build a new entity
Expand All @@ -28,6 +28,18 @@ foreach($obj_store->fetchAll() as $obj_book) {
}
```

## New in Version 6.1 ##

Support for automated exponential backoff for some types of errors. See documentation here:
https://cloud.google.com/datastore/docs/concepts/errors

To enable:
```php
\GDS\Gateway::exponentialBackoff(true);
```

Version `6.0.0` introduced better (but different) support for `NULL` values.

## New in Version 5.0 ##

**As of version 5 (May 2021), this library provides support for**
Expand Down Expand Up @@ -523,6 +535,10 @@ A full suite of unit tests is in the works. Assuming you've installed `php-gds`
```bash
vendor/bin/phpunit
```
Or, if you need to run containerised tests, you can use the `runphp` image (or any you choose)
```bash
docker run --rm -it -v`pwd`:/app -w /app fluentthinking/runphp:7.4.33-v0.9.0 php /app/vendor/bin/phpunit
```

[Click here for more details](tests/).

Expand Down
75 changes: 75 additions & 0 deletions src/GDS/Gateway.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
*/
abstract class Gateway
{
// 8 = about 5 seconds total, with last gap ~2.5 seconds
const RETRY_MAX_ATTEMPTS = 8;

/**
* The dataset ID
Expand Down Expand Up @@ -72,6 +74,19 @@ abstract class Gateway
*/
protected $arr_kind_mappers = [];

protected static $bol_retry = false;

/**
* Configure gateway retries (for 503, 500 responses)
*
* @param bool $bol_retry
* @return void
*/
public static function exponentialBackoff(bool $bol_retry = true)
{
self::$bol_retry = $bol_retry;
}

/**
* Set the Schema to be used next (once?)
*
Expand Down Expand Up @@ -335,6 +350,66 @@ protected function configureValueParamForQuery($obj_val, $mix_value)
return $obj_val;
}

/**
* Delay execution, based on the attempt number
*
* @param int $int_attempt
* @return void
*/
protected function backoff(int $int_attempt)
{
$int_backoff = (int) pow(2, $int_attempt);
$int_jitter = rand(0, 10) * 1000;
$int_delay = ($int_backoff * 10000) + $int_jitter;
usleep($int_delay);
}

/**
* Execute the callback with exponential backoff
*
* @param callable $fnc_main
* @param string|null $str_exception
* @param callable|null $fnc_resolve_exception
* @return mixed
* @throws \Throwable
*/
protected function executeWithExponentialBackoff(
callable $fnc_main,
string $str_exception = null,
callable $fnc_resolve_exception = null
) {
$int_attempt = 0;
$bol_retry_once = false;
do {
try {
$int_attempt++;
if ($int_attempt > 1) {
$this->backoff($int_attempt);
}
return $fnc_main();
} catch (\Throwable $obj_thrown) {
// Rethrow if we're not interested in this Exception type
if (null !== $str_exception && !$obj_thrown instanceof $str_exception) {
throw $obj_thrown;
}
// Rethrow if retry is disabled, non-retryable errors, or if we have hit a retry limit
if (false === self::$bol_retry ||
true === $bol_retry_once ||
!in_array((int) $obj_thrown->getCode(), static::RETRY_ERROR_CODES)
) {
throw null === $fnc_resolve_exception ? $obj_thrown : $fnc_resolve_exception($obj_thrown);
}
// Just one retry for some errors
if (in_array((int) $obj_thrown->getCode(), static::RETRY_ONCE_CODES)) {
$bol_retry_once = true;
}
}
} while ($int_attempt < self::RETRY_MAX_ATTEMPTS);

// We could not make this work after max retries
throw null === $fnc_resolve_exception ? $obj_thrown : $fnc_resolve_exception($obj_thrown);
}

/**
* Configure a Value parameter, based on the supplied object-type value
*
Expand Down
62 changes: 46 additions & 16 deletions src/GDS/Gateway/GRPCv1.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
use Google\Cloud\Datastore\V1\GqlQuery;
use Google\Cloud\Datastore\V1\GqlQueryParameter;
use Google\Cloud\Datastore\V1\Value;
use Google\Rpc\Code;

/**
* gRPC Datastore Gateway (v1)
Expand All @@ -46,6 +47,20 @@
*/
class GRPCv1 extends \GDS\Gateway
{
// https://cloud.google.com/datastore/docs/concepts/errors
// https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto
const RETRY_ERROR_CODES = [
Code::UNKNOWN,
Code::ABORTED,
Code::DEADLINE_EXCEEDED,
Code::RESOURCE_EXHAUSTED,
Code::UNAVAILABLE,
Code::INTERNAL,
];

const RETRY_ONCE_CODES = [
Code::INTERNAL,
];

/**
* Cloud Datastore (gRPC & REST) Client
Expand Down Expand Up @@ -98,29 +113,44 @@ private function createPartitionId()
/**
* Execute a method against the Datastore client.
*
* Prepend projectId as first parameter automatically.
*
* @param string $str_method
* @param mixed[] $args
* @return mixed
* @throws \Exception
*/
private function execute($str_method, $args)
private function execute(string $str_method, array $args) {
array_unshift($args, $this->str_dataset_id);
return $this->executeWithExponentialBackoff(
function () use ($str_method, $args) {
$this->obj_last_response = call_user_func_array([self::$obj_datastore_client, $str_method], $args);
return $this->obj_last_response;
},
ApiException::class,
[$this, 'resolveExecuteException']
);
}

/**
* Wrap the somewhat murky ApiException into something more useful
*
* https://cloud.google.com/datastore/docs/concepts/errors
*
* @param ApiException $obj_exception
* @return \Exception
*/
protected function resolveExecuteException(ApiException $obj_exception): \Exception
{
try {
// Call gRPC client,
// prepend projectId as first parameter automatically.
array_unshift($args, $this->str_dataset_id);
$this->obj_last_response = call_user_func_array([self::$obj_datastore_client, $str_method], $args);
} catch (ApiException $obj_exception) {
$this->obj_last_response = null;
if (FALSE !== strpos($obj_exception->getMessage(), 'too much contention') || FALSE !== strpos($obj_exception->getMessage(), 'Concurrency')) {
// LIVE: "too much contention on these datastore entities. please try again." LOCAL : "Concurrency exception."
throw new Contention('Datastore contention', 409, $obj_exception);
} else {
throw $obj_exception;
}
$this->obj_last_response = null;
if (Code::ABORTED === $obj_exception->getCode() ||
false !== strpos($obj_exception->getMessage(), 'too much contention') ||
false !== strpos($obj_exception->getMessage(), 'Concurrency')) {
// LIVE: "too much contention on these datastore entities. please try again."
// LOCAL : "Concurrency exception."
return new Contention('Datastore contention', 409, $obj_exception);
}

return $this->obj_last_response;
return $obj_exception;
}

/**
Expand Down
19 changes: 16 additions & 3 deletions src/GDS/Gateway/RESTv1.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use GuzzleHttp\Client;
use GuzzleHttp\ClientInterface;
use GuzzleHttp\HandlerStack;
use Psr\Http\Message\ResponseInterface;

/**
* Gateway, implementing the Datastore API v1 over REST
Expand All @@ -23,6 +24,12 @@ class RESTv1 extends \GDS\Gateway
const MODE_NON_TRANSACTIONAL = 'NON_TRANSACTIONAL';
const MODE_UNSPECIFIED = 'UNSPECIFIED';

// https://cloud.google.com/datastore/docs/concepts/errors
// https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto
const RETRY_ERROR_CODES = [409, 429, 500, 503, 504];

const RETRY_ONCE_CODES = [500];

/**
* Client config keys.
*/
Expand Down Expand Up @@ -165,8 +172,14 @@ private function executePostRequest($str_action, $obj_request_body = null)
if(null !== $obj_request_body) {
$arr_options['json'] = $obj_request_body;
}
$obj_response = $this->httpClient()->post($this->actionUrl($str_action), $arr_options);
$this->obj_last_response = json_decode((string)$obj_response->getBody());
$str_url = $this->actionUrl($str_action);
$obj_response = $this->executeWithExponentialBackoff(
function () use ($str_url, $arr_options) {
return $this->httpClient()->post($str_url, $arr_options);
},
\GuzzleHttp\Exception\RequestException::class
);
$this->obj_last_response = \json_decode((string)$obj_response->getBody());
}

/**
Expand Down Expand Up @@ -492,7 +505,7 @@ public function beginTransaction($bol_cross_group = FALSE)
* @return string
*/
protected function getBaseUrl() {
$str_base_url = $this->obj_http_client->getConfig(self::CONFIG_CLIENT_BASE_URL);
$str_base_url = $this->httpClient()->getConfig(self::CONFIG_CLIENT_BASE_URL);
if (!empty($str_base_url)) {
return $str_base_url;
}
Expand Down
Loading

0 comments on commit b865372

Please sign in to comment.