Skip to content

Commit

Permalink
cloudvision/cvlib: Add timeouts to rest/api calls made in helpers
Browse files Browse the repository at this point in the history
Add in a general 5 min timeout to all helper methods to prevent
calls getting stuck in clusters with ongoing infra issues

Change-Id: I75aadc5f548eeff4244c7aaaf0b0c5a8cc8635a5
  • Loading branch information
cianmcgrath committed Nov 27, 2024
1 parent 0975315 commit cb2e2c4
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 177 deletions.
3 changes: 3 additions & 0 deletions cloudvision/cvlib/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@
STUDIO_ID_ARG = "StudioID"
STUDIO_IDS_ARG = "StudioIDs"
WORKSPACE_ID_ARG = "WorkspaceID"

# General timeout for all requests
TIMEOUT_REQUEST = 60 # 1 min in seconds
10 changes: 8 additions & 2 deletions cloudvision/cvlib/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@
from .action import Action
from .changecontrol import ChangeControl
from .connections import AuthAndEndpoints, addHeaderInterceptor
from .constants import BUILD_ID_ARG, STUDIO_ID_ARG, STUDIO_IDS_ARG, WORKSPACE_ID_ARG
from .constants import (
BUILD_ID_ARG,
STUDIO_ID_ARG,
STUDIO_IDS_ARG,
WORKSPACE_ID_ARG,
TIMEOUT_REQUEST,
)
from .device import Device, Interface
from .exceptions import (
ConnectionFailed,
Expand Down Expand Up @@ -300,7 +306,7 @@ def runDeviceCmds(self, commandsList: List[str], device: Optional[Device] = None
runCmdURL = f"https://{self.connections.serviceAddr}/{self.connections.commandEndpoint}"
self.debug(f"Executing the following command(s) on device {device.id}: {commandsList}")
response = requests.post(runCmdURL, data=data, headers=HEADERS,
cookies=cookies, verify=False)
cookies=cookies, timeout=TIMEOUT_REQUEST, verify=False)
except requests.ConnectionError as e:
self.error(f"Got exception while establishing connection to DI : {e}")
raise
Expand Down
22 changes: 14 additions & 8 deletions cloudvision/cvlib/studio.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
from cloudvision.Connector.grpc_client import create_notification
from fmp import wrappers_pb2 as fmp_wrappers

from .constants import INPUT_PATH_ARG, MAINLINE_WS_ID, STUDIO_ID_ARG, WORKSPACE_ID_ARG
from .constants import (
INPUT_PATH_ARG,
MAINLINE_WS_ID,
STUDIO_ID_ARG,
WORKSPACE_ID_ARG,
TIMEOUT_REQUEST,
)
from .exceptions import (
InputException,
InputNotFoundException,
Expand Down Expand Up @@ -272,7 +278,7 @@ def __getStudioInputs(clientGetter, studioId: str, workspaceId: str, start=None,

inputs = None
# We need to issue the get requests as part of a GetAll to allow for truncated inputs
for res in client.GetAll(req):
for res in client.GetAll(req, timeout=TIMEOUT_REQUEST):
inpResp = res.value
if not inpResp.inputs:
continue
Expand All @@ -292,7 +298,7 @@ def __getStudioInputConfig(clientGetter, studioId: str, workspaceId: str, path:
req = services.InputsConfigRequest(key=key)

try:
configResp = client.GetOne(req)
configResp = client.GetOne(req, timeout=TIMEOUT_REQUEST)
except RpcError as confExc:
# If the config does not exist for the workspace, return the mainline state
if confExc.code() == StatusCode.NOT_FOUND:
Expand Down Expand Up @@ -330,7 +336,7 @@ def setStudioInput(clientGetter, studioId: str, workspaceId: str, inputPath: Lis
value=models.InputsConfig(key=key, inputs=pb.StringValue(value=serialized))
)
try:
client.Set(request=req)
client.Set(request=req, timeout=TIMEOUT_REQUEST)
except RpcError as exc:
raise InputUpdateException(inputPath, f"Value {value} was not set: {exc}") from None

Expand Down Expand Up @@ -385,7 +391,7 @@ def setStudioInputs(clientGetter, studioId: str, workspaceId: str,
values=inputsConfigs
)
try:
for res in client.SetSome(request=req):
for res in client.SetSome(request=req, timeout=TIMEOUT_REQUEST):
pass
except RpcError as exc:
raise InputUpdateException(err=f"Inputs {inputs} was not set: {exc}") from None
Expand Down Expand Up @@ -533,7 +539,7 @@ def GetOneWithWS(apiClientGetter, stateStub, stateGetReq, configStub, confGetReq
stateClient = apiClientGetter(stateStub)
# Issue a get to the state endpoint for the workspace
try:
result = stateClient.GetOne(stateGetReq)
result = stateClient.GetOne(stateGetReq, timeout=TIMEOUT_REQUEST)
except RpcError as exc:
# If the state does not exist for the workspace, reraise the original
# exception as something went wrong
Expand All @@ -552,7 +558,7 @@ def GetOneWithWS(apiClientGetter, stateStub, stateGetReq, configStub, confGetReq
stateGetReq.key.workspace_id.value = MAINLINE_WS_ID
stateGetReq.time = wsTs
try:
result = stateClient.GetOne(stateGetReq)
result = stateClient.GetOne(stateGetReq, timeout=TIMEOUT_REQUEST)
except RpcError as mainlineExc:
# Handle the mainline error as its own exception, such that stack
# traces don't contain nested exceptions such as "when handling the
Expand All @@ -563,7 +569,7 @@ def GetOneWithWS(apiClientGetter, stateStub, stateGetReq, configStub, confGetReq
# the mainline value has not been deleted
configClient = apiClientGetter(configStub)
try:
configResp = configClient.GetOne(confGetReq)
configResp = configClient.GetOne(confGetReq, timeout=TIMEOUT_REQUEST)
except RpcError as confExc:
# If the config does not exist for the workspace, return the mainline state
if confExc.code() == StatusCode.NOT_FOUND:
Expand Down
15 changes: 8 additions & 7 deletions cloudvision/cvlib/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
CREATOR_TYPE_USER,
)
from arista.time.time_pb2 import TimeBounds
from .constants import TIMEOUT_REQUEST
from .exceptions import (
TagOperationException
)
Expand Down Expand Up @@ -103,7 +104,7 @@ def _getTagUpdatesFromWorkspace(self, etype=ELEMENT_TYPE_DEVICE):
tagFilter.key.element_type = etype
tagFilter.key.workspace_id.value = self.ctx.getWorkspaceId()
tagRequest.partial_eq_filter.append(tagFilter)
for resp in tagClient.GetAll(tagRequest):
for resp in tagClient.GetAll(tagRequest, timeout=TIMEOUT_REQUEST):
workspaceTagUpdates.append((resp.value.key.device_id.value,
resp.value.key.interface_id.value,
resp.value.key.label.value,
Expand Down Expand Up @@ -134,7 +135,7 @@ def _createTag(self, etype: int, label: str, value: str):
setRequest.value.key.label.value = label
setRequest.value.key.value.value = value
tagClient = self.ctx.getApiClient(TagConfigServiceStub)
tagClient.Set(setRequest)
tagClient.Set(setRequest, timeout=TIMEOUT_REQUEST)

def _createTags(self, etype: int, tags: List[Tuple], configsPerReq=1000):
'''
Expand Down Expand Up @@ -173,7 +174,7 @@ def _createTags(self, etype: int, tags: List[Tuple], configsPerReq=1000):
)
tagClient = self.ctx.getApiClient(TagConfigServiceStub)
try:
for res in tagClient.SetSome(setSomeRequest):
for res in tagClient.SetSome(setSomeRequest, timeout=TIMEOUT_REQUEST):
pass
except RpcError:
raise TagOperationException('', '', 'create')
Expand All @@ -199,7 +200,7 @@ def _assignTagSet(self, etype: int, tagAssign: Tuple, remove: bool = False):
setRequest.value.key.interface_id.value = interfaceId
setRequest.value.remove.value = remove
tagClient = self.ctx.getApiClient(TagAssignmentConfigServiceStub)
tagClient.Set(setRequest)
tagClient.Set(setRequest, timeout=TIMEOUT_REQUEST)

def _assignTagsSets(self, etype: int, tagAssigns: List[Tuple],
remove: bool = False, configsPerReq: int = 1000):
Expand Down Expand Up @@ -232,7 +233,7 @@ def _assignTagsSets(self, etype: int, tagAssigns: List[Tuple],
)
tagClient = self.ctx.getApiClient(TagAssignmentConfigServiceStub)
try:
for res in tagClient.SetSome(setSomeRequest):
for res in tagClient.SetSome(setSomeRequest, timeout=TIMEOUT_REQUEST):
pass
except RpcError:
raise TagOperationException('', '', 'assign')
Expand Down Expand Up @@ -299,7 +300,7 @@ def _getAllDeviceTagsFromMainline(self):
tagFilter.key.element_type = ELEMENT_TYPE_DEVICE
tagFilter.key.workspace_id.value = MAINLINE_ID
tagRequest.partial_eq_filter.append(tagFilter)
for resp in tagClient.GetAll(tagRequest):
for resp in tagClient.GetAll(tagRequest, timeout=TIMEOUT_REQUEST):
label = resp.value.key.label.value
value = resp.value.key.value.value
deviceId = resp.value.key.device_id.value
Expand Down Expand Up @@ -546,7 +547,7 @@ def _getAllInterfaceTagsFromMainline(self):
tagFilter.key.element_type = ELEMENT_TYPE_INTERFACE
tagFilter.key.workspace_id.value = MAINLINE_ID
tagRequest.partial_eq_filter.append(tagFilter)
for resp in tagClient.GetAll(tagRequest):
for resp in tagClient.GetAll(tagRequest, timeout=TIMEOUT_REQUEST):
label = resp.value.key.label.value
value = resp.value.key.value.value
deviceId = resp.value.key.device_id.value
Expand Down
4 changes: 2 additions & 2 deletions cloudvision/cvlib/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from arista.workspace.v1 import models, services

from .constants import MAINLINE_WS_ID
from .constants import MAINLINE_WS_ID, TIMEOUT_REQUEST
from .exceptions import CVException


Expand Down Expand Up @@ -53,7 +53,7 @@ def getWorkspaceLastSynced(clientGetter, workspaceId: str):
key = models.WorkspaceKey(workspace_id=wid)
wsReq = services.WorkspaceRequest(key=key)
try:
wsResp = wsClient.GetOne(wsReq)
wsResp = wsClient.GetOne(wsReq, timeout=TIMEOUT_REQUEST)
except RpcError as wsExec:
if wsExec.code() == StatusCode.NOT_FOUND:
raise CVException("Workspace does not exist")
Expand Down
34 changes: 17 additions & 17 deletions test/cvlib/tags/test_deviceTags.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __init__(self):
self.numGetAlls = 0
self.numSets = 0

def GetAll(self, request):
def GetAll(self, request, timeout):
labelFilters = []
for afilter in request.partial_eq_filter:
if afilter.key.label.value:
Expand All @@ -84,7 +84,7 @@ def GetAll(self, request):
response.remove(item)
return response

def GetOne(self, _):
def GetOne(self, _, timeout):
return WorkspaceResponse()

def SetGetAllResponse(self, response):
Expand All @@ -93,7 +93,7 @@ def SetGetAllResponse(self, response):
def SetGetAllConfigResponse(self, response):
self.tagConfigResponse = response

def Set(self, request):
def Set(self, request, timeout):
self.numSets += 1
return

Expand Down Expand Up @@ -233,8 +233,8 @@ def getApiClient(self, stub):
[
"get tag that is assigned correctly to device with preloaded cache",
{
'J1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['1']},
'J2': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['2']},
'J1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId': ['1']},
'J2': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId': ['2']},
},
convertListToStream([('J1', 'DC', 'DC1'),
('J1', 'DC-Pod', 'POD1'),
Expand All @@ -254,8 +254,8 @@ def getApiClient(self, stub):
[
"get tag that is assigned to device with too many values with preloaded cache",
{
'J1': {'DC': ['DC1', 'DC2'], 'DC-Pod': ['POD1'], 'NodeId':['1']},
'J2': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['2']},
'J1': {'DC': ['DC1', 'DC2'], 'DC-Pod': ['POD1'], 'NodeId': ['1']},
'J2': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId': ['2']},
},
convertListToStream([('J1', 'DC', 'DC1'),
('J1', 'DC', 'DC2'),
Expand Down Expand Up @@ -384,8 +384,8 @@ def test_getSingleTag(name, cacheTags, getAllResp, topoDevices, deviceId, label,
[
"get all device tags for device that has tags with preloaded cache",
{
'J1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['1']},
'J2': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['2']},
'J1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId': ['1']},
'J2': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId': ['2']},
},
convertListToStream([('J1', 'DC', 'DC1'),
('J1', 'DC-Pod', 'POD1'),
Expand Down Expand Up @@ -608,8 +608,8 @@ def test_getTags(name, cacheTags, getAllResp, topoDevices, deviceId, label,
[
"assign new tag to device with preloaded cache",
{
'J1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['1']},
'J2': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['2']},
'J1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId': ['1']},
'J2': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId': ['2']},
},
convertListToStream([('J1', 'DC', 'DC1'),
('J1', 'DC-Pod', 'POD1'),
Expand All @@ -635,8 +635,8 @@ def test_getTags(name, cacheTags, getAllResp, topoDevices, deviceId, label,
[
"replace value with new value of existing label for device with preloaded cache",
{
'J1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['1']},
'J2': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['2']},
'J1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId': ['1']},
'J2': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId': ['2']},
},
convertListToStream([('J1', 'DC', 'DC1'),
('J1', 'DC-Pod', 'POD1'),
Expand Down Expand Up @@ -858,8 +858,8 @@ def test_assignTags(name, cacheTags, getAllResp, topoDevices, deviceId,
[
"unassign tag from device with preloaded cache",
{
'J1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['1']},
'J2': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['2']},
'J1': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId': ['1']},
'J2': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId': ['2']},
},
convertListToStream([('J1', 'DC', 'DC1'),
('J1', 'DC-Pod', 'POD1'),
Expand All @@ -882,8 +882,8 @@ def test_assignTags(name, cacheTags, getAllResp, topoDevices, deviceId,
[
"unassign all values for a label from device with preloaded cache",
{
'J1': {'DC': ['DC1', 'DC2'], 'DC-Pod': ['POD1'], 'NodeId':['1']},
'J2': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId':['2']},
'J1': {'DC': ['DC1', 'DC2'], 'DC-Pod': ['POD1'], 'NodeId': ['1']},
'J2': {'DC': ['DC1'], 'DC-Pod': ['POD1'], 'NodeId': ['2']},
},
convertListToStream([('J1', 'DC', 'DC1'),
('J1', 'DC', 'DC2'),
Expand Down
Loading

0 comments on commit cb2e2c4

Please sign in to comment.