Skip to content

Commit

Permalink
Consent syncing (#423)
Browse files Browse the repository at this point in the history
* update package

* get on the latest version of the sdk

* get it compiling

* update inbox create to take just the env

* update to handle ns

* add publish messages to the common interface

* update the listing functions

* switch on conversation type

* add tests for everything

* fix up lint issue
  • Loading branch information
nplasterer authored Nov 10, 2024
1 parent b1a37c0 commit 1727b34
Show file tree
Hide file tree
Showing 14 changed files with 480 additions and 278 deletions.
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ let package = Package(
.package(url: "https://github.com/1024jp/GzipSwift", from: "5.2.0"),
.package(url: "https://github.com/bufbuild/connect-swift", exact: "0.12.0"),
.package(url: "https://github.com/apple/swift-docc-plugin.git", from: "1.0.0"),
.package(url: "https://github.com/xmtp/libxmtp-swift.git", exact: "0.6.0"),
.package(url: "https://github.com/xmtp/libxmtp-swift.git", exact: "3.0.0"),
],
targets: [
// Targets are the basic building blocks of a package. A target can define a module or a test suite.
Expand Down
18 changes: 11 additions & 7 deletions Sources/XMTPiOS/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public final class Client {
{
let accountAddress = account.address.lowercased()
let inboxId = try await getOrCreateInboxId(
options: options, address: accountAddress)
api: options.api, address: accountAddress)

return try await initializeClient(
accountAddress: accountAddress,
Expand All @@ -155,7 +155,7 @@ public final class Client {
{
let accountAddress = address.lowercased()
let inboxId = try await getOrCreateInboxId(
options: options, address: accountAddress)
api: options.api, address: accountAddress)

return try await initializeClient(
accountAddress: accountAddress,
Expand Down Expand Up @@ -256,19 +256,19 @@ public final class Client {
}

public static func getOrCreateInboxId(
options: ClientOptions, address: String
api: ClientOptions.Api, address: String
) async throws -> String {
var inboxId: String
do {
inboxId =
try await getInboxIdForAddress(
logger: XMTPLogger(),
host: options.api.env.url,
isSecure: options.api.env.isSecure == true,
host: api.env.url,
isSecure: api.env.isSecure == true,
accountAddress: address
) ?? generateInboxId(accountAddress: address, nonce: 0)
} catch {
inboxId = generateInboxId(accountAddress: address, nonce: 0)
inboxId = try generateInboxId(accountAddress: address, nonce: 0)
}
return inboxId
}
Expand Down Expand Up @@ -387,7 +387,11 @@ public final class Client {
}

public func requestMessageHistorySync() async throws {
try await ffiClient.requestHistorySync()
try await ffiClient.sendSyncRequest(kind: .messages)
}

public func syncConsent() async throws {
try await ffiClient.sendSyncRequest(kind: .consent)
}

public func revokeAllOtherInstallations(signingKey: SigningKey) async throws
Expand Down
21 changes: 16 additions & 5 deletions Sources/XMTPiOS/Conversation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import LibXMTP
public enum Conversation: Identifiable, Equatable, Hashable {
case group(Group)
case dm(Dm)

public static func == (lhs: Conversation, rhs: Conversation) -> Bool {
lhs.topic == rhs.topic
}
Expand Down Expand Up @@ -44,7 +44,7 @@ public enum Conversation: Identifiable, Equatable, Hashable {
}
}

public func consentState() async throws -> ConsentState {
public func consentState() throws -> ConsentState {
switch self {
case let .group(group):
return try group.consentState()
Expand Down Expand Up @@ -92,6 +92,15 @@ public enum Conversation: Identifiable, Equatable, Hashable {
content: content, options: options)
}
}

public func publishMessages() async throws {
switch self {
case let .group(group):
return try await group.publishMessages()
case let .dm(dm):
return try await dm.publishMessages()
}
}

public var type: ConversationType {
switch self {
Expand Down Expand Up @@ -168,19 +177,21 @@ public enum Conversation: Identifiable, Equatable, Hashable {
}

public func messages(
limit: Int? = nil, before: Date? = nil, after: Date? = nil,
limit: Int? = nil,
beforeNs: Int64? = nil,
afterNs: Int64? = nil,
direction: SortDirection? = .descending,
deliveryStatus: MessageDeliveryStatus = .all
) async throws -> [DecodedMessage] {
switch self {
case let .group(group):
return try await group.messages(
before: before, after: after, limit: limit,
beforeNs: beforeNs, afterNs: afterNs, limit: limit,
direction: direction, deliveryStatus: deliveryStatus
)
case let .dm(dm):
return try await dm.messages(
before: before, after: after, limit: limit,
beforeNs: beforeNs, afterNs: afterNs, limit: limit,
direction: direction, deliveryStatus: deliveryStatus
)
}
Expand Down
163 changes: 103 additions & 60 deletions Sources/XMTPiOS/Conversations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public enum ConversationOrder {
case createdAt, lastMessage
}

public enum ConversationType {
case all, groups, dms
}

final class ConversationStreamCallback: FfiConversationCallback {
func onError(error: LibXMTP.FfiSubscribeError) {
print("Error ConversationStreamCallback \(error)")
Expand Down Expand Up @@ -80,11 +84,13 @@ public actor Conversations {
}

public func listGroups(
createdAfter: Date? = nil, createdBefore: Date? = nil, limit: Int? = nil
createdAfter: Date? = nil, createdBefore: Date? = nil,
limit: Int? = nil, order: ConversationOrder = .createdAt,
consentState: ConsentState? = nil
) async throws -> [Group] {
var options = FfiListConversationsOptions(
createdAfterNs: nil, createdBeforeNs: nil, limit: nil,
consentState: nil)
consentState: consentState?.toFFI)
if let createdAfter {
options.createdAfterNs = Int64(createdAfter.millisecondsSinceEpoch)
}
Expand All @@ -95,17 +101,25 @@ public actor Conversations {
if let limit {
options.limit = Int64(limit)
}
return try await ffiConversations.listGroups(opts: options).map {
let conversations = try await ffiConversations.listGroups(
opts: options)

let sortedConversations = try sortConversations(
conversations, order: order)

return sortedConversations.map {
$0.groupFromFFI(client: client)
}
}

public func listDms(
createdAfter: Date? = nil, createdBefore: Date? = nil, limit: Int? = nil
createdAfter: Date? = nil, createdBefore: Date? = nil,
limit: Int? = nil, order: ConversationOrder = .createdAt,
consentState: ConsentState? = nil
) async throws -> [Dm] {
var options = FfiListConversationsOptions(
createdAfterNs: nil, createdBeforeNs: nil, limit: nil,
consentState: nil)
consentState: consentState?.toFFI)
if let createdAfter {
options.createdAfterNs = Int64(createdAfter.millisecondsSinceEpoch)
}
Expand All @@ -116,7 +130,13 @@ public actor Conversations {
if let limit {
options.limit = Int64(limit)
}
return try await ffiConversations.listDms(opts: options).map {
let conversations = try await ffiConversations.listDms(
opts: options)

let sortedConversations = try sortConversations(
conversations, order: order)

return sortedConversations.map {
$0.dmFromFFI(client: client)
}
}
Expand Down Expand Up @@ -179,40 +199,49 @@ public actor Conversations {
}
}

public func stream() -> AsyncThrowingStream<
public func stream(type: ConversationType = .all) -> AsyncThrowingStream<
Conversation, Error
> {
AsyncThrowingStream { continuation in
let ffiStreamActor = FfiStreamActor()
let conversationCallback = ConversationStreamCallback {
conversation in
guard !Task.isCancelled else {
continuation.finish()
return
}
do {
let conversationType = try conversation.groupMetadata()
.conversationType()
if conversationType == "dm" {
continuation.yield(
Conversation.dm(
conversation.dmFromFFI(client: self.client))
)
} else if conversationType == "group" {
continuation.yield(
Conversation.group(
conversation.groupFromFFI(client: self.client))
)
}
} catch {
// Do nothing if the conversation type is neither a group or dm
}
}

let task = Task {
let stream = await ffiConversations.stream(
callback: ConversationStreamCallback { conversation in
guard !Task.isCancelled else {
continuation.finish()
return
}
do {
let conversationType =
try conversation.groupMetadata()
.conversationType()
if conversationType == "dm" {
continuation.yield(
Conversation.dm(
conversation.dmFromFFI(
client: self.client))
)
} else if conversationType == "group" {
continuation.yield(
Conversation.group(
conversation.groupFromFFI(
client: self.client))
)
}
} catch {
// Do nothing if the conversation type is neither a group or dm
}
let stream: FfiStreamCloser
switch type {
case .groups:
stream = await ffiConversations.streamGroups(
callback: conversationCallback)
case .all:
stream = await ffiConversations.stream(
callback: conversationCallback)
case .dms:
stream = await ffiConversations.streamDms(
callback: conversationCallback)
}
)
await ffiStreamActor.setFfiStream(stream)
continuation.onTermination = { @Sendable reason in
Task {
Expand Down Expand Up @@ -306,7 +335,8 @@ public actor Conversations {
throw ConversationError.memberCannotBeSelf
}
let addressMap = try await self.client.canMessage(addresses: addresses)
let unregisteredAddresses = addressMap
let unregisteredAddresses =
addressMap
.filter { !$0.value }
.map { $0.key }

Expand All @@ -328,34 +358,47 @@ public actor Conversations {
return group
}

public func streamAllMessages() -> AsyncThrowingStream<
DecodedMessage, Error
> {
public func streamAllMessages(type: ConversationType = .all)
-> AsyncThrowingStream<DecodedMessage, Error>
{
AsyncThrowingStream { continuation in
let ffiStreamActor = FfiStreamActor()
let task = Task {
let stream =
await ffiConversations
.streamAllMessages(
messageCallback: MessageCallback(client: self.client) {
message in
guard !Task.isCancelled else {
continuation.finish()
Task {
await ffiStreamActor.endStream() // End the stream upon cancellation
}
return
}
do {
continuation.yield(
try Message(
client: self.client, ffiMessage: message
).decode())
} catch {
print("Error onMessage \(error)")
}
}

let messageCallback = MessageCallback(client: self.client) {
message in
guard !Task.isCancelled else {
continuation.finish()
Task {
await ffiStreamActor.endStream()
}
return
}
do {
continuation.yield(
try Message(client: self.client, ffiMessage: message)
.decode()
)
} catch {
print("Error onMessage \(error)")
}
}

let task = Task {
let stream: FfiStreamCloser
switch type {
case .groups:
stream = await ffiConversations.streamAllGroupMessages(
messageCallback: messageCallback
)
case .dms:
stream = await ffiConversations.streamAllDmMessages(
messageCallback: messageCallback
)
case .all:
stream = await ffiConversations.streamAllMessages(
messageCallback: messageCallback
)
}
await ffiStreamActor.setFfiStream(stream)
}

Expand Down
Loading

0 comments on commit 1727b34

Please sign in to comment.