Skip to content

Commit

Permalink
Implement progressive call invocations (#57)
Browse files Browse the repository at this point in the history
* Update wampproto to latest

* Implement progressive call invocations

* Add function for progressive call results with progresssive call invocations

* Add funcion to export session id

* Add example for progressive call invocations

* Add example for progressive results
  • Loading branch information
muzzammilshahid authored Nov 1, 2024
1 parent a919320 commit cd5d012
Showing 9 changed files with 496 additions and 20 deletions.
113 changes: 113 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
@@ -6,10 +6,12 @@ import (
"fmt"
"log"
"testing"
"time"

"github.com/gammazero/workerpool"
"github.com/stretchr/testify/require"

"github.com/xconnio/wampproto-go"
"github.com/xconnio/xconn-go"
)

@@ -135,3 +137,114 @@ func TestProgressiveCallResults(t *testing.T) {
require.Equal(t, "done", result.Arguments[0])
})
}

func TestProgressiveCallInvocation(t *testing.T) {
session := connect(t)

// Store progress updates
progressUpdates := make([]int, 0)
reg, err := session.Register("foo.bar.progress",
func(ctx context.Context, invocation *xconn.Invocation) *xconn.Result {
progress := int(invocation.Arguments[0].(float64))
progressUpdates = append(progressUpdates, progress)

isProgress, _ := invocation.Details[wampproto.OptionProgress].(bool)
if isProgress {
return &xconn.Result{Err: xconn.ErrNoResult}
}

return &xconn.Result{Arguments: []any{"done"}}
}, nil)
require.NoError(t, err)
require.NotNil(t, reg)

t.Run("ProgressiveCall", func(t *testing.T) {
totalChunks := 6
chunkIndex := 1

result, err := session.CallProgressive(context.Background(), "foo.bar.progress",
func(ctx context.Context) *xconn.Progress {
options := map[string]any{}

// Mark the last chunk as non-progressive
if chunkIndex == totalChunks-1 {
options[wampproto.OptionProgress] = false
} else {
options[wampproto.OptionProgress] = true
}

args := []any{chunkIndex}
chunkIndex++

time.Sleep(10 * time.Millisecond)
return &xconn.Progress{Arguments: args, Options: options}
})
require.NoError(t, err)

// Verify progressive updates received correctly
require.Equal(t, []int{1, 2, 3, 4, 5}, progressUpdates)

// Verify the final result
require.Equal(t, "done", result.Arguments[0])
})
}

func TestCallProgressiveProgress(t *testing.T) {
session := connect(t)

// Store progress updates
progressUpdates := make([]int, 0)
reg, err := session.Register("foo.bar.progress",
func(ctx context.Context, invocation *xconn.Invocation) *xconn.Result {
progress := int(invocation.Arguments[0].(float64))
progressUpdates = append(progressUpdates, progress)

isProgress, _ := invocation.Details[wampproto.OptionProgress].(bool)
if isProgress {
err := invocation.SendProgress([]any{progress}, nil)
require.NoError(t, err)
return &xconn.Result{Err: xconn.ErrNoResult}
}

return &xconn.Result{Arguments: []any{progress}}
}, nil)
require.NoError(t, err)
require.NotNil(t, reg)

t.Run("ProgressiveCall", func(t *testing.T) {
receivedProgressBack := make([]int, 0)
totalChunks := 6
chunkIndex := 1

result, err := session.CallProgressiveProgress(context.Background(), "foo.bar.progress",
func(ctx context.Context) *xconn.Progress {
options := map[string]any{}

// Mark the last chunk as non-progressive
if chunkIndex == totalChunks-1 {
options[wampproto.OptionProgress] = false
} else {
options[wampproto.OptionProgress] = true
}

args := []any{chunkIndex}
chunkIndex++

time.Sleep(10 * time.Millisecond)
return &xconn.Progress{Arguments: args, Options: options}
}, func(result *xconn.Result) {
progress := int(result.Arguments[0].(float64))
receivedProgressBack = append(receivedProgressBack, progress)
})
require.NoError(t, err)

finalResult := int(result.Arguments[0].(float64))
receivedProgressBack = append(receivedProgressBack, finalResult)

// Verify progressive updates received correctly
require.Equal(t, []int{1, 2, 3, 4, 5}, progressUpdates)

// Verify progressive updates mirrored correctly
require.Equal(t, progressUpdates, receivedProgressBack)
})
}
55 changes: 55 additions & 0 deletions examples/rpc_progressive_invocation/callee/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package main

