Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Max links #142

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 37 additions & 26 deletions cmd-car-split.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
commp "github.com/filecoin-project/go-fil-commp-hashhash"
"github.com/filecoin-project/go-leb128"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/fluent/qp"
Expand All @@ -30,25 +32,9 @@ import (
"k8s.io/klog/v2"
)

const (
nulRootCarHeader = "\x19" + // 25 bytes of CBOR (encoded as varint :cryingbear: )
// map with 2 keys
"\xA2" +
// text-key with length 5
"\x65" + "roots" +
// 1 element array
"\x81" +
// tag 42
"\xD8\x2A" +
// bytes with length 5
"\x45" +
// nul-identity-cid prefixed with \x00 as required in DAG-CBOR: https://github.com/ipld/specs/blob/master/block-layer/codecs/dag-cbor.md#links
"\x00\x01\x55\x00\x00" +
// text-key with length 7
"\x67" + "version" +
// 1, we call this v0 due to the nul-identity CID being an open question: https://github.com/ipld/go-car/issues/26#issuecomment-604299576
"\x01"
)
var CBOR_SHA256_DUMMY_CID = cid.MustParse("bafyreics5uul5lbtxslcigtoa5fkba7qgwu7cyb7ih7z6fzsh4lgfgraau")

const maxLinks = 432000 / 18 // 18 subsets

type subsetInfo struct {
fileName string
Expand Down Expand Up @@ -171,13 +157,19 @@ func newCmd_SplitCar() *cli.Command {
return fmt.Errorf("failed to calculate commitment to cid: %w", err)
}

carFiles = append(carFiles, carFile{name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum), commP: commCid, payloadCid: sl.(cidlink.Link).Cid, paddedSize: ps, fileSize: currentFileSize})
cf := carFile{name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum), commP: commCid, payloadCid: sl.(cidlink.Link).Cid, paddedSize: ps, fileSize: currentFileSize}
carFiles = append(carFiles, cf)

err = closeFile(bufferedWriter, currentFile)
if err != nil {
return fmt.Errorf("failed to close file: %w", err)
}

err = carv2.ReplaceRootsInFile(cf.name, []cid.Cid{cf.payloadCid})
if err != nil {
return fmt.Errorf("failed to replace root: %w", err)
}

cp.Reset()
}

Expand All @@ -192,13 +184,20 @@ func newCmd_SplitCar() *cli.Command {
writer = io.MultiWriter(bufferedWriter, cp)

// Write the header
_, err = io.WriteString(writer, nulRootCarHeader)
if err != nil {
hdr := car.CarHeader{
Roots: []cid.Cid{CBOR_SHA256_DUMMY_CID}, // placeholder
Version: 1,
}
if err := car.WriteHeader(&hdr, writer); err != nil {
return fmt.Errorf("failed to write header: %w", err)
}

hs, err := car.HeaderSize(&hdr)
if err != nil {
return fmt.Errorf("failed to get header size: %w", err)
}
// Set the currentFileSize to the size of the header
currentFileSize = int64(len(nulRootCarHeader))
currentFileSize = int64(hs)
currentSubsetInfo = subsetInfo{fileName: filename, firstSlot: -1, lastSlot: -1}
return nil
}
Expand Down Expand Up @@ -243,7 +242,7 @@ func newCmd_SplitCar() *cli.Command {
dagSize += owm.RawSectionSize()
}

if currentFile == nil || currentFileSize+int64(dagSize) > maxFileSize {
if currentFile == nil || currentFileSize+int64(dagSize) > maxFileSize || len(currentSubsetInfo.blockLinks) > maxLinks {
err := createNewFile()
if err != nil {
return fmt.Errorf("failed to create a new file: %w", err)
Expand Down Expand Up @@ -316,7 +315,18 @@ func newCmd_SplitCar() *cli.Command {
return fmt.Errorf("failed to calculate commitment to cid: %w", err)
}

carFiles = append(carFiles, carFile{name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum), commP: commCid, payloadCid: sl.(cidlink.Link).Cid, paddedSize: ps, fileSize: currentFileSize})
cf := carFile{name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum), commP: commCid, payloadCid: sl.(cidlink.Link).Cid, paddedSize: ps, fileSize: currentFileSize}
carFiles = append(carFiles, cf)

err = closeFile(bufferedWriter, currentFile)
if err != nil {
return fmt.Errorf("failed to close file: %w", err)
}

err = carv2.ReplaceRootsInFile(cf.name, []cid.Cid{cf.payloadCid})
if err != nil {
return fmt.Errorf("failed to replace root: %w", err)
}

f, err := os.Create(meta)
defer f.Close()
Expand All @@ -340,7 +350,8 @@ func newCmd_SplitCar() *cli.Command {
})
}

return closeFile(bufferedWriter, currentFile)
return nil

},
}
}
Expand Down
21 changes: 20 additions & 1 deletion cmd-merge-cars.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,26 @@ import (
"github.com/urfave/cli/v2"
)

const varintSize = 10
const (
varintSize = 10
nulRootCarHeader = "\x19" + // 25 bytes of CBOR (encoded as varint :cryingbear: )
// map with 2 keys
"\xA2" +
// text-key with length 5
"\x65" + "roots" +
// 1 element array
"\x81" +
// tag 42
"\xD8\x2A" +
// bytes with length 5
"\x45" +
// nul-identity-cid prefixed with \x00 as required in DAG-CBOR: https://github.com/ipld/specs/blob/master/block-layer/codecs/dag-cbor.md#links
"\x00\x01\x55\x00\x00" +
// text-key with length 7
"\x67" + "version" +
// 1, we call this v0 due to the nul-identity CID being an open question: https://github.com/ipld/go-car/issues/26#issuecomment-604299576
"\x01"
)

