Skip to content

Commit

Permalink
Merge pull request #186 from nitrictech/feature/minio-storage-plugin
Browse files Browse the repository at this point in the history
minio storage plugin
  • Loading branch information
jyecusch authored Oct 13, 2021
2 parents 35b0f55 + f48e01f commit d3020da
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 27 deletions.
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,8 @@ golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.6 h1:SIasE1FVIQOWz2GEAHFOmoW7xchJcqlucjSULTL0Ag4=
golang.org/x/tools v0.1.6/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
golang.org/x/tools v0.1.7 h1:6j8CgantCy3yc8JGBqkDLMKWqZ0RDU2g1HVgacojGWQ=
golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
104 changes: 104 additions & 0 deletions pkg/plugins/storage/minio/minio.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2021 Nitric Pty Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package minio_storage_service

import (
"fmt"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/nitric-dev/membrane/pkg/plugins/storage"
s3_service "github.com/nitric-dev/membrane/pkg/plugins/storage/s3"
"github.com/nitric-dev/membrane/pkg/utils"
)

const (
MINIO_ENDPOINT_ENV = "MINIO_ENDPOINT"
MINIO_ACCESS_KEY_ENV = "MINIO_ACCESS_KEY"
MINIO_SECRET_KEY_ENV = "MINIO_SECRET_KEY"
)

type minioConfig struct {
endpoint string
accessKey string
secretKey string
}

func configFromEnv() (*minioConfig, error) {
endpoint := utils.GetEnv(MINIO_ENDPOINT_ENV, "")
accKey := utils.GetEnv(MINIO_ACCESS_KEY_ENV, "")
secKey := utils.GetEnv(MINIO_SECRET_KEY_ENV, "")

configErrors := make([]error, 0)

if endpoint == "" {
configErrors = append(configErrors, fmt.Errorf("%s not configured", MINIO_ENDPOINT_ENV))
}

if accKey == "" {
configErrors = append(configErrors, fmt.Errorf("%s not configured", MINIO_ACCESS_KEY_ENV))
}

if secKey == "" {
configErrors = append(configErrors, fmt.Errorf("%s not configured", MINIO_SECRET_KEY_ENV))
}

if len(configErrors) > 0 {
return nil, fmt.Errorf("configuration errors: %v", configErrors)
}

return &minioConfig{
endpoint: endpoint,
accessKey: accKey,
secretKey: secKey,
}, nil
}

func nameSelector(nitricName string, bucket *s3.Bucket) (bool, error) {
if *bucket.Name == nitricName {
return true, nil
}

return false, nil
}

func New() (storage.StorageService, error) {

conf, err := configFromEnv()

if err != nil {
return nil, err
}

// Configure to use MinIO Server
s3Config := &aws.Config{
Credentials: credentials.NewStaticCredentials(conf.accessKey, conf.secretKey, ""),
Endpoint: aws.String(conf.endpoint),
Region: aws.String("us-east-1"),
DisableSSL: aws.Bool(true),
S3ForcePathStyle: aws.Bool(true),
}
newSession, err := session.NewSession(s3Config)

if err != nil {
return nil, fmt.Errorf("error creating new session")
}

s3Client := s3.New(newSession)

return s3_service.NewWithClient(s3Client, s3_service.WithSelector(nameSelector))
}
33 changes: 33 additions & 0 deletions pkg/plugins/storage/s3/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2021 Nitric Pty Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package s3_service

type S3StorageServiceOption interface {
Apply(*S3StorageService)
}

type withSelector struct {
selector BucketSelector
}

func (w *withSelector) Apply(service *S3StorageService) {
service.selector = w.selector
}

func WithSelector(selector BucketSelector) S3StorageServiceOption {
return &withSelector{
selector: selector,
}
}
78 changes: 55 additions & 23 deletions pkg/plugins/storage/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,42 +41,68 @@ const (
// S3StorageService - Is the concrete implementation of AWS S3 for the Nitric Storage Plugin
type S3StorageService struct {
//storage.UnimplementedStoragePlugin
client s3iface.S3API
client s3iface.S3API
selector BucketSelector
}

type BucketSelector = func(nitricName string, b *s3.Bucket) (bool, error)

func (s *S3StorageService) tagSelector(name string, bucket *s3.Bucket) (bool, error) {
// TODO: This could be rather slow, it's interesting that they don't return this in the list buckets output
tagout, err := s.client.GetBucketTagging(&s3.GetBucketTaggingInput{
Bucket: bucket.Name,
})

if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
// Table not found, try to create and put again
if awsErr.Code() == ErrCodeNoSuchTagSet {
// Ignore buckets with no tags, check the next bucket
return false, nil
}
}
return false, err
}