import (
"context"
"fmt"
"log"
"os"
"os/signal"

"github.com/xconnio/wampproto-go"
"github.com/xconnio/xconn-go"
)

const procedureProgressUpload = "io.xconn.progress.upload"

func main() {
ctx := context.Background()
client := xconn.Client{}
callee, err := client.Connect(ctx, "ws://localhost:8080/ws", "realm1")
if err != nil {
log.Fatalf("Failed to connect to server: %s", err)
}
defer func() { _ = callee.Leave() }()

invocationHandler := func(ctx context.Context, invocation *xconn.Invocation) *xconn.Result {
isProgress, _ := invocation.Details[wampproto.OptionProgress].(bool)

// Handle the progressive chunk
if isProgress {
chunkIndex := invocation.Arguments[0].(float64)
fmt.Printf("Received chunk %v\n", chunkIndex)
return &xconn.Result{Err: xconn.ErrNoResult}
}

// Final response after all chunks are received
fmt.Println("All chunks received, processing complete.")
return &xconn.Result{Arguments: []any{"Upload complete"}}
}

registration, err := callee.Register(procedureProgressUpload, invocationHandler, nil)
if err != nil {
log.Fatalf("Failed to register procedure: %s", err)
}
defer func() { _ = callee.Unregister(registration.ID) }()

// Wait for interrupt signal to gracefully shut down
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
select {
case <-sigChan:
log.Println("Interrupt signal received, shutting down.")
case <-ctx.Done():
log.Println("Context canceled, shutting down.")
}
}
56 changes: 56 additions & 0 deletions examples/rpc_progressive_invocation/caller/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package main

import (
"context"
"fmt"
"log"
"time"

"github.com/xconnio/wampproto-go"
"github.com/xconnio/xconn-go"
)

const procedureProgressUpload = "io.xconn.progress.upload"

func main() {
ctx := context.Background()
client := xconn.Client{}
caller, err := client.Connect(ctx, "ws://localhost:8080/ws", "realm1")
if err != nil {
log.Fatalf("Failed to connect to server: %s", err)
}
defer func() { _ = caller.Leave() }()

totalChunks := 6
chunkIndex := 0

// Simulate file data being uploaded in chunks
fmt.Println("Starting file upload...")

result, err := caller.CallProgressive(ctx, procedureProgressUpload, func(ctx context.Context) *xconn.Progress {
options := map[string]any{}

// Mark the last chunk as non-progressive
if chunkIndex == totalChunks-1 {
options[wampproto.OptionProgress] = false
} else {
options[wampproto.OptionProgress] = true
}

// Simulate sending each chunk of the file
fmt.Printf("Uploading chunk %d...\n", chunkIndex)
args := []any{chunkIndex}
chunkIndex++

// Simulate network delay between chunks
time.Sleep(500 * time.Millisecond)

return &xconn.Progress{Arguments: args, Options: options}
})

if err != nil {
log.Fatalf("Failed to upload data: %s", err)
}

fmt.Println("Final result:", result.Arguments[0])
}
59 changes: 59 additions & 0 deletions examples/rpc_progressive_results/callee/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package main

import (
"context"
"fmt"
"log"
"os"
"os/signal"

"github.com/xconnio/wampproto-go"
"github.com/xconnio/xconn-go"
)

const procedureProgressUpload = "io.xconn.progress.upload"

func main() {
ctx := context.Background()
client := xconn.Client{}
callee, err := client.Connect(ctx, "ws://localhost:8080/ws", "realm1")
if err != nil {
log.Fatalf("Failed to connect to server: %s", err)
}
defer func() { _ = callee.Leave() }()

invocationHandler := func(ctx context.Context, invocation *xconn.Invocation) *xconn.Result {
isProgress, _ := invocation.Details[wampproto.OptionProgress].(bool)
chunkIndex := invocation.Arguments[0].(float64)

if isProgress {
// Mirror back the received chunk as progress
fmt.Printf("Received chunk %v, sending progress back\n", chunkIndex)
if err = invocation.SendProgress([]any{chunkIndex}, nil); err != nil {
return &xconn.Result{Err: "wamp.error.canceled", Arguments: []any{err.Error()}}
}

return &xconn.Result{Err: xconn.ErrNoResult}
}

// Final response when all chunks are received
fmt.Println("All chunks received, processing complete.")
return &xconn.Result{Arguments: []any{fmt.Sprintf("Upload complete, chunk %v acknowledged", chunkIndex)}}
}

registration, err := callee.Register(procedureProgressUpload, invocationHandler, nil)
if err != nil {
log.Fatalf("Failed to register method: %s", err)
}
defer func() { _ = callee.Unregister(registration.ID) }()

// Wait for interrupt signal to gracefully shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
select {
case <-sigChan:
log.Println("Interrupt signal received, shutting down.")
case <-ctx.Done():
log.Println("Context canceled, shutting down.")
}
}
59 changes: 59 additions & 0 deletions examples/rpc_progressive_results/caller/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package main

