Skip to content

Commit

Permalink
Remove quesma/network (#1024)
Browse files Browse the repository at this point in the history
Related: #1017

Signed-off-by: Przemysław Hejman <[email protected]>
  • Loading branch information
mieciu authored Nov 21, 2024
1 parent 1424c9d commit 14c9d62
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 22 deletions.
5 changes: 2 additions & 3 deletions quesma/proxy/l4_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"quesma/clickhouse"
"quesma/elasticsearch"
"quesma/logger"
"quesma/network"
"quesma/quesma/config"
"quesma/quesma/types"
"quesma/stats"
Expand All @@ -27,15 +26,15 @@ const (
)

type TcpProxy struct {
From network.Port
From util.Port
To string
inspect bool
inspectHttpServer *http.Server
ready chan struct{}
acceptingConnections atomic.Bool
}

func NewTcpProxy(From network.Port, To string, inspect bool) *TcpProxy {
func NewTcpProxy(From util.Port, To string, inspect bool) *TcpProxy {
return &TcpProxy{
From: From,
To: To,
Expand Down
16 changes: 8 additions & 8 deletions quesma/proxy/l4_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"log"
"net"
"net/http"
"quesma/network"
"quesma/stats"
"quesma/util"
"slices"
"strconv"
"testing"
Expand Down Expand Up @@ -53,7 +53,7 @@ func TestTcpProxy_IngestAndProcess(t *testing.T) {
verifyStatistics(t, fromPort)
}

func verifyStatistics(t *testing.T, port network.Port) {
func verifyStatistics(t *testing.T, port util.Port) {
go func() {
_, err := http.Post(fmt.Sprintf("http://localhost:%d/logs/_doc", int(port)), "application/json", bytes.NewBuffer([]byte(exampleLog())))
if err != nil {
Expand All @@ -73,7 +73,7 @@ func verifyStatistics(t *testing.T, port network.Port) {
t.Fatal("Statistics not updated")
}

func verifyTCPProxy(t1 *testing.T, data string, fromPort network.Port, toPort network.Port) {
func verifyTCPProxy(t1 *testing.T, data string, fromPort util.Port, toPort util.Port) {
conn, err := net.Dial("tcp", ":"+strconv.Itoa(int(fromPort)))
if err != nil {
t1.Fatal("Error dialing to port:", err)
Expand Down Expand Up @@ -106,22 +106,22 @@ func verifyTCPProxy(t1 *testing.T, data string, fromPort network.Port, toPort ne
}
}

var allocatedPorts = make([]network.Port, 0)
var allocatedPorts = make([]util.Port, 0)

func findFreePort() network.Port {
func findFreePort() util.Port {
port := 11000

for {
if slices.Contains(allocatedPorts, network.Port(port)) {
if slices.Contains(allocatedPorts, util.Port(port)) {
port++
continue
}
listener, err := net.Listen("tcp", ":"+strconv.Itoa(port))
if err == nil {
_ = listener.Close()
allocatedPorts = append(allocatedPorts, network.Port(port))
allocatedPorts = append(allocatedPorts, util.Port(port))
log.Println("Allocated port:", port)
return network.Port(port)
return util.Port(port)
}
port++
}
Expand Down
3 changes: 1 addition & 2 deletions quesma/quesma/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"log"
"os"
"quesma/elasticsearch/elasticsearch_field_types"
"quesma/network"
"quesma/util"
"strings"
)
Expand All @@ -38,7 +37,7 @@ type QuesmaConfiguration struct {
Elasticsearch ElasticsearchConfiguration
IndexConfig map[string]IndexConfiguration
Logging LoggingConfiguration
PublicTcpPort network.Port
PublicTcpPort util.Port
IngestStatistics bool
QuesmaInternalTelemetryUrl *Url
DisableAuth bool
Expand Down
8 changes: 4 additions & 4 deletions quesma/quesma/config/config_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/knadh/koanf/v2"
"github.com/rs/zerolog"
"log"
"quesma/network"
"quesma/util"
"reflect"
"slices"
"strings"
Expand Down Expand Up @@ -72,8 +72,8 @@ type FrontendConnector struct {
}

type FrontendConnectorConfiguration struct {
ListenPort network.Port `koanf:"listenPort"`
DisableAuth bool `koanf:"disableAuth"`
ListenPort util.Port `koanf:"listenPort"`
DisableAuth bool `koanf:"disableAuth"`
}

type BackendConnector struct {
Expand Down Expand Up @@ -849,7 +849,7 @@ END:
return conf
}

func (c *QuesmaNewConfiguration) getPublicTcpPort() (network.Port, error) {
func (c *QuesmaNewConfiguration) getPublicTcpPort() (util.Port, error) {
// per validation, there's always at least one frontend connector,
// even if there's a second one, it has to listen on the same port
return c.FrontendConnectors[0].Config.ListenPort, nil
Expand Down
4 changes: 2 additions & 2 deletions quesma/quesma/dual_write_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (
"quesma/clickhouse"
"quesma/elasticsearch"
"quesma/logger"
"quesma/network"
"quesma/quesma/async_search_storage"
"quesma/quesma/config"
"quesma/quesma/mux"
"quesma/quesma/recovery"
"quesma/quesma/ui"
"quesma/schema"
"quesma/telemetry"
"quesma/util"
"strconv"
"sync/atomic"
"time"
Expand Down Expand Up @@ -58,7 +58,7 @@ type dualWriteHttpProxy struct {
elasticRouter *mux.PathRouter
indexManagement elasticsearch.IndexManagement
logManager *clickhouse.LogManager
publicPort network.Port
publicPort util.Port
asyncQueriesEvictor *async_search_storage.AsyncQueriesEvictor
queryRunner *QueryRunner
schemaRegistry schema.Registry
Expand Down
3 changes: 1 addition & 2 deletions quesma/quesma/quesma.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"quesma/feature"
"quesma/ingest"
"quesma/logger"
"quesma/network"
"quesma/proxy"
"quesma/queryparser"
"quesma/quesma/config"
Expand All @@ -39,7 +38,7 @@ import (
type (
Quesma struct {
processor engine
publicTcpPort network.Port
publicTcpPort util.Port
quesmaManagementConsole *ui.QuesmaManagementConsole
config *config.QuesmaConfiguration
telemetryAgent telemetry.PhoneHomeAgent
Expand Down
3 changes: 2 additions & 1 deletion quesma/network/types.go → quesma/util/port.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package network

package util

import (
"fmt"
Expand Down

0 comments on commit 14c9d62

Please sign in to comment.