Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peer sync report async #225

Closed
wants to merge 13 commits into from
25 changes: 25 additions & 0 deletions packages/automerge-repo/src/DocHandle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export class DocHandle<T> //

#machine: DocHandleXstateMachine<T>
#timeoutDelay: number
#remoteHeads: Record<PeerId, A.Heads> = {}

/** The URL of this document
*
Expand Down Expand Up @@ -327,6 +328,18 @@ export class DocHandle<T> //
})
}

/** `setRemoteHeads` is called by the doc synchronizer
* @hidden
*/
setRemoteHeads(peerId: PeerId, heads: A.Heads) {
this.#remoteHeads[peerId] = heads
this.emit("remote-heads", { peerId, heads })
}

getRemoteHeads(peerId: PeerId): A.Heads | undefined {
return this.#remoteHeads[peerId]
}

/** `change` is called by the repo when the document is changed locally */
change(callback: A.ChangeFn<T>, options: A.ChangeOptions<T> = {}) {
if (!this.isReady()) {
Expand Down Expand Up @@ -482,6 +495,16 @@ export interface DocHandleOutboundEphemeralMessagePayload<T> {
data: Uint8Array
}

export interface DocHandleRemoteHeadsPayload {
peerId: PeerId
heads: A.Heads
}

export interface DocHandleSyncStatePayload {
peerId: PeerId
syncState: A.SyncState
}

export interface DocHandleEvents<T> {
"heads-changed": (payload: DocHandleEncodedChangePayload<T>) => void
change: (payload: DocHandleChangePayload<T>) => void
Expand All @@ -491,6 +514,8 @@ export interface DocHandleEvents<T> {
"ephemeral-message-outbound": (
payload: DocHandleOutboundEphemeralMessagePayload<T>
) => void
"remote-heads": (payload: DocHandleRemoteHeadsPayload) => void
"sync-state": (payload: DocHandleSyncStatePayload) => void
}

// STATE MACHINE TYPES
Expand Down
7 changes: 7 additions & 0 deletions packages/automerge-repo/src/Repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ export class Repo extends EventEmitter<RepoEvents> {
networkSubsystem.on("message", async msg => {
await synchronizer.receiveMessage(msg)
})

if (storageSubsystem) {
// todo: debounce
synchronizer.on("sync-state", ({ documentId, peerId, syncState }) => {
storageSubsystem.saveSyncState(documentId, peerId, syncState)
})
}
}

/** Returns an existing handle if we have it; creates one otherwise. */
Expand Down
1 change: 1 addition & 0 deletions packages/automerge-repo/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export type {
DocHandleDeletePayload,
DocHandleEncodedChangePayload,
DocHandleEphemeralMessagePayload,
DocHandleRemoteHeadsPayload,
DocHandleEvents,
DocHandleOptions,
DocHandleOutboundEphemeralMessagePayload,
Expand Down
8 changes: 8 additions & 0 deletions packages/automerge-repo/src/network/messages.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { SyncState } from "@automerge/automerge"
import { DocumentId, PeerId, SessionId } from "../types.js"

/**
Expand Down Expand Up @@ -119,6 +120,13 @@ export type MessageContents<T extends Message = Message> =
? Omit<T, "senderId" | "count" | "sessionId">
: Omit<T, "senderId">

/** Notify the repo that the sync state has changed */
export interface SyncStateMessage {
peerId: PeerId
documentId: DocumentId
syncState: SyncState
}

// TYPE GUARDS

export const isValidRepoMessage = (message: Message): message is RepoMessage =>
Expand Down
21 changes: 19 additions & 2 deletions packages/automerge-repo/src/storage/StorageSubsystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as A from "@automerge/automerge/next"
import debug from "debug"
import { headsAreSame } from "../helpers/headsAreSame.js"
import { mergeArrays } from "../helpers/mergeArrays.js"
import { type DocumentId } from "../types.js"
import { PeerId, type DocumentId } from "../types.js"
import { StorageAdapter } from "./StorageAdapter.js"
import { ChunkInfo, StorageKey } from "./types.js"
import { keyHash, headsHash } from "./keyHash.js"
Expand Down Expand Up @@ -205,10 +205,27 @@ export class StorageSubsystem {
newChunkInfos.push({ key, type: "snapshot", size: binary.length })

this.#chunkInfos.set(documentId, newChunkInfos)

this.#compacting = false
}

async loadSyncState(
documentId: DocumentId,
peerId: PeerId
): Promise<A.SyncState | undefined> {
const key = [documentId, "sync-state", peerId]
const loaded = await this.#storageAdapter.load(key)
return loaded ? A.decodeSyncState(loaded) : undefined
}

async saveSyncState(
documentId: DocumentId,
peerId: PeerId,
syncState: A.SyncState
): Promise<void> {
const key = [documentId, "sync-state", peerId]
await this.#storageAdapter.save(key, A.encodeSyncState(syncState))
}

/**
* Returns true if the document has changed since the last time it was saved.
*/
Expand Down
3 changes: 1 addition & 2 deletions packages/automerge-repo/src/storage/chunkTypeFromKey.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { StorageKey } from "./types.js"
import { ChunkType } from "./types.js"
import type { StorageKey, ChunkType } from "./types.js"

/**
* Keys for storing Automerge documents are of the form:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as A from "@automerge/automerge"
import debug from "debug"
import { DocHandle } from "../DocHandle.js"
import { stringifyAutomergeUrl } from "../AutomergeUrl.js"
Expand Down Expand Up @@ -35,8 +36,21 @@ export class CollectionSynchronizer extends Synchronizer {

/** Creates a new docSynchronizer and sets it up to propagate messages */
#initDocSynchronizer(handle: DocHandle<unknown>): DocSynchronizer {
const docSynchronizer = new DocSynchronizer(handle)
const docSynchronizer = new DocSynchronizer({
handle,
onLoadSyncState: peerId => {
if (!this.repo.storageSubsystem) {
return undefined
}

return this.repo.storageSubsystem.loadSyncState(
handle.documentId,
peerId
)
},
})
docSynchronizer.on("message", event => this.emit("message", event))
docSynchronizer.on("sync-state", event => this.emit("sync-state", event))
return docSynchronizer
}

Expand All @@ -51,6 +65,28 @@ export class CollectionSynchronizer extends Synchronizer {
return generousPeers
}

/** load stored sync state for generous peers and begin sync on doc synchronizer */
async #beginSync(docSynchronizer: DocSynchronizer, peers: PeerId[]) {
const documentId = docSynchronizer.documentId
const storedSyncStates: Record<PeerId, A.SyncState> = {}
const newPeers: PeerId[] = []

for (const peerId of peers) {
if (!docSynchronizer.hasPeer(peerId)) {
const syncState = await this.repo.storageSubsystem?.loadSyncState(
documentId,
peerId
)

if (syncState) {
storedSyncStates[peerId] = syncState
}
newPeers.push(peerId)
}
}
docSynchronizer.beginSync(newPeers)
}

// PUBLIC

/**
Expand All @@ -77,6 +113,9 @@ export class CollectionSynchronizer extends Synchronizer {

// Initiate sync with any new peers
const peers = await this.#documentGenerousPeers(documentId)

// todo: call load sync state on doc synchronizer

docSynchronizer.beginSync(
peers.filter(peerId => !docSynchronizer.hasPeer(peerId))
)
Expand Down
Loading