Skip to content

Commit

Permalink
refactor: sync latest postgres parser(keploy#1720)
Browse files Browse the repository at this point in the history
* refactor: sync parsers acc to latest refactor changes

Signed-off-by: Sarthak160 <[email protected]>

* chore: remove comments

Signed-off-by: Sarthak160 <[email protected]>

* chore: remove linter errors

Signed-off-by: Sarthak160 <[email protected]>

* chore: fix linters

Signed-off-by: charankamarapu <[email protected]>

* chore: fix linters

Signed-off-by: charankamarapu <[email protected]>

* chore: fix linters

Signed-off-by: charankamarapu <[email protected]>

* fix: channel and conn closing

Signed-off-by: charankamarapu <[email protected]>

* chore: fix linters

Signed-off-by: charankamarapu <[email protected]>

---------

Signed-off-by: Sarthak160 <[email protected]>
Signed-off-by: charankamarapu <[email protected]>
Co-authored-by: charankamarapu <[email protected]>
  • Loading branch information
Sarthak160 and charankamarapu authored Mar 23, 2024
1 parent e359091 commit c64ab91
Show file tree
Hide file tree
Showing 19 changed files with 839 additions and 97 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module go.keploy.io/server/v2

go 1.21.0

replace github.com/jackc/pgproto3/v2 => github.com/keploy/pgproto3/v2 v2.0.2
replace github.com/jackc/pgproto3/v2 => github.com/keploy/pgproto3/v2 v2.0.5

require (
github.com/Microsoft/go-winio v0.6.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/k0kubun/pp/v3 v3.2.0 h1:h33hNTZ9nVFNP3u2Fsgz8JXiF5JINoZfFq4SvKJwNcs=
github.com/k0kubun/pp/v3 v3.2.0/go.mod h1:ODtJQbQcIRfAD3N+theGCV1m/CBxweERz2dapdz1EwA=
github.com/keploy/pgproto3/v2 v2.0.2 h1:exp+WlBBWucEmiYsjXezGrhzShdyHWkvQoIXzdj7Vj8=
github.com/keploy/pgproto3/v2 v2.0.2/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
github.com/keploy/pgproto3/v2 v2.0.5 h1:8spdNKZ+nOnHVxiimDsqulBRN6viPXPghkA7xppnzJ8=
github.com/keploy/pgproto3/v2 v2.0.5/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/proxy/integrations/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (g *Generic) MatchType(_ context.Context, _ []byte) bool {
}

func (g *Generic) RecordOutgoing(ctx context.Context, src net.Conn, dst net.Conn, mocks chan<- *models.Mock, opts models.OutgoingOptions) error {
logger := g.logger.With(zap.Any("Client IP Address", src.RemoteAddr().String()), zap.Any("Client ConnectionID", util.GetNextID()), zap.Any("Destination ConnectionID", util.GetNextID()))
logger := g.logger.With(zap.Any("Client IP Address", src.RemoteAddr().String()), zap.Any("Client ConnectionID", ctx.Value(models.ClientConnectionIDKey).(string)), zap.Any("Destination ConnectionID", ctx.Value(models.DestConnectionIDKey).(string)))

reqBuf, err := util.ReadInitialBuf(ctx, logger, src)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/proxy/integrations/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (g *Grpc) MatchType(_ context.Context, reqBuf []byte) bool {
}

func (g *Grpc) RecordOutgoing(ctx context.Context, src net.Conn, dst net.Conn, mocks chan<- *models.Mock, opts models.OutgoingOptions) error {
logger := g.logger.With(zap.Any("Client IP Address", src.RemoteAddr().String()), zap.Any("Client ConnectionID", util.GetNextID()), zap.Any("Destination ConnectionID", util.GetNextID()))
logger := g.logger.With(zap.Any("Client IP Address", src.RemoteAddr().String()), zap.Any("Client ConnectionID", ctx.Value(models.ClientConnectionIDKey).(string)), zap.Any("Destination ConnectionID", ctx.Value(models.DestConnectionIDKey).(string)))

reqBuf, err := util.ReadInitialBuf(ctx, logger, src)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/proxy/integrations/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (h *HTTP) MatchType(_ context.Context, buf []byte) bool {
}

func (h *HTTP) RecordOutgoing(ctx context.Context, src net.Conn, dst net.Conn, mocks chan<- *models.Mock, opts models.OutgoingOptions) error {
logger := h.logger.With(zap.Any("Client IP Address", src.RemoteAddr().String()), zap.Any("Client ConnectionID", util.GetNextID()), zap.Any("Destination ConnectionID", util.GetNextID()))
logger := h.logger.With(zap.Any("Client IP Address", src.RemoteAddr().String()), zap.Any("Client ConnectionID", ctx.Value(models.ClientConnectionIDKey).(string)), zap.Any("Destination ConnectionID", ctx.Value(models.DestConnectionIDKey).(string)))

h.logger.Debug("Recording the outgoing http call in record mode")

Expand Down
2 changes: 1 addition & 1 deletion pkg/core/proxy/integrations/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (m *Mongo) MatchType(_ context.Context, buffer []byte) bool {
}

func (m *Mongo) RecordOutgoing(ctx context.Context, src net.Conn, dst net.Conn, mocks chan<- *models.Mock, opts models.OutgoingOptions) error {
logger := m.logger.With(zap.Any("Client IP Address", src.RemoteAddr().String()), zap.Any("Client ConnectionID", util.GetNextID()), zap.Any("Destination ConnectionID", util.GetNextID()))
logger := m.logger.With(zap.Any("Client IP Address", src.RemoteAddr().String()), zap.Any("Client ConnectionID", ctx.Value(models.ClientConnectionIDKey).(string)), zap.Any("Destination ConnectionID", ctx.Value(models.DestConnectionIDKey).(string)))
reqBuf, err := util.ReadInitialBuf(ctx, logger, src)
if err != nil {
utils.LogError(logger, err, "failed to read the initial mongo message")
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/proxy/integrations/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (m *MySQL) MatchType(_ context.Context, _ []byte) bool {
}

func (m *MySQL) RecordOutgoing(ctx context.Context, src net.Conn, dst net.Conn, mocks chan<- *models.Mock, opts models.OutgoingOptions) error {
logger := m.logger.With(zap.Any("Client IP Address", src.RemoteAddr().String()), zap.Any("Client ConnectionID", util.GetNextID()), zap.Any("Destination ConnectionID", util.GetNextID()))
logger := m.logger.With(zap.Any("Client IP Address", src.RemoteAddr().String()), zap.Any("Client ConnectionID", ctx.Value(models.ClientConnectionIDKey).(string)), zap.Any("Destination ConnectionID", ctx.Value(models.DestConnectionIDKey).(string)))

err := encodeMySQL(ctx, logger, src, dst, mocks, opts)
if err != nil {
Expand Down
52 changes: 48 additions & 4 deletions pkg/core/proxy/integrations/postgres/v1/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net"
"strings"
"time"

"go.keploy.io/server/v2/pkg/core/proxy/integrations"
Expand All @@ -19,8 +20,10 @@ import (
func decodePostgres(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientConn net.Conn, dstCfg *integrations.ConditionalDstCfg, mockDb integrations.MockMemDb, _ models.OutgoingOptions) error {
pgRequests := [][]byte{reqBuf}
errCh := make(chan error, 1)
defer close(errCh)

go func(errCh chan error, pgRequests [][]byte) {
// close should be called from the producer of the channel
defer close(errCh)
for {
// Since protocol packets have to be parsed for checking stream end,
// clientConnection have deadline for read to determine the end of stream.
Expand All @@ -34,12 +37,11 @@ func decodePostgres(ctx context.Context, logger *zap.Logger, reqBuf []byte, clie
for {
buffer, err := pUtil.ReadBytes(ctx, logger, clientConn)
if err != nil {
if netErr, ok := err.(net.Error); !(ok && netErr.Timeout()) && err != nil {
if netErr, ok := err.(net.Error); !(ok && netErr.Timeout()) {
if err == io.EOF {
logger.Debug("EOF error received from client. Closing conn in postgres !!")
errCh <- err
}
//TODO: why debug log sarthak?
logger.Debug("failed to read the request message in proxy for postgres dependency")
errCh <- err
}
Expand All @@ -55,7 +57,6 @@ func decodePostgres(ctx context.Context, logger *zap.Logger, reqBuf []byte, clie
logger.Debug("the postgres request buffer is empty")
continue
}

matched, pgResponses, err := matchingReadablePG(ctx, logger, pgRequests, mockDb)
if err != nil {
errCh <- fmt.Errorf("error while matching tcs mocks %v", err)
Expand Down Expand Up @@ -97,3 +98,46 @@ func decodePostgres(ctx context.Context, logger *zap.Logger, reqBuf []byte, clie
return err
}
}

type QueryData struct {
PrepIdentifier string `json:"PrepIdentifier" yaml:"PrepIdentifier"`
Query string `json:"Query" yaml:"Query"`
}

type PrepMap map[string][]QueryData

type TestPrepMap map[string][]QueryData

func getRecordPrepStatement(allMocks []*models.Mock) PrepMap {
preparedstatement := make(PrepMap)
for _, v := range allMocks {
if v.Kind != "Postgres" {
continue
}
for _, req := range v.Spec.PostgresRequests {
var querydata []QueryData
psMap := make(map[string]string)
if len(req.PacketTypes) > 0 && req.PacketTypes[0] != "p" && req.Identfier != "StartupRequest" {
p := 0
for _, header := range req.PacketTypes {
if header == "P" {
if strings.Contains(req.Parses[p].Name, "S_") {
psMap[req.Parses[p].Query] = req.Parses[p].Name
querydata = append(querydata, QueryData{PrepIdentifier: req.Parses[p].Name,
Query: req.Parses[p].Query,
})

}
p++
}
}
}
// also append the query data for the prepared statement
if len(querydata) > 0 {
preparedstatement[v.ConnectionID] = append(preparedstatement[v.ConnectionID], querydata...)
}
}

}
return preparedstatement
}
32 changes: 18 additions & 14 deletions pkg/core/proxy/integrations/postgres/v1/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@ import (
)

func encodePostgres(ctx context.Context, logger *zap.Logger, reqBuf []byte, clientConn, destConn net.Conn, mocks chan<- *models.Mock, _ models.OutgoingOptions) error {
//closing the destination conn
defer func(destConn net.Conn) {
err := destConn.Close()
if err != nil {
utils.LogError(logger, err, "failed to close the destination connection")
}
}(destConn)

logger.Debug("Inside the encodePostgresOutgoing function")
var pgRequests []models.Backend
Expand Down Expand Up @@ -79,28 +72,35 @@ func encodePostgres(ctx context.Context, logger *zap.Logger, reqBuf []byte, clie

clientBuffChan := make(chan []byte)
destBuffChan := make(chan []byte)
errChan := make(chan error)
defer close(clientBuffChan)
defer close(destBuffChan)
defer close(errChan)
errChan := make(chan error, 1)

//get the error group from the context
g := ctx.Value(models.ErrGroupKey).(*errgroup.Group)

// read requests from client
g.Go(func() error {
defer utils.Recover(logger)
defer close(clientBuffChan)
pUtil.ReadBuffConn(ctx, logger, clientConn, clientBuffChan, errChan)
return nil
})

// read responses from destination
g.Go(func() error {
defer utils.Recover(logger)
defer close(destBuffChan)
pUtil.ReadBuffConn(ctx, logger, destConn, destBuffChan, errChan)
return nil
})

go func() {
err := g.Wait()
if err != nil {
logger.Info("error group is returning an error", zap.Error(err))
}
close(errChan)
}()

prevChunkWasReq := false
logger.Debug("the iteration for the pg request starts", zap.Any("pgReqs", len(pgRequests)), zap.Any("pgResps", len(pgResponses)))

Expand All @@ -125,6 +125,7 @@ func encodePostgres(ctx context.Context, logger *zap.Logger, reqBuf []byte, clie
ResTimestampMock: resTimestampMock,
Metadata: metadata,
},
ConnectionID: ctx.Value(models.ClientConnectionIDKey).(string),
}
return ctx.Err()
}
Expand Down Expand Up @@ -153,6 +154,7 @@ func encodePostgres(ctx context.Context, logger *zap.Logger, reqBuf []byte, clie
ResTimestampMock: resTimestampMock,
Metadata: metadata,
},
ConnectionID: ctx.Value(models.ClientConnectionIDKey).(string),
}
pgRequests = []models.Backend{}
pgResponses = []models.Frontend{}
Expand Down Expand Up @@ -295,17 +297,19 @@ func encodePostgres(ctx context.Context, logger *zap.Logger, reqBuf []byte, clie
}
if pg.FrontendWrapper.MsgType == 'C' {
pg.FrontendWrapper.CommandComplete = *msg.(*pgproto3.CommandComplete)
// empty the command tag
pg.FrontendWrapper.CommandComplete.CommandTag = []byte{}
pg.FrontendWrapper.CommandCompletes = append(pg.FrontendWrapper.CommandCompletes, pg.FrontendWrapper.CommandComplete)
}
if pg.FrontendWrapper.DataRow.RowValues != nil {
if pg.FrontendWrapper.MsgType == 'D' && pg.FrontendWrapper.DataRow.RowValues != nil {
// Create a new slice for each DataRow
valuesCopy := make([]string, len(pg.FrontendWrapper.DataRow.RowValues))
copy(valuesCopy, pg.FrontendWrapper.DataRow.RowValues)

row := pgproto3.DataRow{
RowValues: valuesCopy, // Use the copy of the values
Values: pg.FrontendWrapper.DataRow.Values,
}
// fmt.Println("row is ", row)
dataRows = append(dataRows, row)
}
}
Expand Down Expand Up @@ -362,7 +366,7 @@ func encodePostgres(ctx context.Context, logger *zap.Logger, reqBuf []byte, clie
if err != nil {
logger.Debug("failed to decode the response message in proxy for postgres dependency", zap.Error(err))
}
if (len(afterEncoded) != len(buffer) && pgMock.PacketTypes[0] != "R") || len(pgMock.DataRows) > 0 {
if len(afterEncoded) != len(buffer) && pgMock.PacketTypes[0] != "R" {
logger.Debug("the length of the encoded buffer is not equal to the length of the original buffer", zap.Any("after_encoded", len(afterEncoded)), zap.Any("buffer", len(buffer)))
pgMock.Payload = bufStr
}
Expand Down
Loading

0 comments on commit c64ab91

Please sign in to comment.