Skip to content

Commit

Permalink
Fix tfirst tracking for interleaved runs of same transaction
Browse files Browse the repository at this point in the history
Since the connection instance tracks the last seen RUN's SUCCESS'
tfirst, a subsequent run of the same transaction overwrites the
tfirst of partially consumed results of a previous run.

Having each stream track their own tfirst avoids this problem.

Note: this only happens to Bolt4, since it is not possible to pull
partial results with Bolt3 (only PULALL is available).
  • Loading branch information
fbiville committed Mar 14, 2023
1 parent 7a11b08 commit 62bae2d
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 12 deletions.
10 changes: 4 additions & 6 deletions neo4j/internal/bolt/bolt3.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ type bolt3 struct {
connId string
logId string
serverVersion string
tfirst int64 // Time that server started streaming
pendingTx *internalTx3 // Stashed away when tx started explcitly
bookmark string // Last bookmark
birthDate time.Time
Expand Down Expand Up @@ -455,15 +454,14 @@ func (b *bolt3) run(cypher string, params map[string]interface{}, tx *internalTx
if b.err != nil {
return nil, b.err
}
b.tfirst = succ.tfirst
b.currStream = &stream{keys: succ.fields, tfirst: succ.tfirst}
// Change state to streaming
if b.state == bolt3_ready {
b.state = bolt3_streaming
} else {
b.state = bolt3_streamingtx
}

b.currStream = &stream{keys: succ.fields}
return b.currStream, nil
}

Expand Down Expand Up @@ -609,14 +607,14 @@ func (b *bolt3) receiveNext() (*db.Record, *db.Summary, error) {
b.bookmark = sum.Bookmark
}
}
b.currStream.sum = sum
b.currStream = nil
// Add some extras to the summary
sum.Agent = b.serverVersion
sum.Major = 3
sum.Minor = b.minor
sum.ServerName = b.serverName
sum.TFirst = b.tfirst
sum.TFirst = b.currStream.tfirst
b.currStream.sum = sum
b.currStream = nil
return nil, sum, nil
case *db.Neo4jError:
b.err = x
Expand Down
9 changes: 3 additions & 6 deletions neo4j/internal/bolt/bolt4.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ type bolt4 struct {
connId string
logId string
serverVersion string
tfirst int64 // Time that server started streaming
pendingTx internalTx4 // Stashed away when tx started explicitly
hasPendingTx bool
bookmark string // Last bookmark
Expand Down Expand Up @@ -615,17 +614,15 @@ func (b *bolt4) run(cypher string, params map[string]interface{}, fetchSize int,
// pull message as well, this will be cleaned up by Reset
return nil, b.err
}
// Extract the RUN response from success response
b.tfirst = succ.tfirst
// Create a stream representation, set it to current and track it
stream := &stream{keys: succ.fields, qid: succ.qid, fetchSize: fetchSize, tfirst: succ.tfirst}
// Change state to streaming
if b.state == bolt4_ready {
b.state = bolt4_streaming
} else {
b.state = bolt4_streamingtx
}

// Create a stream representation, set it to current and track it
stream := &stream{keys: succ.fields, qid: succ.qid, fetchSize: fetchSize}
b.streams.attach(stream)
// No need to check streams state, we know we are streaming

Expand Down Expand Up @@ -829,7 +826,7 @@ func (b *bolt4) receiveNext() (*db.Record, bool, *db.Summary) {
sum.Major = 4
sum.Minor = b.minor
sum.ServerName = b.serverName
sum.TFirst = b.tfirst
sum.TFirst = b.streams.curr.tfirst
if len(sum.Bookmark) > 0 {
b.bookmark = sum.Bookmark
}
Expand Down
33 changes: 33 additions & 0 deletions neo4j/internal/bolt/bolt4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,4 +1000,37 @@ func TestBolt4(ot *testing.T) {
assertBoltState(t, bolt4_failed, bolt)
AssertError(t, err)
})

ot.Run("tracks tfirst properly", func(t *testing.T) {
bolt, cleanup := connectToServer(t, func(srv *bolt4server) {
srv.accept(4)
srv.waitForTxBegin()
srv.sendSuccess(nil)
srv.waitForRun(nil)
srv.waitForPullN(1)
srv.send(msgSuccess, map[string]interface{}{"fields": []interface{}{"x"}, "t_first": int64(10)})
srv.send(msgRecord, []interface{}{"1"})
srv.send(msgSuccess, map[string]interface{}{"has_more": true})
srv.waitForRun(nil)
srv.waitForPullN(-1)
srv.send(msgSuccess, map[string]interface{}{"fields": []interface{}{"x"}, "t_first": int64(20)})
srv.send(msgRecord, []interface{}{"3"})
srv.send(msgRecord, []interface{}{"4"})
srv.send(msgSuccess, map[string]interface{}{"bookmark": "b2", "type": "r"})
srv.send(msgRecord, []interface{}{"2"})
srv.send(msgSuccess, map[string]interface{}{"bookmark": "b1", "type": "r"})
})
defer cleanup()
defer bolt.Close()

tx1, _ := bolt.TxBegin(db.TxConfig{Mode: db.ReadMode})
results1, _ := bolt.RunTx(tx1, db.Command{Cypher: "UNWIND [1,2] AS x RETURN x", FetchSize: 1})
results2, _ := bolt.RunTx(tx1, db.Command{Cypher: "UNWIND [3,4] AS x RETURN x", FetchSize: -1})
summary2, _ := bolt.Consume(results2)
summary1, _ := bolt.Consume(results1)

AssertIntEqual(t, int(summary1.TFirst), 10)
AssertIntEqual(t, int(summary2.TFirst), 20)
})

}
1 change: 1 addition & 0 deletions neo4j/internal/bolt/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type stream struct {
qid int64
fetchSize int
key int64
tfirst int64 // Time that server started streaming
}

// Acts on buffered data, first return value indicates if buffering
Expand Down

0 comments on commit 62bae2d

Please sign in to comment.