Skip to content

Commit

Permalink
Fix: Add response object to any append rows requests exception (#838)
Browse files Browse the repository at this point in the history
* Fix: Add response object to any append rows requests exception

Should an AppendRowsRequest fail, you need to inspect the response to
see what went wrong. Currently this lib only raises an exception with
the code and message, throwing the actual response away. This patch adds
the response to any exception raise. This is fine because the base grpc
error has a response kwarg that this lib wasn't using.

Now you can catch the error and call `e.response.row_errors` to see the
underlying row errors.

Fixes: #836

* fix system test

* lint

---------

Co-authored-by: Yiru Tang <[email protected]>
Co-authored-by: Anthonios Partheniou <[email protected]>
Co-authored-by: Lingqing Gan <[email protected]>
  • Loading branch information
4 people authored Dec 19, 2024
1 parent 1560654 commit 04867c6
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 13 deletions.
2 changes: 1 addition & 1 deletion google/cloud/bigquery_storage_v1/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def _on_response(self, response: gapic_types.AppendRowsResponse):
future: AppendRowsFuture = self._futures_queue.get_nowait()
if response.error.code:
exc = exceptions.from_grpc_status(
response.error.code, response.error.message
response.error.code, response.error.message, response=response
)
future.set_exception(exc)
else:
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/bigquery_storage_v1beta2/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ def _on_response(self, response: gapic_types.AppendRowsResponse):
future: AppendRowsFuture = self._futures_queue.get_nowait()
if response.error.code:
exc = exceptions.from_grpc_status(
response.error.code, response.error.message
response.error.code, response.error.message, response=response
)
future.set_exception(exc)
else:
Expand Down
50 changes: 39 additions & 11 deletions tests/system/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def table(project_id, dataset, bq_client):
schema = [
bigquery.SchemaField("first_name", "STRING", mode="NULLABLE"),
bigquery.SchemaField("last_name", "STRING", mode="NULLABLE"),
bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
]

unique_suffix = str(uuid.uuid4()).replace("-", "_")
Expand All @@ -52,15 +52,8 @@ def bqstorage_write_client(credentials):
return bigquery_storage_v1.BigQueryWriteClient(credentials=credentials)


def test_append_rows_with_invalid_stream_name_fails_fast(bqstorage_write_client):
bad_request = gapic_types.AppendRowsRequest()
bad_request.write_stream = "this-is-an-invalid-stream-resource-path"

with pytest.raises(exceptions.GoogleAPICallError):
bqstorage_write_client.append_rows(bad_request)


def test_append_rows_with_proto3(bqstorage_write_client, table):
@pytest.fixture(scope="function")
def append_rows_stream(bqstorage_write_client, table):
person_pb = person_pb2.PersonProto()

stream_name = f"projects/{table.project}/datasets/{table.dataset_id}/tables/{table.table_id}/_default"
Expand All @@ -81,11 +74,22 @@ def test_append_rows_with_proto3(bqstorage_write_client, table):
bqstorage_write_client,
request_template,
)
return append_rows_stream


def test_append_rows_with_invalid_stream_name_fails_fast(bqstorage_write_client):
bad_request = gapic_types.AppendRowsRequest()
bad_request.write_stream = "this-is-an-invalid-stream-resource-path"

with pytest.raises(exceptions.GoogleAPICallError):
bqstorage_write_client.append_rows(bad_request)


def test_append_rows_with_proto3(append_rows_stream):
request = gapic_types.AppendRowsRequest()
proto_data = gapic_types.AppendRowsRequest.ProtoData()
proto_rows = gapic_types.ProtoRows()
row = person_pb
row = person_pb2.PersonProto()
row.first_name = "fn"
row.last_name = "ln"
row.age = 20
Expand All @@ -96,3 +100,27 @@ def test_append_rows_with_proto3(bqstorage_write_client, table):

assert response_future.result()
# The request should success


def test_append_rows_with_proto3_got_response_on_failure(append_rows_stream):
"""When the request fails and there is a response, verify that the response
is included in the exception. For more details, see
https://github.com/googleapis/python-bigquery-storage/issues/836
"""

# Make an invalid request by leaving the required field row.age blank.
request = gapic_types.AppendRowsRequest()
proto_data = gapic_types.AppendRowsRequest.ProtoData()
proto_rows = gapic_types.ProtoRows()
row = person_pb2.PersonProto()
row.first_name = "fn"
row.last_name = "ln"
proto_rows.serialized_rows.append(row.SerializeToString())
proto_data.rows = proto_rows
request.proto_rows = proto_data
response_future = append_rows_stream.send(request)

with pytest.raises(exceptions.GoogleAPICallError) as excinfo:
response_future.result()

assert isinstance(excinfo.value.response, gapic_types.AppendRowsResponse)

0 comments on commit 04867c6

Please sign in to comment.