Skip to content

Commit

Permalink
refactor to make a PeerMetadata type
Browse files Browse the repository at this point in the history
  • Loading branch information
pvh committed Nov 30, 2023
1 parent 006fd58 commit 95bdbf9
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 170 deletions.
38 changes: 11 additions & 27 deletions packages/automerge-repo-network-broadcastchannel/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
NetworkAdapter,
type Message,
type PeerId,
type StorageId,
type PeerMetadata,
} from "@automerge/automerge-repo"

export type BroadcastChannelNetworkAdapterOptions = {
Expand All @@ -35,13 +35,9 @@ export class BroadcastChannelNetworkAdapter extends NetworkAdapter {
this.#options = { channelName: "broadcast", ...(options ?? {}) }
}

connect(
peerId: PeerId,
storageId: StorageId | undefined,
isEphemeral: boolean
) {
connect(peerId: PeerId, peerMetadata: PeerMetadata) {
this.peerId = peerId
this.storageId = storageId
this.peerMetadata = peerMetadata
this.#broadcastChannel = new BroadcastChannel(this.#options.channelName)

this.#broadcastChannel.addEventListener(
Expand All @@ -61,10 +57,10 @@ export class BroadcastChannelNetworkAdapter extends NetworkAdapter {
targetId: senderId,
type: "welcome",
})
this.#announceConnection(senderId, storageId, isEphemeral)
this.#announceConnection(senderId, peerMetadata)
break
case "welcome":
this.#announceConnection(senderId, storageId, isEphemeral)
this.#announceConnection(senderId, peerMetadata)
break
default:
if (!("data" in message)) {
Expand All @@ -88,12 +84,8 @@ export class BroadcastChannelNetworkAdapter extends NetworkAdapter {
this.emit("ready", { network: this })
}

#announceConnection(
peerId: PeerId,
storageId: StorageId | undefined,
isEphemeral: boolean
) {
this.emit("peer-candidate", { peerId, storageId, isEphemeral })
#announceConnection(peerId: PeerId, peerMetadata: PeerMetadata) {
this.emit("peer-candidate", { peerId, peerMetadata })
}

send(message: Message) {
Expand Down Expand Up @@ -123,12 +115,8 @@ type ArriveMessage = {
/** The peer ID of the sender of this message */
senderId: PeerId

/** Unique ID of the storage that the sender peer is using, is persistent across sessions */
storageId?: StorageId

/** Indicates whether other peers should persist the sync state of the sender peer.
* Sync state is only persisted for non-ephemeral peers */
isEphemeral: boolean
/** The peer metadata of the sender of this message */
peerMetadata: PeerMetadata

/** Arrive messages don't have a targetId */
targetId: never
Expand All @@ -141,12 +129,8 @@ type WelcomeMessage = {
/** The peer ID of the recipient sender this message */
senderId: PeerId

/** Unique ID of the storage that the sender peer is using, is persistent across sessions */
storageId?: StorageId

/** Indicates whether other peers should persist the sync state of the sender peer.
* Sync state is only persisted for non-ephemeral peers */
isEphemeral: boolean
/** The peer metadata of the sender of this message */
peerMetadata: PeerMetadata

/** The peer ID of the recipient of this message */
targetId: PeerId
Expand Down
45 changes: 14 additions & 31 deletions packages/automerge-repo-network-messagechannel/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
type PeerId,
type Message,
type StorageId,
PeerMetadata,
} from "@automerge/automerge-repo"
import { MessagePortRef } from "./MessagePortRef.js"
import { StrongMessagePortRef } from "./StrongMessagePortRef.js"
Expand Down Expand Up @@ -38,15 +39,10 @@ export class MessageChannelNetworkAdapter extends NetworkAdapter {
: new StrongMessagePortRef(messagePort)
}

connect(
peerId: PeerId,
storageId: StorageId | undefined,
isEphemeral: boolean
) {
connect(peerId: PeerId, peerMetadata: PeerMetadata) {
log("messageport connecting")
this.peerId = peerId
this.storageId = storageId
this.isEphemeral = isEphemeral
this.peerMetadata = peerMetadata
this.messagePortRef.start()
this.messagePortRef.addListener(
"message",
Expand All @@ -65,21 +61,20 @@ export class MessageChannelNetworkAdapter extends NetworkAdapter {
switch (type) {
case "arrive":
{
const { storageId, isEphemeral } = message
const { peerMetadata } = message
this.messagePortRef.postMessage({
senderId: this.peerId,
storageId: this.storageId,
isEphemeral: this.isEphemeral,
peerMetadata: this.peerMetadata,
targetId: senderId,
type: "welcome",
})
this.announceConnection(senderId, storageId, isEphemeral)
this.announceConnection(senderId, peerMetadata)
}
break
case "welcome":
{
const { storageId, isEphemeral } = message
this.announceConnection(senderId, storageId, isEphemeral)
const { peerMetadata } = message
this.announceConnection(senderId, peerMetadata)
}
break
default:
Expand Down Expand Up @@ -135,16 +130,12 @@ export class MessageChannelNetworkAdapter extends NetworkAdapter {
}
}

announceConnection(
peerId: PeerId,
storageId: StorageId | undefined,
isEphemeral: boolean
) {
announceConnection(peerId: PeerId, peerMetadata: PeerMetadata) {
if (!this.#startupComplete) {
this.#startupComplete = true
this.emit("ready", { network: this })
}
this.emit("peer-candidate", { peerId, storageId, isEphemeral })
this.emit("peer-candidate", { peerId, peerMetadata })
}

disconnect() {
Expand Down Expand Up @@ -172,12 +163,8 @@ type ArriveMessage = {
/** The peer ID of the sender of this message */
senderId: PeerId

/** Unique ID of the storage that the sender peer is using, is persistent across sessions */
storageId?: StorageId

/** Indicates whether other peers should persist the sync state of the sender peer.
* Sync state is only persisted for non-ephemeral peers */
isEphemeral: boolean
/** The peer metadata of the sender of this message */
peerMetadata: PeerMetadata

/** Arrive messages don't have a targetId */
targetId: never
Expand All @@ -190,12 +177,8 @@ type WelcomeMessage = {
/** The peer ID of the recipient sender this message */
senderId: PeerId

/** Unique ID of the storage that the sender peer is using, is persistent across sessions */
storageId?: StorageId

/** Indicates whether other peers should persist the sync state of the sender peer.
* Sync state is only persisted for non-ephemeral peers */
isEphemeral: boolean
/** The peer metadata of the sender of this message */
peerMetadata: PeerMetadata

/** The peer ID of the recipient of this message */
targetId: PeerId
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import {
NetworkAdapter,
type PeerId,
type PeerMetadata,
cbor,
type StorageId,
} from "@automerge/automerge-repo"
import WebSocket from "isomorphic-ws"

Expand Down Expand Up @@ -36,11 +36,7 @@ export class BrowserWebSocketClientAdapter extends WebSocketNetworkAdapter {
this.url = url
}

connect(
peerId: PeerId,
storageId: StorageId | undefined,
isEphemeral: boolean
) {
connect(peerId: PeerId, peerMetadata: PeerMetadata) {
// If we're reconnecting make sure we remove the old event listeners
// before creating a new connection.
if (this.socket) {
Expand All @@ -50,15 +46,11 @@ export class BrowserWebSocketClientAdapter extends WebSocketNetworkAdapter {
}

if (!this.timerId) {
this.timerId = setInterval(
() => this.connect(peerId, storageId, isEphemeral),
5000
)
this.timerId = setInterval(() => this.connect(peerId, peerMetadata), 5000)
}

this.peerId = peerId
this.storageId = storageId
this.isEphemeral = isEphemeral
this.peerMetadata = peerMetadata
this.socket = new WebSocket(this.url)
this.socket.binaryType = "arraybuffer"

Expand All @@ -83,7 +75,7 @@ export class BrowserWebSocketClientAdapter extends WebSocketNetworkAdapter {
log(`@ ${this.url}: open`)
clearInterval(this.timerId)
this.timerId = undefined
this.send(joinMessage(this.peerId!, this.storageId, this.isEphemeral))
this.send(joinMessage(this.peerId!, this.peerMetadata!))
}

// When a socket closes, or disconnects, remove it from the array.
Expand All @@ -96,7 +88,7 @@ export class BrowserWebSocketClientAdapter extends WebSocketNetworkAdapter {

if (!this.timerId) {
if (this.peerId) {
this.connect(this.peerId, this.storageId, this.isEphemeral)
this.connect(this.peerId, this.peerMetadata!)
}
}
}
Expand All @@ -110,7 +102,7 @@ export class BrowserWebSocketClientAdapter extends WebSocketNetworkAdapter {
throw new Error("WTF, get a socket")
}
if (this.socket.readyState === WebSocket.OPEN) {
this.send(joinMessage(this.peerId!, this.storageId, this.isEphemeral))
this.send(joinMessage(this.peerId!, this.peerMetadata!))
} else {
// The onOpen handler automatically sends a join message
}
Expand Down Expand Up @@ -147,11 +139,7 @@ export class BrowserWebSocketClientAdapter extends WebSocketNetworkAdapter {
this.socket?.send(arrayBuf)
}

announceConnection(
peerId: PeerId,
storageId: StorageId | undefined,
isEphemeral: boolean
) {
announceConnection(peerId: PeerId, peerMetadata: PeerMetadata) {
// return a peer object
const myPeerId = this.peerId
if (!myPeerId) {
Expand All @@ -162,7 +150,7 @@ export class BrowserWebSocketClientAdapter extends WebSocketNetworkAdapter {
this.emit("ready", { network: this })
}
this.remotePeerId = peerId
this.emit("peer-candidate", { peerId, storageId, isEphemeral })
this.emit("peer-candidate", { peerId, peerMetadata })
}

receiveMessage(message: Uint8Array) {
Expand All @@ -181,9 +169,9 @@ export class BrowserWebSocketClientAdapter extends WebSocketNetworkAdapter {

switch (type) {
case "peer": {
const { storageId, isEphemeral } = decoded
const { peerMetadata } = decoded
log(`peer: ${senderId}`)
this.announceConnection(senderId, storageId, isEphemeral)
this.announceConnection(senderId, peerMetadata)
break
}
case "error":
Expand All @@ -197,14 +185,12 @@ export class BrowserWebSocketClientAdapter extends WebSocketNetworkAdapter {

function joinMessage(
senderId: PeerId,
storageId: StorageId | undefined,
isEphemeral: boolean
peerMetadata: PeerMetadata
): JoinMessage {
return {
type: "join",
senderId,
storageId,
isEphemeral,
peerMetadata,
supportedProtocolVersions: [ProtocolV1],
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ const log = debug("WebsocketServer")
import {
cbor as cborHelpers,
NetworkAdapter,
type PeerMetadata,
type PeerId,
type StorageId,
} from "@automerge/automerge-repo"
import { FromClientMessage, FromServerMessage } from "./messages.js"
import { ProtocolV1, ProtocolVersion } from "./protocolVersion.js"
Expand All @@ -28,14 +28,9 @@ export class NodeWSServerAdapter extends NetworkAdapter {
this.server = server
}

connect(
peerId: PeerId,
storageId: StorageId | undefined,
isEphemeral: boolean
) {
connect(peerId: PeerId, peerMetadata: PeerMetadata) {
this.peerId = peerId
this.storageId = storageId
this.isEphemeral = isEphemeral
this.peerMetadata = peerMetadata

this.server.on("close", function close() {
clearInterval(interval)
Expand Down Expand Up @@ -140,12 +135,11 @@ export class NodeWSServerAdapter extends NetworkAdapter {
this.emit("peer-disconnected", { peerId: senderId })
}

const { storageId, isEphemeral } = cbor
const { peerMetadata } = cbor
// Let the rest of the system know that we have a new connection.
this.emit("peer-candidate", {
peerId: senderId,
storageId,
isEphemeral,
peerMetadata,
})
this.sockets[senderId] = socket

Expand All @@ -167,8 +161,7 @@ export class NodeWSServerAdapter extends NetworkAdapter {
this.send({
type: "peer",
senderId: this.peerId!,
storageId: this.storageId,
isEphemeral: this.isEphemeral,
peerMetadata: this.peerMetadata!,
selectedProtocolVersion: ProtocolV1,
targetId: senderId,
})
Expand Down
18 changes: 5 additions & 13 deletions packages/automerge-repo-network-websocket/src/messages.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Message, PeerId, StorageId } from "@automerge/automerge-repo"
import type { Message, PeerId, PeerMetadata } from "@automerge/automerge-repo"
import type { ProtocolVersion } from "./protocolVersion.js"

/** The sender is disconnecting */
Expand All @@ -13,12 +13,8 @@ export type JoinMessage = {
/** The PeerID of the client */
senderId: PeerId

/** Unique ID of the storage that the sender peer is using, is persistent across sessions */
storageId?: StorageId

/** Indicates whether other peers should persist the sync state of the sender peer.
* Sync state is only persisted for non-ephemeral peers */
isEphemeral: boolean
/** Metadata presented by the peer */
peerMetadata: PeerMetadata

/** The protocol version the client supports */
supportedProtocolVersions: ProtocolVersion[]
Expand All @@ -30,12 +26,8 @@ export type PeerMessage = {
/** The PeerID of the server */
senderId: PeerId

/** Unique ID of the storage that the sender peer is using, is persistent across sessions */
storageId?: StorageId

/** Indicates whether other peers should persist the sync state of the sender peer.
* Sync state is only persisted for non-ephemeral peers */
isEphemeral: boolean
/** Metadata presented by the peer */
peerMetadata: PeerMetadata

/** The protocol version the server selected for this connection */
selectedProtocolVersion: ProtocolVersion
Expand Down
Loading

0 comments on commit 95bdbf9

Please sign in to comment.