Skip to content

Commit

Permalink
CBG-1196 uptake go-blip to make messages.flags atomic (#7396)
Browse files Browse the repository at this point in the history
  • Loading branch information
torcolvin authored Feb 26, 2025
1 parent cfcc0fe commit 4962557
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 60 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/coreos/go-oidc/v3 v3.12.0
github.com/couchbase/cbgt v1.3.10-0.20250128173458-04138cb9d33d
github.com/couchbase/clog v0.1.0
github.com/couchbase/go-blip v0.0.0-20250130142438-e3a29100f703
github.com/couchbase/go-blip v0.0.0-20250222004812-31da589e100a
github.com/couchbase/gocb/v2 v2.9.4
github.com/couchbase/gocbcore/v10 v10.5.4
github.com/couchbase/gomemcached v0.2.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ github.com/couchbase/cbgt v1.3.10-0.20250128173458-04138cb9d33d h1:X80jy41uF1ivq
github.com/couchbase/cbgt v1.3.10-0.20250128173458-04138cb9d33d/go.mod h1:MImhtmvk0qjJit5HbmA34tnYThZoNtvgjL7jJH/kCAE=
github.com/couchbase/clog v0.1.0 h1:4Kh/YHkhRjMCbdQuvRVsm39XZh4FtL1d8fAwJsHrEPY=
github.com/couchbase/clog v0.1.0/go.mod h1:7tzUpEOsE+fgU81yfcjy5N1H6XtbVC8SgOz/3mCjmd4=
github.com/couchbase/go-blip v0.0.0-20250130142438-e3a29100f703 h1:PSEnn/xmEDjZ8uXjuZIXGYlG9QfLOioqKz5MLUpMvkk=
github.com/couchbase/go-blip v0.0.0-20250130142438-e3a29100f703/go.mod h1:J2dZK3JAfWPKZfRWERC7xVlRPQvMVMde/I1Ed4cQIrM=
github.com/couchbase/go-blip v0.0.0-20250222004812-31da589e100a h1:tCXbdnE+cHXHEgoyONn5yD2c73CFeVYuxCu6Md4v5w4=
github.com/couchbase/go-blip v0.0.0-20250222004812-31da589e100a/go.mod h1:+K082iN0fPzrWgNU/58/sMpydLVTTSdnuJ1srwlWTuk=
github.com/couchbase/go-couchbase v0.1.1 h1:ClFXELcKj/ojyoTYbsY34QUrrYCBi/1G749sXSCkdhk=
github.com/couchbase/go-couchbase v0.1.1/go.mod h1:+/bddYDxXsf9qt0xpDUtRR47A2GjaXmGGAqQ/k3GJ8A=
github.com/couchbase/gocb/v2 v2.9.4 h1:PNYu6dqLFwIdHlEfZBzYE9Nh9NDtPu1/KLRF76bupdU=
Expand Down
39 changes: 4 additions & 35 deletions rest/blip_api_crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2185,25 +2185,10 @@ func TestRemovedMessageWithAlternateAccess(t *testing.T) {
btcRunner.StartOneshotPull(btc.id)
_ = btcRunner.WaitForVersion(btc.id, docMarker, docMarkerVersion)

messages := btc.pullReplication.GetMessages()

var highestMsgSeq uint32
var highestSeqMsg blip.Message
// Grab most recent changes message
for _, message := range messages {
messageBody, err := message.Body()
require.NoError(t, err)
if message.Properties["Profile"] == db.MessageChanges && string(messageBody) != "null" {
if highestMsgSeq < uint32(message.SerialNumber()) {
highestMsgSeq = uint32(message.SerialNumber())
highestSeqMsg = message
}
}
}
changesMsg := btc.getMostRecentChangesMessage()

var messageBody []interface{}
err = highestSeqMsg.ReadJSONBody(&messageBody)
assert.NoError(t, err)
require.NoError(t, changesMsg.ReadJSONBody(&messageBody))
require.Len(t, messageBody, 3)
require.Len(t, messageBody[0], 4) // Rev 2 of doc, being sent as removal from channel A
require.Len(t, messageBody[1], 4) // Rev 3 of doc, being sent as removal from channel B
Expand Down Expand Up @@ -2288,25 +2273,9 @@ func TestRemovedMessageWithAlternateAccessAndChannelFilteredReplication(t *testi
btcRunner.StartPullSince(btc.id, BlipTesterPullOptions{Channels: "A", Continuous: false})
_ = btcRunner.WaitForVersion(btc.id, markerID, markerVersion)

messages := btc.pullReplication.GetMessages()

var highestMsgSeq uint32
var highestSeqMsg blip.Message
// Grab most recent changes message
for _, message := range messages {
messageBody, err := message.Body()
require.NoError(t, err)
if message.Properties["Profile"] == db.MessageChanges && string(messageBody) != "null" {
if highestMsgSeq < uint32(message.SerialNumber()) {
highestMsgSeq = uint32(message.SerialNumber())
highestSeqMsg = message
}
}
}

changesMsg := btc.getMostRecentChangesMessage()
var messageBody []interface{}
err = highestSeqMsg.ReadJSONBody(&messageBody)
assert.NoError(t, err)
require.NoError(t, changesMsg.ReadJSONBody(&messageBody))
require.Len(t, messageBody, 1)
require.Len(t, messageBody[0], 3) // marker doc
require.Equal(t, "docmarker", messageBody[0].([]interface{})[1])
Expand Down
29 changes: 26 additions & 3 deletions rest/blip_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,29 @@ func (btc *BlipTesterClient) waitForReplicationMessage(collection *db.DatabaseCo
return btc.pushReplication.WaitForMessage(serialNumber + 1)
}

// getMostRecentChangesMessage returns the most recent non nil changes message received from the pull replication. This represents the latest set of changes.
func (btc *BlipTesterClient) getMostRecentChangesMessage() *blip.Message {
var highestMsgSeq uint32
var highestSeqMsg *blip.Message
// Grab most recent changes message
for _, message := range btc.pullReplication.GetMessages() {
if message.Properties["Profile"] != db.MessageChanges {
continue
}
messageBody, err := message.Body()
require.NoError(btc.TB(), err)
if string(messageBody) == "null" {
continue
}
if highestMsgSeq >= uint32(message.SerialNumber()) {
continue
}
highestMsgSeq = uint32(message.SerialNumber())
highestSeqMsg = message
}
return highestSeqMsg
}

// SingleCollection returns a single collection blip tester if the RestTester database is configured with only one collection. Otherwise, throw a fatal test error.
func (btcRunner *BlipTestClientRunner) SingleCollection(clientID uint32) *BlipTesterCollectionClient {
if btcRunner.clients[clientID].nonCollectionAwareClient != nil {
Expand Down Expand Up @@ -1476,15 +1499,15 @@ func (btr *BlipTesterReplicator) GetMessage(serialNumber blip.MessageNumber) (ms
}

// GetMessages returns a copy of all messages stored in the Client keyed by serial number
func (btr *BlipTesterReplicator) GetMessages() map[blip.MessageNumber]blip.Message {
func (btr *BlipTesterReplicator) GetMessages() map[blip.MessageNumber]*blip.Message {
btr.messagesLock.RLock()
defer btr.messagesLock.RUnlock()

messages := make(map[blip.MessageNumber]blip.Message, len(btr.messages))
messages := make(map[blip.MessageNumber]*blip.Message, len(btr.messages))
for k, v := range btr.messages {
// Read the body before copying, since it might be read asynchronously
_, _ = v.Body()
messages[k] = *v
messages[k] = v
}

return messages
Expand Down
21 changes: 2 additions & 19 deletions rest/revocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"testing"
"time"

"github.com/couchbase/go-blip"
"github.com/couchbase/sync_gateway/auth"
"github.com/couchbase/sync_gateway/base"
"github.com/couchbase/sync_gateway/channels"
Expand Down Expand Up @@ -2372,25 +2371,9 @@ func TestRevocationNoRev(t *testing.T) {

_ = btcRunner.WaitForVersion(btc.id, waitMarkerID, waitMarkerVersion)

messages := btc.pullReplication.GetMessages()

var highestMsgSeq uint32
var highestSeqMsg blip.Message
// Grab most recent changes message
for _, message := range messages {
messageBody, err := message.Body()
require.NoError(t, err)
if message.Properties["Profile"] == db.MessageChanges && string(messageBody) != "null" {
if highestMsgSeq < uint32(message.SerialNumber()) {
highestMsgSeq = uint32(message.SerialNumber())
highestSeqMsg = message
}
}
}

changesMsg := btc.getMostRecentChangesMessage()
var messageBody []interface{}
err := highestSeqMsg.ReadJSONBody(&messageBody)
require.NoError(t, err)
require.NoError(t, changesMsg.ReadJSONBody(&messageBody))
require.Len(t, messageBody, 2)
require.Len(t, messageBody[0], 4)

Expand Down

0 comments on commit 4962557

Please sign in to comment.