for _, tag := range tagout.TagSet {
if *tag.Key == "x-nitric-name" && *tag.Value == name {
return true, nil
}
}

return false, nil
}

// getBucketByName - Finds and returns a bucket by it's Nitric name
func (s *S3StorageService) getBucketByName(bucket string) (*s3.Bucket, error) {
out, err := s.client.ListBuckets(&s3.ListBucketsInput{})

if err != nil {
return nil, fmt.Errorf("Encountered an error retrieving the bucket list: %v", err)
return nil, fmt.Errorf("encountered an error retrieving the bucket list: %v", err)
}

for _, b := range out.Buckets {
// TODO: This could be rather slow, it's interesting that they don't return this in the list buckets output
tagout, err := s.client.GetBucketTagging(&s3.GetBucketTaggingInput{
Bucket: b.Name,
})
var selected bool = false
var selectErr error = nil

if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == ErrCodeNoSuchTagSet {
// Ignore buckets with no tags, check the next bucket
continue
}
return nil, err
}
if s.selector == nil {
// if selector is undefined us the default selector
selected, selectErr = s.tagSelector(bucket, b)
} else {
// Use provided selector if one available
selected, selectErr = s.selector(bucket, b)
}

if selectErr != nil {
return nil, err
}

for _, tag := range tagout.TagSet {
if *tag.Key == "x-nitric-name" && *tag.Value == bucket {
return b, nil
}
if selected {
return b, nil
}
}

return nil, fmt.Errorf("Unable to find bucket with name: %s", bucket)
return nil, fmt.Errorf("unable to find bucket with name: %s", bucket)
}

// Read - Retrieves an item from a bucket
Expand Down Expand Up @@ -249,7 +275,7 @@ func New() (storage.StorageService, error) {
})

if sessionError != nil {
return nil, fmt.Errorf("Error creating new AWS session %v", sessionError)
return nil, fmt.Errorf("error creating new AWS session %v", sessionError)
}

s3Client := s3.New(sess)
Expand All @@ -260,8 +286,14 @@ func New() (storage.StorageService, error) {
}

// NewWithClient creates a new S3 Storage plugin and injects the given client
func NewWithClient(client s3iface.S3API) (storage.StorageService, error) {
return &S3StorageService{
func NewWithClient(client s3iface.S3API, opts ...S3StorageServiceOption) (storage.StorageService, error) {
s3Client := &S3StorageService{
client: client,
}, nil
}

for _, o := range opts {
o.Apply(s3Client)
}

return s3Client, nil
}
4 changes: 2 additions & 2 deletions pkg/providers/dev/membrane.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
events_service "github.com/nitric-dev/membrane/pkg/plugins/events/dev"
gateway_plugin "github.com/nitric-dev/membrane/pkg/plugins/gateway/dev"
queue_service "github.com/nitric-dev/membrane/pkg/plugins/queue/dev"
boltdb_storage_service "github.com/nitric-dev/membrane/pkg/plugins/storage/boltdb"
secret_service "github.com/nitric-dev/membrane/pkg/plugins/secret/dev"
minio_storage_service "github.com/nitric-dev/membrane/pkg/plugins/storage/minio"
)

func main() {
Expand All @@ -41,7 +41,7 @@ func main() {
eventsPlugin, _ := events_service.New()
gatewayPlugin, _ := gateway_plugin.New()
queuePlugin, _ := queue_service.New()
storagePlugin, _ := boltdb_storage_service.New()
storagePlugin, _ := minio_storage_service.New()

m, err := membrane.New(&membrane.MembraneOptions{
DocumentPlugin: documentPlugin,
Expand Down
4 changes: 2 additions & 2 deletions pkg/providers/dev/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/nitric-dev/membrane/pkg/plugins/queue"
queue_service "github.com/nitric-dev/membrane/pkg/plugins/queue/dev"
"github.com/nitric-dev/membrane/pkg/plugins/storage"
boltdb_storage_service "github.com/nitric-dev/membrane/pkg/plugins/storage/boltdb"
minio_storage_service "github.com/nitric-dev/membrane/pkg/plugins/storage/minio"
"github.com/nitric-dev/membrane/pkg/providers"
)

Expand Down Expand Up @@ -57,5 +57,5 @@ func (p *DevServiceFactory) NewQueueService() (queue.QueueService, error) {

// NewStorageService - Returns local dev storage plugin
func (p *DevServiceFactory) NewStorageService() (storage.StorageService, error) {
return boltdb_storage_service.New()
return minio_storage_service.New()
}

0 comments on commit d3020da

Please sign in to comment.