Skip to content

Commit

Permalink
update reopen up stream
Browse files Browse the repository at this point in the history
  • Loading branch information
MacOMNI committed Nov 7, 2024
1 parent 4062b60 commit f1b8190
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 47 deletions.
6 changes: 3 additions & 3 deletions Networking/Sources/MsQuicSwift/QuicStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public final class QuicStream: Sendable {
}

public func shutdown(errorCode: QuicErrorCode = .success) throws {
logger.info("closing stream \(errorCode)")
logger.debug("closing stream \(errorCode)")

try storage.write { storage in
guard let storage2 = storage else {
Expand Down Expand Up @@ -206,10 +206,10 @@ private class StreamHandle {
}

case QUIC_STREAM_EVENT_PEER_SEND_ABORTED:
logger.info("Peer send aborted")
logger.trace("Peer send aborted")

case QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE:
logger.info("Stream shutdown complete")
logger.trace("Stream shutdown complete")

let evtData = event.pointee.SHUTDOWN_COMPLETE
if let stream {
Expand Down
4 changes: 2 additions & 2 deletions Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,14 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
if existingStream.stream.id < stream.stream.id {
// The new stream has a higher ID, so reset the existing one
existingStream.close(abort: false)
logger.info(
logger.debug(
"Reset older UP stream with lower ID",
metadata: ["existingStreamId": "\(existingStream.stream.id)", "newStreamId": "\(stream.stream.id)"]
)
} else {
// The existing stream has a higher ID or is equal, so reset the new one
stream.close(abort: false)
logger.info(
logger.debug(
"Duplicate UP stream detected, closing new stream with lower or equal ID",
metadata: ["existingStreamId": "\(existingStream.stream.id)", "newStreamId": "\(stream.stream.id)"]
)
Expand Down
121 changes: 80 additions & 41 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public enum PeerRole: Sendable, Hashable {
// case proxy // not yet specified
}

struct ReconnectState {
struct BackoffState {
var attempt: Int
var delay: TimeInterval

Expand All @@ -25,7 +25,6 @@ struct ReconnectState {
delay = 1
}

// Initializer with custom values
init(attempt: Int = 0, delay: TimeInterval = 1) {
self.attempt = attempt
self.delay = delay
Expand Down Expand Up @@ -234,9 +233,10 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {

fileprivate let connections: ThreadSafeContainer<ConnectionStorage> = .init(.init())
fileprivate let streams: ThreadSafeContainer<[UniqueId: Stream<Handler>]> = .init([:])
fileprivate let reconnectStates: ThreadSafeContainer<[NetAddr: ReconnectState]> = .init([:])
fileprivate let reconnectStates: ThreadSafeContainer<[NetAddr: BackoffState]> = .init([:])
fileprivate let reopenStates: ThreadSafeContainer<[UniqueId: BackoffState]> = .init([:])

let reconnectMaxRetryAttempts = 5
let maxRetryAttempts = 5
let presistentStreamHandler: Handler.PresistentHandler
let ephemeralStreamHandler: Handler.EphemeralHandler

Expand Down Expand Up @@ -297,39 +297,70 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
let state = reconnectStates.read { reconnectStates in
reconnectStates[address] ?? .init()
}
if state.attempt < reconnectMaxRetryAttempts {
reconnectStates.write { reconnectStates in
if var state = reconnectStates[address] {
state.applyBackoff()
reconnectStates[address] = state
}

guard state.attempt < maxRetryAttempts else {
logger.warning("reconnecting to \(address) exceeded max attempts")
return
}

reconnectStates.write { reconnectStates in
if var state = reconnectStates[address] {
state.applyBackoff()
reconnectStates[address] = state
}
Task {
try await Task.sleep(for: .seconds(state.delay))
try connections.write { connections in
if connections.byAddr[address] != nil {
logger.warning("reconnecting to \(address) already connected")
return
}
let quicConn = try QuicConnection(
handler: PeerEventHandler(self),
registration: clientConfiguration.registration,
configuration: clientConfiguration
)
try quicConn.connect(to: address)
let conn = Connection(
quicConn,
impl: self,
role: role,
remoteAddress: address,
initiatedByLocal: true
)
connections.byAddr[address] = conn
connections.byId[conn.id] = conn
}
Task {
try await Task.sleep(for: .seconds(state.delay))
try connections.write { connections in
if connections.byAddr[address] != nil {
logger.warning("reconnecting to \(address) already connected")
return
}
let quicConn = try QuicConnection(
handler: PeerEventHandler(self),
registration: clientConfiguration.registration,
configuration: clientConfiguration
)
try quicConn.connect(to: address)
let conn = Connection(
quicConn,
impl: self,
role: role,
remoteAddress: address,
initiatedByLocal: true
)
connections.byAddr[address] = conn
connections.byId[conn.id] = conn
}
}
}

func reopenUpStream(connection: Connection<Handler>, kind: Handler.PresistentHandler.StreamKind) {
let state = reopenStates.read { states in
states[connection.id] ?? .init()
}

guard state.attempt < maxRetryAttempts else {
logger.warning("Reopen attempt for stream \(kind) on connection \(connection.id) exceeded max attempts")
return
}

reopenStates.write { states in
if var state = states[connection.id] {
state.applyBackoff()
states[connection.id] = state
}
}

Task {
try await Task.sleep(for: .seconds(state.delay))
do {
logger.debug("Attempting to reopen UP stream of kind \(kind) for connection \(connection.id)")
try connection.createPreistentStream(kind: kind)
} catch {
logger.error("Failed to reopen UP stream for connection \(connection.id): \(error)")
reopenUpStream(connection: connection, kind: kind)
}
} else {
logger.warning("reconnect attempt exceeded max attempts")
}
}

Expand Down Expand Up @@ -546,6 +577,10 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
}
if let conn {
conn.streamStarted(stream: stream)
// Check
impl.reopenStates.write { states in
states[conn.id] = nil
}
}
}

Expand All @@ -558,11 +593,10 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
}
}

func closed(_ quicStream: QuicStream, status: QuicStatus, code: QuicErrorCode) {
func closed(_ quicStream: QuicStream, status: QuicStatus, code _: QuicErrorCode) {
let stream = impl.streams.read { streams in
streams[quicStream.id]
}
logger.info("closed stream \(String(describing: stream?.id)) \(status) \(code)")

if let stream {
let connection = impl.connections.read { connections in
Expand All @@ -573,6 +607,7 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
if shouldReopenStream(connection: connection, stream: stream, status: status) {
do {
if let kind = stream.kind {
// impl.reopenUpStream(connection: connection, kind: kind);
do {
try connection.createPreistentStream(kind: kind)
} catch {
Expand All @@ -593,13 +628,17 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
}
}

// TODO: Add all the cases about reopen up stream
private func shouldReopenStream(connection: Connection<Handler>, stream: Stream<Handler>, status: QuicStatus) -> Bool {
logger.info("reopen stream about connection needReconnect:\(connection.needReconnect) isClosed:\(connection.isClosed)")
// Need to reopen connection or close it
if connection.needReconnect || connection.isClosed {
// Only reopen if the stream is a persistent UP stream and the closure was unexpected
if connection.isClosed || connection.needReconnect || stream.kind == nil {
return false
}
// Only reopen if the stream is a persistent UP stream and the closure was unexpected
return stream.kind != nil && status.rawValue != QuicStatusCode.connectionIdle.rawValue
switch QuicStatusCode(rawValue: status.rawValue) {
case .connectionIdle, .badCert:
return false
default:
return !status.isSucceeded
}
}
}
3 changes: 2 additions & 1 deletion Networking/Tests/NetworkingTests/PeerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ struct PeerTests {
)
try await Task.sleep(for: .milliseconds(1000))
let lastReceivedData2 = await handler2.lastReceivedData
#expect(lastReceivedData2 != messageData)
}

@Test
Expand Down Expand Up @@ -731,7 +732,7 @@ struct PeerTests {
}

// Wait for message propagation
try? await Task.sleep(for: .milliseconds(100))
try? await Task.sleep(for: .milliseconds(1000))

// everyone should receive two messages
for (idx, handler) in handlers.enumerated() {
Expand Down

0 comments on commit f1b8190

Please sign in to comment.