Skip to content

Commit

Permalink
Create and persist an authorized session object for MfgInspector.
Browse files Browse the repository at this point in the history
AuthorizedSession allows us to persist connections across multiple requests to the same host. This means the manufacturing inspector client will re-use existing connections from the pool which reduces overhead and latency. Requests are processed faster and sessions automatically handle keep-alive connections. Additionally, session related data (e.g. auth) is persisted which improves performance. Given our partial uploader implementation, this will be beneficial to both client and server.

PiperOrigin-RevId: 693033180
  • Loading branch information
OpenHTF Owners authored and copybara-github committed Nov 6, 2024
1 parent 710313e commit 55ae8f2
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 16 deletions.
37 changes: 25 additions & 12 deletions openhtf/output/callbacks/mfg_inspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import logging
import time
import zlib
from typing import Optional

from google.auth import credentials as credentials_lib
from google.auth.transport import requests
Expand Down Expand Up @@ -45,19 +46,18 @@ class InvalidTestRunError(Exception):

def _send_mfg_inspector_request(
envelope_data: bytes,
credentials: credentials_lib.Credentials,
authorized_session: requests.AuthorizedSession,
destination_url: str,
) -> Dict[str, Any]:
"""Send upload http request. Intended to be run in retry loop."""
logging.info('Uploading result...')

with requests.AuthorizedSession(credentials) as authed_session:
response = authed_session.request(
'POST',
destination_url,
data=envelope_data,
timeout=_MFG_INSPECTOR_UPLOAD_TIMEOUT,
)
response = authorized_session.request(
'POST',
destination_url,
data=envelope_data,
timeout=_MFG_INSPECTOR_UPLOAD_TIMEOUT,
)

try:
result = response.json()
Expand Down Expand Up @@ -94,6 +94,7 @@ def send_mfg_inspector_data(
credentials: credentials_lib.Credentials,
destination_url: str,
payload_type: guzzle_pb2.PayloadType,
authorized_session: Optional[requests.AuthorizedSession] = None,
) -> Dict[str, Any]:
"""Upload MfgEvent to steam_engine."""
envelope = guzzle_pb2.TestRunEnvelope() # pytype: disable=module-attr # gen-stub-imports
Expand All @@ -105,10 +106,13 @@ def send_mfg_inspector_data(
envelope.payload_type = payload_type
envelope_data = envelope.SerializeToString()

if authorized_session is None:
authorized_session = requests.AuthorizedSession(credentials)

for _ in range(5):
try:
result = _send_mfg_inspector_request(
envelope_data, credentials, destination_url
envelope_data, authorized_session, destination_url
)
return result
except UploadFailedError:
Expand Down Expand Up @@ -181,8 +185,10 @@ def __init__(self,
'user_agent': 'OpenHTF Guzzle Upload Client',
},
scopes=[self.SCOPE_CODE_URI])
self.authorized_session = requests.AuthorizedSession(self.credentials)
else:
self.credentials = None
self.authorized_session = None

self.upload_result = None

Expand Down Expand Up @@ -260,11 +266,18 @@ def upload(self, payload_type=guzzle_pb2.COMPRESSED_TEST_RUN):
if not self.credentials:
raise RuntimeError('Must provide credentials to use upload callback.')

if self.authorized_session is None:
self.authorized_session = requests.AuthorizedSession(self.credentials)

def upload_callback(test_record_obj):
proto = self._convert(test_record_obj)
self.upload_result = send_mfg_inspector_data(proto, self.credentials,
self.destination_url,
payload_type)
self.upload_result = send_mfg_inspector_data(
proto,
self.credentials,
self.destination_url,
payload_type,
self.authorized_session,
)

return upload_callback

Expand Down
23 changes: 19 additions & 4 deletions test/output/callbacks/mfg_inspector_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ def setUp(self):
self.mock_send_mfg_inspector_data = mock.patch.object(
mfg_inspector, 'send_mfg_inspector_data').start()

self.mock_authorized_session = mock.patch.object(
mfg_inspector.requests,
'AuthorizedSession',
spec_set=True,
autospec=True,
).start()

def tearDown(self):
mock.patch.stopall()
super(TestMfgInspector, self).tearDown()
Expand Down Expand Up @@ -110,8 +117,12 @@ def test_upload_only(self):
callback.upload()(MOCK_TEST_RUN)

self.mock_send_mfg_inspector_data.assert_called_with(
MOCK_TEST_RUN_PROTO, self.mock_credentials, callback.destination_url,
guzzle_pb2.COMPRESSED_TEST_RUN)
MOCK_TEST_RUN_PROTO,
self.mock_credentials,
callback.destination_url,
guzzle_pb2.COMPRESSED_TEST_RUN,
self.mock_authorized_session(self.mock_credentials),
)

def test_save_and_upload(self):
testrun_output = io.BytesIO()
Expand All @@ -132,8 +143,12 @@ def test_save_and_upload(self):
self.assertEqual(MOCK_TEST_RUN_PROTO, testrun)

self.mock_send_mfg_inspector_data.assert_called_with(
MOCK_TEST_RUN_PROTO, self.mock_credentials, callback.destination_url,
guzzle_pb2.COMPRESSED_TEST_RUN)
MOCK_TEST_RUN_PROTO,
self.mock_credentials,
callback.destination_url,
guzzle_pb2.COMPRESSED_TEST_RUN,
self.mock_authorized_session(self.mock_credentials),
)

# Make sure mock converter only called once i.e. the test record was
# was converted to a proto only once. This important because some custom
Expand Down

0 comments on commit 55ae8f2

Please sign in to comment.