Skip to content

Commit

Permalink
set publish time, if max messages not set, set it
Browse files Browse the repository at this point in the history
  • Loading branch information
lwahlmeier committed Oct 10, 2022
1 parent a38dbd6 commit 2fd8469
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 5 deletions.
5 changes: 4 additions & 1 deletion PubSubEmulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
)

var logger = lcwlog.GetLoggerWithPrefix("PubSubEmulator")
Expand Down Expand Up @@ -57,7 +58,9 @@ func (ps *PubSubEmulator) Publish(ctx context.Context, pr *pubsub.PublishRequest
}
mids := make([]string, 0)
for _, msg := range pr.Messages {
msg.MessageId = uuid.New().String()
msg.PublishTime = timestamppb.Now()
msg.MessageId = uuid.NewString()
logger.Info("{}:{} Published mid:{}", pjName, topicName, msg.MessageId)
err = topic.PublishMessage(msg)
if err != nil {
logger.Warn("Project:{}, Topic:{} error publishing:{}", pjName, topicName, err.Error())
Expand Down
15 changes: 12 additions & 3 deletions internal/fspubsub/FSSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (fss *FSSubscription) Publish(msg *pubsub.PubsubMessage) {
ioutil.WriteFile(tmpPath, data, os.ModePerm)
// We Rename to keep from sending notify too soon before we write data
os.Rename(tmpPath, msgPath)
logger.Debug("Sub:{} New Message mid:{}", fss.name, msg.MessageId)
fss.msgsChannel.Add() <- uuid.MustParse(msg.MessageId)
}

Expand All @@ -167,6 +168,7 @@ func (fss *FSSubscription) UpdateAcks(ackIds []string, deadline int32) {
defer fss.ackLock.Unlock()
// We do this here to save multipule locks if we do UpdateAck
for _, mid := range ackIds {
logger.Debug("Sub:{} Updating add Ack:{} by:{}", fss.name, mid, doTime)
msgPath := path.Join(fss.subPath, fmt.Sprintf("%s.ack.proto", mid))
_, err := os.Stat(msgPath)
if err != nil {
Expand Down Expand Up @@ -233,6 +235,7 @@ func (fss *FSSubscription) nackMessages() {

func (fss *FSSubscription) AckMessages(ackIds []string) {
for _, ackid := range ackIds {
logger.Debug("Sub:{} Acking:{}", fss.name, ackid)
ackFile := path.Join(fss.subPath, fmt.Sprintf("%s.ack.proto", ackid))
os.RemoveAll(ackFile)
}
Expand Down Expand Up @@ -329,11 +332,15 @@ func (fss *FSSubscription) Delete() {
func (fss *FSSubscription) CreateStreamingSubscription(firstRecvMsg *pubsub.StreamingPullRequest, streamingServer pubsub.Subscriber_StreamingPullServer) base.BaseStreamingSubcription {
cid := uuid.NewString()
logger.Info("Project:{}:Topic:{}:Sub:{}, Creating SubStream:{}", fss.topic.project.name, fss.topic.name, fss.name, cid)
maxMsg := int64(10)
if firstRecvMsg.MaxOutstandingMessages > 0 {
maxMsg = firstRecvMsg.MaxOutstandingMessages
}
ss := &StreamingSubcription{
sub: fss,
streamingServer: streamingServer,
clientId: cid,
maxMsgs: firstRecvMsg.MaxOutstandingMessages,
maxMsgs: maxMsg,
maxBytes: firstRecvMsg.MaxOutstandingBytes,
deadline: time.Second * time.Duration(firstRecvMsg.StreamAckDeadlineSeconds),
running: true,
Expand Down Expand Up @@ -399,7 +406,8 @@ func (ss *StreamingSubcription) msgSelect() []*pubsub.ReceivedMessage {
if msg == nil {
return msgs
}
ss.pendingMsgs[uuid.MustParse(msg.AckId)] = true
logger.Debug("Sub:{} Sending mid:{}", ss.sub.name, msg.AckId)
ss.pendingMsgs[mid] = true
msgs = append(msgs, msg)
maxWait := time.NewTimer(time.Millisecond * 5)
for len(ss.pendingMsgs) < int(ss.maxMsgs) {
Expand All @@ -408,7 +416,8 @@ func (ss *StreamingSubcription) msgSelect() []*pubsub.ReceivedMessage {
case mid := <-ss.sub.msgsChannel.Get():
msg := ss.sub.getLocalMessages(mid)
if msg != nil {
ss.pendingMsgs[uuid.MustParse(msg.AckId)] = true
ss.pendingMsgs[mid] = true
logger.Debug("Sub:{} Sending mid:{}", ss.sub.name, msg.AckId)
msgs = append(msgs, msg)
}
case <-maxWait.C:
Expand Down
8 changes: 7 additions & 1 deletion internal/mempubsub/MemSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (ms *MemSubscription) getDefaultAckDeadline() time.Duration {
}

func (ms *MemSubscription) GetMessages(maxMsgs int32, maxWait time.Duration) []*pubsub.ReceivedMessage {
logger.Info("Poll Messages from:{}:{}:{}", ms.topic.project.name, ms.topic.name, ms.name)
msgs := make([]*pubsub.ReceivedMessage, 0)
timer := time.NewTimer(maxWait)
ackDelay := ms.getDefaultAckDeadline()
Expand Down Expand Up @@ -134,12 +135,16 @@ func (ms *MemSubscription) GetMessages(maxMsgs int32, maxWait time.Duration) []*
func (ms *MemSubscription) CreateStreamingSubscription(firstRecvMsg *pubsub.StreamingPullRequest, streamingServer pubsub.Subscriber_StreamingPullServer) base.BaseStreamingSubcription {
cid := uuid.NewString()
logger.Info("Project:{}:Topic:{}:Sub:{}, Creating SubStream:{}", ms.topic.project.name, ms.topic.name, ms.name, cid)
maxMsg := int64(10)
if firstRecvMsg.MaxOutstandingMessages > 0 {
maxMsg = firstRecvMsg.MaxOutstandingMessages
}
ss := &StreamingSubcription{
sub: ms,
streamingServer: streamingServer,
timer: time.NewTimer(ack_check_time),
clientId: cid,
maxMsgs: firstRecvMsg.MaxOutstandingMessages,
maxMsgs: maxMsg,
maxBytes: firstRecvMsg.MaxOutstandingBytes,
deadline: time.Second * time.Duration(firstRecvMsg.StreamAckDeadlineSeconds),
running: true,
Expand Down Expand Up @@ -209,6 +214,7 @@ func (ms *MemSubscription) runAck(ackUUID uuid.UUID) {
}

func (ms *MemSubscription) PublishMessage(msg *pubsub.PubsubMessage) {
logger.Debug("Project:{}:Topic:{}:Sub:{}, Got Message:{}", ms.topic.project.name, ms.topic.name, ms.name, msg.MessageId)
ms.msgChannel.Add() <- &pubsub.ReceivedMessage{
Message: msg,
AckId: msg.MessageId,
Expand Down

0 comments on commit 2fd8469

Please sign in to comment.