diff --git a/PubSubEmulator.go b/PubSubEmulator.go index c458c44..e677c51 100644 --- a/PubSubEmulator.go +++ b/PubSubEmulator.go @@ -11,6 +11,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" + "google.golang.org/protobuf/types/known/timestamppb" ) var logger = lcwlog.GetLoggerWithPrefix("PubSubEmulator") @@ -57,7 +58,9 @@ func (ps *PubSubEmulator) Publish(ctx context.Context, pr *pubsub.PublishRequest } mids := make([]string, 0) for _, msg := range pr.Messages { - msg.MessageId = uuid.New().String() + msg.PublishTime = timestamppb.Now() + msg.MessageId = uuid.NewString() + logger.Info("{}:{} Published mid:{}", pjName, topicName, msg.MessageId) err = topic.PublishMessage(msg) if err != nil { logger.Warn("Project:{}, Topic:{} error publishing:{}", pjName, topicName, err.Error()) diff --git a/internal/fspubsub/FSSubscription.go b/internal/fspubsub/FSSubscription.go index 1230498..8bb7532 100644 --- a/internal/fspubsub/FSSubscription.go +++ b/internal/fspubsub/FSSubscription.go @@ -152,6 +152,7 @@ func (fss *FSSubscription) Publish(msg *pubsub.PubsubMessage) { ioutil.WriteFile(tmpPath, data, os.ModePerm) // We Rename to keep from sending notify too soon before we write data os.Rename(tmpPath, msgPath) + logger.Debug("Sub:{} New Message mid:{}", fss.name, msg.MessageId) fss.msgsChannel.Add() <- uuid.MustParse(msg.MessageId) } @@ -167,6 +168,7 @@ func (fss *FSSubscription) UpdateAcks(ackIds []string, deadline int32) { defer fss.ackLock.Unlock() // We do this here to save multipule locks if we do UpdateAck for _, mid := range ackIds { + logger.Debug("Sub:{} Updating add Ack:{} by:{}", fss.name, mid, doTime) msgPath := path.Join(fss.subPath, fmt.Sprintf("%s.ack.proto", mid)) _, err := os.Stat(msgPath) if err != nil { @@ -233,6 +235,7 @@ func (fss *FSSubscription) nackMessages() { func (fss *FSSubscription) AckMessages(ackIds []string) { for _, ackid := range ackIds { + logger.Debug("Sub:{} Acking:{}", fss.name, ackid) ackFile := path.Join(fss.subPath, fmt.Sprintf("%s.ack.proto", ackid)) os.RemoveAll(ackFile) } @@ -329,11 +332,15 @@ func (fss *FSSubscription) Delete() { func (fss *FSSubscription) CreateStreamingSubscription(firstRecvMsg *pubsub.StreamingPullRequest, streamingServer pubsub.Subscriber_StreamingPullServer) base.BaseStreamingSubcription { cid := uuid.NewString() logger.Info("Project:{}:Topic:{}:Sub:{}, Creating SubStream:{}", fss.topic.project.name, fss.topic.name, fss.name, cid) + maxMsg := int64(10) + if firstRecvMsg.MaxOutstandingMessages > 0 { + maxMsg = firstRecvMsg.MaxOutstandingMessages + } ss := &StreamingSubcription{ sub: fss, streamingServer: streamingServer, clientId: cid, - maxMsgs: firstRecvMsg.MaxOutstandingMessages, + maxMsgs: maxMsg, maxBytes: firstRecvMsg.MaxOutstandingBytes, deadline: time.Second * time.Duration(firstRecvMsg.StreamAckDeadlineSeconds), running: true, @@ -399,7 +406,8 @@ func (ss *StreamingSubcription) msgSelect() []*pubsub.ReceivedMessage { if msg == nil { return msgs } - ss.pendingMsgs[uuid.MustParse(msg.AckId)] = true + logger.Debug("Sub:{} Sending mid:{}", ss.sub.name, msg.AckId) + ss.pendingMsgs[mid] = true msgs = append(msgs, msg) maxWait := time.NewTimer(time.Millisecond * 5) for len(ss.pendingMsgs) < int(ss.maxMsgs) { @@ -408,7 +416,8 @@ func (ss *StreamingSubcription) msgSelect() []*pubsub.ReceivedMessage { case mid := <-ss.sub.msgsChannel.Get(): msg := ss.sub.getLocalMessages(mid) if msg != nil { - ss.pendingMsgs[uuid.MustParse(msg.AckId)] = true + ss.pendingMsgs[mid] = true + logger.Debug("Sub:{} Sending mid:{}", ss.sub.name, msg.AckId) msgs = append(msgs, msg) } case <-maxWait.C: diff --git a/internal/mempubsub/MemSubscription.go b/internal/mempubsub/MemSubscription.go index 3716fb3..2acf34b 100644 --- a/internal/mempubsub/MemSubscription.go +++ b/internal/mempubsub/MemSubscription.go @@ -107,6 +107,7 @@ func (ms *MemSubscription) getDefaultAckDeadline() time.Duration { } func (ms *MemSubscription) GetMessages(maxMsgs int32, maxWait time.Duration) []*pubsub.ReceivedMessage { + logger.Info("Poll Messages from:{}:{}:{}", ms.topic.project.name, ms.topic.name, ms.name) msgs := make([]*pubsub.ReceivedMessage, 0) timer := time.NewTimer(maxWait) ackDelay := ms.getDefaultAckDeadline() @@ -134,12 +135,16 @@ func (ms *MemSubscription) GetMessages(maxMsgs int32, maxWait time.Duration) []* func (ms *MemSubscription) CreateStreamingSubscription(firstRecvMsg *pubsub.StreamingPullRequest, streamingServer pubsub.Subscriber_StreamingPullServer) base.BaseStreamingSubcription { cid := uuid.NewString() logger.Info("Project:{}:Topic:{}:Sub:{}, Creating SubStream:{}", ms.topic.project.name, ms.topic.name, ms.name, cid) + maxMsg := int64(10) + if firstRecvMsg.MaxOutstandingMessages > 0 { + maxMsg = firstRecvMsg.MaxOutstandingMessages + } ss := &StreamingSubcription{ sub: ms, streamingServer: streamingServer, timer: time.NewTimer(ack_check_time), clientId: cid, - maxMsgs: firstRecvMsg.MaxOutstandingMessages, + maxMsgs: maxMsg, maxBytes: firstRecvMsg.MaxOutstandingBytes, deadline: time.Second * time.Duration(firstRecvMsg.StreamAckDeadlineSeconds), running: true, @@ -209,6 +214,7 @@ func (ms *MemSubscription) runAck(ackUUID uuid.UUID) { } func (ms *MemSubscription) PublishMessage(msg *pubsub.PubsubMessage) { + logger.Debug("Project:{}:Topic:{}:Sub:{}, Got Message:{}", ms.topic.project.name, ms.topic.name, ms.name, msg.MessageId) ms.msgChannel.Add() <- &pubsub.ReceivedMessage{ Message: msg, AckId: msg.MessageId,