Skip to content

Commit

Permalink
Merge pull request #176 from nitrictech/feature/dynamodb-streaming
Browse files Browse the repository at this point in the history
Dynamo DB streaming.
  • Loading branch information
tjholm authored Sep 21, 2021
2 parents 85f5edb + a4c84cf commit 17c7304
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 65 deletions.
211 changes: 146 additions & 65 deletions pkg/plugins/document/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package dynamodb_service

import (
"fmt"
"io"
"regexp"
"sort"
"strings"
Expand Down Expand Up @@ -236,7 +237,7 @@ func (s *DynamoDocService) Delete(key *document.Key) error {
if err != nil {
return newErr(
codes.Internal,
fmt.Sprintf("error performing delete in table"),
"error performing delete in table",
err,
)
}
Expand All @@ -247,7 +248,7 @@ func (s *DynamoDocService) Delete(key *document.Key) error {
if err != nil {
return newErr(
codes.Internal,
fmt.Sprintf("error performing delete"),
"error performing delete",
err,
)
}
Expand All @@ -261,6 +262,26 @@ func (s *DynamoDocService) Delete(key *document.Key) error {
return nil
}

func (s *DynamoDocService) query(collection *document.Collection, expressions []document.QueryExpression, limit int, pagingToken map[string]string) (*document.QueryResult, error) {
queryResult := &document.QueryResult{
Documents: make([]document.Document, 0),
}

var resFunc resultRetriever = s.performQuery
if collection.Parent == nil || collection.Parent.Id == "" {
resFunc = s.performScan
}

if res, err := resFunc(collection, expressions, limit, pagingToken); err != nil {
return nil, err
} else {
queryResult.Documents = append(queryResult.Documents, res.Documents...)
queryResult.PagingToken = res.PagingToken
}

return queryResult, nil
}

func (s *DynamoDocService) Query(collection *document.Collection, expressions []document.QueryExpression, limit int, pagingToken map[string]string) (*document.QueryResult, error) {
newErr := errors.ErrorsWithScope(
"DynamoDocService.Query",
Expand All @@ -285,69 +306,116 @@ func (s *DynamoDocService) Query(collection *document.Collection, expressions []
)
}

queryResult := &document.QueryResult{
Documents: make([]document.Document, 0),
queryResult, err := s.query(collection, expressions, limit, pagingToken)
if err != nil {
return nil, newErr(
codes.Internal,
"query error",
err,
)
}

// If partition key defined then perform a query
if collection.Parent != nil && collection.Parent.Id != "" {
err := s.performQuery(collection, expressions, limit, pagingToken, queryResult)
if err != nil {
remainingLimit := limit - len(queryResult.Documents)

// If more results available, perform additional queries
for remainingLimit > 0 &&
(queryResult.PagingToken != nil && len(queryResult.PagingToken) > 0) {

if res, err := s.query(collection, expressions, remainingLimit, queryResult.PagingToken); err != nil {
return nil, newErr(
codes.Internal,
"query error",
err,
)
} else {
queryResult.Documents = append(queryResult.Documents, res.Documents...)
queryResult.PagingToken = res.PagingToken
}

remainingLimit := limit - len(queryResult.Documents)
remainingLimit = limit - len(queryResult.Documents)
}

// If more results available, perform additional queries
for remainingLimit > 0 &&
(queryResult.PagingToken != nil && len(queryResult.PagingToken) > 0) {
return queryResult, nil
}

err := s.performQuery(collection, expressions, remainingLimit, queryResult.PagingToken, queryResult)
if err != nil {
return nil, newErr(
codes.Internal,
"query error",
err,
)
}
func (s *DynamoDocService) QueryStream(collection *document.Collection, expressions []document.QueryExpression, limit int) document.DocumentIterator {
newErr := errors.ErrorsWithScope(
"DynamoDocService.QueryStream",
map[string]interface{}{
"collection": collection,
},
)

colErr := document.ValidateQueryCollection(collection)
expErr := document.ValidateExpressions(expressions)

remainingLimit = limit - len(queryResult.Documents)
if colErr != nil || expErr != nil {
// Return an error only iterator
return func() (*document.Document, error) {
return nil, newErr(
codes.InvalidArgument,
"invalid arguments",
fmt.Errorf("collection error:%v, expression error: %v", colErr, expErr),
)
}
}

} else {
err := s.performScan(collection, expressions, limit, pagingToken, queryResult)
if err != nil {
var tmpLimit = limit
var documents []document.Document
var pagingToken map[string]string

// Initial fetch
res, fetchErr := s.query(collection, expressions, tmpLimit, nil)

if fetchErr != nil {
// Return an error only iterator if the initial fetch failed
return func() (*document.Document, error) {
return nil, newErr(
codes.Internal,
"scan error",
err,
"query error",
fetchErr,
)
}
}

remainingLimit := limit - len(queryResult.Documents)

// If more results available, perform additional scans
for remainingLimit > 0 &&
(queryResult.PagingToken != nil && len(queryResult.PagingToken) > 0) {
documents = res.Documents
pagingToken = res.PagingToken

return func() (*document.Document, error) {
// check the iteration state
if tmpLimit <= 0 && limit > 0 {
// we've reached the limit of reading
return nil, io.EOF
} else if pagingToken != nil && len(documents) == 0 {
// we've run out of documents and have more pages to read
res, fetchErr = s.query(collection, expressions, tmpLimit, pagingToken)
documents = res.Documents
pagingToken = res.PagingToken
} else if pagingToken == nil && len(documents) == 0 {
// we're all out of documents and pages before hitting the limit
return nil, io.EOF
}

err := s.performScan(collection, expressions, remainingLimit, queryResult.PagingToken, queryResult)
if err != nil {
return nil, newErr(
codes.Internal,
"scan error",
err,
)
}
// We received an error fetching the docs
if fetchErr != nil {
return nil, newErr(
codes.Internal,
"query error",
fetchErr,
)
}

remainingLimit = limit - len(queryResult.Documents)
if len(documents) == 0 {
return nil, io.EOF
}
}

return queryResult, nil
// pop the first element
var doc document.Document
doc, documents = documents[0], documents[1:]
tmpLimit = tmpLimit - 1

return &doc, nil
}
}

// New - Create a new DynamoDB key value plugin implementation
Expand Down Expand Up @@ -415,16 +483,23 @@ func createItemMap(source map[string]interface{}, key *document.Key) map[string]
return newMap
}

type resultRetriever = func(
collection *document.Collection,
expressions []document.QueryExpression,
limit int,
pagingToken map[string]string,
) (*document.QueryResult, error)

func (s *DynamoDocService) performQuery(
collection *document.Collection,
expressions []document.QueryExpression,
limit int,
pagingToken map[string]string,
queryResult *document.QueryResult) error {
) (*document.QueryResult, error) {

if collection.Parent == nil {
// Should never occur
return fmt.Errorf("cannot perform query without partion key defined")
return nil, fmt.Errorf("cannot perform query without partion key defined")
}

// Sort expressions to help map where "A >= %1 AND A <= %2" to DynamoDB expression "A BETWEEN %1 AND %2"
Expand All @@ -433,7 +508,7 @@ func (s *DynamoDocService) performQuery(
tableName, err := s.getTableName(*collection)

if err != nil {
return err
return nil, err
}

input := &dynamodb.QueryInput{
Expand Down Expand Up @@ -470,7 +545,7 @@ func (s *DynamoDocService) performQuery(
expKey := fmt.Sprintf(":%v%v", exp.Operand, i)
valAttrib, err := dynamodbattribute.Marshal(exp.Value)
if err != nil {
return fmt.Errorf("error marshalling %v: %v", exp.Operand, exp.Value)
return nil, fmt.Errorf("error marshalling %v: %v", exp.Operand, exp.Value)
}
input.ExpressionAttributeValues[expKey] = valAttrib
}
Expand All @@ -483,7 +558,7 @@ func (s *DynamoDocService) performQuery(
if len(pagingToken) > 0 {
startKey, err := dynamodbattribute.MarshalMap(pagingToken)
if err != nil {
return fmt.Errorf("error performing query %v: %v", input, err)
return nil, fmt.Errorf("error performing query %v: %v", input, err)
}
input.SetExclusiveStartKey(startKey)
}
Expand All @@ -493,23 +568,26 @@ func (s *DynamoDocService) performQuery(
resp, err := s.client.Query(input)

if err != nil {
return fmt.Errorf("error performing query %v: %v", input, err)
return nil, fmt.Errorf("error performing query %v: %v", input, err)
}

return marshalQueryResult(collection, resp.Items, resp.LastEvaluatedKey, queryResult)
return marshalQueryResult(collection, resp.Items, resp.LastEvaluatedKey)
}

func (s *DynamoDocService) performScan(
collection *document.Collection,
expressions []document.QueryExpression,
limit int,
pagingToken map[string]string,
queryResult *document.QueryResult) error {
) (*document.QueryResult, error) {

// Sort expressions to help map where "A >= %1 AND A <= %2" to DynamoDB expression "A BETWEEN %1 AND %2"
sort.Sort(document.ExpsSort(expressions))

tableName, err := s.getTableName(*collection)
if err != nil {
return nil, err
}

input := &dynamodb.ScanInput{
TableName: tableName,
Expand Down Expand Up @@ -546,7 +624,7 @@ func (s *DynamoDocService) performScan(
expKey := fmt.Sprintf(":%v%v", exp.Operand, i)
valAttrib, err := dynamodbattribute.Marshal(exp.Value)
if err != nil {
return fmt.Errorf("error marshalling %v: %v", exp.Operand, exp.Value)
return nil, fmt.Errorf("error marshalling %v: %v", exp.Operand, exp.Value)
}
input.ExpressionAttributeValues[expKey] = valAttrib
}
Expand All @@ -560,7 +638,7 @@ func (s *DynamoDocService) performScan(
if len(pagingToken) > 0 {
startKey, err := dynamodbattribute.MarshalMap(pagingToken)
if err != nil {
return fmt.Errorf("error performing scan %v: %v", input, err)
return nil, fmt.Errorf("error performing scan %v: %v", input, err)
}
input.SetExclusiveStartKey(startKey)
}
Expand All @@ -569,21 +647,22 @@ func (s *DynamoDocService) performScan(
resp, err := s.client.Scan(input)

if err != nil {
return fmt.Errorf("error performing scan %v: %v", input, err)
return nil, fmt.Errorf("error performing scan %v: %v", input, err)
}

return marshalQueryResult(collection, resp.Items, resp.LastEvaluatedKey, queryResult)
return marshalQueryResult(collection, resp.Items, resp.LastEvaluatedKey)
}

func marshalQueryResult(collection *document.Collection, items []map[string]*dynamodb.AttributeValue, lastEvaluatedKey map[string]*dynamodb.AttributeValue, queryResult *document.QueryResult) error {

func marshalQueryResult(collection *document.Collection, items []map[string]*dynamodb.AttributeValue, lastEvaluatedKey map[string]*dynamodb.AttributeValue) (*document.QueryResult, error) {
// Unmarshal Dynamo response items
var pTkn map[string]string = nil
var valueMaps []map[string]interface{}
err := dynamodbattribute.UnmarshalListOfMaps(items, &valueMaps)
if err != nil {
return fmt.Errorf("error unmarshalling query response: %v", err)
if err := dynamodbattribute.UnmarshalListOfMaps(items, &valueMaps); err != nil {
return nil, fmt.Errorf("error unmarshalling query response: %v", err)
}

docs := make([]document.Document, 0, len(valueMaps))

// Strip keys & append results
for _, m := range valueMaps {
// Retrieve the original ID on the result
Expand Down Expand Up @@ -622,20 +701,22 @@ func marshalQueryResult(collection *document.Collection, items []map[string]*dyn
},
Content: m,
}
queryResult.Documents = append(queryResult.Documents, sdkDoc)
docs = append(docs, sdkDoc)
}

// Unmarshal lastEvalutedKey
var resultPagingToken map[string]string
if len(lastEvaluatedKey) > 0 {
err = dynamodbattribute.UnmarshalMap(lastEvaluatedKey, &resultPagingToken)
if err != nil {
return fmt.Errorf("error unmarshalling query lastEvaluatedKey: %v", err)
if err := dynamodbattribute.UnmarshalMap(lastEvaluatedKey, &resultPagingToken); err != nil {
return nil, fmt.Errorf("error unmarshalling query lastEvaluatedKey: %v", err)
}
queryResult.PagingToken = resultPagingToken
pTkn = resultPagingToken
}

return nil
return &document.QueryResult{
Documents: docs,
PagingToken: pTkn,
}, nil
}

func createFilterExpression(expressions []document.QueryExpression) string {
Expand Down
1 change: 1 addition & 0 deletions tests/plugins/document/dynamodb/dynamodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ var _ = Describe("DynamoDb", func() {
test.SetTests(docPlugin)
test.DeleteTests(docPlugin)
test.QueryTests(docPlugin)
test.QueryStreamTests(docPlugin)
})

func createDynamoClient() *dynamodb.DynamoDB {
Expand Down

0 comments on commit 17c7304

Please sign in to comment.