Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TLS Support, Cluster Support #43

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bin/boot-dev.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#!/usr/bin/env sh

# get dependencies, build cmds
go mod download
go build ./...

# autowatch and run tests
ls -d * */* */*/* | entr -r go test $(go list ./...)
ls -d * */* */*/* | entr -n -r go test $(go list ./...)
5 changes: 5 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ services:
image: redis:5.0-alpine
ports:
- "6379:6379"
redisNew:
init: true
image: redis:5.0-alpine
ports:
- "6380:6379"
volumes:
deps:
driver: local
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
module github.com/stickermule/rump

go 1.12
go 1.18

require (
github.com/mediocregopher/radix/v3 v3.2.3
github.com/mediocregopher/radix/v3 v3.8.1
github.com/pkg/errors v0.9.1
golang.org/x/sync v0.0.0-20190423024810-112230192c58
)

require golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 // indirect
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/mediocregopher/mediocre-go-lib v0.0.0-20181029021733-cb65787f37ed h1:3dQJqqDouawQgl3gBE1PNHKFkJYGEuFb1DbSlaxdosE=
github.com/mediocregopher/mediocre-go-lib v0.0.0-20181029021733-cb65787f37ed/go.mod h1:dSsfyI2zABAdhcbvkXqgxOxrCsbYeHCPgrZkku60dSg=
github.com/mediocregopher/radix/v3 v3.2.3 h1:TbcGCZdo9zfPYPgevsqRn+OjvCyfOK6TzuXhqzWdCt0=
github.com/mediocregopher/radix/v3 v3.2.3/go.mod h1:EmfVyvspXz1uZEyPBMyGK+kjWiKQGvsUt6O3Pj+LDCQ=
github.com/mediocregopher/radix/v3 v3.8.1 h1:rOkHflVuulFKlwsLY01/M2cM2tWCjDoETcMqKbAWu1M=
github.com/mediocregopher/radix/v3 v3.8.1/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 h1:/atklqdjdhuosWIl6AIbOeHJjicWYPqR9bpxqxYG2pA=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
2 changes: 1 addition & 1 deletion infra/Dockerfile.dev
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.12.4-alpine
FROM golang:1.18-alpine

# disable cgo to avoid gcc requirement bug
ENV CGO_ENABLED=0
Expand Down
67 changes: 48 additions & 19 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,30 @@ package config
import (
"flag"
"fmt"
"net/url"
"os"
"strings"
)

// Resource can be either Redis (isRedis) or file.
// URI is either a Redis URI or a file path.
type Resource struct {
URI string
IsRedis bool
url.URL
}

func (r Resource) IsRedis() bool {
return contains([]string{"redis", "rediss", "credis", "crediss"}, r.Scheme)
}

func (r Resource) IsSecure() bool {
return r.Scheme == "rediss" || r.Scheme == "crediss"
}

func (r Resource) IsCluster() bool {
return r.Scheme == "credis" || r.Scheme == "crediss"
}

func (r Resource) FormattedString() string {
return fmt.Sprintf("redis://%v", r.Host)
}

// Config represents the current source and target config.
Expand All @@ -34,35 +49,49 @@ func exit(e error) {
os.Exit(1)
}

// https://play.golang.org/p/Qg_uv_inCek
// contains checks if a string is present in a slice
func contains(s []string, str string) bool {
for _, v := range s {
if v == str {
return true
}
}

return false
}

// validate makes sure from and to are Redis URIs or file paths,
// and generates the final Config.
func validate(from, to string, silent, ttl bool) (Config, error) {

source, err := url.Parse(from)
if err != nil {
return Config{}, err
}

target, err := url.Parse(to)
if err != nil {
return Config{}, err
}

cfg := Config{
Source: Resource{
URI: from,
*source,
},
Target: Resource{
URI: to,
*target,
},
Silent: silent,
TTL: ttl,
}

if strings.HasPrefix(from, "redis://") {
cfg.Source.IsRedis = true
}

if strings.HasPrefix(to, "redis://") {
cfg.Target.IsRedis = true
}

// Guard from incorrect usage.
switch {
case cfg.Source.URI == "":
return cfg, fmt.Errorf("from is required")
case cfg.Target.URI == "":
return cfg, fmt.Errorf("to is required")
case !cfg.Source.IsRedis && !cfg.Target.IsRedis:
case cfg.Source.String() == "":
return cfg, fmt.Errorf("source not valid redis url")
case cfg.Target.String() == "":
return cfg, fmt.Errorf("target not valid redis url")
case !cfg.Source.IsRedis() && !cfg.Target.IsRedis():
return cfg, fmt.Errorf("file-only operations not supported")
}

Expand Down
57 changes: 45 additions & 12 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ func TestFromRedisToRedis(t *testing.T) {
t.Error("from redis to redis should work")
}

if !cfg.Source.IsRedis {
if !cfg.Source.IsRedis() {
t.Error("wrong from")
}

if !cfg.Target.IsRedis {
if !cfg.Target.IsRedis() {
t.Error("wrong to")
}

if cfg.Source.URI != "redis://s" {
if cfg.Source.String() != "redis://s" {
t.Error("wrong source")
}

if cfg.Target.URI != "redis://t" {
if cfg.Target.String() != "redis://t" {
t.Error("wrong target")
}
}
Expand All @@ -54,19 +54,19 @@ func TestFromRedisToFile(t *testing.T) {
t.Error("from redis to file should work")
}

if !cfg.Source.IsRedis {
if !cfg.Source.IsRedis() {
t.Error("wrong from")
}

if cfg.Target.IsRedis {
if cfg.Target.IsRedis() {
t.Error("wrong to")
}

if cfg.Source.URI != "redis://s" {
if cfg.Source.String() != "redis://s" {
t.Error("wrong source")
}

if cfg.Target.URI != "/t.rump" {
if cfg.Target.String() != "/t.rump" {
t.Error("wrong target")
}
}
Expand All @@ -77,19 +77,52 @@ func TestFromFileToRedis(t *testing.T) {
t.Error("from file to redis should work")
}

if cfg.Source.IsRedis {
if cfg.Source.IsRedis() {
t.Error("wrong from")
}

if !cfg.Target.IsRedis {
if !cfg.Target.IsRedis() {
t.Error("wrong to")
}

if cfg.Source.URI != "/s.rump" {
if cfg.Source.String() != "/s.rump" {
t.Error("wrong source")
}

if cfg.Target.URI != "redis://t" {
if cfg.Target.String() != "redis://t" {
t.Error("wrong target")
}
}

func TestFromRedisToRedisWithAuth(t *testing.T) {
cfg, err := validate("redis://:a@s", "redis://:b@t", false, false)
if err != nil {
t.Error("from redis to redis should work")
}

if !cfg.Source.IsRedis() {
t.Error("wrong from")
}

if !cfg.Target.IsRedis() {
t.Error("wrong to")
}

if cfg.Source.FormattedString() != "redis://s" {
t.Error("wrong source")
}

sp, _ := cfg.Source.User.Password()
if sp != "a" {
t.Error("wrong source auth")
}

if cfg.Target.FormattedString() != "redis://t" {
t.Error("wrong target")
}

tp, _ := cfg.Target.User.Password()
if tp != "b" {
t.Error("wrong target auth")
}
}
46 changes: 41 additions & 5 deletions pkg/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/mediocregopher/radix/v3"

"github.com/stickermule/rump/pkg/config"
"github.com/stickermule/rump/pkg/message"
)

Expand All @@ -21,13 +22,16 @@ type Redis struct {
}

// New creates the Redis struct, used to read/write.
func New(source *radix.Pool, bus message.Bus, silent, ttl bool) *Redis {
func New(addr string, cfg config.Config, bus message.Bus) (*Redis, error) {

source, err := radix.NewPool("tcp", addr, 1, radix.PoolConnFunc(authConn(cfg.Source)))

return &Redis{
Pool: source,
Bus: bus,
Silent: silent,
TTL: ttl,
}
Silent: cfg.Silent,
TTL: cfg.TTL,
}, err
}

// maybeLog may log, depending on the Silent flag
Expand All @@ -46,9 +50,11 @@ func (r *Redis) maybeTTL(key string) (string, error) {
}

var ttl string
var err error

// Try getting key TTL.
err := r.Pool.Do(radix.Cmd(&ttl, "PTTL", key))
err = r.Pool.Do(radix.Cmd(&ttl, "PTTL", key))

if err != nil {
return ttl, err
}
Expand Down Expand Up @@ -78,6 +84,7 @@ func (r *Redis) Read(ctx context.Context) error {
// Scan and push to bus until no keys are left.
// If context Done, exit early.
for scanner.Next(&key) {

err := r.Pool.Do(radix.Cmd(&value, "DUMP", key))
if err != nil {
return err
Expand Down Expand Up @@ -128,3 +135,32 @@ func (r *Redis) Write(ctx context.Context) error {

return nil
}

func (r *Redis) Monitor(ctx context.Context) error {

// Loop until channel is open
for r.Bus != nil {
select {
// Exit early if context done.
case <-ctx.Done():
fmt.Println("")
fmt.Println("redis write: exit")
return ctx.Err()
// Get Messages from Bus
case p, ok := <-r.Bus:
// if channel closed, set to nil, break loop
if !ok {
r.Bus = nil
continue
}
err := r.Pool.Do(radix.Cmd(&p.Value, "MONITOR", ""))
if err != nil {
return err
}

r.maybeLog(p.Value)
}
}

return nil
}
Loading