func newCmd_MergeCars() *cli.Command {
var outputFile string
Expand Down
11 changes: 6 additions & 5 deletions cmd-rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/allegro/bigcache/v3"
"github.com/fsnotify/fsnotify"
hugecache "github.com/rpcpool/yellowstone-faithful/huge-cache"
"github.com/rpcpool/yellowstone-faithful/metrics"
splitcarfetcher "github.com/rpcpool/yellowstone-faithful/split-car-fetcher"
"github.com/ryanuber/go-glob"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -202,13 +203,13 @@ func newCmd_rpc() *cli.Command {
return nil
}()
if err != nil {
metrics_epochsAvailable.WithLabelValues(fmt.Sprintf("%d", epochNum)).Set(0)
metrics.EpochsAvailable.WithLabelValues(fmt.Sprintf("%d", epochNum)).Set(0)
klog.Error(err)
numFailed.Add(1)
// NOTE: DO NOT return the error here, as we want to continue loading other epochs
return nil
}
metrics_epochsAvailable.WithLabelValues(fmt.Sprintf("%d", epochNum)).Set(1)
metrics.EpochsAvailable.WithLabelValues(fmt.Sprintf("%d", epochNum)).Set(1)
numSucceeded.Add(1)
return nil
})
Expand Down Expand Up @@ -275,7 +276,7 @@ func newCmd_rpc() *cli.Command {
return
}
klog.V(2).Infof("Epoch %d added/replaced in %s", epoch.Epoch(), time.Since(startedAt))
metrics_epochsAvailable.WithLabelValues(fmt.Sprintf("%d", epoch.Epoch())).Set(1)
metrics.EpochsAvailable.WithLabelValues(fmt.Sprintf("%d", epoch.Epoch())).Set(1)
}
case fsnotify.Create:
{
Expand All @@ -298,7 +299,7 @@ func newCmd_rpc() *cli.Command {
return
}
klog.V(2).Infof("Epoch %d added in %s", epoch.Epoch(), time.Since(startedAt))
metrics_epochsAvailable.WithLabelValues(fmt.Sprintf("%d", epoch.Epoch())).Set(1)
metrics.EpochsAvailable.WithLabelValues(fmt.Sprintf("%d", epoch.Epoch())).Set(1)
}
case fsnotify.Remove:
{
Expand All @@ -310,7 +311,7 @@ func newCmd_rpc() *cli.Command {
klog.Errorf("error removing epoch for config file %q: %s", event.Name, err.Error())
}
klog.V(2).Infof("Epoch %d removed in %s", epNumber, time.Since(startedAt))
metrics_epochsAvailable.WithLabelValues(fmt.Sprintf("%d", epNumber)).Set(0)
metrics.EpochsAvailable.WithLabelValues(fmt.Sprintf("%d", epNumber)).Set(0)
}
case fsnotify.Rename:
klog.V(3).Infof("File %q was renamed; do nothing", event.Name)
Expand Down
36 changes: 31 additions & 5 deletions http-range.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package main

import (
"io"
"path/filepath"
"strings"
"time"

"github.com/rpcpool/yellowstone-faithful/metrics"
"k8s.io/klog/v2"
)

Expand All @@ -30,6 +32,30 @@ func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) {
startedAt := time.Now()
defer func() {
took := time.Since(startedAt)
var isIndex, isCar bool

// Metrics want to always report
// if has suffix .index, then it's an index file
if strings.HasSuffix(r.name, ".index") {
isIndex = true
// get the index name, which is the part before the .index suffix, after the last .
indexName := strings.TrimSuffix(r.name, ".index")
// split the index name by . and get the last part
byDot := strings.Split(indexName, ".")
if len(byDot) > 0 {
indexName = byDot[len(byDot)-1]
}
// TODO: distinguish between remote and local index reads
metrics.IndexLookupHistogram.WithLabelValues(indexName).Observe(float64(took.Seconds()))
}
// if has suffix .car, then it's a car file
if strings.HasSuffix(r.name, ".car") || r.isSplitCar {
isCar = true
carName := filepath.Base(r.name)
// TODO: distinguish between remote and local index reads
metrics.CarLookupHistogram.WithLabelValues(carName).Observe(float64(took.Seconds()))
}

if klog.V(5).Enabled() {
var icon string
if r.isRemote {
Expand All @@ -39,19 +65,19 @@ func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) {
// add disk icon
icon = "💾 "
}

prefix := icon + "[READ-UNKNOWN]"
// if has suffix .index, then it's an index file
if strings.HasSuffix(r.name, ".index") {
if isIndex {
prefix = icon + azureBG("[READ-INDEX]")
}
// if has suffix .car, then it's a car file
if strings.HasSuffix(r.name, ".car") || r.isSplitCar {
} else if isCar {

if r.isSplitCar {
prefix = icon + azureBG("[READ-SPLIT-CAR]")
} else {
prefix = icon + purpleBG("[READ-CAR]")
}
}

klog.V(5).Infof(prefix+" %s:%d+%d (%s)\n", (r.name), off, len(p), took)
}
}()
Expand Down
Loading
Loading