Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage Write API with proto wrapper types causes nil pointer #364

Open
alfredgunnar opened this issue Oct 24, 2024 · 1 comment
Open

Storage Write API with proto wrapper types causes nil pointer #364

alfredgunnar opened this issue Oct 24, 2024 · 1 comment
Labels
bug Something isn't working

Comments

@alfredgunnar
Copy link

alfredgunnar commented Oct 24, 2024

What happened?

When using the emulator to test code for batch writing protobuf messages to BigQuery using the Storage Write API, there's a nil pointer error from the emulator if the message contains a wrapper type.

Here's the stack trace from the emulator:

github.com/goccy/bigquery-emulator/types.normalizeData({0x2accf40?, 0xc000a90d90?}, 0x0)
      /work/types/types.go:547 +0xb0
github.com/goccy/bigquery-emulator/types.normalizeData({0x2b67120?, 0xc000ad0300?}, 0xc0004d87e0)
      /work/types/types.go:575 +0x488
github.com/goccy/bigquery-emulator/types.NewTableWithSchema(0xc00048ae00, {0xc000a8d818, 0x1, 0x10?})
      /work/types/types.go:479 +0x479
github.com/goccy/bigquery-emulator/server.(*storageWriteServer).insertTableData(0xc0009781e0, {0x30e0650, 0xc000775f50}, 0xc000648b19?, 0xc0005b5a40, {0xc000a8d818?, 0x4?, 0x76?})
      /work/server/storage_handler.go:662 +0x4e
github.com/goccy/bigquery-emulator/server.(*storageWriteServer).BatchCommitWriteStreams(0xc0009781e0, {0x30e0650, 0xc000775f50}, 0x0?)
      /work/server/storage_handler.go:727 +0x472
cloud.google.com/go/bigquery/storage/apiv1/storagepb._BigQueryWrite_BatchCommitWriteStreams_Handler({0x2d39ba0?, 0xc0009781e0}, {0x30e0650, 0xc000775f50}, 0xc0007f4480, 0x0)
      /go/pkg/mod/cloud.google.com/go/[email protected]/storage/apiv1/storagepb/storage.pb.go:3371 +0x169
google.golang.org/grpc.(*Server).processUnaryRPC(0xc000540000, {0x30e0650, 0xc000775ec0}, {0x30eff60, 0xc0000f5520}, 0xc00011d440, 0xc0009782a0, 0x4b5b248, 0x0)
      /go/pkg/mod/google.golang.org/[email protected]/server.go:1343 +0xe03
google.golang.org/grpc.(*Server).handleStream(0xc000540000, {0x30eff60, 0xc0000f5520}, 0xc00011d440)
      /go/pkg/mod/google.golang.org/[email protected]/server.go:1737 +0xc4c
google.golang.org/grpc.(*Server).serveStreams.func1.1()
      /go/pkg/mod/google.golang.org/[email protected]/server.go:986 +0x86
created by google.golang.org/grpc.(*Server).serveStreams.func1 in goroutine 130
      /go/pkg/mod/google.golang.org/[email protected]/server.go:997 +0x145

What did you expect to happen?

I'd expect this to work in the same way as BigQuery where wrapper types are supported.

How can we reproduce it (as minimally and precisely as possible)?

With this proto file:

syntax = "proto3";

package example.v1;

import "gen_bq_schema/bq_table.proto";
import "google/protobuf/wrappers.proto";

// An example message
message Message {
  option (gen_bq_schema.bigquery_opts).table_name = "message";

  // An example string field
  string string_field = 1;

  // An example double wrapper
  google.protobuf.DoubleValue double_wrapper_field = 2;
}
(which generates this go code - expand to see)

// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// 	protoc-gen-go v1.34.2
// 	protoc        (unknown)
// source: example/v1/message.proto

package examplev1

import (
	_ "github.com/GoogleCloudPlatform/protoc-gen-bq-schema/protos"
	protoreflect "google.golang.org/protobuf/reflect/protoreflect"
	protoimpl "google.golang.org/protobuf/runtime/protoimpl"
	wrapperspb "google.golang.org/protobuf/types/known/wrapperspb"
	reflect "reflect"
	sync "sync"
)

const (
	// Verify that this generated code is sufficiently up-to-date.
	_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
	// Verify that runtime/protoimpl is sufficiently up-to-date.
	_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)

