Skip to content

Commit

Permalink
Merge branch 'streamingfast:feature/parquet' into feature/parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
fschoell authored Nov 11, 2024
2 parents f0a81d2 + 86f931a commit 13ed98e
Show file tree
Hide file tree
Showing 25 changed files with 846 additions and 192 deletions.
80 changes: 39 additions & 41 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,65 +1,63 @@
# Substreams sink files
# Substreams Sink Files

## Description
`substreams-sink-files` is a tool that allows developers to pipe data extracted from a blockchain into various file base format like CSV, JSONL & Parquet.

`substreams-sink-files` is a tool that allows developers to pipe data extracted from a blockchain into various types of local or Cloud files-based persistence solutions.
## Usage

## Prerequisites
Install `substreams-sink-files` by using the pre-built binary release [available in the official GitHub repository](https://github.com/streamingfast/substreams-sink-files/releases) or install using brew by doing `brew install streamingfast/tap/substreams-sink-files`.

- A Substreams module prepared for a files-sink
- Cloud-based file storage mechanism (optional)

## Installation
The sink supports different Substreams output module's type, here a jump list of today's supported formats:

Install `substreams-sink-files` by using the pre-built binary release [available in the official GitHub repository](https://github.com/streamingfast/substreams-sink-files/releases).
- [Line-based CSV](#jsonl-csv-and-any-other-line-based-format)
- [Line-based JSONL](#jsonl-csv-and-any-other-line-based-format)
- [Arbitrary Protobuf to JSON]
- Parquet

Extract `substreams-sink-files` into a folder and ensure this folder is referenced globally via your `PATH` environment variable.
### JSONL, CSV and any other line based format

## Using the `substreams-sink-files` tool
The sink supports an output type [sf.substreams.sink.files.v1.Lines](./proto/sf/substreams/sink/files/v1/files.proto) that can handle any line format, the Substreams being responsible of transforming blocks into lines of the format of your choice. The [sf.substreams.sink.files.v1.Lines](./proto/sf/substreams/sink/files/v1/files.proto) [documentation found on this link](https://github.com/streamingfast/substreams-sink-files/blob/feature/parquet/proto/sf/substreams/sink/files/v1/files.proto#L13-L26) gives further details about the format.

The `run` command is the primary way to work with the `substreams-sink-files` tool. The command for your project will resemble the following:
The [Substreams Ethereum Token Transfers example](https://github.com/streamingfast/substreams-eth-token-transfers/blob/develop/src/lib.rs#L31-L46) can be used as an example, it showcases both JSONL and CSV output format:

> [!NOTE]
> Change output module `jsonl_out` below to `csv_out` to test CSV output
```bash
substreams-sink-files run \
mainnet.eth.streamingfast.io:443 \
substreams-sink-files run mainnet.eth.streamingfast.io:443 \
https://github.com/streamingfast/substreams-eth-token-transfers/releases/download/v0.4.0/substreams-eth-token-transfers-v0.4.0.spkg \
jsonl_out \
./localdata/out \
10_000_000:+100_000 \
--encoder=lines \
--file-working-dir="./localdata/working" \
--state-store=./localdata/working/state.yaml \
10_000_000:+100_000
--file-block-count=10000
```

> **Note** We use a custom range here `10_000_000:+100_000` because there is no ERC20/ERC721/ERC1155 until a long time in the chain.
Output resembling the following will be printed to the terminal window for properly issued commands and a properly set up and configured Substreams module.
This will run the Substreams, processes 100 000 blocks and at each bundle of 10 000 blocks, will produce a file containing the lines output by the module, resulting in the files on disk:

```bash
2023-06-16T11:23:52.342-0400 INFO (substreams-sink-files) starting prometheus metrics server {"listen_addr": "localhost:9102"}
2023-06-16T11:23:52.343-0400 INFO (substreams-sink-files) sink to files {"file_output_path": "./localdata/out", "file_working_dir": "./localdata/working", "encoder_type": "lines", "state_store": "./localdata/working/state.yaml", "blocks_per_file": 10000, "buffer_max_size": 67108864}
2023-06-16T11:23:52.343-0400 INFO (substreams-sink-files) sinker from CLI {"endpoint": "mainnet.eth.streamingfast.io:443", "manifest_path": "https://github.com/streamingfast/substreams-eth-token-transfers/releases/download/v0.4.0/substreams-eth-token-transfers-v0.4.0.spkg", "output_module_name": "jsonl_out", "expected_module_type": "<Ignored>", "block_range": "10_000_000:+100_000"}
2023-06-16T11:23:52.343-0400 INFO (substreams-sink-files) reading substreams manifest {"manifest_path": "https://github.com/streamingfast/substreams-eth-token-transfers/releases/download/v0.4.0/substreams-eth-token-transfers-v0.4.0.spkg"}
2023-06-16T11:23:52.343-0400 INFO (substreams-sink-files) starting pprof server {"listen_addr": "localhost:6060"}
2023-06-16T11:23:52.660-0400 INFO (substreams-sink-files) validating output module {"module_name": "jsonl_out"}
2023-06-16T11:23:52.660-0400 INFO (substreams-sink-files) validating output module type {"module_name": "jsonl_out", "module_type": "proto:substreams.sink.files.v1.Lines"}
2023-06-16T11:23:52.663-0400 INFO (substreams-sink-files) sinker configured {"mode": "Production", "module_count": 3, "output_module_name": "jsonl_out", "output_module_type": "proto:substreams.sink.files.v1.Lines", "output_module_hash": "0d94c2c7662fbe04923c43d9f8732e0858f7af37", "client_config": "mainnet.eth.streamingfast.io:443 (insecure: false, plaintext: false, JWT present: true)", "buffer": true, "block_range": "[10000000, 10100000)", "infinite_retry": false, "final_blocks_only": false, "liveness_checker": true}
2023-06-16T11:23:52.666-0400 INFO (substreams-sink-files) ready, waiting for signal to quit
2023-06-16T11:23:52.667-0400 INFO (substreams-sink-files) starting new file boundary {"boundary": "[10000000, 10010000)"}
2023-06-16T11:23:52.684-0400 INFO (substreams-sink-files) boundary started {"boundary": "[10000000, 10010000)"}
2023-06-16T11:23:52.684-0400 INFO (substreams-sink-files) starting file sink {"restarting_at": "#10009999 (601b697795b7435dcb3f661aeea877fae4e3b534044a5940497b0a04a8845621)"}
2023-06-16T11:23:52.684-0400 INFO (substreams-sink-files) starting sinker {"stats_refresh_each": "15s", "restarting_at": "#10009999 (601b697795b7435dcb3f661aeea877fae4e3b534044a5940497b0a04a8845621)", "end_at": "#1374390772024"}
2023-06-16T11:23:52.801-0400 INFO (substreams-sink-files) session initialized with remote endpoint {"trace_id": "8fd18de5b6acb648867f4b2828bee602"}
2023-06-16T11:23:53.054-0400 INFO (substreams-sink-files) block_num is not in active boundary {"active_boundary": "[10000000, 10010000)", "boundaries_to_skip": 0, "block_num": 10010000}
2023-06-16T11:23:53.054-0400 INFO (substreams-sink-files) stopping file boundary
2023-06-16T11:23:53.054-0400 INFO (substreams-sink-files) all data from range is in memory, no need to flush
2023-06-16T11:23:53.055-0400 INFO (substreams-sink-files) queuing boundary upload {"boundary": "[10000000, 10010000)"}
2023-06-16T11:23:53.056-0400 INFO (substreams-sink-files) bundler stats {"file_count": 1, "boundary": "[10000000, 10010000)", "boundary_process_duration": "371.596208ms", "upload_duration": "0s", "data_process_duration": "0s", "avg_upload_dur": 0, "total_upload_dur": 0, "avg_boundary_process_dur": 0.371596208, "total_boundary_process_dur": 0.371596208, "avg_data_process_dur": 0, "total_data_process_dur": 0}
2023-06-16T11:23:53.056-0400 INFO (substreams-sink-files) starting new file boundary {"boundary": "[10010000, 10020000)"}
2023-06-16T11:23:53.058-0400 INFO (substreams-sink-files) boundary uploaded {"boundary": "[10000000, 10010000)", "output_path": "localdata/out/0010000000-0010010000.jsonl"}
2023-06-16T11:23:53.060-0400 INFO (substreams-sink-files) boundary started {"boundary": "[10010000, 10020000)"}
./localdata/out
├── 0010000000-0010010000.jsonl
├── 0010010000-0010020000.jsonl
├── 0010020000-0010030000.jsonl
├── 0010030000-0010040000.jsonl
├── 0010040000-0010050000.jsonl
├── 0010050000-0010060000.jsonl
├── 0010060000-0010070000.jsonl
├── 0010070000-0010080000.jsonl
├── 0010080000-0010090000.jsonl
└── 0010090000-0010100000.jsonl
```

With example of file content:

```json
$ cat localdata/out/0010000000-0010010000.jsonl| head -n1
{"schema":"erc20","trx_hash":"1f17943d5dd7053959f1dc092dfad60a7caa084224212b1adbecaf3137efdfdd","log_index":0,"from":"876eabf441b2ee5b5b0554fd502a8e0600950cfa","to":"566021352eb2f882538bf8d59e5d2ba741b9ec7a","quantity":"95073600000000000000","operator":"","token_id":""}
```

## Documentation

### Cursors

When you use Substreams, it sends back a block to a consumer using an opaque cursor. This cursor points to the exact location within the blockchain where the block is. In case your connection terminates or the process restarts, upon re-connection, Substreams sends back the cursor of the last written bundle in the request so that the stream of data can be resumed exactly where it left off and data integrity is maintained.
Expand Down
6 changes: 5 additions & 1 deletion bundler/writer/parquet_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,14 @@ func (p *ParquetWriter) EncodeMapModule(output *pbsubstreamsrpc.MapModuleOutput)
panic(fmt.Errorf("rows buffer must be initialized on StartBoundary, code is wrong"))
}

_, err = rowsBuffer.WriteRows(rows)
n, err := rowsBuffer.WriteRows(rows)
if err != nil {
return fmt.Errorf("writing rows to buffer: %w", err)
}

if n != len(rows) {
return fmt.Errorf("expected to write %d rows, but wrote %d", len(rows), n)
}
}

return nil
Expand Down
12 changes: 10 additions & 2 deletions cmd/substreams-sink-files/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/jhump/protoreflect/desc"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/streamingfast/cli"
. "github.com/streamingfast/cli"
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/derr"
Expand Down Expand Up @@ -100,6 +101,15 @@ func syncRunE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("new sinker: %w", err)
}

if blockRange := sinker.BlockRange(); blockRange.EndBlock() != nil {
size, err := sinker.BlockRange().Size()
if err != nil {
panic(fmt.Errorf("size should error only on open ended range, which we should have caught earlier: %w", err))
}

cli.Ensure(size >= blocksPerFile, "You requested %d blocks per file but your block range spans only %d blocks, this would produce 0 file, refusing to start", blocksPerFile, size)
}

fileOutputStore, err := dstore.NewStore(fileOutputPath, "", "", false)
if err != nil {
return fmt.Errorf("new store %q: %w", fileOutputPath, err)
Expand Down Expand Up @@ -132,8 +142,6 @@ func syncRunE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("output module message descriptor: %w", err)
}

// tables := protox.ParquetFindTablesInMessageDescriptor(msgDesc)

parquetWriter, err := writer.NewParquetWriter(msgDesc)
if err != nil {
return fmt.Errorf("new parquet writer: %w", err)
Expand Down
17 changes: 11 additions & 6 deletions encoder/lines.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ package encoder

import (
"fmt"
"reflect"
"unsafe"

"github.com/golang/protobuf/proto"
"github.com/streamingfast/substreams-sink-files/bundler/writer"
pbsinkfiles "github.com/streamingfast/substreams-sink-files/pb/sf/substreams/sink/files/v1"
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
"google.golang.org/protobuf/proto"
)

type LinesEncoder struct {
Expand All @@ -20,10 +19,10 @@ func NewLineEncoder() *LinesEncoder {

func (l *LinesEncoder) EncodeTo(output *pbsubstreamsrpc.MapModuleOutput, writer writer.Writer) error {
// FIXME: Improve by using a customized probably decoder, maybe Vitess, or we could
// even create our own which should be quiter simpler and could even reduce allocations
// even create our own which should be quite simpler and could even reduce allocations
lines := &pbsinkfiles.Lines{}
if err := proto.Unmarshal(output.GetMapOutput().Value, lines); err != nil {
return fmt.Errorf("failed to unmarhsal lines: %w", err)
return fmt.Errorf("failed to unmarshal lines: %w", err)
}

for _, line := range lines.Lines {
Expand All @@ -36,7 +35,13 @@ func (l *LinesEncoder) EncodeTo(output *pbsubstreamsrpc.MapModuleOutput, writer

// unsafeGetBytes get the `[]byte` value out of a string without an allocation that `[]byte(s)` does.
//
// See https://stackoverflow.com/a/68195226/697930 and the post in general for background
// See https://stackoverflow.com/a/74658905/697930 and the post in general for background
func unsafeGetBytes(s string) []byte {
return unsafe.Slice((*byte)(unsafe.Pointer((*reflect.StringHeader)(unsafe.Pointer(&s)).Data)), len(s))
// The unsafe.StringData don't accepts empty strings, we handle that case before
if len(s) == 0 {
return nil
}

stringDataPtr := unsafe.StringData(s)
return unsafe.Slice(stringDataPtr, len(s))
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (
github.com/goccy/go-json v0.10.3 // indirect
github.com/google/flatbuffers v24.3.25+incompatible // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/holiman/uint256 v1.3.1 // indirect
github.com/huandu/go-sqlbuilder v1.27.3 // indirect
github.com/huandu/xstrings v1.4.0 // indirect
github.com/iancoleman/strcase v0.3.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
github.com/holiman/uint256 v1.3.1 h1:JfTzmih28bittyHM8z360dCjIA9dbPIBlcTI6lmctQs=
github.com/holiman/uint256 v1.3.1/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E=
github.com/huandu/go-assert v1.1.6 h1:oaAfYxq9KNDi9qswn/6aE0EydfxSa+tWZC1KabNitYs=
github.com/huandu/go-assert v1.1.6/go.mod h1:JuIfbmYG9ykwvuxoJ3V8TB5QP+3+ajIA54Y44TmkMxs=
github.com/huandu/go-sqlbuilder v1.27.3 h1:cNVF9vQP4i7rTk6XXJIEeMbGkZbxfjcITeJzobJK44k=
Expand Down
Loading

0 comments on commit 13ed98e

Please sign in to comment.