Skip to content

Commit

Permalink
Add record CIDs for creates/updates
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 committed Sep 9, 2024
1 parent 6be24d0 commit dd31d74
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 21 deletions.
17 changes: 9 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,23 @@ $ websocat "ws://localhost:6008/subscribe?wantedCollections=app.bsky.feed.post&w

```json
{
"did": "did:plc:stk5dhwhpcjxtlwnhbknmfgx",
"time_us": 1725516666734038,
"did": "did:plc:eygmaihciaxprqvxpfvl6flk",
"time_us": 1725911162329308,
"type": "com",
"commit": {
"rev": "3l3f6nzhvkl2g",
"rev": "3l3qo2vutsw2b",
"type": "c",
"collection": "app.bsky.feed.like",
"rkey": "3l3f6nzhls32g",
"rkey": "3l3qo2vuowo2b",
"record": {
"$type": "app.bsky.feed.like",
"createdAt": "2024-09-05T06:11:06.451Z",
"createdAt": "2024-09-09T19:46:02.102Z",
"subject": {
"cid": "bafyreihdlrrfxc4l5few7lso3u36zf655omeuso3y3dnersr5mm3r7mbyu",
"uri": "at://did:plc:ou4u4h3khiaym4a43r7uwp4f/app.bsky.feed.post/3l3e3vz53lu2d"
"cid": "bafyreidc6sydkkbchcyg62v77wbhzvb2mvytlmsychqgwf2xojjtirmzj4",
"uri": "at://did:plc:wa7b35aakoll7hugkrjtf3xf/app.bsky.feed.post/3l3pte3p2e325"
}
}
},
"cid": "bafyreidwaivazkwu67xztlmuobx35hs2lnfh3kolmgfmucldvhd3sgzcqi"
}
}
```
Expand Down
8 changes: 6 additions & 2 deletions pkg/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub
break
}

if rcid.String() != op.Cid.String() {
recCid := rcid.String()
if recCid != op.Cid.String() {
log.Error("record cid mismatch", "expected", *op.Cid, "actual", rcid)
break
}
Expand All @@ -239,6 +240,7 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub
Collection: collection,
RKey: rkey,
Record: recJSON,
CID: recCid,
}
case repomgr.EvtKindUpdateRecord:
if op.Cid == nil {
Expand All @@ -252,7 +254,8 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub
break
}

if rcid.String() != op.Cid.String() {
recCid := rcid.String()
if recCid != op.Cid.String() {
log.Error("record cid mismatch", "expected", *op.Cid, "actual", rcid)
break
}
Expand All @@ -274,6 +277,7 @@ func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSub
Collection: collection,
RKey: rkey,
Record: recJSON,
CID: recCid,
}
case repomgr.EvtKindDeleteRecord:
// Emit the delete
Expand Down
23 changes: 12 additions & 11 deletions pkg/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,21 @@ import (
)

type Event struct {
Did string `json:"did" cborgen:"did"`
TimeUS int64 `json:"time_us" cborgen:"time_us"`
EventType string `json:"type" cborgen:"type"`
Commit *Commit `json:"commit,omitempty" cborgen:"commit,omitempty"`
Account *comatproto.SyncSubscribeRepos_Account `json:"account,omitempty" cborgen:"account,omitempty"`
Identity *comatproto.SyncSubscribeRepos_Identity `json:"identity,omitempty" cborgen:"identity,omitempty"`
Did string `json:"did"`
TimeUS int64 `json:"time_us"`
EventType string `json:"type"`
Commit *Commit `json:"commit,omitempty"`
Account *comatproto.SyncSubscribeRepos_Account `json:"account,omitempty"`
Identity *comatproto.SyncSubscribeRepos_Identity `json:"identity,omitempty"`
}

type Commit struct {
Rev string `json:"rev,omitempty" cborgen:"rev"`
OpType string `json:"type" cborgen:"type"`
Collection string `json:"collection,omitempty" cborgen:"collection"`
RKey string `json:"rkey,omitempty" cborgen:"rkey"`
Record json.RawMessage `json:"record,omitempty" cborgen:"record,omitempty"`
Rev string `json:"rev,omitempty"`
OpType string `json:"type"`
Collection string `json:"collection,omitempty"`
RKey string `json:"rkey,omitempty"`
Record json.RawMessage `json:"record,omitempty"`
CID string `json:"cid,omitempty"`
}

var (
Expand Down

0 comments on commit dd31d74

Please sign in to comment.