// An example message
type Message struct {
	state         protoimpl.MessageState
	sizeCache     protoimpl.SizeCache
	unknownFields protoimpl.UnknownFields

	// An example string field
	StringField string `protobuf:"bytes,1,opt,name=string_field,json=stringField,proto3" json:"string_field,omitempty"`
	// An example double wrapper
	DoubleWrapperField *wrapperspb.DoubleValue `protobuf:"bytes,2,opt,name=double_wrapper_field,json=doubleWrapperField,proto3" json:"double_wrapper_field,omitempty"`
}

func (x *Message) Reset() {
	*x = Message{}
	if protoimpl.UnsafeEnabled {
		mi := &file_example_v1_message_proto_msgTypes[0]
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
		ms.StoreMessageInfo(mi)
	}
}

func (x *Message) String() string {
	return protoimpl.X.MessageStringOf(x)
}

func (*Message) ProtoMessage() {}

func (x *Message) ProtoReflect() protoreflect.Message {
	mi := &file_example_v1_message_proto_msgTypes[0]
	if protoimpl.UnsafeEnabled && x != nil {
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
		if ms.LoadMessageInfo() == nil {
			ms.StoreMessageInfo(mi)
		}
		return ms
	}
	return mi.MessageOf(x)
}

// Deprecated: Use Message.ProtoReflect.Descriptor instead.
func (*Message) Descriptor() ([]byte, []int) {
	return file_example_v1_message_proto_rawDescGZIP(), []int{0}
}

func (x *Message) GetStringField() string {
	if x != nil {
		return x.StringField
	}
	return ""
}

func (x *Message) GetDoubleWrapperField() *wrapperspb.DoubleValue {
	if x != nil {
		return x.DoubleWrapperField
	}
	return nil
}

var File_example_v1_message_proto protoreflect.FileDescriptor

var file_example_v1_message_proto_rawDesc = []byte{
	0x0a, 0x18, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2f, 0x76, 0x31, 0x2f, 0x6d, 0x65, 0x73,
	0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x65, 0x78, 0x61, 0x6d,
	0x70, 0x6c, 0x65, 0x2e, 0x76, 0x31, 0x1a, 0x1c, 0x67, 0x65, 0x6e, 0x5f, 0x62, 0x71, 0x5f, 0x73,
	0x63, 0x68, 0x65, 0x6d, 0x61, 0x2f, 0x62, 0x71, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x2e, 0x70,
	0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f,
	0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x77, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, 0x73, 0x2e, 0x70,
	0x72, 0x6f, 0x74, 0x6f, 0x22, 0x8a, 0x01, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
	0x12, 0x21, 0x0a, 0x0c, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x5f, 0x66, 0x69, 0x65, 0x6c, 0x64,
	0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x46, 0x69,
	0x65, 0x6c, 0x64, 0x12, 0x4e, 0x0a, 0x14, 0x64, 0x6f, 0x75, 0x62, 0x6c, 0x65, 0x5f, 0x77, 0x72,
	0x61, 0x70, 0x70, 0x65, 0x72, 0x5f, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28,
	0x0b, 0x32, 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
	0x62, 0x75, 0x66, 0x2e, 0x44, 0x6f, 0x75, 0x62, 0x6c, 0x65, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52,
	0x12, 0x64, 0x6f, 0x75, 0x62, 0x6c, 0x65, 0x57, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, 0x46, 0x69,
	0x65, 0x6c, 0x64, 0x3a, 0x0c, 0xea, 0x3f, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
	0x65, 0x42, 0xb4, 0x01, 0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x2e, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c,
	0x65, 0x2e, 0x76, 0x31, 0x42, 0x0c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x50, 0x72, 0x6f,
	0x74, 0x6f, 0x50, 0x01, 0x5a, 0x4b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d,
	0x2f, 0x65, 0x69, 0x6e, 0x72, 0x69, 0x64, 0x65, 0x2f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f,
	0x72, 0x6d, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2d, 0x70, 0x6c, 0x61, 0x6e, 0x6e, 0x65, 0x72, 0x2f,
	0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x67, 0x6f, 0x2f, 0x65, 0x78, 0x61,
	0x6d, 0x70, 0x6c, 0x65, 0x2f, 0x76, 0x31, 0x3b, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x76,
	0x31, 0xa2, 0x02, 0x03, 0x45, 0x58, 0x58, 0xaa, 0x02, 0x0a, 0x45, 0x78, 0x61, 0x6d, 0x70, 0x6c,
	0x65, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x0a, 0x45, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x5c, 0x56,
	0x31, 0xe2, 0x02, 0x16, 0x45, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x5c, 0x56, 0x31, 0x5c, 0x47,
	0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x0b, 0x45, 0x78, 0x61,
	0x6d, 0x70, 0x6c, 0x65, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}

