diff --git a/config/config_test.go b/config/config_test.go index 48a0fdd286..079fa91fce 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -153,6 +153,10 @@ func Test_Defaults(t *testing.T) { path: "Sequencer.StreamServer.Version", expectedValue: uint8(0), }, + { + path: "Sequencer.StreamServer.WriteTimeout", + expectedValue: types.NewDuration(5 * time.Second), + }, { path: "Sequencer.StreamServer.Enabled", expectedValue: false, diff --git a/config/default.go b/config/default.go index 9a06ad3fef..8e99b35e69 100644 --- a/config/default.go +++ b/config/default.go @@ -206,6 +206,7 @@ StateConsistencyCheckInterval = "5s" Port = 0 Filename = "" Version = 0 + WriteTimeout = "5s" Enabled = false [SequenceSender] diff --git a/config/environments/local/local.node.config.toml b/config/environments/local/local.node.config.toml index 237ab37129..2f50f96311 100644 --- a/config/environments/local/local.node.config.toml +++ b/config/environments/local/local.node.config.toml @@ -113,6 +113,7 @@ StateConsistencyCheckInterval = "5s" [Sequencer.StreamServer] Port = 0 Filename = "" + WriteTimeout = "5s" Enabled = false [SequenceSender] diff --git a/docs/config-file/node-config-doc.html b/docs/config-file/node-config-doc.html index 858b9034fd..a12f577869 100644 --- a/docs/config-file/node-config-doc.html +++ b/docs/config-file/node-config-doc.html @@ -62,7 +62,9 @@
"300ms"
Interval is the interval of time to calculate sequencer metrics
"1m"
"300ms"
-
EnableLog is a flag to enable/disable metrics logs
Port to listen on
Filename of the binary data file
Version of the binary data file
ChainID is the chain ID
Enabled is a flag to enable/disable the data streamer
UpgradeEtrogBatchNumber is the batch number of the upgrade etrog
XLayer config
PackBatchSpacialList is the list of addresses that will have a special gas price
GasPriceMultiple is the multiple of the gas price
InitGasPriceMultiple is the multiple of the gas price for init free gas tx
QueryPendingTxsLimit is used to limit amount txs from the db
WaitPeriodSendSequence is the time the sequencer waits until
trying to send a sequence to L1
"1m"
+
EnableLog is a flag to enable/disable metrics logs
Port to listen on
Filename of the binary data file
Version of the binary data file
ChainID is the chain ID
Enabled is a flag to enable/disable the data streamer
UpgradeEtrogBatchNumber is the batch number of the upgrade etrog
WriteTimeout is the TCP write timeout when sending data to a datastream client
"1m"
+
"300ms"
+
XLayer config
PackBatchSpacialList is the list of addresses that will have a special gas price
GasPriceMultiple is the multiple of the gas price
InitGasPriceMultiple is the multiple of the gas price for init free gas tx
QueryPendingTxsLimit is used to limit amount txs from the db
WaitPeriodSendSequence is the time the sequencer waits until
trying to send a sequence to L1
"1m"
"300ms"
LastBatchVirtualizationTimeMaxWaitPeriod is time since sequences should be sent
"1m"
"300ms"
diff --git a/docs/config-file/node-config-doc.md b/docs/config-file/node-config-doc.md
index 4ece9c91a1..dc55845efc 100644
--- a/docs/config-file/node-config-doc.md
+++ b/docs/config-file/node-config-doc.md
@@ -3473,6 +3473,7 @@ EnableLog=true
| - [Enabled](#Sequencer_StreamServer_Enabled ) | No | boolean | No | - | Enabled is a flag to enable/disable the data streamer |
| - [Log](#Sequencer_StreamServer_Log ) | No | object | No | - | Log is the log configuration |
| - [UpgradeEtrogBatchNumber](#Sequencer_StreamServer_UpgradeEtrogBatchNumber ) | No | integer | No | - | UpgradeEtrogBatchNumber is the batch number of the upgrade etrog |
+| - [WriteTimeout](#Sequencer_StreamServer_WriteTimeout ) | No | string | No | - | Duration |
#### 10.8.1. `Sequencer.StreamServer.Port`
@@ -3610,6 +3611,32 @@ Must be one of:
UpgradeEtrogBatchNumber=0
```
+#### 10.8.8. `Sequencer.StreamServer.WriteTimeout`
+
+**Title:** Duration
+
+**Type:** : `string`
+
+**Default:** `"5s"`
+
+**Description:** WriteTimeout is the TCP write timeout when sending data to a datastream client
+
+**Examples:**
+
+```json
+"1m"
+```
+
+```json
+"300ms"
+```
+
+**Example setting the default value** ("5s"):
+```
+[Sequencer.StreamServer]
+WriteTimeout="5s"
+```
+
### 10.9. `Sequencer.PackBatchSpacialList`
**Type:** : `array of string`
diff --git a/docs/config-file/node-config-schema.json b/docs/config-file/node-config-schema.json
index 41582e91f3..7f220848ec 100644
--- a/docs/config-file/node-config-schema.json
+++ b/docs/config-file/node-config-schema.json
@@ -1402,6 +1402,16 @@
"type": "integer",
"description": "UpgradeEtrogBatchNumber is the batch number of the upgrade etrog",
"default": 0
+ },
+ "WriteTimeout": {
+ "type": "string",
+ "title": "Duration",
+ "description": "WriteTimeout is the TCP write timeout when sending data to a datastream client",
+ "default": "5s",
+ "examples": [
+ "1m",
+ "300ms"
+ ]
}
},
"additionalProperties": false,
diff --git a/go.mod b/go.mod
index e465b3ebe9..27fd9fa514 100644
--- a/go.mod
+++ b/go.mod
@@ -3,7 +3,7 @@ module github.com/0xPolygonHermez/zkevm-node
go 1.21
require (
- github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240426122934-6f47d2485fc1
+ github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-RC4
github.com/didip/tollbooth/v6 v6.1.2
github.com/dop251/goja v0.0.0-20230806174421-c933cf95e127
github.com/ethereum/go-ethereum v1.13.11
diff --git a/go.sum b/go.sum
index 4be01dab15..77a22ca737 100644
--- a/go.sum
+++ b/go.sum
@@ -43,8 +43,8 @@ github.com/0xPolygon/agglayer v0.0.1 h1:J6/DUo9rNUncDifquanouRCo2g7g069yvz0aFtu7
github.com/0xPolygon/agglayer v0.0.1/go.mod h1:UYp5O8THULoXVrUfzkRjVBzxHR5DxBdUN/Iq0EgxNxM=
github.com/0xPolygon/cdk-data-availability v0.0.5 h1://vg1oR/5tw2XfEIorpP+wIxLfNUmoKrdmX8YZvBKX4=
github.com/0xPolygon/cdk-data-availability v0.0.5/go.mod h1:aGwqHiJhL+mJbdepl3s58wsY18EuViDa9vZCpPuIYGw=
-github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240426122934-6f47d2485fc1 h1:4wbCJOGcZ8BTuOfNFrcZ1cAVfTWaX1W9EYHaDx3imLc=
-github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240426122934-6f47d2485fc1/go.mod h1:0QkAXcFa92mFJrCbN3UPUJGJYes851yEgYHLONnaosE=
+github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-RC4 h1:+4K+xSzv0ImbK30B/T9FauNTrTFUmWcNKYhIgwsE4C4=
+github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-RC4/go.mod h1:0QkAXcFa92mFJrCbN3UPUJGJYes851yEgYHLONnaosE=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
diff --git a/sequencer/config.go b/sequencer/config.go
index 5f0857232c..35af7468f2 100644
--- a/sequencer/config.go
+++ b/sequencer/config.go
@@ -58,6 +58,8 @@ type StreamServerCfg struct {
Log log.Config `mapstructure:"Log"`
// UpgradeEtrogBatchNumber is the batch number of the upgrade etrog
UpgradeEtrogBatchNumber uint64 `mapstructure:"UpgradeEtrogBatchNumber"`
+ // WriteTimeout is the TCP write timeout when sending data to a datastream client
+ WriteTimeout types.Duration `mapstructure:"WriteTimeout"`
}
// FinalizerCfg contains the finalizer's configuration properties
diff --git a/sequencer/l2block.go b/sequencer/l2block.go
index 20945ea046..05aea9623b 100644
--- a/sequencer/l2block.go
+++ b/sequencer/l2block.go
@@ -245,7 +245,7 @@ func (f *finalizer) processL2Block(ctx context.Context, l2Block *L2Block) error
if subOverflow { // Sanity check, this cannot happen as reservedZKCounters should be >= that usedZKCounters
return fmt.Errorf("error subtracting L2 block %d [%d] needed resources from the batch %d, overflow resource: %s, batch bytes: %d, L2 block bytes: %d, counters: {batch: %s, used: %s, reserved: %s, needed: %s, high: %s}",
blockResponse.BlockNumber, l2Block.trackingNum, l2Block.batch.batchNumber, overflowResource, l2Block.batch.finalRemainingResources.Bytes, batchL2DataSize,
- f.logZKCounters(l2Block.batch.finalRemainingResources.ZKCounters), f.logZKCounters(batchResponse.UsedZkCounters), f.logZKCounters(batchResponse.ReservedZkCounters), f.logZKCounters(neededZKCounters), f.logZKCounters(l2Block.batch.imHighReservedZKCounters))
+ f.logZKCounters(l2Block.batch.finalRemainingResources.ZKCounters), f.logZKCounters(batchResponse.UsedZkCounters), f.logZKCounters(batchResponse.ReservedZkCounters), f.logZKCounters(neededZKCounters), f.logZKCounters(l2Block.batch.finalHighReservedZKCounters))
}
l2Block.batch.finalHighReservedZKCounters = newHighZKCounters
@@ -253,7 +253,7 @@ func (f *finalizer) processL2Block(ctx context.Context, l2Block *L2Block) error
} else {
overflowLog := fmt.Sprintf("L2 block %d [%d] needed resources exceeds the remaining batch %d resources, overflow resource: %s, batch bytes: %d, L2 block bytes: %d, counters: {batch: %s, used: %s, reserved: %s, needed: %s, high: %s}",
blockResponse.BlockNumber, l2Block.trackingNum, l2Block.batch.batchNumber, overflowResource, l2Block.batch.finalRemainingResources.Bytes, batchL2DataSize,
- f.logZKCounters(l2Block.batch.finalRemainingResources.ZKCounters), f.logZKCounters(batchResponse.UsedZkCounters), f.logZKCounters(batchResponse.ReservedZkCounters), f.logZKCounters(neededZKCounters), f.logZKCounters(l2Block.batch.imHighReservedZKCounters))
+ f.logZKCounters(l2Block.batch.finalRemainingResources.ZKCounters), f.logZKCounters(batchResponse.UsedZkCounters), f.logZKCounters(batchResponse.ReservedZkCounters), f.logZKCounters(neededZKCounters), f.logZKCounters(l2Block.batch.finalHighReservedZKCounters))
f.LogEvent(ctx, event.Level_Warning, event.EventID_ReservedZKCountersOverflow, overflowLog, nil)
diff --git a/sequencer/sequencer.go b/sequencer/sequencer.go
index f4bbd5259d..41f1be536c 100644
--- a/sequencer/sequencer.go
+++ b/sequencer/sequencer.go
@@ -17,7 +17,7 @@ import (
)
const (
- datastreamChannelMultiplier = 2
+ datastreamChannelBufferSize = 50
)
// Sequencer represents a sequencer
@@ -61,9 +61,7 @@ func New(cfg Config, batchCfg state.BatchConfig, poolCfg pool.Config, txPool txP
eventLog: eventLog,
}
- // TODO: Make configurable
- channelBufferSize := 200 * datastreamChannelMultiplier // nolint:gomnd
- sequencer.dataToStream = make(chan interface{}, channelBufferSize)
+ sequencer.dataToStream = make(chan interface{}, datastreamChannelBufferSize)
return sequencer, nil
}
@@ -83,7 +81,7 @@ func (s *Sequencer) Start(ctx context.Context) {
// Start stream server if enabled
if s.cfg.StreamServer.Enabled {
- s.streamServer, err = datastreamer.NewServer(s.cfg.StreamServer.Port, s.cfg.StreamServer.Version, s.cfg.StreamServer.ChainID, state.StreamTypeSequencer, s.cfg.StreamServer.Filename, &s.cfg.StreamServer.Log)
+ s.streamServer, err = datastreamer.NewServer(s.cfg.StreamServer.Port, s.cfg.StreamServer.Version, s.cfg.StreamServer.ChainID, state.StreamTypeSequencer, s.cfg.StreamServer.Filename, s.cfg.StreamServer.WriteTimeout.Duration, &s.cfg.StreamServer.Log)
if err != nil {
log.Fatalf("failed to create stream server, error: %v", err)
}
diff --git a/test/config/debug.node.config.toml b/test/config/debug.node.config.toml
index 6f036cefeb..8981773934 100644
--- a/test/config/debug.node.config.toml
+++ b/test/config/debug.node.config.toml
@@ -113,6 +113,7 @@ StateConsistencyCheckInterval = "5s"
Port = 6900
Filename = "/datastreamer/datastream.bin"
Version = 1
+ WriteTimeout = "5s"
Enabled = true
[SequenceSender]
diff --git a/test/config/test.node.config.toml b/test/config/test.node.config.toml
index c3bfca6070..6b5e69c8d4 100644
--- a/test/config/test.node.config.toml
+++ b/test/config/test.node.config.toml
@@ -138,6 +138,7 @@ StateConsistencyCheckInterval = "5s"
Filename = "/datastreamer/datastream.bin"
Version = 1
ChainID = 195
+ WriteTimeout = "5s"
Enabled = true
[SequenceSender]
diff --git a/tools/datastreamer/config/config.go b/tools/datastreamer/config/config.go
index 0acb225cf9..b6c841e591 100644
--- a/tools/datastreamer/config/config.go
+++ b/tools/datastreamer/config/config.go
@@ -7,6 +7,7 @@ import (
"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
"github.com/0xPolygonHermez/zkevm-data-streamer/log"
+ "github.com/0xPolygonHermez/zkevm-node/config/types"
"github.com/0xPolygonHermez/zkevm-node/db"
"github.com/0xPolygonHermez/zkevm-node/state/runtime/executor"
"github.com/mitchellh/mapstructure"
@@ -48,6 +49,8 @@ type StreamServerCfg struct {
Log log.Config `mapstructure:"Log"`
// UpgradeEtrogBatchNumber is the batch number of the upgrade etrog
UpgradeEtrogBatchNumber uint64 `mapstructure:"UpgradeEtrogBatchNumber"`
+ // WriteTimeout is the TCP write timeout when sending data to a datastream client
+ WriteTimeout types.Duration `mapstructure:"WriteTimeout"`
}
// Config is the configuration for the tool
diff --git a/tools/datastreamer/config/tool.config.toml b/tools/datastreamer/config/tool.config.toml
index c497f3362f..51976d8af8 100644
--- a/tools/datastreamer/config/tool.config.toml
+++ b/tools/datastreamer/config/tool.config.toml
@@ -7,6 +7,7 @@ Port = 6901
Filename = "datastream.bin"
Version = 1
ChainID = 1440
+WriteTimeout = "5s"
UpgradeEtrogBatchNumber = 0
[StateDB]
diff --git a/tools/datastreamer/main.go b/tools/datastreamer/main.go
index 975e4c7ecd..016d1834e5 100644
--- a/tools/datastreamer/main.go
+++ b/tools/datastreamer/main.go
@@ -159,7 +159,7 @@ func main() {
func initializeStreamServer(c *config.Config) (*datastreamer.StreamServer, error) {
// Create a stream server
- streamServer, err := datastreamer.NewServer(c.Offline.Port, c.Offline.Version, c.Offline.ChainID, state.StreamTypeSequencer, c.Offline.Filename, &c.Log)
+ streamServer, err := datastreamer.NewServer(c.Offline.Port, c.Offline.Version, c.Offline.ChainID, state.StreamTypeSequencer, c.Offline.Filename, c.Offline.WriteTimeout.Duration, &c.Log)
if err != nil {
return nil, err
}