Skip to content

Commit

Permalink
feat: FIR-43725 add streaming to go sdk (#130)
Browse files Browse the repository at this point in the history
  • Loading branch information
stepansergeevitch authored Mar 5, 2025
1 parent 6cc214e commit 83b4692
Show file tree
Hide file tree
Showing 29 changed files with 1,340 additions and 628 deletions.
100 changes: 83 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,19 @@ Firebolt GO driver is an implementation of `database/sql/driver`.
go get github.com/firebolt-db/firebolt-go-sdk
```

### Example
### DSN (Data source name)
All information for the connection should be specified using the DSN string. The firebolt dsn string has the following format:
```
firebolt://[/database]?account_name=account_name&client_id=client_id&client_secret=client_secret[&engine=engine]
```

- **client_id** - credentials client id.
- **client_secret** - credentials client secret.
- **account_name** - the name of Firebolt account to log in to.
- **database** - (optional) the name of the database to connect to.
- **engine** - (optional) the name of the engine to run SQL on.

### Querying example
Here is an example of establishing a connection and executing a simple select query.
For it to run successfully, you have to specify your credentials, and have a default engine up and running.

Expand All @@ -24,44 +36,44 @@ package main
import (
"database/sql"
"fmt"
// we need to import firebolt-go-sdk, so it is able to register its driver
// we need to import firebolt-go-sdk in order to register the driver
_ "github.com/firebolt-db/firebolt-go-sdk"
)

func main() {

// constructing a dsn string, you need to set your credentials
// set your Firebolt credentials to construct a dsn string
clientId := ""
clientSecret := ""
accountName := ""
databaseName := ""
dsn := fmt.Sprintf("firebolt:///%s?account_name=%s&client_id=%s&client_secret=%s", databaseName, accountName, clientId, clientSecret)

// opening the firebolt driver
// open a Firebolt connection
db, err := sql.Open("firebolt", dsn)
if err != nil {
fmt.Printf("error during opening a driver: %v", err)
}

// Create table
// create a table
_, err = db.Query("CREATE TABLE test_table(id INT, value TEXT)")
if err != nil {
fmt.Printf("error during select query: %v", err)
}

// Parametrized insert (only ? placeholders are supported)
// execute a parametrized insert (only ? placeholders are supported)
_, err = db.Query("INSERT INTO test_table VALUES (?, ?)", 1, "my value")
if err != nil {
fmt.Printf("error during select query: %v", err)
}

// executing a simple select query
// execute a simple select query
rows, err := db.Query("SELECT id FROM test_table")
if err != nil {
fmt.Printf("error during select query: %v", err)
}

// iterating over the resulting rows
// iterate over the result
defer func() {
if err := rows.Close(); err != nil {
fmt.Printf("error during rows.Close(): %v\n", err)
Expand All @@ -82,18 +94,72 @@ func main() {
}
```

### Streaming example
In order to stream the query result (and not store it in memory fully), you need to pass a special context with streaming enabled.
> **Warning**: If you enable streaming the result, the query execution might finish successfully, but the actual error might be returned during the iteration over the rows.
### DSN (Data source name)
All information for the connection should be specified using the DSN string. The firebolt dsn string has the following format:
```
firebolt://[/database]?account_name=account_name&client_id=client_id&client_secret=client_secret[&engine=engine]
Here is an example of how to do it:

```go
package main

import (
"context"
"database/sql"
"fmt"

// we need to import firebolt-go-sdk in order to register the driver
_ "github.com/firebolt-db/firebolt-go-sdk"
fireboltContext "github.com/firebolt-db/firebolt-go-sdk/context"
)

func main() {
// set your Firebolt credentials to construct a dsn string
clientId := ""
clientSecret := ""
accountName := ""
databaseName := ""
dsn := fmt.Sprintf("firebolt:///%s?account_name=%s&client_id=%s&client_secret=%s", databaseName, accountName, clientId, clientSecret)

// open a Firebolt connection
db, err := sql.Open("firebolt", dsn)
if err != nil {
fmt.Printf("error during opening a driver: %v", err)
}

// create a streaming context
streamingCtx := fireboltContext.WithStreaming(context.Background())

// execute a large select query
rows, err := db.QueryContext(streamingCtx, "SELECT \"abc\" FROM generate_series(1, 100000000)")
if err != nil {
fmt.Printf("error during select query: %v", err)
}

// iterating over the result is exactly the same as in the previous example
defer func() {
if err := rows.Close(); err != nil {
fmt.Printf("error during rows.Close(): %v\n", err)
}
}()

for rows.Next() {
var id int
if err := rows.Scan(&id); err != nil {
fmt.Printf("error during scan: %v", err)
}
fmt.Println(id)
}

if err := rows.Err(); err != nil {
fmt.Printf("error during rows iteration: %v\n", err)
}
}
```

- **client_id** - credentials client id.
- **client_secret** - credentials client secret.
- **account_name** - the name of Firebolt account to log in to.
- **database** - (optional) the name of the database to connect to.
- **engine** - (optional) the name of the engine to run SQL on.
#### Errors in streaming
If you enable streaming the result, the query execution might finish successfully, but the actual error might be returned during the iteration over the rows.


### Limitations
Although, all interfaces are available, not all of them are implemented or could be implemented:
Expand Down
14 changes: 12 additions & 2 deletions client/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,13 @@ func getAccessTokenUsernamePassword(username string, password string, apiEndpoin
return "", errors.ConstructNestedError("authentication request failed", resp.err)
}

content, err := resp.Content()
if err != nil {
return "", errors.ConstructNestedError("error during reading response content", err)
}

var authResp AuthenticationResponse
if err = jsonStrictUnmarshall(resp.data, &authResp); err != nil {
if err = jsonStrictUnmarshall(content, &authResp); err != nil {
return "", errors.ConstructNestedError("failed to unmarshal authentication response with error", err)
}
logging.Infolog.Printf("Authentication was successful")
Expand Down Expand Up @@ -111,8 +116,13 @@ func getAccessTokenServiceAccount(clientId string, clientSecret string, apiEndpo
return "", errors.ConstructNestedError("authentication request failed", resp.err)
}

content, err := resp.Content()
if err != nil {
return "", errors.ConstructNestedError("error during reading response content", err)
}

var authResp AuthenticationResponse
if err = jsonStrictUnmarshall(resp.data, &authResp); err != nil {
if err = jsonStrictUnmarshall(content, &authResp); err != nil {
return "", errors.ConstructNestedError("failed to unmarshal authentication response with error", err)
}
logging.Infolog.Printf("Authentication was successful")
Expand Down
23 changes: 19 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"errors"
"fmt"

contextUtils "github.com/firebolt-db/firebolt-go-sdk/context"

"github.com/firebolt-db/firebolt-go-sdk/types"

errorUtils "github.com/firebolt-db/firebolt-go-sdk/errors"
Expand Down Expand Up @@ -50,6 +52,7 @@ func MakeClient(settings *types.FireboltSettings, apiEndpoint string) (*ClientIm
ClientSecret: settings.ClientSecret,
ApiEndpoint: apiEndpoint,
UserAgent: ConstructUserAgentString(),
newVersion: settings.NewVersion,
},
AccountName: settings.AccountName,
}
Expand Down Expand Up @@ -106,9 +109,14 @@ func (c *ClientImpl) getSystemEngineURLAndParameters(ctx context.Context, accoun
return "", nil, errorUtils.ConstructNestedError("error during system engine url http request", resp.err)
}

content, err := resp.Content()
if err != nil {
return "", nil, errorUtils.ConstructNestedError("error during reading response content", err)
}

var systemEngineURLResponse SystemEngineURLResponse
if err := json.Unmarshal(resp.data, &systemEngineURLResponse); err != nil {
return "", nil, errorUtils.ConstructNestedError("error during unmarshalling system engine URL Response", errors.New(string(resp.data)))
if err := json.Unmarshal(content, &systemEngineURLResponse); err != nil {
return "", nil, errorUtils.ConstructNestedError("error during unmarshalling system engine URL response", errors.New(string(content)))
}
if URLCache != nil {
URLCache.Put(url, systemEngineURLResponse, 0) //nolint:errcheck
Expand All @@ -123,8 +131,15 @@ func (c *ClientImpl) getSystemEngineURLAndParameters(ctx context.Context, accoun
return engineUrl, parameters, nil
}

func (c *ClientImpl) GetQueryParams(setStatements map[string]string) (map[string]string, error) {
params := map[string]string{"output_format": outputFormat}
func (c *ClientImpl) getOutputFormat(ctx context.Context) string {
if contextUtils.IsStreaming(ctx) {
return jsonLinesOutputFormat
}
return jsonOutputFormat
}

func (c *ClientImpl) GetQueryParams(ctx context.Context, setStatements map[string]string) (map[string]string, error) {
params := map[string]string{"output_format": c.getOutputFormat(ctx)}
for setKey, setValue := range setStatements {
params[setKey] = setValue
}
Expand Down
46 changes: 20 additions & 26 deletions client/client_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package client

import (
"context"
"encoding/json"
"errors"
"net/http"
"net/url"
Expand All @@ -12,10 +11,10 @@ import (

errorUtils "github.com/firebolt-db/firebolt-go-sdk/errors"
"github.com/firebolt-db/firebolt-go-sdk/logging"
"github.com/firebolt-db/firebolt-go-sdk/types"
)

const outputFormat = "JSON_Compact"
const jsonOutputFormat = "JSON_Compact"
const jsonLinesOutputFormat = "JSONLines_Compact"
const protocolVersionHeader = "Firebolt-Protocol-Version"
const protocolVersion = "2.1"

Expand All @@ -27,16 +26,18 @@ var allowedUpdateParameters = []string{"database"}

type Client interface {
GetConnectionParameters(ctx context.Context, engineName string, databaseName string) (string, map[string]string, error)
Query(ctx context.Context, engineUrl, query string, parameters map[string]string, control ConnectionControl) (*types.QueryResponse, error)
Query(ctx context.Context, engineUrl, query string, parameters map[string]string, control ConnectionControl) (*Response, error)
IsNewVersion() bool
}

type BaseClient struct {
ClientID string
ClientSecret string
ApiEndpoint string
UserAgent string
ParameterGetter func(map[string]string) (map[string]string, error)
ParameterGetter func(context.Context, map[string]string) (map[string]string, error)
AccessTokenGetter func() (string, error)
newVersion bool
}

// ConnectionControl is a struct that holds methods for updating connection properties
Expand All @@ -47,14 +48,19 @@ type ConnectionControl struct {
SetEngineURL func(string)
}

// IsNewVersion returns true if the client is using the 2.0 version of Firebolt
func (c *BaseClient) IsNewVersion() bool {
return c.newVersion
}

// Query sends a query to the engine URL and populates queryResponse, if query was successful
func (c *BaseClient) Query(ctx context.Context, engineUrl, query string, parameters map[string]string, control ConnectionControl) (*types.QueryResponse, error) {
func (c *BaseClient) Query(ctx context.Context, engineUrl, query string, parameters map[string]string, control ConnectionControl) (*Response, error) {
logging.Infolog.Printf("Query engine '%s' with '%s'", engineUrl, query)

if c.ParameterGetter == nil {
return nil, errors.New("ParameterGetter is not set")
}
params, err := c.ParameterGetter(parameters)
params, err := c.ParameterGetter(ctx, parameters)
if err != nil {
return nil, err
}
Expand All @@ -67,19 +73,7 @@ func (c *BaseClient) Query(ctx context.Context, engineUrl, query string, paramet
if err = c.processResponseHeaders(resp.headers, control); err != nil {
return nil, errorUtils.ConstructNestedError("error during processing response headers", err)
}

var queryResponse types.QueryResponse
if len(resp.data) == 0 {
// response could be empty, which doesn't mean it is an error
return &queryResponse, nil
}

if err = json.Unmarshal(resp.data, &queryResponse); err != nil {
return nil, errorUtils.ConstructNestedError("wrong Response", errors.New(string(resp.data)))
}

logging.Infolog.Printf("Query was successful")
return &queryResponse, nil
return resp, nil
}

func handleUpdateParameters(updateParameters func(string, string), updateParametersRaw string) {
Expand Down Expand Up @@ -111,7 +105,7 @@ func splitEngineEndpoint(endpoint string) (string, url.Values, error) {
}

func (c *BaseClient) handleUpdateEndpoint(updateEndpointRaw string, control ConnectionControl) error {
// split URL containted into updateEndpointRaw into endpoint and parameters
// split URL contained into updateEndpointRaw into endpoint and parameters
// Update parameters and set client engine endpoint

corruptUrlError := errors.New("failed to execute USE ENGINE command: corrupt update endpoint - contact support")
Expand Down Expand Up @@ -145,18 +139,18 @@ func (c *BaseClient) processResponseHeaders(headers http.Header, control Connect
return nil
}

// request fetches an access token from the cache or re-authenticate when the access token is not available in the cache
// requestWithAuthRetry fetches an access token from the cache or re-authenticate when the access token is not available in the cache
// and sends a request using that token
func (c *BaseClient) requestWithAuthRetry(ctx context.Context, method string, url string, params map[string]string, bodyStr string) Response {
func (c *BaseClient) requestWithAuthRetry(ctx context.Context, method string, url string, params map[string]string, bodyStr string) *Response {
var err error

if c.AccessTokenGetter == nil {
return Response{nil, 0, nil, errors.New("AccessTokenGetter is not set")}
return MakeResponse(nil, 0, nil, errors.New("AccessTokenGetter is not set"))
}

accessToken, err := c.AccessTokenGetter()
if err != nil {
return Response{nil, 0, nil, errorUtils.ConstructNestedError("error while getting access token", err)}
return MakeResponse(nil, 0, nil, errorUtils.ConstructNestedError("error while getting access token", err))
}
resp := DoHttpRequest(requestParameters{ctx, accessToken, method, url, c.UserAgent, params, bodyStr, ContentTypeJSON})
if resp.statusCode == http.StatusUnauthorized {
Expand All @@ -165,7 +159,7 @@ func (c *BaseClient) requestWithAuthRetry(ctx context.Context, method string, ur
// Refreshing the access token as it is expired
accessToken, err = c.AccessTokenGetter()
if err != nil {
return Response{nil, 0, nil, errorUtils.ConstructNestedError("error while getting access token", err)}
return MakeResponse(nil, 0, nil, errorUtils.ConstructNestedError("error while getting access token", err))
}
// Trying to send the same request again now that the access token has been refreshed
resp = DoHttpRequest(requestParameters{ctx, accessToken, method, url, c.UserAgent, params, bodyStr, ContentTypeJSON})
Expand Down
4 changes: 3 additions & 1 deletion client/client_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"net/http/httptest"
"testing"

contextUtils "github.com/firebolt-db/firebolt-go-sdk/context"

"github.com/firebolt-db/firebolt-go-sdk/utils"
)

Expand Down Expand Up @@ -88,7 +90,7 @@ func testAdditionalHeaders(t *testing.T, clientFactory func(string) Client) {
prepareEnvVariablesForTest(t, server)
client := clientFactory(server.URL)

ctx := context.WithValue(context.TODO(), ContextKey("additionalHeaders"), additionalHeaders)
ctx := contextUtils.WithAdditionalHeaders(context.Background(), additionalHeaders)

_, _ = client.Query(ctx, server.URL, selectOne, map[string]string{}, ConnectionControl{})

Expand Down
Loading

0 comments on commit 83b4692

Please sign in to comment.