Skip to content

Commit

Permalink
WIP: delay processing messages until sync state is initialized
Browse files Browse the repository at this point in the history
  • Loading branch information
paulsonnentag committed Nov 7, 2023
1 parent db1f91e commit 7dcaadf
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 31 deletions.
13 changes: 13 additions & 0 deletions packages/automerge-repo/src/Repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,19 @@ export class Repo extends EventEmitter<RepoEvents> {
synchronizer.on("sync-state", ({ documentId, peerId, syncState }) => {
storageSubsystem.saveSyncState(documentId, peerId, syncState)
})

// todo: batch
synchronizer.on("missing-sync-state", async ({ documentId, peerId }) => {
console.log("missing-sync-state", documentId, peerId)

const syncState =
(await storageSubsystem.loadSyncState(documentId, peerId)) ??
Automerge.initSyncState()

synchronizer.intializeSyncStates([{ documentId, peerId, syncState }])
})

console.log("new")
}
}

Expand Down
6 changes: 6 additions & 0 deletions packages/automerge-repo/src/network/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ export interface SyncStateMessage {
syncState: SyncState
}

/** Notify the repo that the sync state needs to be initialized */
export interface MissingSyncStateMessage {
peerId: PeerId
documentId: DocumentId
}

// TYPE GUARDS

export const isValidRepoMessage = (message: Message): message is RepoMessage =>
Expand Down
48 changes: 39 additions & 9 deletions packages/automerge-repo/src/synchronizer/CollectionSynchronizer.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import debug from "debug"
import * as A from "@automerge/automerge"
import { DocHandle } from "../DocHandle.js"
import { stringifyAutomergeUrl } from "../AutomergeUrl.js"
import { Repo } from "../Repo.js"
import { RepoMessage } from "../network/messages.js"
import { RepoMessage, SyncStateMessage } from "../network/messages.js"
import { DocumentId, PeerId } from "../types.js"
import { DocSynchronizer } from "./DocSynchronizer.js"
import { Synchronizer } from "./Synchronizer.js"
Expand Down Expand Up @@ -50,6 +51,10 @@ export class CollectionSynchronizer extends Synchronizer {
})
docSynchronizer.on("message", event => this.emit("message", event))
docSynchronizer.on("sync-state", event => this.emit("sync-state", event))
docSynchronizer.on("missing-sync-state", event => {
console.log("Missing sync state", event)
this.emit("missing-sync-state", event)
})
return docSynchronizer
}

Expand Down Expand Up @@ -98,15 +103,25 @@ export class CollectionSynchronizer extends Synchronizer {
/**
* Starts synchronizing the given document with all peers that we share it generously with.
*/
addDocument(documentId: DocumentId) {
async addDocument(documentId: DocumentId) {
// HACK: this is a hack to prevent us from adding the same document twice
if (this.#docSetUp[documentId]) {
return
}

// todo: initialize sync state
const docSynchronizer = this.#fetchDocSynchronizer(documentId)
void this.#documentGenerousPeers(documentId).then(peers => {
docSynchronizer.beginSync(peers)
})
const peers = await this.#documentGenerousPeers(documentId)

// todo: the collection synchronizer shouldn't load sync state directly
for (const peerId of peers) {
const syncState =
(await this.repo.storageSubsystem?.loadSyncState(documentId, peerId)) ??
A.initSyncState()
docSynchronizer.initializeSyncState(peerId, syncState)
}

docSynchronizer.beginSync(peers)
}

// TODO: implement this
Expand All @@ -115,7 +130,7 @@ export class CollectionSynchronizer extends Synchronizer {
}

