Skip to content

Commit

Permalink
Merge pull request #248 from totem3/feature/import-from-gcs-emulator-…
Browse files Browse the repository at this point in the history
…without-public-host
  • Loading branch information
goccy authored Apr 7, 2024
2 parents e74d4e4 + 8bf5a73 commit 5ad569f
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 0 deletions.
1 change: 1 addition & 0 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,7 @@ func (h *jobsInsertHandler) importFromGCS(ctx context.Context, r *jobsInsertRequ
opts = append(
opts,
option.WithEndpoint(fmt.Sprintf("%s/storage/v1/", host)),
storage.WithJSONReads(),
option.WithoutAuthentication(),
)
}
Expand Down
135 changes: 135 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1708,6 +1708,141 @@ func TestImportFromGCS(t *testing.T) {
}
}

func TestImportFromGCSEmulatorWithoutPublicHost(t *testing.T) {
const (
projectID = "test"
datasetID = "dataset1"
tableID = "table_a"
host = "127.0.0.1"
bucketName = "test-bucket"
sourceName = "path/to/data.json"
)

ctx := context.Background()
bqServer, err := server.New(server.TempStorage)
if err != nil {
t.Fatal(err)
}
project := types.NewProject(
projectID,
types.NewDataset(
datasetID,
types.NewTable(
tableID,
[]*types.Column{
types.NewColumn("id", types.INT64),
types.NewColumn("value", types.INT64),
},
nil,
),
),
)
if err := bqServer.Load(server.StructSource(project)); err != nil {
t.Fatal(err)
}

testServer := bqServer.TestServer()
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
for i := 0; i < 3; i++ {
if err := enc.Encode(map[string]interface{}{
"id": i + 1,
"value": i + 10,
}); err != nil {
t.Fatal(err)
}
}
storageServer, err := fakestorage.NewServerWithOptions(fakestorage.Options{
InitialObjects: []fakestorage.Object{
{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: bucketName,
Name: sourceName,
Size: int64(len(buf.Bytes())),
},
Content: buf.Bytes(),
},
},
Host: host,
Scheme: "http",
})
if err != nil {
t.Fatal(err)
}

storageServerURL := storageServer.URL()
u, err := url.Parse(storageServerURL)
if err != nil {
t.Fatal(err)
}
storageEmulatorHost := fmt.Sprintf("http://%s:%s", host, u.Port())
t.Setenv("STORAGE_EMULATOR_HOST", storageEmulatorHost)

defer func() {
testServer.Close()
bqServer.Stop(ctx)
storageServer.Stop()
}()

client, err := bigquery.NewClient(
ctx,
projectID,
option.WithEndpoint(testServer.URL),
option.WithoutAuthentication(),
)
if err != nil {
t.Fatal(err)
}
defer client.Close()

gcsSourceURL := fmt.Sprintf("gs://%s/%s", bucketName, sourceName)
gcsRef := bigquery.NewGCSReference(gcsSourceURL)
gcsRef.SourceFormat = bigquery.JSON
gcsRef.AutoDetect = true
loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(gcsRef)
loader.WriteDisposition = bigquery.WriteTruncate
job, err := loader.Run(ctx)
if err != nil {
t.Fatal(err)
}
status, err := job.Wait(ctx)
if err != nil {
t.Fatal(err)
}
if status.Err() != nil {
t.Fatal(status.Err())
}

query := client.Query(fmt.Sprintf("SELECT * FROM %s.%s", datasetID, tableID))
it, err := query.Read(ctx)
if err != nil {
t.Fatal(err)
}

type row struct {
ID int64
Value int64
}
var rows []*row
for {
var r row
if err := it.Next(&r); err != nil {
if err == iterator.Done {
break
}
t.Fatal(err)
}
rows = append(rows, &r)
}
if diff := cmp.Diff([]*row{
{ID: 1, Value: 10},
{ID: 2, Value: 11},
{ID: 3, Value: 12},
}, rows); diff != "" {
t.Errorf("(-want +got):\n%s", diff)
}
}

func TestImportWithWildcardFromGCS(t *testing.T) {
const (
projectID = "test"
Expand Down

0 comments on commit 5ad569f

Please sign in to comment.