Skip to content

Commit

Permalink
Already deleted Message returns 404 vs 409 conflict when a 2nd attemp…
Browse files Browse the repository at this point in the history
…t to delete is processed (#739)

* deleted message now returns 404 if a second attempt to delete it is attempted, vs a conflict 409

* fix sync edge case for conflicting deletes
  • Loading branch information
LiranCohen authored Jul 3, 2024
1 parent bc54d0c commit 5b0c61a
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 20 deletions.
8 changes: 8 additions & 0 deletions .changeset/dirty-days-flash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@web5/agent": patch
"@web5/identity-agent": patch
"@web5/proxy-agent": patch
"@web5/user-agent": patch
---

Sync accounts for 404 from a conflicting RecordsDelete message
21 changes: 19 additions & 2 deletions packages/agent/src/sync-engine-level.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@ import type {
MessagesQueryReply,
MessagesReadReply,
PaginationCursor,
UnionMessageReply,
} from '@tbd54566975/dwn-sdk-js';

import ms from 'ms';
import { Level } from 'level';
import { monotonicFactory } from 'ulidx';
import { NodeStream } from '@web5/common';
import {
DwnInterfaceName,
DwnMethodName,
} from '@tbd54566975/dwn-sdk-js';

import type { SyncEngine } from './types/sync.js';
import type { Web5PlatformAgent } from './types/agent.js';
Expand Down Expand Up @@ -143,7 +148,7 @@ export class SyncEngineLevel implements SyncEngine {
: undefined;

const pullReply = await this.agent.dwn.node.processMessage(did, message, { dataStream });
if (pullReply.status.code === 202 || pullReply.status.code === 409) {
if (SyncEngineLevel.syncMessageReplyIsSuccessful(pullReply)) {
await this.addMessage(did, messageCid);
deleteOperations.push({ type: 'del', key: key });
}
Expand Down Expand Up @@ -195,7 +200,8 @@ export class SyncEngineLevel implements SyncEngine {
// Update the watermark and add the messageCid to the Sync Message Store if either:
// - 202: message was successfully written to the remote DWN
// - 409: message was already present on the remote DWN
if (reply.status.code === 202 || reply.status.code === 409) {
// - RecordsDelete and the status code is 404: the initial write message was not found or the message was already deleted
if (SyncEngineLevel.syncMessageReplyIsSuccessful(reply)) {
await this.addMessage(did, messageCid);
deleteOperations.push({ type: 'del', key: key });
}
Expand Down Expand Up @@ -252,6 +258,17 @@ export class SyncEngineLevel implements SyncEngine {
}
}

private static syncMessageReplyIsSuccessful(reply: UnionMessageReply): boolean {
return reply.status.code === 202 ||
reply.status.code === 409 ||
(
// If the message is a RecordsDelete and the status code is 404, the initial write message was not found or the message was already deleted
reply.entry?.message.descriptor.interface === DwnInterfaceName.Records &&
reply.entry?.message.descriptor.method === DwnMethodName.Delete &&
reply.status.code === 404
);
}

private async enqueueOperations({ syncDirection, syncPeerState }: {
syncDirection: SyncDirection,
syncPeerState: SyncState[]
Expand Down
89 changes: 73 additions & 16 deletions packages/agent/tests/sync-engine-level.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,9 @@ describe('SyncEngineLevel', () => {
it('silently ignores a messageCid that already exists on the local DWN', async () => {
// scenario: The messageCids returned from the remote eventLog contains a messageCid that already exists on the local DWN.
// During sync, when processing the messageCid the local DWN will return a conflict response, but the sync should continue
//
// NOTE: When deleting a message, the conflicting Delete will return a 404 instead of a 409,
// the sync should still mark the message as synced and continue

// create a record and store it locally and remotely
const remoteAndLocalRecord = await testHarness.agent.processDwnRequest({
Expand All @@ -456,6 +459,23 @@ describe('SyncEngineLevel', () => {
messageCid : remoteAndLocalRecord.messageCid,
});

// delete the record both locally and remotely
const deleteMessage = await testHarness.agent.processDwnRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.RecordsDelete,
messageParams : {
recordId: remoteAndLocalRecord.message!.recordId
}
});
// send the delete to the remote
await testHarness.agent.sendDwnRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.RecordsDelete,
messageCid : deleteMessage.messageCid,
});

// create 2 records stored only remotely to later sync to the local DWN
const record1 = await testHarness.agent.sendDwnRequest({
author : alice.did.uri,
Expand All @@ -481,7 +501,7 @@ describe('SyncEngineLevel', () => {
});
expect(record2.reply.status.code).to.equal(202);

// confirm that only the single record exists locally
// confirm that only the record and it's delete exists locally
let localQueryResponse = await testHarness.agent.processDwnRequest({
author : alice.did.uri,
target : alice.did.uri,
Expand All @@ -492,12 +512,16 @@ describe('SyncEngineLevel', () => {
});

let localDwnQueryEntries = localQueryResponse.reply.entries!;
expect(localDwnQueryEntries.length).to.equal(1);
expect(localDwnQueryEntries).to.have.members([remoteAndLocalRecord.messageCid]);
expect(localDwnQueryEntries.length).to.equal(2);
expect(localDwnQueryEntries).to.have.members([
remoteAndLocalRecord.messageCid,
deleteMessage.messageCid
]);

// stub getDwnEventLog to return the messageCids of the records we want to sync
sinon.stub(syncEngine as any, 'getDwnEventLog').resolves([
remoteAndLocalRecord.messageCid,
deleteMessage.messageCid,
record1.messageCid,
record2.messageCid
]);
Expand All @@ -514,17 +538,19 @@ describe('SyncEngineLevel', () => {
// Execute Sync to push records to Alice's remote node
await syncEngine.pull();

// Verify sendDwnRequest is called for all 3 records
expect(sendDwnRequestSpy.callCount).to.equal(3, 'sendDwnRequestSpy');
// Verify that processMessage is called for all 3 records
expect(processMessageSpy.callCount).to.equal(3, 'processMessageSpy');
// Verify sendDwnRequest is called for all 4 messages
expect(sendDwnRequestSpy.callCount).to.equal(4, 'sendDwnRequestSpy');
// Verify that processMessage is called for all 4 messages
expect(processMessageSpy.callCount).to.equal(4, 'processMessageSpy');

// Verify that the conflict response is returned for the record that already exists locally
expect((await processMessageSpy.firstCall.returnValue).status.code).to.equal(409);
// Verify that the delete message returned a 404
expect((await processMessageSpy.secondCall.returnValue).status.code).to.equal(404);

// Verify that the other 2 records are successfully processed
expect((await processMessageSpy.secondCall.returnValue).status.code).to.equal(202);
expect((await processMessageSpy.thirdCall.returnValue).status.code).to.equal(202);
expect((await processMessageSpy.returnValues[2]).status.code).to.equal(202);
expect((await processMessageSpy.returnValues[3]).status.code).to.equal(202);

// confirm the new records exist remotely
localQueryResponse = await testHarness.agent.processDwnRequest({
Expand All @@ -536,9 +562,10 @@ describe('SyncEngineLevel', () => {
},
});
localDwnQueryEntries = localQueryResponse.reply.entries!;
expect(localDwnQueryEntries.length).to.equal(3);
expect(localDwnQueryEntries.length).to.equal(4);
expect(localDwnQueryEntries).to.have.members([
remoteAndLocalRecord.messageCid,
deleteMessage.messageCid,
record1.messageCid,
record2.messageCid
]);
Expand Down Expand Up @@ -901,6 +928,8 @@ describe('SyncEngineLevel', () => {

// scenario: The messageCids returned from the local eventLog contains a Cid that already exists in the remote DWN.
// During sync, the remote DWN will return a conflict 409 status code and the sync should continue
// NOTE: if the messageCid is a delete message and it is already deleted,
// the remote DWN will return a 404 status code and the sync should continue

// create a record, store it and send it to the remote Dwn
const remoteAndLocalRecord = await testHarness.agent.processDwnRequest({
Expand All @@ -922,6 +951,23 @@ describe('SyncEngineLevel', () => {
messageCid : remoteAndLocalRecord.messageCid,
});

// delete the record both locally and remotely
const deleteMessage = await testHarness.agent.processDwnRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.RecordsDelete,
messageParams : {
recordId: remoteAndLocalRecord.message!.recordId
}
});
// send the delete to the remote
await testHarness.agent.sendDwnRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.RecordsDelete,
messageCid : deleteMessage.messageCid,
});

// create 2 records stored only locally to sync to the remote DWN
const record1 = await testHarness.agent.processDwnRequest({
author : alice.did.uri,
Expand All @@ -947,7 +993,7 @@ describe('SyncEngineLevel', () => {
});
expect(record2.reply.status.code).to.equal(202);

// confirm that only the single record exists remotely
// confirm that only record and it's delete exist remotely
let remoteQueryResponse = await testHarness.agent.sendDwnRequest({
author : alice.did.uri,
target : alice.did.uri,
Expand All @@ -958,13 +1004,14 @@ describe('SyncEngineLevel', () => {
});

let remoteDwnQueryEntries = remoteQueryResponse.reply.entries!;
expect(remoteDwnQueryEntries.length).to.equal(1);
expect(remoteDwnQueryEntries).to.have.members([remoteAndLocalRecord.messageCid]);
expect(remoteDwnQueryEntries.length).to.equal(2);
expect(remoteDwnQueryEntries).to.have.members([ remoteAndLocalRecord.messageCid, deleteMessage.messageCid ]);

// stub getDwnEventLog to return the messageCids of the records we want to sync
// we stub this to avoid syncing the registered identity related messages
sinon.stub(syncEngine as any, 'getDwnEventLog').resolves([
remoteAndLocalRecord.messageCid,
deleteMessage.messageCid,
record1.messageCid,
record2.messageCid
]);
Expand All @@ -975,8 +1022,17 @@ describe('SyncEngineLevel', () => {
// Execute Sync to push records to Alice's remote node
await syncEngine.push();

// Verify sendDwnRequest was called once for each record including the one that already exists remotely
expect(sendDwnRequestSpy.callCount).to.equal(3);
// Verify sendDwnRequest was called once for each record including the ones that already exist remotely
expect(sendDwnRequestSpy.callCount).to.equal(4);

// Verify that the conflict response is returned for the record that already exists remotely
expect((await sendDwnRequestSpy.firstCall.returnValue).status.code).to.equal(409);
// Verify that the delete message returned a 404
expect((await sendDwnRequestSpy.secondCall.returnValue).status.code).to.equal(404);

// Verify that the other 2 records are successfully processed
expect((await sendDwnRequestSpy.returnValues[2]).status.code).to.equal(202);
expect((await sendDwnRequestSpy.returnValues[3]).status.code).to.equal(202);

// confirm the new records exist remotely
remoteQueryResponse = await testHarness.agent.sendDwnRequest({
Expand All @@ -988,9 +1044,10 @@ describe('SyncEngineLevel', () => {
},
});
remoteDwnQueryEntries = remoteQueryResponse.reply.entries!;
expect(remoteDwnQueryEntries.length).to.equal(3);
expect(remoteDwnQueryEntries.length).to.equal(4);
expect(remoteDwnQueryEntries).to.have.members([
remoteAndLocalRecord.messageCid,
deleteMessage.messageCid,
record1.messageCid,
record2.messageCid
]);
Expand Down
4 changes: 2 additions & 2 deletions packages/api/tests/record.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2983,7 +2983,7 @@ describe('Record', () => {
}
});

it('duplicate delete with store should return conflict', async () => {
it('duplicate delete with store should return not found', async () => {
// create a record
const { status: writeStatus, record } = await dwnAlice.records.write({
data : 'Hello, world!',
Expand Down Expand Up @@ -3013,7 +3013,7 @@ describe('Record', () => {

// attempt to delete the record again
const { status: deleteStatus2 } = await record.delete();
expect(deleteStatus2.code).to.equal(409);
expect(deleteStatus2.code).to.equal(404);
});

it('a record in a deleted state returns undefined for data related fields', async () => {
Expand Down

0 comments on commit 5b0c61a

Please sign in to comment.