/** Adds a peer and maybe starts synchronizing with them */
addPeer(peerId: PeerId) {
async addPeer(peerId: PeerId) {
log(`adding ${peerId} & synchronizing with them`)

if (this.#peers.has(peerId)) {
Expand All @@ -125,9 +140,18 @@ export class CollectionSynchronizer extends Synchronizer {
this.#peers.add(peerId)
for (const docSynchronizer of Object.values(this.#docSynchronizers)) {
const { documentId } = docSynchronizer
void this.repo.sharePolicy(peerId, documentId).then(okToShare => {
if (okToShare) docSynchronizer.beginSync([peerId])
})
const okToShare = await this.repo.sharePolicy(peerId, documentId)
if (okToShare) {
// todo: the collection synchronizer shouldn't load sync state directly
const syncState =
(await this.repo.storageSubsystem?.loadSyncState(
documentId,
peerId
)) ?? A.initSyncState()

docSynchronizer.initializeSyncState(peerId, syncState)
docSynchronizer.beginSync([peerId])
}
}
}

Expand All @@ -140,4 +164,10 @@ export class CollectionSynchronizer extends Synchronizer {
docSynchronizer.endSync(peerId)
}
}

intializeSyncStates(syncStateMessages: SyncStateMessage[]) {
for (const { documentId, peerId, syncState } of syncStateMessages) {
this.#docSynchronizers[documentId].initializeSyncState(peerId, syncState)
}
}
}
59 changes: 37 additions & 22 deletions packages/automerge-repo/src/synchronizer/DocSynchronizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,27 +126,15 @@ export class DocSynchronizer extends Synchronizer {
}

#getSyncState(peerId: PeerId) {
if (!this.#peers.includes(peerId)) {
this.#log("adding a new peer", peerId)
this.#peers.push(peerId)
const syncState = this.#syncStates[peerId]
if (!syncState) {
throw new Error(`sync state for peer ${peerId} hasn't been initalized`)
}

// when a peer is added, we don't know if it has the document or not
if (!(peerId in this.#peerDocumentStatuses)) {
this.#peerDocumentStatuses[peerId] = "unknown"
}

const prevSyncState = this.#syncStates[peerId]
if (prevSyncState) {
return prevSyncState
}

return A.initSyncState()
return syncState
}

#setSyncState(peerId: PeerId, syncState: A.SyncState) {
// TODO: we only need to do this on reconnect

const previousSyncState = this.#syncStates[peerId]

this.#syncStates[peerId] = syncState
Expand Down Expand Up @@ -300,8 +288,17 @@ export class DocSynchronizer extends Synchronizer {
if (message.documentId !== this.#handle.documentId)
throw new Error(`channelId doesn't match documentId`)

// We need to block receiving the syncMessages until we've checked local storage
if (!this.#handle.inState([READY, REQUESTING, UNAVAILABLE])) {
const syncState = this.#syncStates[message.senderId]

if (!syncState) {
this.emit("missing-sync-state", {
peerId: message.senderId,
documentId: this.documentId,
})
}

// We need to block receiving the syncMessages until we've loeaded the document and the sync state for the peer
if (!this.#isDocumentLoaded() || !syncState) {
this.#pendingSyncMessages.push({ message, received: new Date() })
return
}
Expand All @@ -310,6 +307,21 @@ export class DocSynchronizer extends Synchronizer {
this.#processSyncMessage(message, new Date())
}

/** called by collection synchronizer to load sync state initially */
initializeSyncState(peerId: PeerId, syncState: A.SyncState) {
this.#syncStates[peerId] = syncState

console.log("initialized sycn state", this.documentId, peerId, syncState)

if (this.#isDocumentLoaded()) {
this.#processAllPendingSyncMessages()
}
}

#isDocumentLoaded() {
return !this.#handle.inState([READY, REQUESTING, UNAVAILABLE])
}

#processSyncMessage(message: SyncMessage | RequestMessage, received: Date) {
if (isRequestMessage(message)) {
this.#peerDocumentStatuses[message.senderId] = "wants"
Expand Down Expand Up @@ -373,10 +385,13 @@ export class DocSynchronizer extends Synchronizer {
}

#processAllPendingSyncMessages() {
for (const message of this.#pendingSyncMessages) {
this.#processSyncMessage(message.message, message.received)
}
this.#pendingSyncMessages = this.#pendingSyncMessages.filter(pending => {
if (!this.#syncStates[pending.message.senderId]) {
return true
}

this.#pendingSyncMessages = []
this.#processSyncMessage(pending.message, pending.received)
return false
})
}
}
2 changes: 2 additions & 0 deletions packages/automerge-repo/src/synchronizer/Synchronizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
MessageContents,
RepoMessage,
SyncStateMessage,
MissingSyncStateMessage,
} from "../network/messages.js"

export abstract class Synchronizer extends EventEmitter<SynchronizerEvents> {
Expand All @@ -12,4 +13,5 @@ export abstract class Synchronizer extends EventEmitter<SynchronizerEvents> {
export interface SynchronizerEvents {
message: (arg: MessageContents) => void
"sync-state": (arg: SyncStateMessage) => void
"missing-sync-state": (arg: MissingSyncStateMessage) => void
}

0 comments on commit 7dcaadf

Please sign in to comment.