Skip to content

Commit

Permalink
feat: add support for web transports
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Jan 3, 2024
1 parent d7c816a commit 586c8c9
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 1 deletion.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/google/uuid v1.3.0
github.com/onsi/ginkgo v1.12.1
github.com/onsi/gomega v1.11.0
github.com/quic-go/webtransport-go v0.6.0
github.com/stretchr/testify v1.8.4
github.com/teivah/onecontext v1.3.0
github.com/vmihailenco/msgpack/v5 v5.3.5
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ github.com/onsi/gomega v1.11.0 h1:+CqWgvj0OZycCaqclBD1pxKHAU+tOkHmQIWvDHq2aug=
github.com/onsi/gomega v1.11.0/go.mod h1:azGKhqFUon9Vuj0YmTfLSmx0FUwqXYSTl5re8lQLTUg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/quic-go/webtransport-go v0.6.0/go.mod h1:9KjU4AEBqEQidGHNDkZrb8CAa1abRaosM2yGOyiikEc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand Down
10 changes: 9 additions & 1 deletion httpconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/url"
"path"

"github.com/quic-go/webtransport-go"

Check failure on line 12 in httpconnection.go

View workflow job for this annotation

GitHub Actions / Test and Coverage

missing go.sum entry for module providing package github.com/quic-go/webtransport-go (imported by github.com/philippseith/signalr); to add:
"nhooyr.io/websocket"
)

Expand Down Expand Up @@ -126,7 +127,14 @@ func NewHTTPConnection(ctx context.Context, address string, options ...func(*htt
var conn Connection
switch {
case negotiateResponse.hasTransport("WebTransports"):
// TODO
var d webtransport.Dialer
_, wtConn, err := d.Dial(ctx, reqURL.String(), req.Header)
if err != nil {
return nil, err
}

// TODO think about if the API should give the possibility to cancel this connections
conn = newWebTransportsConnection(context.Background(), negotiateResponse.ConnectionID, wtConn)

case httpConn.hasTransport(TransportWebSockets) && negotiateResponse.hasTransport(TransportWebSockets):
wsURL := reqURL
Expand Down
68 changes: 68 additions & 0 deletions webtransportsconnection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package signalr

import (
"context"
"fmt"
"github.com/quic-go/webtransport-go"
"sync"
)

type webTransportsConnection struct {
ConnectionBase
session *webtransport.Session

Check failure on line 12 in webtransportsconnection.go

View workflow job for this annotation

GitHub Actions / golangci

undefined: webtransport (typecheck)

// current stream guarded by mutex
stream webtransport.Stream

Check failure on line 15 in webtransportsconnection.go

View workflow job for this annotation

GitHub Actions / golangci

undefined: webtransport (typecheck)
sync.Mutex
}

func newWebTransportsConnection(ctx context.Context, connectionID string, session *webtransport.Session) *webTransportsConnection {

Check failure on line 19 in webtransportsconnection.go

View workflow job for this annotation

GitHub Actions / golangci

undefined: webtransport (typecheck)
w := &webTransportsConnection{
session: session,
ConnectionBase: *NewConnectionBase(ctx, connectionID),
}
return w
}

func (w *webTransportsConnection) Write(p []byte) (n int, err error) {
err = w.syncStream()
if err != nil {
return 0, err
}
n, err = ReadWriteWithContext(w.Context(),
func() (int, error) {
return w.stream.Write(p)
},
func() {})
if err != nil {
err = fmt.Errorf("%T: %w", w, err)
_ = w.stream.Close()
}
return n, err
}

func (w *webTransportsConnection) Read(p []byte) (n int, err error) {
err = w.syncStream()
if err != nil {
return 0, err
}
n, err = ReadWriteWithContext(w.Context(),
func() (int, error) {
return w.stream.Read(p)
},
func() {})
if err != nil {
err = fmt.Errorf("%T: %w", w, err)
_ = w.stream.Close()
}
return n, err
}

func (w *webTransportsConnection) syncStream() (err error) {
w.Lock()
defer w.Unlock()
if w.stream == nil {
w.stream, err = w.session.OpenStream()
}
return
}

0 comments on commit 586c8c9

Please sign in to comment.