Skip to content

Commit

Permalink
#17 add implementation of the TCP sender
Browse files Browse the repository at this point in the history
  • Loading branch information
thegodenage committed Apr 23, 2024
1 parent ceed722 commit 4d73a2c
Showing 1 changed file with 46 additions and 9 deletions.
55 changes: 46 additions & 9 deletions internal/proxy/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,36 +54,44 @@ func (r *TCPReceiver) Run() error {
// of the incoming tcp connections to
// desired TCP location.
type TCPSender struct {
// pipes is a map of ipaddr string
// and channel of bytes used to send
// TCP connection to desired location
pipes map[string]any
remoteAddr string
listenAddr string

bytesChan chan []byte
}

var _ Sender = (*TCPSender)(nil)

// NewTCPSender creates new instance of the TCPSender, which is used
// to send bytes to other remote connection.
func NewTCPSender(listenAddr, remoteAddr string) *TCPSender {
return &TCPSender{
pipes: make(map[string]any),
remoteAddr: remoteAddr,
listenAddr: listenAddr,
bytesChan: make(chan []byte),
}
}

// Start starts the TCPSender which sends TCP bytes to
// desired host.
func (s *TCPSender) Start(ctx context.Context) error {
rAddr, err := net.ResolveTCPAddr("tcp", s.remoteAddr)
if err != nil {
return fmt.Errorf("resolving remote tcp addr: '%s': %w", s.remoteAddr, err)
}

go s.startRemoteSender(ctx, rAddr)
lAddr, err := net.ResolveTCPAddr("tcp", s.listenAddr)
if err != nil {
return fmt.Errorf("resolving listen tcp addr: '%s': %w", s.remoteAddr, err)
}

go s.startRemoteSender(ctx, rAddr, lAddr)

return nil
}

// Accept should be called in order to accepts new TCP bytes.
// Those bytes then are send to desired TCP location.
func (s *TCPSender) Accept(rw io.ReadWriter) error {
bytes, err := io.ReadAll(rw)
if err != nil {
Expand All @@ -95,9 +103,38 @@ func (s *TCPSender) Accept(rw io.ReadWriter) error {
return nil
}

func (s *TCPSender) startRemoteSender(ctx context.Context, remoteAddr *net.TCPAddr) {
net.DialTCP("tcp")
for bytes := range s.bytesChan {
// startRemoteSender is used to start process of sending incoming bytes to the host.
// It should be called in another go routine in order to not block the main routine.
func (s *TCPSender) startRemoteSender(ctx context.Context, listenAddr, remoteAddr *net.TCPAddr) {
conn, err := net.DialTCP("tcp", listenAddr, remoteAddr)
if err != nil {
fmt.Printf("error dialing remote host: %s", err.Error())
}

defer func() {
if err := conn.Close(); err != nil {
fmt.Printf("connection close failed: %s", err.Error())
}
}()

for {
select {
case bytes, ok := <-s.bytesChan:
if !ok {
fmt.Printf("bytes channel has been closed, closing connection")
return
}

n, err := conn.Write(bytes)
if err != nil {
fmt.Printf("error writing bytes to connection: %s", err.Error())
continue
}

fmt.Printf("written: %d bytes to remote connection", n)
case <-ctx.Done():
fmt.Println("context done")
return
}
}
}

0 comments on commit 4d73a2c

Please sign in to comment.