-
Notifications
You must be signed in to change notification settings - Fork 0
/
socket.go
110 lines (86 loc) · 1.83 KB
/
socket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package gomq
import (
"net/url"
"github.com/workspace-9/gomq/zmtp"
)
type Socket struct {
driver SocketDriver
mech zmtp.Mechanism
ctx *Context
}
func (s Socket) Connect(addr string) error {
url, err := url.Parse(addr)
if err != nil {
return err
}
tp, ok := s.ctx.getTransport(url.Scheme)
if !ok {
return ErrTransportNotFound
}
if err := s.driver.Connect(tp, url); err != nil {
return err
}
return nil
}
type transportNotFound struct{}
func (transportNotFound) Error() string {
return "Transport not found"
}
var ErrTransportNotFound transportNotFound
func (s Socket) Bind(addr string) error {
url, err := url.Parse(addr)
if err != nil {
return err
}
tp, ok := s.ctx.getTransport(url.Scheme)
if !ok {
return ErrTransportNotFound
}
if err := s.driver.Bind(tp, url); err != nil {
return err
}
return nil
}
func (s Socket) Send(data [][]byte) error {
messages := make([]zmtp.Message, len(data))
for idx, datum := range data {
messages[idx] = zmtp.Message{
More: idx != len(data)-1, Body: datum,
}
}
return s.driver.Send(messages)
}
func (s Socket) Recv() ([][]byte, error) {
messages, err := s.driver.Recv()
if err != nil {
return nil, err
}
data := make([][]byte, len(messages))
for idx, message := range messages {
data[idx] = message.Body
}
return data, nil
}
func (s Socket) Close() error {
return s.driver.Close()
}
func (s Socket) Disconnect(addr string) error {
url, err := url.Parse(addr)
if err != nil {
return err
}
return s.driver.Disconnect(url)
}
func (s Socket) Unbind(addr string) error {
url, err := url.Parse(addr)
if err != nil {
return err
}
return s.driver.Unbind(url)
}
func (s Socket) SetOption(option string, val any) error {
return s.mech.SetOption(option, val)
}
func (s Socket) SetServer(serv bool) error {
return s.SetOption(zmtp.OptionServer, serv)
}