var (
	file_example_v1_message_proto_rawDescOnce sync.Once
	file_example_v1_message_proto_rawDescData = file_example_v1_message_proto_rawDesc
)

func file_example_v1_message_proto_rawDescGZIP() []byte {
	file_example_v1_message_proto_rawDescOnce.Do(func() {
		file_example_v1_message_proto_rawDescData = protoimpl.X.CompressGZIP(file_example_v1_message_proto_rawDescData)
	})
	return file_example_v1_message_proto_rawDescData
}

var file_example_v1_message_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_example_v1_message_proto_goTypes = []any{
	(*Message)(nil),                // 0: example.v1.Message
	(*wrapperspb.DoubleValue)(nil), // 1: google.protobuf.DoubleValue
}
var file_example_v1_message_proto_depIdxs = []int32{
	1, // 0: example.v1.Message.double_wrapper_field:type_name -> google.protobuf.DoubleValue
	1, // [1:1] is the sub-list for method output_type
	1, // [1:1] is the sub-list for method input_type
	1, // [1:1] is the sub-list for extension type_name
	1, // [1:1] is the sub-list for extension extendee
	0, // [0:1] is the sub-list for field type_name
}

func init() { file_example_v1_message_proto_init() }
func file_example_v1_message_proto_init() {
	if File_example_v1_message_proto != nil {
		return
	}
	if !protoimpl.UnsafeEnabled {
		file_example_v1_message_proto_msgTypes[0].Exporter = func(v any, i int) any {
			switch v := v.(*Message); i {
			case 0:
				return &v.state
			case 1:
				return &v.sizeCache
			case 2:
				return &v.unknownFields
			default:
				return nil
			}
		}
	}
	type x struct{}
	out := protoimpl.TypeBuilder{
		File: protoimpl.DescBuilder{
			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
			RawDescriptor: file_example_v1_message_proto_rawDesc,
			NumEnums:      0,
			NumMessages:   1,
			NumExtensions: 0,
			NumServices:   0,
		},
		GoTypes:           file_example_v1_message_proto_goTypes,
		DependencyIndexes: file_example_v1_message_proto_depIdxs,
		MessageInfos:      file_example_v1_message_proto_msgTypes,
	}.Build()
	File_example_v1_message_proto = out.File
	file_example_v1_message_proto_rawDesc = nil
	file_example_v1_message_proto_goTypes = nil
	file_example_v1_message_proto_depIdxs = nil
}

and this generated bq schema (using protoc-gen-bq-schema):

[
 {
  "name": "string_field",
  "type": "STRING",
  "mode": "NULLABLE",
  "description": "An example string field"
 },
 {
  "name": "double_wrapper_field",
  "type": "FLOAT",
  "mode": "NULLABLE",
  "description": "An example double wrapper"
 }
]

it can be reproduced with this test:

package test

import (
	"context"
	"fmt"
	"os"
	"reflect"
	"testing"

	"cloud.google.com/go/bigquery"
	"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
	"cloud.google.com/go/bigquery/storage/managedwriter"
	"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
	"github.com/docker/go-connections/nat"
	examplev1 "SOME IMPORT PATH TO GO CODE GENERATED FROM PROTOBUF"
	"github.com/testcontainers/testcontainers-go"
	"github.com/testcontainers/testcontainers-go/wait"
	"google.golang.org/api/option"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/protobuf/proto"

	"google.golang.org/protobuf/types/known/wrapperspb"
	"gotest.tools/v3/assert"
)

