Skip to content

Commit

Permalink
update libs
Browse files Browse the repository at this point in the history
  • Loading branch information
lwahlmeier committed Oct 18, 2022
1 parent 2fd8469 commit e7c017e
Show file tree
Hide file tree
Showing 53 changed files with 720 additions and 3,995 deletions.
9 changes: 4 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ go 1.18

require (
github.com/google/uuid v1.3.0
github.com/lwahlmeier/lcwlog v0.0.1
github.com/lwahlmeier/GoScheduler v1.0.1
github.com/lwahlmeier/lcwlog v0.1.0
github.com/lwahlmeier/unboundChannel v1.0.0
github.com/spf13/cobra v1.5.0
github.com/spf13/viper v1.1.1
github.com/stretchr/testify v1.8.0
Expand All @@ -14,14 +16,12 @@ require (
)

require (
github.com/cornelk/hashmap v1.0.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dchest/siphash v1.1.0 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/lwahlmeier/pyfmt v0.0.1 // indirect
github.com/lwahlmeier/pyfmt v0.2.0 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
Expand All @@ -30,7 +30,6 @@ require (
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/tevino/abool v1.2.0 // indirect
golang.org/x/net v0.0.0-20220909164309-bea034e7d591 // indirect
golang.org/x/sys v0.0.0-20220909162455-aba9fc2a8ff2 // indirect
golang.org/x/text v0.3.7 // indirect
Expand Down
20 changes: 8 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,10 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cornelk/hashmap v1.0.1 h1:RXGcy29hEdLLV8T6aK4s+BAd4tq4+3Hq50N2GoG0uIg=
github.com/cornelk/hashmap v1.0.1/go.mod h1:8wbysTUDnwJGrPZ1Iwsou3m+An6sldFrJItjRhfegCw=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dchest/siphash v1.1.0 h1:1Rs9eTUlZLPBEvV+2sTaM8O0NWn0ppbgqS7p11aWawI=
github.com/dchest/siphash v1.1.0/go.mod h1:q+IRvb2gOSrUnYoPqHiyHXS0FOBBOdl6tONBlVnOnt4=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down Expand Up @@ -106,7 +102,6 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
Expand Down Expand Up @@ -145,10 +140,14 @@ github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/lwahlmeier/lcwlog v0.0.1 h1:SxB+9hsfY8EC1/Y5PrKiFY9aPX1XP2Cvdc6CxhiDrhw=
github.com/lwahlmeier/lcwlog v0.0.1/go.mod h1:gkvtifa+KrERlYm1RobxuHT8sAlK4E4Q4fRcHHDB2DE=
github.com/lwahlmeier/pyfmt v0.0.1 h1:dhSNbX+/oacDsAAMoY32PVLNyt3qFb9KyHthxNsIvPE=
github.com/lwahlmeier/pyfmt v0.0.1/go.mod h1:IczzcvzCJCBjvOtZ3jFtb420GnoTzfgzA5tzgS47Sww=
github.com/lwahlmeier/GoScheduler v1.0.1 h1:n/ITVaSnXIEO761o5JSVG188/51lt+1xoocmqaheWxQ=
github.com/lwahlmeier/GoScheduler v1.0.1/go.mod h1:dypP1694paqSgaD69kytbnNRqRcsx2ogmokbcfgXyac=
github.com/lwahlmeier/lcwlog v0.1.0 h1:rAmgDV8QMy/sUrH70jWxnfdMvzvg9+WHF4LHa135Gy4=
github.com/lwahlmeier/lcwlog v0.1.0/go.mod h1:1OeOKw6wTKuiYn9+MDw5nNx7s6cKyb66/drs+wJD0kI=
github.com/lwahlmeier/pyfmt v0.2.0 h1:MdVYQQb19ZJbpBROuLRtLwj+94Au/EYIZUb6lT1zYN8=
github.com/lwahlmeier/pyfmt v0.2.0/go.mod h1:FzApCDajWuGo3Ww79197mPDNwwdv+ungAHZKRUz5vIg=
github.com/lwahlmeier/unboundChannel v1.0.0 h1:7EN9RTh0W4HZoNActAZAJ/fGQYSYl0n2Eo1t0K0KNQ4=
github.com/lwahlmeier/unboundChannel v1.0.0/go.mod h1:H73PyOLKqEC2LJcfhu7aEVWrxCmlJNu9x+Cqv5dL+9E=
github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo=
github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
Expand Down Expand Up @@ -185,8 +184,6 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/tevino/abool v1.2.0 h1:heAkClL8H6w+mK5md9dzsuohKeXHUpY7Vw0ZCKW+huA=
github.com/tevino/abool v1.2.0/go.mod h1:qc66Pna1RiIsPa7O4Egxxs9OqkuxDX55zznh9K07Tzg=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down Expand Up @@ -497,7 +494,6 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
75 changes: 45 additions & 30 deletions internal/fspubsub/FSSubscription.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fspubsub

import (
"context"
"errors"
"fmt"
"io/ioutil"
Expand All @@ -9,11 +10,13 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"time"

"githb.com/lwahlmeier/go-pubsub-emulator/internal/base"
"githb.com/lwahlmeier/go-pubsub-emulator/internal/utils"
"github.com/google/uuid"
"github.com/lwahlmeier/GoScheduler"
unboundchannel "github.com/lwahlmeier/unboundChannel"
"google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -31,8 +34,9 @@ type FSSubscription struct {
ackLock sync.Mutex
streamClients map[string]*StreamingSubcription
clientLock sync.Mutex
msgsChannel *utils.DynamicUUIDChannel
running bool
msgsChannel *unboundchannel.UnboundChannel[uuid.UUID]
running atomic.Bool
cf context.CancelFunc
}

func CreateSubscription(basePath string, topic *FSTopic, sub *pubsub.Subscription) (*FSSubscription, error) {
Expand All @@ -58,9 +62,10 @@ func CreateSubscription(basePath string, topic *FSTopic, sub *pubsub.Subscriptio
subPath: subPath,
pubsubSubscription: sub,
streamClients: make(map[string]*StreamingSubcription),
msgsChannel: utils.NewDynamicUUIDChannel(),
running: true,
msgsChannel: unboundchannel.NewUnboundChannel[uuid.UUID](),
running: atomic.Bool{},
}
fsSub.running.Store(true)

sfp := fsSub.GetSubFilePath()
data, err := proto.Marshal(sub)
Expand All @@ -72,7 +77,9 @@ func CreateSubscription(basePath string, topic *FSTopic, sub *pubsub.Subscriptio
return nil, err
}
logger.Info("Created Sub:{} for Topic:{} in Project:{}", subName, topic.name, topic.project.name)
go fsSub.watchAcks()
ctx, cf := context.WithCancel(context.Background())
fsSub.cf = cf
GoScheduler.GetDefaultScheduler().ScheduleWithContext(ack_check_time, true, fsSub.nackMessages, ctx)
return fsSub, nil
}

Expand All @@ -83,9 +90,10 @@ func LoadSubscription(subName string, topic *FSTopic) (*FSSubscription, error) {
topic: topic,
subPath: subPath,
streamClients: make(map[string]*StreamingSubcription),
msgsChannel: utils.NewDynamicUUIDChannel(),
running: true,
msgsChannel: unboundchannel.NewUnboundChannel[uuid.UUID](),
running: atomic.Bool{},
}
fsSub.running.Store(true)
psSub := &pubsub.Subscription{}
fp, err := os.Open(fsSub.GetSubFilePath())
if err != nil {
Expand Down Expand Up @@ -116,19 +124,12 @@ func LoadSubscription(subName string, topic *FSTopic) (*FSSubscription, error) {
fsSub.msgsChannel.Add() <- mid
}
logger.Info("Loaded Sub:{} for Topic:{} in Project:{}", subName, topic.name, topic.project.name)
go fsSub.watchAcks()
ctx, cf := context.WithCancel(context.Background())
fsSub.cf = cf
GoScheduler.GetDefaultScheduler().ScheduleWithContext(ack_check_time, true, fsSub.nackMessages, ctx)
return fsSub, nil
}

func (fss *FSSubscription) watchAcks() {
ackTimer := time.NewTimer(ack_check_time)
for fss.running {
<-ackTimer.C
fss.nackMessages()
ackTimer.Reset(ack_check_time)
}
}

func (fss *FSSubscription) GetTopic() base.BaseTopic {
return fss.topic
}
Expand All @@ -146,6 +147,9 @@ func (fss *FSSubscription) GetSubscriptionPubSub() *pubsub.Subscription {
}

func (fss *FSSubscription) Publish(msg *pubsub.PubsubMessage) {
if !fss.running.Load() {
return
}
data, _ := proto.Marshal(msg)
tmpPath := path.Join(fss.subPath, fmt.Sprintf("%s.tmp-msg.proto", msg.MessageId))
msgPath := path.Join(fss.subPath, fmt.Sprintf("%s.msg.proto", msg.MessageId))
Expand All @@ -157,6 +161,9 @@ func (fss *FSSubscription) Publish(msg *pubsub.PubsubMessage) {
}

func (fss *FSSubscription) UpdateAcks(ackIds []string, deadline int32) {
if !fss.running.Load() {
return
}
doTime := time.Second * time.Duration(deadline)
if doTime < time.Second {
doTime = time.Duration(0)
Expand All @@ -179,6 +186,9 @@ func (fss *FSSubscription) UpdateAcks(ackIds []string, deadline int32) {
}

func (fss *FSSubscription) UpdateAck(ackId string, addTime time.Duration) {
if !fss.running.Load() {
return
}
doTime := addTime
if addTime < time.Second {
doTime = time.Duration(0)
Expand All @@ -194,10 +204,13 @@ func (fss *FSSubscription) UpdateAck(ackId string, addTime time.Duration) {
return
}
os.Chtimes(msgPath, ackTill, ackTill)

}

func (fss *FSSubscription) nackMessages() {
fmt.Println("NACK")
if !fss.running.Load() {
return
}
fss.ackLock.Lock()
defer fss.ackLock.Unlock()
nacks := make([]uuid.UUID, 0)
Expand Down Expand Up @@ -317,16 +330,18 @@ func (fss *FSSubscription) GetMessages(max int32, maxWait time.Duration) []*pubs
}

func (fss *FSSubscription) Delete() {
fss.running = false
fss.clientLock.Lock()
defer fss.clientLock.Unlock()
for _, ss := range fss.streamClients {
ss.running = false
ss.streamingServer.Context().Done()
ss.acker.Stop()
if fss.running.CompareAndSwap(true, false) {
fss.clientLock.Lock()
defer fss.clientLock.Unlock()
fss.cf()
for _, ss := range fss.streamClients {
ss.running = false
ss.streamingServer.Context().Done()
ss.acker.Stop()
}
fss.msgsChannel.Stop()
os.RemoveAll(fss.subPath)
}
fss.msgsChannel.Stop()
os.RemoveAll(fss.subPath)
}

func (fss *FSSubscription) CreateStreamingSubscription(firstRecvMsg *pubsub.StreamingPullRequest, streamingServer pubsub.Subscriber_StreamingPullServer) base.BaseStreamingSubcription {
Expand All @@ -345,7 +360,7 @@ func (fss *FSSubscription) CreateStreamingSubscription(firstRecvMsg *pubsub.Stre
deadline: time.Second * time.Duration(firstRecvMsg.StreamAckDeadlineSeconds),
running: true,
pendingMsgs: make(map[uuid.UUID]bool),
acker: utils.NewDynamicUUIDChannel(),
acker: unboundchannel.NewUnboundChannel[uuid.UUID](),
recvChan: make(chan *pubsub.StreamingPullRequest),
}
fss.clientLock.Lock()
Expand All @@ -372,7 +387,7 @@ type StreamingSubcription struct {
clientId string
deadline time.Duration
recvChan chan *pubsub.StreamingPullRequest
acker *utils.DynamicUUIDChannel
acker *unboundchannel.UnboundChannel[uuid.UUID]
running bool
}

Expand Down
5 changes: 4 additions & 1 deletion internal/fspubsub/FSSubscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,20 @@ func TestBasicExpireAck(t *testing.T) {
go ss.Run()
msgUUID := uuid.NewString()
data := []byte("TEST")
fmt.Println("Publish")
sub.GetTopic().PublishMessage(&pubsub.PubsubMessage{MessageId: msgUUID, Data: data})

rmsg := <-tsps.recvChannel

assert.Equal(t, 1, len(rmsg.ReceivedMessages))
assert.Equal(t, data, rmsg.ReceivedMessages[0].Message.Data)
tsps.sendChannel <- &pubsub.StreamingPullRequest{
ModifyDeadlineSeconds: []int32{0},
ModifyDeadlineAckIds: []string{msgUUID},
}
fmt.Println("Publish2")
time.Sleep(ack_check_time * 2)
rmsg = <-tsps.recvChannel
fmt.Println("Publish3")
assert.Equal(t, 1, len(rmsg.ReceivedMessages))
assert.Equal(t, data, rmsg.ReceivedMessages[0].Message.Data)
tsps.sendChannel <- &pubsub.StreamingPullRequest{
Expand Down
Loading

0 comments on commit e7c017e

Please sign in to comment.