import (
"context"
"fmt"
"log"
"time"

"github.com/xconnio/wampproto-go"
"github.com/xconnio/xconn-go"
)

const procedureProgressUpload = "io.xconn.progress.upload"

func main() {
ctx := context.Background()
client := xconn.Client{}
caller, err := client.Connect(ctx, "ws://localhost:8080/ws", "realm1")
if err != nil {
log.Fatalf("Failed to connect to server: %s", err)
}
defer func() { _ = caller.Leave() }()

totalChunks := 5
chunkIndex := 0

fmt.Println("Starting file upload...")

result, err := caller.CallProgressiveProgress(ctx, procedureProgressUpload, func(ctx context.Context) *xconn.Progress {
options := map[string]any{}

// Mark the last chunk as non-progressive
if chunkIndex == totalChunks-1 {
options[wampproto.OptionProgress] = false
} else {
options[wampproto.OptionProgress] = true
}

// Simulate uploading chunk
fmt.Printf("Sending chunk %d\n", chunkIndex)
args := []any{chunkIndex}
chunkIndex++

// Simulate delay for each chunk
time.Sleep(500 * time.Millisecond)

return &xconn.Progress{Arguments: args, Options: options}
}, func(result *xconn.Result) {
// Handle progress updates mirrored by the callee
chunkProgress := result.Arguments[0].(float64)
fmt.Printf("Progress update: chunk %v acknowledged by server\n", chunkProgress)
})

if err != nil {
log.Fatalf("Failed to upload data: %s", err)
}

fmt.Printf("Upload complete: %s\n", result.Arguments[0])
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ require (
github.com/gobwas/ws v1.4.0
github.com/projectdiscovery/ratelimit v0.0.50
github.com/stretchr/testify v1.9.0
github.com/xconnio/wampproto-go v0.0.0-20240920091217-fd8f83f21c54
github.com/xconnio/wampproto-go v0.0.0-20241021144224-de559a3b2e29
github.com/xconnio/wampproto-protobuf/go v0.0.0-20240706133816-0ca5f0268ce9
golang.org/x/exp v0.0.0-20240525044651-4c93da0ed11d
gopkg.in/yaml.v3 v3.0.1
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -41,8 +41,8 @@ github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAh
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/xconnio/wampproto-go v0.0.0-20240920091217-fd8f83f21c54 h1:uqKiqnmD6XSnX65WbUUNmIyW4L0oaPeOQPytzrxZPyg=
github.com/xconnio/wampproto-go v0.0.0-20240920091217-fd8f83f21c54/go.mod h1:/b7EyR1X9EkOHPQBJGz1KvdjClo1GsalBGIzjQU5+i4=
github.com/xconnio/wampproto-go v0.0.0-20241021144224-de559a3b2e29 h1:l5gcIsKVq3mBOVaIm0EXQGhwCYjy1PtuEDCK3YYI2ro=
github.com/xconnio/wampproto-go v0.0.0-20241021144224-de559a3b2e29/go.mod h1:/b7EyR1X9EkOHPQBJGz1KvdjClo1GsalBGIzjQU5+i4=
github.com/xconnio/wampproto-protobuf/go v0.0.0-20240706133816-0ca5f0268ce9 h1:N0W6uTElFFj/nl88fAtCwUw0y0pdHbtn3QPQri/iGsw=
github.com/xconnio/wampproto-protobuf/go v0.0.0-20240706133816-0ca5f0268ce9/go.mod h1:k3t5aYBC+1ujppNAaIgu+Kn7oryRSwsP3o362HkAAho=
github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8Ydu2Bstc=
Loading

0 comments on commit cd5d012

Please sign in to comment.