func Test(t *testing.T) {
	ctx := context.Background()

	// start emulator image
	projectID := "test"
	ctainer, httpPort, grpcPort, err := StartBigQueryEmulatorContainer(ctx, t, projectID)
	assert.NilError(t, err)
	t.Cleanup(func() {
		assert.NilError(t, ctainer.Terminate(ctx))
	})

	// BigQuery client
	bqClient, err := bigquery.NewClient(
		ctx,
		projectID,
		option.WithEndpoint(fmt.Sprintf("http://localhost:%s", httpPort.Port())),
		option.WithoutAuthentication(),
	)
	assert.NilError(t, err)

	// Storage Write API client
	conn, err := grpc.NewClient(
		fmt.Sprintf("localhost:%s", grpcPort.Port()),
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	assert.NilError(t, err)
	writerClient, err := managedwriter.NewClient(
		ctx,
		projectID,
		option.WithGRPCConn(conn),
	)
	assert.NilError(t, err)

	// setup table
	schemaBytes, err := os.ReadFile("message.schema") // the generated BigQuery schema
	assert.NilError(t, err)
	schema, err := bigquery.SchemaFromJSON(schemaBytes)
	assert.NilError(t, err)
	dataset := bqClient.Dataset("test")
	assert.NilError(t, dataset.Create(ctx, nil))
	table := dataset.Table("test")
	assert.NilError(t, table.Create(ctx, &bigquery.TableMetadata{
		Schema: schema,
	}))

	// setup stream
	msg := &examplev1.Message{
		StringField:        "test",
		DoubleWrapperField: &wrapperspb.DoubleValue{Value: 56.18}, // this is what causes the issue
	}
	emptyMsg := reflect.New(reflect.TypeOf(msg).Elem()).Interface().(*examplev1.Message)
	schemaDescriptor, err := adapt.NormalizeDescriptor(emptyMsg.ProtoReflect().Descriptor())
	assert.NilError(t, err)
	tableIdentifier, err := table.Identifier(bigquery.StorageAPIResourceID)
	assert.NilError(t, err)
	stream, err := writerClient.NewManagedStream(
		ctx,
		managedwriter.WithDestinationTable(tableIdentifier),
		managedwriter.WithSchemaDescriptor(schemaDescriptor),
		managedwriter.WithType(managedwriter.PendingStream),
		managedwriter.EnableWriteRetries(true),
	)
	assert.NilError(t, err)

	// Write to stream
	row, err := proto.Marshal(msg)
	result, err := stream.AppendRows(ctx, [][]byte{row})
	assert.NilError(t, err)
	_, err = result.GetResult(ctx) // this seems to be the call that triggers the issue
	assert.NilError(t, err)

	// Close stream
	_, err = stream.Finalize(ctx)
	assert.NilError(t, err)
	_, err = writerClient.BatchCommitWriteStreams(ctx, &storagepb.BatchCommitWriteStreamsRequest{
		Parent:       tableIdentifier,
		WriteStreams: []string{stream.StreamName()},
	})
	assert.NilError(t, err)
}

type logger struct {
	t *testing.T
}

func (l *logger) Accept(log testcontainers.Log) {
	l.t.Logf("%s: %s", log.LogType, string(log.Content))
}

const (
	bqHttpPort = "9050/tcp"
	bqGrpcPort = "9060/tcp"
)

func StartBigQueryEmulatorContainer(
	ctx context.Context,
	t *testing.T,
	projectID string,
) (testcontainers.Container, nat.Port, nat.Port, error) {
	testLogger := logger{t: t}
	req := testcontainers.ContainerRequest{
		Image:        "ghcr.io/goccy/bigquery-emulator:0.6.0",
		ExposedPorts: []string{bqHttpPort, bqGrpcPort},
		SkipReaper:   true,
		Cmd:          []string{"bigquery-emulator", "--project", projectID, "--log-level", "debug"},
		WaitingFor: wait.ForAll(
			wait.ForListeningPort(bqHttpPort),
			wait.ForListeningPort(bqGrpcPort),
		),
		LogConsumerCfg: &testcontainers.LogConsumerConfig{
			Consumers: []testcontainers.LogConsumer{&testLogger},
		},
	}
	bqContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
		ContainerRequest: req,
		Started:          true,
	})
	if err != nil {
		return nil, "", "", err
	}

	mappedHttpPort, err := bqContainer.MappedPort(ctx, bqHttpPort)
	if err != nil {
		return nil, "", "", err
	}

	mappedGrpcPort, err := bqContainer.MappedPort(ctx, bqGrpcPort)
	if err != nil {
		return nil, "", "", err
	}

	return bqContainer, mappedHttpPort, mappedGrpcPort, err
}

Anything else we need to know?

It seems to be the call to result.GetResult that triggers the issue.
If the DoubleWrapperField is not set, the issue is not seen.

@alfredgunnar alfredgunnar added the bug Something isn't working label Oct 24, 2024
@mdelapenya
Copy link

Just in case, there is a BigQuery container in the Google Cloud module: https://golang.testcontainers.org/modules/gcloud/#bigquery

I think it could simplify the setup here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants