From 85701ebd0362e71b424af5a177282637ac42a820 Mon Sep 17 00:00:00 2001 From: anjor Date: Thu, 8 Aug 2024 08:34:41 +0100 Subject: [PATCH 1/8] add max links --- cmd-car-split.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index c2833412..059372b8 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -48,6 +48,8 @@ const ( "\x67" + "version" + // 1, we call this v0 due to the nul-identity CID being an open question: https://github.com/ipld/go-car/issues/26#issuecomment-604299576 "\x01" + + maxLinks = 20 ) type subsetInfo struct { @@ -243,7 +245,7 @@ func newCmd_SplitCar() *cli.Command { dagSize += owm.RawSectionSize() } - if currentFile == nil || currentFileSize+int64(dagSize) > maxFileSize { + if currentFile == nil || currentFileSize+int64(dagSize) > maxFileSize || len(currentSubsetInfo.blockLinks) > maxLinks { err := createNewFile() if err != nil { return fmt.Errorf("failed to create a new file: %w", err) From ded9b24a6d92a286f18bda84cdf7ca7c9bb5fa31 Mon Sep 17 00:00:00 2001 From: anjor Date: Thu, 8 Aug 2024 10:39:17 +0100 Subject: [PATCH 2/8] max links --- cmd-car-split.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index 059372b8..cb3c4405 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -49,7 +49,7 @@ const ( // 1, we call this v0 due to the nul-identity CID being an open question: https://github.com/ipld/go-car/issues/26#issuecomment-604299576 "\x01" - maxLinks = 20 + maxLinks = 432000 / 18 // 18 subsets ) type subsetInfo struct { From 41089cf523dfc38c6308e6b43a3f4b3006f2de97 Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Wed, 14 Aug 2024 23:21:50 +0200 Subject: [PATCH 3/8] Update metrics (#147) * Remove /health and /metrics req logging; closes #127 * Move metrics to metrics package * Prometheus for index and car lookups; closes #126 * Cleanup metrics; closes #128 --- cmd-rpc.go | 11 ++-- http-range.go | 14 +++++ metrics.go | 147 --------------------------------------------- metrics/metrics.go | 90 +++++++++++++++++++++++++++ multiepoch.go | 85 ++++++++++++++++++++++---- 5 files changed, 184 insertions(+), 163 deletions(-) delete mode 100644 metrics.go create mode 100644 metrics/metrics.go diff --git a/cmd-rpc.go b/cmd-rpc.go index 56488562..36fa8129 100644 --- a/cmd-rpc.go +++ b/cmd-rpc.go @@ -15,6 +15,7 @@ import ( "github.com/allegro/bigcache/v3" "github.com/fsnotify/fsnotify" hugecache "github.com/rpcpool/yellowstone-faithful/huge-cache" + "github.com/rpcpool/yellowstone-faithful/metrics" splitcarfetcher "github.com/rpcpool/yellowstone-faithful/split-car-fetcher" "github.com/ryanuber/go-glob" "github.com/urfave/cli/v2" @@ -202,13 +203,13 @@ func newCmd_rpc() *cli.Command { return nil }() if err != nil { - metrics_epochsAvailable.WithLabelValues(fmt.Sprintf("%d", epochNum)).Set(0) + metrics.EpochsAvailable.WithLabelValues(fmt.Sprintf("%d", epochNum)).Set(0) klog.Error(err) numFailed.Add(1) // NOTE: DO NOT return the error here, as we want to continue loading other epochs return nil } - metrics_epochsAvailable.WithLabelValues(fmt.Sprintf("%d", epochNum)).Set(1) + metrics.EpochsAvailable.WithLabelValues(fmt.Sprintf("%d", epochNum)).Set(1) numSucceeded.Add(1) return nil }) @@ -275,7 +276,7 @@ func newCmd_rpc() *cli.Command { return } klog.V(2).Infof("Epoch %d added/replaced in %s", epoch.Epoch(), time.Since(startedAt)) - metrics_epochsAvailable.WithLabelValues(fmt.Sprintf("%d", epoch.Epoch())).Set(1) + metrics.EpochsAvailable.WithLabelValues(fmt.Sprintf("%d", epoch.Epoch())).Set(1) } case fsnotify.Create: { @@ -298,7 +299,7 @@ func newCmd_rpc() *cli.Command { return } klog.V(2).Infof("Epoch %d added in %s", epoch.Epoch(), time.Since(startedAt)) - metrics_epochsAvailable.WithLabelValues(fmt.Sprintf("%d", epoch.Epoch())).Set(1) + metrics.EpochsAvailable.WithLabelValues(fmt.Sprintf("%d", epoch.Epoch())).Set(1) } case fsnotify.Remove: { @@ -310,7 +311,7 @@ func newCmd_rpc() *cli.Command { klog.Errorf("error removing epoch for config file %q: %s", event.Name, err.Error()) } klog.V(2).Infof("Epoch %d removed in %s", epNumber, time.Since(startedAt)) - metrics_epochsAvailable.WithLabelValues(fmt.Sprintf("%d", epNumber)).Set(0) + metrics.EpochsAvailable.WithLabelValues(fmt.Sprintf("%d", epNumber)).Set(0) } case fsnotify.Rename: klog.V(3).Infof("File %q was renamed; do nothing", event.Name) diff --git a/http-range.go b/http-range.go index 2b948b8d..8eebcf2b 100644 --- a/http-range.go +++ b/http-range.go @@ -2,9 +2,11 @@ package main import ( "io" + "path/filepath" "strings" "time" + "github.com/rpcpool/yellowstone-faithful/metrics" "k8s.io/klog/v2" ) @@ -43,6 +45,15 @@ func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) { // if has suffix .index, then it's an index file if strings.HasSuffix(r.name, ".index") { prefix = icon + azureBG("[READ-INDEX]") + // get the index name, which is the part before the .index suffix, after the last . + indexName := strings.TrimSuffix(r.name, ".index") + // split the index name by . and get the last part + byDot := strings.Split(indexName, ".") + if len(byDot) > 0 { + indexName = byDot[len(byDot)-1] + } + // TODO: distinguish between remote and local index reads + metrics.IndexLookupHistogram.WithLabelValues(indexName).Observe(float64(took.Seconds())) } // if has suffix .car, then it's a car file if strings.HasSuffix(r.name, ".car") || r.isSplitCar { @@ -51,6 +62,9 @@ func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) { } else { prefix = icon + purpleBG("[READ-CAR]") } + carName := filepath.Base(r.name) + // TODO: distinguish between remote and local index reads + metrics.CarLookupHistogram.WithLabelValues(carName).Observe(float64(took.Seconds())) } klog.V(5).Infof(prefix+" %s:%d+%d (%s)\n", (r.name), off, len(p), took) } diff --git a/metrics.go b/metrics.go deleted file mode 100644 index 2194023c..00000000 --- a/metrics.go +++ /dev/null @@ -1,147 +0,0 @@ -package main - -import ( - "runtime/debug" - "time" - - "github.com/prometheus/client_golang/prometheus" -) - -// - RPC requests by method (counter) -// - Epochs available epoch_available{epoch="200"} = 1 -// - status_code -// - miner ids -// - source type (ipfs/bitwarden/s3/etc) -// - response time histogram - -func init() { - prometheus.MustRegister(metrics_RpcRequestByMethod) - prometheus.MustRegister(metrics_epochsAvailable) - prometheus.MustRegister(metrics_statusCode) - prometheus.MustRegister(metrics_methodToCode) - prometheus.MustRegister(metrics_methodToSuccessOrFailure) - prometheus.MustRegister(metrics_methodToNumProxied) - prometheus.MustRegister(metrics_responseTimeHistogram) -} - -var metrics_RpcRequestByMethod = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "rpc_requests_by_method", - Help: "RPC requests by method", - }, - []string{"method"}, -) - -var metrics_epochsAvailable = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "epoch_available", - Help: "Epochs available", - }, - []string{"epoch"}, -) - -var metrics_statusCode = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "status_code", - Help: "Status code", - }, - []string{"code"}, -) - -var metrics_methodToCode = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "method_to_code", - Help: "Method to code", - }, - []string{"method", "code"}, -) - -var metrics_methodToSuccessOrFailure = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "method_to_success_or_failure", - Help: "Method to success or failure", - }, - []string{"method", "status"}, -) - -var metrics_methodToNumProxied = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "method_to_num_proxied", - Help: "Method to num proxied", - }, - []string{"method"}, -) - -var metrics_responseTimeHistogram = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "response_time_histogram", - Help: "Response time histogram", - }, - []string{"method"}, -) - -// - Version information of this binary -var metrics_version = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "version", - Help: "Version information of this binary", - }, - []string{"started_at", "tag", "commit", "compiler", "goarch", "goos", "goamd64", "vcs", "vcs_revision", "vcs_time", "vcs_modified"}, -) - -func init() { - // Add an entry to the metric with the version information. - labeledValues := map[string]string{ - "started_at": StartedAt.Format(time.RFC3339), - "tag": GitTag, - "commit": GitCommit, - "compiler": "", - "goarch": "", - "goos": "", - "goamd64": "", - "vcs": "", - "vcs_revision": "", - "vcs_time": "", - "vcs_modified": "", - } - if info, ok := debug.ReadBuildInfo(); ok { - for _, setting := range info.Settings { - if isAnyOf(setting.Key, - "-compiler", - "GOARCH", - "GOOS", - "GOAMD64", - "vcs", - "vcs.revision", - "vcs.time", - "vcs.modified", - ) { - switch setting.Key { - case "-compiler": - labeledValues["compiler"] = setting.Value - case "GOARCH": - labeledValues["goarch"] = setting.Value - case "GOOS": - labeledValues["goos"] = setting.Value - case "GOAMD64": - labeledValues["goamd64"] = setting.Value - case "vcs": - labeledValues["vcs"] = setting.Value - case "vcs.revision": - labeledValues["vcs_revision"] = setting.Value - case "vcs.time": - labeledValues["vcs_time"] = setting.Value - case "vcs.modified": - labeledValues["vcs_modified"] = setting.Value - } - } - } - } - metrics_version.With(labeledValues).Set(1) -} - -var StartedAt = time.Now() - -func GetUptime() time.Duration { - return time.Since(StartedAt) -} diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 00000000..c0e6a293 --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,90 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var RpcRequestByMethod = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "rpc_requests_by_method", + Help: "RPC requests by method", + }, + []string{"method"}, +) + +var EpochsAvailable = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "epoch_available", + Help: "Epochs available", + }, + []string{"epoch"}, +) + +var StatusCode = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "status_code", + Help: "Status code", + }, + []string{"code"}, +) + +var MethodToCode = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "method_to_code", + Help: "Method to code", + }, + []string{"method", "code"}, +) + +var MethodToSuccessOrFailure = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "method_to_success_or_failure", + Help: "Method to success or failure", + }, + []string{"method", "status"}, +) + +var MethodToNumProxied = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "method_to_num_proxied", + Help: "Method to num proxied", + }, + []string{"method"}, +) + +// - Version information of this binary +var Version = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "version", + Help: "Version information of this binary", + }, + []string{"started_at", "tag", "commit", "compiler", "goarch", "goos", "goamd64", "vcs", "vcs_revision", "vcs_time", "vcs_modified"}, +) + +var IndexLookupHistogram = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "index_lookup_latency_histogram", + Help: "Index lookup latency", + Buckets: prometheus.ExponentialBuckets(0.000001, 10, 10), + }, + []string{"index_type"}, +) + +var CarLookupHistogram = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "car_lookup_latency_histogram", + Help: "Car lookup latency", + Buckets: prometheus.ExponentialBuckets(0.000001, 10, 10), + }, + []string{"car"}, +) + +var RpcResponseLatencyHistogram = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "rpc_response_latency_histogram", + Help: "RPC response latency histogram", + Buckets: prometheus.ExponentialBuckets(0.000001, 10, 10), + }, + []string{"rpc_method"}, +) diff --git a/multiepoch.go b/multiepoch.go index 1a3a27a7..344a92da 100644 --- a/multiepoch.go +++ b/multiepoch.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net/http" + "runtime/debug" "sort" "strings" "sync" @@ -15,6 +16,7 @@ import ( "github.com/libp2p/go-reuseport" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" + "github.com/rpcpool/yellowstone-faithful/metrics" old_faithful_grpc "github.com/rpcpool/yellowstone-faithful/old-faithful-proto/old-faithful-grpc" "github.com/sourcegraph/jsonrpc2" "github.com/valyala/fasthttp" @@ -276,21 +278,25 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx } klog.Infof("Will proxy unhandled RPC methods to %q", addr) } + metricsHandler := fasthttpadaptor.NewFastHTTPHandler(promhttp.Handler()) return func(reqCtx *fasthttp.RequestCtx) { startedAt := time.Now() reqID := randomRequestID() var method string = "" defer func() { - klog.V(2).Infof("[%s] request %q took %s", reqID, sanitizeMethod(method), time.Since(startedAt)) - metrics_statusCode.WithLabelValues(fmt.Sprint(reqCtx.Response.StatusCode())).Inc() - metrics_responseTimeHistogram.WithLabelValues(sanitizeMethod(method)).Observe(time.Since(startedAt).Seconds()) + if method == "/metrics" || method == "/health" { + return + } + took := time.Since(startedAt) + klog.V(2).Infof("[%s] request %q took %s", reqID, sanitizeMethod(method), took) + metrics.StatusCode.WithLabelValues(fmt.Sprint(reqCtx.Response.StatusCode())).Inc() + metrics.RpcResponseLatencyHistogram.WithLabelValues(sanitizeMethod(method)).Observe(took.Seconds()) }() { // handle the /metrics endpoint if string(reqCtx.Path()) == "/metrics" { method = "/metrics" - handler := fasthttpadaptor.NewFastHTTPHandler(promhttp.Handler()) - handler(reqCtx) + metricsHandler(reqCtx) return } { @@ -350,9 +356,9 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx return } method = rpcRequest.Method - metrics_RpcRequestByMethod.WithLabelValues(sanitizeMethod(method)).Inc() + metrics.RpcRequestByMethod.WithLabelValues(sanitizeMethod(method)).Inc() defer func() { - metrics_methodToCode.WithLabelValues(sanitizeMethod(method), fmt.Sprint(reqCtx.Response.StatusCode())).Inc() + metrics.MethodToCode.WithLabelValues(sanitizeMethod(method), fmt.Sprint(reqCtx.Response.StatusCode())).Inc() }() klog.V(2).Infof("[%s] method=%q", reqID, sanitizeMethod(method)) @@ -370,7 +376,7 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx body, reqID, ) - metrics_methodToNumProxied.WithLabelValues(sanitizeMethod(method)).Inc() + metrics.MethodToNumProxied.WithLabelValues(sanitizeMethod(method)).Inc() return } @@ -403,7 +409,7 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx klog.Errorf("[%s] failed to handle %q: %v", reqID, sanitizeMethod(method), err) } if errorResp != nil { - metrics_methodToSuccessOrFailure.WithLabelValues(sanitizeMethod(method), "failure").Inc() + metrics.MethodToSuccessOrFailure.WithLabelValues(sanitizeMethod(method), "failure").Inc() if proxy != nil && lsConf.ProxyConfig.ProxyFailedRequests { klog.Warningf("[%s] Failed local method %q, proxying to %q", reqID, rpcRequest.Method, proxy.Addr) // proxy the request to the target @@ -416,7 +422,7 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx body, reqID, ) - metrics_methodToNumProxied.WithLabelValues(sanitizeMethod(method)).Inc() + metrics.MethodToNumProxied.WithLabelValues(sanitizeMethod(method)).Inc() return } else { if errors.Is(err, ErrNotFound) { @@ -436,7 +442,7 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx } return } - metrics_methodToSuccessOrFailure.WithLabelValues(sanitizeMethod(method), "success").Inc() + metrics.MethodToSuccessOrFailure.WithLabelValues(sanitizeMethod(method), "success").Inc() } } @@ -539,3 +545,60 @@ func (ser *MultiEpoch) handleRequest(ctx context.Context, conn *requestContext, }, fmt.Errorf("method not found") } } + +func init() { + // Add an entry to the metric with the version information. + labeledValues := map[string]string{ + "started_at": StartedAt.Format(time.RFC3339), + "tag": GitTag, + "commit": GitCommit, + "compiler": "", + "goarch": "", + "goos": "", + "goamd64": "", + "vcs": "", + "vcs_revision": "", + "vcs_time": "", + "vcs_modified": "", + } + if info, ok := debug.ReadBuildInfo(); ok { + for _, setting := range info.Settings { + if isAnyOf(setting.Key, + "-compiler", + "GOARCH", + "GOOS", + "GOAMD64", + "vcs", + "vcs.revision", + "vcs.time", + "vcs.modified", + ) { + switch setting.Key { + case "-compiler": + labeledValues["compiler"] = setting.Value + case "GOARCH": + labeledValues["goarch"] = setting.Value + case "GOOS": + labeledValues["goos"] = setting.Value + case "GOAMD64": + labeledValues["goamd64"] = setting.Value + case "vcs": + labeledValues["vcs"] = setting.Value + case "vcs.revision": + labeledValues["vcs_revision"] = setting.Value + case "vcs.time": + labeledValues["vcs_time"] = setting.Value + case "vcs.modified": + labeledValues["vcs_modified"] = setting.Value + } + } + } + } + metrics.Version.With(labeledValues).Set(1) +} + +var StartedAt = time.Now() + +func GetUptime() time.Duration { + return time.Since(StartedAt) +} From ab34b149b73a124f1ea7927204d46c5d9ede8252 Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Thu, 15 Aug 2024 16:19:46 +0200 Subject: [PATCH 4/8] Fix metrics to not tie them to loglevel (#149) * Remove /health and /metrics req logging; closes #127 * Move metrics to metrics package * Prometheus for index and car lookups; closes #126 * Cleanup metrics; closes #128 * Report latency histograms regardless of log level Previously the latency histograms were tied to verbosity level which meant that they were not published unless using v=5. This makes the metrics always visible. --- http-range.go | 46 +++++++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/http-range.go b/http-range.go index 8eebcf2b..4033455e 100644 --- a/http-range.go +++ b/http-range.go @@ -32,6 +32,30 @@ func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) { startedAt := time.Now() defer func() { took := time.Since(startedAt) + var isIndex, isCar bool + + // Metrics want to always report + // if has suffix .index, then it's an index file + if strings.HasSuffix(r.name, ".index") { + isIndex = true + // get the index name, which is the part before the .index suffix, after the last . + indexName := strings.TrimSuffix(r.name, ".index") + // split the index name by . and get the last part + byDot := strings.Split(indexName, ".") + if len(byDot) > 0 { + indexName = byDot[len(byDot)-1] + } + // TODO: distinguish between remote and local index reads + metrics.IndexLookupHistogram.WithLabelValues(indexName).Observe(float64(took.Seconds())) + } + // if has suffix .car, then it's a car file + if strings.HasSuffix(r.name, ".car") || r.isSplitCar { + isCar = true + carName := filepath.Base(r.name) + // TODO: distinguish between remote and local index reads + metrics.CarLookupHistogram.WithLabelValues(carName).Observe(float64(took.Seconds())) + } + if klog.V(5).Enabled() { var icon string if r.isRemote { @@ -41,31 +65,19 @@ func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) { // add disk icon icon = "💾 " } + prefix := icon + "[READ-UNKNOWN]" - // if has suffix .index, then it's an index file - if strings.HasSuffix(r.name, ".index") { + if isIndex { prefix = icon + azureBG("[READ-INDEX]") - // get the index name, which is the part before the .index suffix, after the last . - indexName := strings.TrimSuffix(r.name, ".index") - // split the index name by . and get the last part - byDot := strings.Split(indexName, ".") - if len(byDot) > 0 { - indexName = byDot[len(byDot)-1] - } - // TODO: distinguish between remote and local index reads - metrics.IndexLookupHistogram.WithLabelValues(indexName).Observe(float64(took.Seconds())) - } - // if has suffix .car, then it's a car file - if strings.HasSuffix(r.name, ".car") || r.isSplitCar { + } else if isCar { + if r.isSplitCar { prefix = icon + azureBG("[READ-SPLIT-CAR]") } else { prefix = icon + purpleBG("[READ-CAR]") } - carName := filepath.Base(r.name) - // TODO: distinguish between remote and local index reads - metrics.CarLookupHistogram.WithLabelValues(carName).Observe(float64(took.Seconds())) } + klog.V(5).Infof(prefix+" %s:%d+%d (%s)\n", (r.name), off, len(p), took) } }() From 513161db2d0837d7842e56b9d5bb6689060de052 Mon Sep 17 00:00:00 2001 From: Anjor Kanekar Date: Fri, 16 Aug 2024 15:39:03 +0100 Subject: [PATCH 5/8] set root in header (#148) * set root in header * finish --- cmd-car-split.go | 53 ++++++++++++++++++++++++++--------------------- cmd-merge-cars.go | 21 ++++++++++++++++++- 2 files changed, 49 insertions(+), 25 deletions(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index c2833412..30c16cf8 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -16,6 +16,8 @@ import ( commp "github.com/filecoin-project/go-fil-commp-hashhash" "github.com/filecoin-project/go-leb128" "github.com/ipfs/go-cid" + "github.com/ipld/go-car" + carv2 "github.com/ipld/go-car/v2" "github.com/ipld/go-ipld-prime/codec/dagcbor" "github.com/ipld/go-ipld-prime/datamodel" "github.com/ipld/go-ipld-prime/fluent/qp" @@ -30,25 +32,7 @@ import ( "k8s.io/klog/v2" ) -const ( - nulRootCarHeader = "\x19" + // 25 bytes of CBOR (encoded as varint :cryingbear: ) - // map with 2 keys - "\xA2" + - // text-key with length 5 - "\x65" + "roots" + - // 1 element array - "\x81" + - // tag 42 - "\xD8\x2A" + - // bytes with length 5 - "\x45" + - // nul-identity-cid prefixed with \x00 as required in DAG-CBOR: https://github.com/ipld/specs/blob/master/block-layer/codecs/dag-cbor.md#links - "\x00\x01\x55\x00\x00" + - // text-key with length 7 - "\x67" + "version" + - // 1, we call this v0 due to the nul-identity CID being an open question: https://github.com/ipld/go-car/issues/26#issuecomment-604299576 - "\x01" -) +var CBOR_SHA256_DUMMY_CID = cid.MustParse("bafyreics5uul5lbtxslcigtoa5fkba7qgwu7cyb7ih7z6fzsh4lgfgraau") type subsetInfo struct { fileName string @@ -171,13 +155,19 @@ func newCmd_SplitCar() *cli.Command { return fmt.Errorf("failed to calculate commitment to cid: %w", err) } - carFiles = append(carFiles, carFile{name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum), commP: commCid, payloadCid: sl.(cidlink.Link).Cid, paddedSize: ps, fileSize: currentFileSize}) + cf := carFile{name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum), commP: commCid, payloadCid: sl.(cidlink.Link).Cid, paddedSize: ps, fileSize: currentFileSize} + carFiles = append(carFiles, cf) err = closeFile(bufferedWriter, currentFile) if err != nil { return fmt.Errorf("failed to close file: %w", err) } + err = carv2.ReplaceRootsInFile(cf.name, []cid.Cid{cf.payloadCid}) + if err != nil { + return fmt.Errorf("failed to replace root: %w", err) + } + cp.Reset() } @@ -192,8 +182,11 @@ func newCmd_SplitCar() *cli.Command { writer = io.MultiWriter(bufferedWriter, cp) // Write the header - _, err = io.WriteString(writer, nulRootCarHeader) - if err != nil { + hdr := car.CarHeader{ + Roots: []cid.Cid{CBOR_SHA256_DUMMY_CID}, // placeholder + Version: 1, + } + if err := car.WriteHeader(&hdr, writer); err != nil { return fmt.Errorf("failed to write header: %w", err) } @@ -316,7 +309,18 @@ func newCmd_SplitCar() *cli.Command { return fmt.Errorf("failed to calculate commitment to cid: %w", err) } - carFiles = append(carFiles, carFile{name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum), commP: commCid, payloadCid: sl.(cidlink.Link).Cid, paddedSize: ps, fileSize: currentFileSize}) + cf := carFile{name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum), commP: commCid, payloadCid: sl.(cidlink.Link).Cid, paddedSize: ps, fileSize: currentFileSize} + carFiles = append(carFiles, cf) + + err = closeFile(bufferedWriter, currentFile) + if err != nil { + return fmt.Errorf("failed to close file: %w", err) + } + + err = carv2.ReplaceRootsInFile(cf.name, []cid.Cid{cf.payloadCid}) + if err != nil { + return fmt.Errorf("failed to replace root: %w", err) + } f, err := os.Create(meta) defer f.Close() @@ -340,7 +344,8 @@ func newCmd_SplitCar() *cli.Command { }) } - return closeFile(bufferedWriter, currentFile) + return nil + }, } } diff --git a/cmd-merge-cars.go b/cmd-merge-cars.go index 9abbf181..6cf3b2fc 100644 --- a/cmd-merge-cars.go +++ b/cmd-merge-cars.go @@ -10,7 +10,26 @@ import ( "github.com/urfave/cli/v2" ) -const varintSize = 10 +const ( + varintSize = 10 + nulRootCarHeader = "\x19" + // 25 bytes of CBOR (encoded as varint :cryingbear: ) + // map with 2 keys + "\xA2" + + // text-key with length 5 + "\x65" + "roots" + + // 1 element array + "\x81" + + // tag 42 + "\xD8\x2A" + + // bytes with length 5 + "\x45" + + // nul-identity-cid prefixed with \x00 as required in DAG-CBOR: https://github.com/ipld/specs/blob/master/block-layer/codecs/dag-cbor.md#links + "\x00\x01\x55\x00\x00" + + // text-key with length 7 + "\x67" + "version" + + // 1, we call this v0 due to the nul-identity CID being an open question: https://github.com/ipld/go-car/issues/26#issuecomment-604299576 + "\x01" +) func newCmd_MergeCars() *cli.Command { var outputFile string From b38daaedc54f59382d5bbe53725f43e12836acd5 Mon Sep 17 00:00:00 2001 From: anjor Date: Thu, 8 Aug 2024 08:34:41 +0100 Subject: [PATCH 6/8] add max links --- cmd-car-split.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index 30c16cf8..4dae762b 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -190,8 +190,12 @@ func newCmd_SplitCar() *cli.Command { return fmt.Errorf("failed to write header: %w", err) } + hs, err := car.HeaderSize(&hdr) + if err != nil { + return fmt.Errorf("failed to get header size: %w", err) + } // Set the currentFileSize to the size of the header - currentFileSize = int64(len(nulRootCarHeader)) + currentFileSize = int64(hs) currentSubsetInfo = subsetInfo{fileName: filename, firstSlot: -1, lastSlot: -1} return nil } @@ -236,7 +240,7 @@ func newCmd_SplitCar() *cli.Command { dagSize += owm.RawSectionSize() } - if currentFile == nil || currentFileSize+int64(dagSize) > maxFileSize { + if currentFile == nil || currentFileSize+int64(dagSize) > maxFileSize || len(currentSubsetInfo.blockLinks) > maxLinks { err := createNewFile() if err != nil { return fmt.Errorf("failed to create a new file: %w", err) From 9de7a45dd1ed441e8e8d10f197f9a2f2007c198f Mon Sep 17 00:00:00 2001 From: anjor Date: Thu, 8 Aug 2024 10:39:17 +0100 Subject: [PATCH 7/8] max links --- cmd-car-split.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd-car-split.go b/cmd-car-split.go index 4dae762b..f9c2c7d7 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -34,6 +34,8 @@ import ( var CBOR_SHA256_DUMMY_CID = cid.MustParse("bafyreics5uul5lbtxslcigtoa5fkba7qgwu7cyb7ih7z6fzsh4lgfgraau") +const maxLinks = 432000 / 18 // 18 subsets + type subsetInfo struct { fileName string firstSlot int From eb4461620d3237f8e6d729b8e5cdc92338f0681e Mon Sep 17 00:00:00 2001 From: anjor Date: Mon, 2 Sep 2024 15:56:59 +0100 Subject: [PATCH 8/8] merge --- cmd-car-split.go | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/cmd-car-split.go b/cmd-car-split.go index 3c203c7a..f9c2c7d7 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -32,33 +32,9 @@ import ( "k8s.io/klog/v2" ) -<<<<<<< HEAD var CBOR_SHA256_DUMMY_CID = cid.MustParse("bafyreics5uul5lbtxslcigtoa5fkba7qgwu7cyb7ih7z6fzsh4lgfgraau") const maxLinks = 432000 / 18 // 18 subsets -======= -const ( - nulRootCarHeader = "\x19" + // 25 bytes of CBOR (encoded as varint :cryingbear: ) - // map with 2 keys - "\xA2" + - // text-key with length 5 - "\x65" + "roots" + - // 1 element array - "\x81" + - // tag 42 - "\xD8\x2A" + - // bytes with length 5 - "\x45" + - // nul-identity-cid prefixed with \x00 as required in DAG-CBOR: https://github.com/ipld/specs/blob/master/block-layer/codecs/dag-cbor.md#links - "\x00\x01\x55\x00\x00" + - // text-key with length 7 - "\x67" + "version" + - // 1, we call this v0 due to the nul-identity CID being an open question: https://github.com/ipld/go-car/issues/26#issuecomment-604299576 - "\x01" - - maxLinks = 432000 / 18 // 18 subsets -) ->>>>>>> ded9b24a6d92a286f18bda84cdf7ca7c9bb5fa31 type subsetInfo struct { fileName string