forked from googleapis/google-cloud-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
retry.py
207 lines (158 loc) · 6.75 KB
/
retry.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# Copyright 2016 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from functools import wraps
import six
MAX_TRIES = 4
DELAY = 1
BACKOFF = 2
def _retry_all(_):
"""Retry all caught exceptions."""
return True
class BackoffFailed(Exception):
"""Retry w/ backoffs did not complete successfully."""
class RetryBase(object):
"""Base for retrying calling a decorated function w/ exponential backoff.
:type max_tries: int
:param max_tries: Number of times to try (not retry) before giving up.
:type delay: int
:param delay: Initial delay between retries in seconds.
:type backoff: int
:param backoff: Backoff multiplier e.g. value of 2 will double the
delay each retry.
:type logger: logging.Logger instance
:param logger: Logger to use. If None, print.
"""
def __init__(self, max_tries=MAX_TRIES, delay=DELAY, backoff=BACKOFF,
logger=None):
self.max_tries = max_tries
self.delay = delay
self.backoff = backoff
self.logger = logger.warning if logger else six.print_
class RetryErrors(RetryBase):
"""Decorator for retrying given exceptions in testing.
:type exception: Exception or tuple of Exceptions
:param exception: The exception to check or may be a tuple of
exceptions to check.
:type error_predicate: function, takes caught exception, returns bool
:param error_predicate: Predicate evaluating whether to retry after a
caught exception.
:type max_tries: int
:param max_tries: Number of times to try (not retry) before giving up.
:type delay: int
:param delay: Initial delay between retries in seconds.
:type backoff: int
:param backoff: Backoff multiplier e.g. value of 2 will double the
delay each retry.
:type logger: logging.Logger instance
:param logger: Logger to use. If None, print.
"""
def __init__(self, exception, error_predicate=_retry_all,
max_tries=MAX_TRIES, delay=DELAY, backoff=BACKOFF,
logger=None):
super(RetryErrors, self).__init__(max_tries, delay, backoff, logger)
self.exception = exception
self.error_predicate = error_predicate
def __call__(self, to_wrap):
@wraps(to_wrap)
def wrapped_function(*args, **kwargs):
tries = 0
while tries < self.max_tries:
try:
return to_wrap(*args, **kwargs)
except self.exception as caught_exception:
if not self.error_predicate(caught_exception):
raise
delay = self.delay * self.backoff**tries
msg = ("%s, Trying again in %d seconds..." %
(caught_exception, delay))
self.logger(msg)
time.sleep(delay)
tries += 1
return to_wrap(*args, **kwargs)
return wrapped_function
class RetryResult(RetryBase):
"""Decorator for retrying based on non-error result.
:type result_predicate: function, takes result, returns bool
:param result_predicate: Predicate evaluating whether to retry after a
result is returned.
:type max_tries: int
:param max_tries: Number of times to try (not retry) before giving up.
:type delay: int
:param delay: Initial delay between retries in seconds.
:type backoff: int
:param backoff: Backoff multiplier e.g. value of 2 will double the
delay each retry.
:type logger: logging.Logger instance
:param logger: Logger to use. If None, print.
"""
def __init__(self, result_predicate,
max_tries=MAX_TRIES, delay=DELAY, backoff=BACKOFF,
logger=None):
super(RetryResult, self).__init__(max_tries, delay, backoff, logger)
self.result_predicate = result_predicate
def __call__(self, to_wrap):
@wraps(to_wrap)
def wrapped_function(*args, **kwargs):
tries = 0
while tries < self.max_tries:
result = to_wrap(*args, **kwargs)
if self.result_predicate(result):
return result
delay = self.delay * self.backoff**tries
msg = "%s. Trying again in %d seconds..." % (
self.result_predicate.__name__, delay,)
self.logger(msg)
time.sleep(delay)
tries += 1
raise BackoffFailed()
return wrapped_function
class RetryInstanceState(RetryBase):
"""Decorator for retrying based on instance state.
:type instance_predicate: function, takes instance, returns bool
:param instance_predicate: Predicate evaluating whether to retry after an
API-invoking method is called.
:type max_tries: int
:param max_tries: Number of times to try (not retry) before giving up.
:type delay: int
:param delay: Initial delay between retries in seconds.
:type backoff: int
:param backoff: Backoff multiplier e.g. value of 2 will double the
delay each retry.
:type logger: logging.Logger instance
:param logger: Logger to use. If None, print.
"""
def __init__(self, instance_predicate,
max_tries=MAX_TRIES, delay=DELAY, backoff=BACKOFF,
logger=None):
super(RetryInstanceState, self).__init__(
max_tries, delay, backoff, logger)
self.instance_predicate = instance_predicate
def __call__(self, to_wrap):
instance = to_wrap.__self__ # only instance methods allowed
@wraps(to_wrap)
def wrapped_function(*args, **kwargs):
tries = 0
while tries < self.max_tries:
result = to_wrap(*args, **kwargs)
if self.instance_predicate(instance):
return result
delay = self.delay * self.backoff**tries
msg = "%s. Trying again in %d seconds..." % (
self.instance_predicate.__name__, delay,)
self.logger(msg)
time.sleep(delay)
tries += 1
raise BackoffFailed()
return wrapped_function