From 0d2d80a095f89e507a067b158e6fd50e4f44a642 Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Tue, 26 Mar 2024 13:02:23 +0100 Subject: [PATCH] cleanups and fix example --- examples/gateway/graph/main.go | 4 +- gateway/backend.go | 45 +++++ gateway/backend_blocks.go | 50 +---- gateway/backend_graph.go | 133 +++++------- gateway/backend_graph_test.go | 302 ++++++++-------------------- gateway/backend_graph_traversal.go | 16 +- gateway/backend_graph_utils.go | 5 +- gateway/backend_graph_utils_test.go | 4 +- gateway/errors.go | 26 +++ gateway/remote_blocks_backend.go | 2 +- 10 files changed, 215 insertions(+), 372 deletions(-) diff --git a/examples/gateway/graph/main.go b/examples/gateway/graph/main.go index 9c2c04c3b..ec7a3582b 100644 --- a/examples/gateway/graph/main.go +++ b/examples/gateway/graph/main.go @@ -27,13 +27,13 @@ func main() { } defer (func() { _ = tp.Shutdown(ctx) })() - carFetcher, err := gateway.NewRemoteCarFetcher([]string{*gatewayUrlPtr}, nil) + carFetcher, err := gateway.NewRemoteCarFetcher([]string{*gatewayUrlPtr}) if err != nil { log.Fatal(err) } // Creates the gateway with the remote graph backend. - backend, err := gateway.NewGraphGatewayBackend(carFetcher) + backend, err := gateway.NewGraphBackend(carFetcher) if err != nil { log.Fatal(err) } diff --git a/gateway/backend.go b/gateway/backend.go index 280791c0d..c4db09798 100644 --- a/gateway/backend.go +++ b/gateway/backend.go @@ -10,11 +10,56 @@ import ( "github.com/ipfs/boxo/ipns" "github.com/ipfs/boxo/namesys" "github.com/ipfs/boxo/path" + "github.com/ipfs/boxo/path/resolver" "github.com/ipfs/go-cid" routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" "github.com/libp2p/go-libp2p/core/routing" + "github.com/prometheus/client_golang/prometheus" ) +type backendOptions struct { + ns namesys.NameSystem + vs routing.ValueStore + r resolver.Resolver + promRegistry prometheus.Registerer +} + +// WithNameSystem sets the name system to use with the different backends. If not set +// it will use the default DNSLink resolver generated by [NewDNSResolver] along +// with any configured [routing.ValueStore]. +func WithNameSystem(ns namesys.NameSystem) BackendOption { + return func(opts *backendOptions) error { + opts.ns = ns + return nil + } +} + +// WithValueStore sets the [routing.ValueStore] to use with the different backends. +func WithValueStore(vs routing.ValueStore) BackendOption { + return func(opts *backendOptions) error { + opts.vs = vs + return nil + } +} + +// WithResolver sets the [resolver.Resolver] to use with the different backends. +func WithResolver(r resolver.Resolver) BackendOption { + return func(opts *backendOptions) error { + opts.r = r + return nil + } +} + +// WithPrometheusRegistry sets the registry to use with [GraphBackend]. +func WithPrometheusRegistry(reg prometheus.Registerer) BackendOption { + return func(opts *backendOptions) error { + opts.promRegistry = reg + return nil + } +} + +type BackendOption func(options *backendOptions) error + // baseBackend contains some common backend functionalities that are shared by // different backend implementations. type baseBackend struct { diff --git a/gateway/backend_blocks.go b/gateway/backend_blocks.go index 94a2551b9..8eafe06af 100644 --- a/gateway/backend_blocks.go +++ b/gateway/backend_blocks.go @@ -16,7 +16,6 @@ import ( "github.com/ipfs/boxo/ipld/merkledag" ufile "github.com/ipfs/boxo/ipld/unixfs/file" uio "github.com/ipfs/boxo/ipld/unixfs/io" - "github.com/ipfs/boxo/namesys" "github.com/ipfs/boxo/path" "github.com/ipfs/boxo/path/resolver" blocks "github.com/ipfs/go-block-format" @@ -35,9 +34,7 @@ import ( "github.com/ipld/go-ipld-prime/traversal" "github.com/ipld/go-ipld-prime/traversal/selector" selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" - "github.com/libp2p/go-libp2p/core/routing" mc "github.com/multiformats/go-multicodec" - "github.com/prometheus/client_golang/prometheus" // Ensure basic codecs are registered. _ "github.com/ipld/go-ipld-prime/codec/cbor" @@ -57,51 +54,8 @@ type BlocksBackend struct { var _ IPFSBackend = (*BlocksBackend)(nil) -type blocksBackendOptions struct { - ns namesys.NameSystem - vs routing.ValueStore - r resolver.Resolver - promRegistry prometheus.Registerer -} - -// WithNameSystem sets the name system to use with the [BlocksBackend]. If not set -// it will use the default DNSLink resolver generated by [NewDNSResolver] along -// with any configured [routing.ValueStore]. -func WithNameSystem(ns namesys.NameSystem) BlocksBackendOption { - return func(opts *blocksBackendOptions) error { - opts.ns = ns - return nil - } -} - -// WithValueStore sets the [routing.ValueStore] to use with the [BlocksBackend]. -func WithValueStore(vs routing.ValueStore) BlocksBackendOption { - return func(opts *blocksBackendOptions) error { - opts.vs = vs - return nil - } -} - -// WithResolver sets the [resolver.Resolver] to use with the [BlocksBackend]. -func WithResolver(r resolver.Resolver) BlocksBackendOption { - return func(opts *blocksBackendOptions) error { - opts.r = r - return nil - } -} - -// WithPrometheusRegistry sets the registry to use for metrics collection. -func WithPrometheusRegistry(reg prometheus.Registerer) BlocksBackendOption { - return func(opts *blocksBackendOptions) error { - opts.promRegistry = reg - return nil - } -} - -type BlocksBackendOption func(options *blocksBackendOptions) error - -func NewBlocksBackend(blockService blockservice.BlockService, opts ...BlocksBackendOption) (*BlocksBackend, error) { - var compiledOptions blocksBackendOptions +func NewBlocksBackend(blockService blockservice.BlockService, opts ...BackendOption) (*BlocksBackend, error) { + var compiledOptions backendOptions for _, o := range opts { if err := o(&compiledOptions); err != nil { return nil, err diff --git a/gateway/backend_graph.go b/gateway/backend_graph.go index b230bb20a..30da4b52e 100644 --- a/gateway/backend_graph.go +++ b/gateway/backend_graph.go @@ -15,7 +15,6 @@ import ( "github.com/ipfs/boxo/ipld/merkledag" "github.com/ipfs/boxo/ipld/unixfs" "github.com/ipfs/boxo/path" - ipfspath "github.com/ipfs/boxo/path" "github.com/ipfs/boxo/path/resolver" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" @@ -39,45 +38,24 @@ import ( const GetBlockTimeout = time.Second * 60 -// type DataCallback = func(resource string, reader io.Reader) error -// TODO: Don't use a caboose type, perhaps ask them to use a type alias instead of a type -// type DataCallback = caboose.DataCallback type DataCallback func(resource string, reader io.Reader) error -// TODO: Don't use a caboose type -// type ErrPartialResponse = caboose.ErrPartialResponse - -// ErrPartialResponse can be returned from a DataCallback to indicate that some of the requested resource -// was successfully fetched, and that instead of retrying the full resource, that there are -// one or more more specific resources that should be fetched (via StillNeed) to complete the request. -type ErrPartialResponse struct { - error - StillNeed []string -} - -func (epr ErrPartialResponse) Error() string { - if epr.error != nil { - return fmt.Sprintf("partial response: %s", epr.error.Error()) - } - return "caboose received a partial response" -} - var ErrFetcherUnexpectedEOF = fmt.Errorf("failed to fetch IPLD data") type CarFetcher interface { Fetch(ctx context.Context, path string, cb DataCallback) error } -type GraphGateway struct { +type GraphBackend struct { baseBackend fetcher CarFetcher pc traversal.LinkTargetNodePrototypeChooser - metrics *GraphGatewayMetrics + metrics *GraphBackendMetrics } -type GraphGatewayMetrics struct { +type GraphBackendMetrics struct { contextAlreadyCancelledMetric prometheus.Counter carFetchAttemptMetric prometheus.Counter carBlocksFetchedMetric prometheus.Counter @@ -87,8 +65,8 @@ type GraphGatewayMetrics struct { bytesRangeSizeMetric prometheus.Histogram } -func NewGraphGatewayBackend(f CarFetcher, opts ...BlocksBackendOption) (*GraphGateway, error) { - var compiledOptions blocksBackendOptions +func NewGraphBackend(f CarFetcher, opts ...BackendOption) (*GraphBackend, error) { + var compiledOptions backendOptions for _, o := range opts { if err := o(&compiledOptions); err != nil { return nil, err @@ -107,10 +85,10 @@ func NewGraphGatewayBackend(f CarFetcher, opts ...BlocksBackendOption) (*GraphGa promReg = compiledOptions.promRegistry } - return &GraphGateway{ + return &GraphBackend{ baseBackend: baseBackend, fetcher: f, - metrics: registerGraphGatewayMetrics(promReg), + metrics: registerGraphBackendMetrics(promReg), pc: dagpb.AddSupportToChooser(func(lnk ipld.Link, lnkCtx ipld.LinkContext) (ipld.NodePrototype, error) { if tlnkNd, ok := lnkCtx.LinkNode.(schema.TypedLinkNode); ok { return tlnkNd.LinkTargetNodePrototype(), nil @@ -120,7 +98,7 @@ func NewGraphGatewayBackend(f CarFetcher, opts ...BlocksBackendOption) (*GraphGa }, nil } -func registerGraphGatewayMetrics(registerer prometheus.Registerer) *GraphGatewayMetrics { +func registerGraphBackendMetrics(promReg prometheus.Registerer) *GraphBackendMetrics { // How many CAR Fetch attempts we had? Need this to calculate % of various graph request types. // We only count attempts here, because success/failure with/without retries are provided by caboose: // - ipfs_caboose_fetch_duration_car_success_count @@ -133,7 +111,7 @@ func registerGraphGatewayMetrics(registerer prometheus.Registerer) *GraphGateway Name: "car_fetch_attempts", Help: "The number of times a CAR fetch was attempted by IPFSBackend.", }) - registerer.MustRegister(carFetchAttemptMetric) + promReg.MustRegister(carFetchAttemptMetric) contextAlreadyCancelledMetric := prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "ipfs", @@ -141,7 +119,7 @@ func registerGraphGatewayMetrics(registerer prometheus.Registerer) *GraphGateway Name: "car_fetch_context_already_cancelled", Help: "The number of times context is already cancelled when a CAR fetch was attempted by IPFSBackend.", }) - registerer.MustRegister(contextAlreadyCancelledMetric) + promReg.MustRegister(contextAlreadyCancelledMetric) // How many blocks were read via CARs? // Need this as a baseline to reason about error ratio vs raw_block_recovery_attempts. @@ -151,7 +129,7 @@ func registerGraphGatewayMetrics(registerer prometheus.Registerer) *GraphGateway Name: "car_blocks_fetched", Help: "The number of blocks successfully read via CAR fetch.", }) - registerer.MustRegister(carBlocksFetchedMetric) + promReg.MustRegister(carBlocksFetchedMetric) carParamsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "ipfs", @@ -159,7 +137,7 @@ func registerGraphGatewayMetrics(registerer prometheus.Registerer) *GraphGateway Name: "car_fetch_params", Help: "How many times specific CAR parameter was used during CAR data fetch.", }, []string{"dagScope", "entityRanges"}) // we use 'ranges' instead of 'bytes' here because we only count the number of ranges present - registerer.MustRegister(carParamsMetric) + promReg.MustRegister(carParamsMetric) bytesRangeStartMetric := prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: "ipfs", @@ -168,7 +146,7 @@ func registerGraphGatewayMetrics(registerer prometheus.Registerer) *GraphGateway Help: "Tracks where did the range request start.", Buckets: prometheus.ExponentialBuckets(1024, 2, 24), // 1024 bytes to 8 GiB }) - registerer.MustRegister(bytesRangeStartMetric) + promReg.MustRegister(bytesRangeStartMetric) bytesRangeSizeMetric := prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: "ipfs", @@ -177,9 +155,9 @@ func registerGraphGatewayMetrics(registerer prometheus.Registerer) *GraphGateway Help: "Tracks the size of range requests.", Buckets: prometheus.ExponentialBuckets(256*1024, 2, 10), // From 256KiB to 100MiB }) - registerer.MustRegister(bytesRangeSizeMetric) + promReg.MustRegister(bytesRangeSizeMetric) - return &GraphGatewayMetrics{ + return &GraphBackendMetrics{ contextAlreadyCancelledMetric, carFetchAttemptMetric, carBlocksFetchedMetric, @@ -189,7 +167,7 @@ func registerGraphGatewayMetrics(registerer prometheus.Registerer) *GraphGateway } } -func (api *GraphGateway) fetchCAR(ctx context.Context, path path.ImmutablePath, params CarParams, cb DataCallback) error { +func (api *GraphBackend) fetchCAR(ctx context.Context, path path.ImmutablePath, params CarParams, cb DataCallback) error { urlWithoutHost := contentPathToCarUrl(path, params).String() api.metrics.carFetchAttemptMetric.Inc() @@ -203,15 +181,15 @@ func (api *GraphGateway) fetchCAR(ctx context.Context, path path.ImmutablePath, if ipldError != nil { fetchErr = ipldError } else if fetchErr != nil { - fetchErr = GatewayError(fetchErr) + fetchErr = blockstoreErrToGatewayErr(fetchErr) } return fetchErr } // resolvePathWithRootsAndBlock takes a path and linksystem and returns the set of non-terminal cids, the terminal cid, the remainder, and the block corresponding to the terminal cid -func resolvePathWithRootsAndBlock(ctx context.Context, fpath ipfspath.ImmutablePath, unixFSLsys *ipld.LinkSystem) ([]cid.Cid, cid.Cid, []string, blocks.Block, error) { - pathRootCids, terminalCid, remainder, terminalBlk, err := resolvePathToLastWithRoots(ctx, fpath, unixFSLsys) +func resolvePathWithRootsAndBlock(ctx context.Context, p path.ImmutablePath, unixFSLsys *ipld.LinkSystem) ([]cid.Cid, cid.Cid, []string, blocks.Block, error) { + pathRootCids, terminalCid, remainder, terminalBlk, err := resolvePathToLastWithRoots(ctx, p, unixFSLsys) if err != nil { return nil, cid.Undef, nil, nil, err } @@ -236,18 +214,17 @@ func resolvePathWithRootsAndBlock(ctx context.Context, fpath ipfspath.ImmutableP // the remainder pathing, the last block loaded, and the last node loaded. // // Note: the block returned will be nil if the terminal element is a link or the path is just a CID -func resolvePathToLastWithRoots(ctx context.Context, fpath ipfspath.ImmutablePath, unixFSLsys *ipld.LinkSystem) ([]cid.Cid, cid.Cid, []string, blocks.Block, error) { - c, p := fpath.RootCid(), fpath.Segments()[2:] - - if len(p) == 0 { - return nil, c, nil, nil, nil +func resolvePathToLastWithRoots(ctx context.Context, p path.ImmutablePath, unixFSLsys *ipld.LinkSystem) ([]cid.Cid, cid.Cid, []string, blocks.Block, error) { + root, segments := p.RootCid(), p.Segments()[2:] + if len(segments) == 0 { + return nil, root, nil, nil, nil } unixFSLsys.NodeReifier = unixfsnode.Reify defer func() { unixFSLsys.NodeReifier = nil }() var cids []cid.Cid - cids = append(cids, c) + cids = append(cids, root) pc := dagpb.AddSupportToChooser(func(lnk ipld.Link, lnkCtx ipld.LinkContext) (ipld.NodePrototype, error) { if tlnkNd, ok := lnkCtx.LinkNode.(schema.TypedLinkNode); ok { @@ -274,13 +251,13 @@ func resolvePathToLastWithRoots(ctx context.Context, fpath ipfspath.ImmutablePat return blk, nd, nil } - nextBlk, nextNd, err := loadNode(ctx, c) + nextBlk, nextNd, err := loadNode(ctx, root) if err != nil { return nil, cid.Undef, nil, nil, err } depth := 0 - for i, elem := range p { + for i, elem := range segments { nextNd, err = nextNd.LookupBySegment(ipld.ParsePathSegment(elem)) if err != nil { return nil, cid.Undef, nil, nil, err @@ -297,7 +274,7 @@ func resolvePathToLastWithRoots(ctx context.Context, fpath ipfspath.ImmutablePat } cids = append(cids, cidLnk.Cid) - if i < len(p)-1 { + if i < len(segments)-1 { nextBlk, nextNd, err = loadNode(ctx, cidLnk.Cid) if err != nil { return nil, cid.Undef, nil, nil, err @@ -311,13 +288,13 @@ func resolvePathToLastWithRoots(ctx context.Context, fpath ipfspath.ImmutablePat // if last node is not a link, just return it's cid, add path to remainder and return if nextNd.Kind() != ipld.Kind_Link { // return the cid and the remainder of the path - return cids[:len(cids)-1], cids[len(cids)-1], p[len(p)-depth:], nextBlk, nil + return cids[:len(cids)-1], cids[len(cids)-1], segments[len(segments)-depth:], nextBlk, nil } return cids[:len(cids)-1], cids[len(cids)-1], nil, nil, nil } -func contentMetadataFromRootsAndRemainder(p ipfspath.ImmutablePath, pathRoots []cid.Cid, remainder []string) (ContentPathMetadata, error) { +func contentMetadataFromRootsAndRemainder(p path.ImmutablePath, pathRoots []cid.Cid, remainder []string) (ContentPathMetadata, error) { md := ContentPathMetadata{ PathSegmentRoots: pathRoots, LastSegmentRemainder: remainder, @@ -328,7 +305,7 @@ func contentMetadataFromRootsAndRemainder(p ipfspath.ImmutablePath, pathRoots [] var errNotUnixFS = fmt.Errorf("data was not unixfs") -func (api *GraphGateway) Get(ctx context.Context, path path.ImmutablePath, byteRanges ...ByteRange) (ContentPathMetadata, *GetResponse, error) { +func (api *GraphBackend) Get(ctx context.Context, path path.ImmutablePath, byteRanges ...ByteRange) (ContentPathMetadata, *GetResponse, error) { rangeCount := len(byteRanges) api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "entity", "entityRanges": strconv.Itoa(rangeCount)}).Inc() @@ -704,7 +681,7 @@ func (it *backpressuredHAMTDirIterNoRecursion) Err() error { var _ AwaitCloser = (*backpressuredHAMTDirIterNoRecursion)(nil) -func (api *GraphGateway) GetAll(ctx context.Context, path path.ImmutablePath) (ContentPathMetadata, files.Node, error) { +func (api *GraphBackend) GetAll(ctx context.Context, path path.ImmutablePath) (ContentPathMetadata, files.Node, error) { api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "all", "entityRanges": "0"}).Inc() return fetchWithPartialRetries(ctx, path, CarParams{Scope: DagScopeAll}, loadTerminalUnixFSElementWithRecursiveDirectories, api.metrics, api.fetchCAR) } @@ -723,7 +700,7 @@ type nextReq struct { params CarParams } -func fetchWithPartialRetries[T any](ctx context.Context, path path.ImmutablePath, initialParams CarParams, resolveTerminalElementFn loadTerminalElement[T], metrics *GraphGatewayMetrics, fetchCAR fetchCarFn) (ContentPathMetadata, T, error) { +func fetchWithPartialRetries[T any](ctx context.Context, p path.ImmutablePath, initialParams CarParams, resolveTerminalElementFn loadTerminalElement[T], metrics *GraphBackendMetrics, fetchCAR fetchCarFn) (ContentPathMetadata, T, error) { var zeroReturnType T terminalPathElementCh := make(chan terminalPathType[T], 1) @@ -752,11 +729,9 @@ func fetchWithPartialRetries[T any](ctx context.Context, path path.ImmutablePath } } - // FIXME(HACDIAS): p := ipfspath.FromString(path.String()) - p := path params := initialParams - err := fetchCAR(cctx, path, params, func(resource string, reader io.Reader) error { + err := fetchCAR(cctx, p, params, func(resource string, reader io.Reader) error { gb, err := carToLinearBlockGetter(cctx, reader, metrics) if err != nil { return err @@ -828,14 +803,12 @@ func fetchWithPartialRetries[T any](ctx context.Context, path path.ImmutablePath return closeErr case req := <-sendRequest: // set path and params for next iteration - p = ipfspath.FromCid(req.c) - // FIXME(hacdias) - imPath := p + p = path.FromCid(req.c) if err != nil { return err } params = req.params - remainderUrl := contentPathToCarUrl(imPath, params).String() + remainderUrl := contentPathToCarUrl(p, params).String() return ErrPartialResponse{StillNeed: []string{remainderUrl}} case <-cctx.Done(): return cctx.Err() @@ -875,15 +848,13 @@ func fetchWithPartialRetries[T any](ctx context.Context, path path.ImmutablePath } } -func (api *GraphGateway) GetBlock(ctx context.Context, path path.ImmutablePath) (ContentPathMetadata, files.File, error) { +func (api *GraphBackend) GetBlock(ctx context.Context, p path.ImmutablePath) (ContentPathMetadata, files.File, error) { api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "block", "entityRanges": "0"}).Inc() - // FIXME(HACDIAS): p := ipfspath.FromString(path.String()) - p := path var md ContentPathMetadata var f files.File // TODO: if path is `/ipfs/cid`, we should use ?format=raw - err := api.fetchCAR(ctx, path, CarParams{Scope: DagScopeBlock}, func(resource string, reader io.Reader) error { + err := api.fetchCAR(ctx, p, CarParams{Scope: DagScopeBlock}, func(resource string, reader io.Reader) error { gb, err := carToLinearBlockGetter(ctx, reader, api.metrics) if err != nil { return err @@ -924,21 +895,18 @@ func (api *GraphGateway) GetBlock(ctx context.Context, path path.ImmutablePath) return md, f, nil } -func (api *GraphGateway) Head(ctx context.Context, path path.ImmutablePath) (ContentPathMetadata, *HeadResponse, error) { +func (api *GraphBackend) Head(ctx context.Context, p path.ImmutablePath) (ContentPathMetadata, *HeadResponse, error) { api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "entity", "entityRanges": "1"}).Inc() // TODO: we probably want to move this either to boxo, or at least to loadRequestIntoSharedBlockstoreAndBlocksGateway api.metrics.bytesRangeStartMetric.Observe(0) api.metrics.bytesRangeSizeMetric.Observe(3071) - // FIXME(HACDIAS): p := ipfspath.FromString(path.String()) - p := path - var md ContentPathMetadata var n *HeadResponse // TODO: fallback to dynamic fetches in case we haven't requested enough data rangeTo := int64(3071) - err := api.fetchCAR(ctx, path, CarParams{Scope: DagScopeEntity, Range: &DagByteRange{From: 0, To: &rangeTo}}, func(resource string, reader io.Reader) error { + err := api.fetchCAR(ctx, p, CarParams{Scope: DagScopeEntity, Range: &DagByteRange{From: 0, To: &rangeTo}}, func(resource string, reader io.Reader) error { gb, err := carToLinearBlockGetter(ctx, reader, api.metrics) if err != nil { return err @@ -1069,11 +1037,11 @@ func (api *GraphGateway) Head(ctx context.Context, path path.ImmutablePath) (Con return md, n, nil } -func (api *GraphGateway) ResolvePath(ctx context.Context, path path.ImmutablePath) (ContentPathMetadata, error) { +func (api *GraphBackend) ResolvePath(ctx context.Context, p path.ImmutablePath) (ContentPathMetadata, error) { api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "block", "entityRanges": "0"}).Inc() var md ContentPathMetadata - err := api.fetchCAR(ctx, path, CarParams{Scope: DagScopeBlock}, func(resource string, reader io.Reader) error { + err := api.fetchCAR(ctx, p, CarParams{Scope: DagScopeBlock}, func(resource string, reader io.Reader) error { gb, err := carToLinearBlockGetter(ctx, reader, api.metrics) if err != nil { return err @@ -1081,8 +1049,6 @@ func (api *GraphGateway) ResolvePath(ctx context.Context, path path.ImmutablePat lsys := getLinksystem(gb) // First resolve the path since we always need to. - // FIXME(HACDIAS): p := ipfspath.FromString(path.String()) - p := path pathRoots, _, remainder, _, err := resolvePathToLastWithRoots(ctx, p, lsys) if err != nil { return err @@ -1100,18 +1066,16 @@ func (api *GraphGateway) ResolvePath(ctx context.Context, path path.ImmutablePat return md, nil } -func (api *GraphGateway) GetCAR(ctx context.Context, path path.ImmutablePath, params CarParams) (ContentPathMetadata, io.ReadCloser, error) { +func (api *GraphBackend) GetCAR(ctx context.Context, p path.ImmutablePath, params CarParams) (ContentPathMetadata, io.ReadCloser, error) { numRanges := "0" if params.Range != nil { numRanges = "1" } api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": string(params.Scope), "entityRanges": numRanges}).Inc() - rootCid, err := getRootCid(path) + rootCid, err := getRootCid(p) if err != nil { return ContentPathMetadata{}, nil, err } - // FIXME(HACDIAS): p := ipfspath.FromString(path.String()) - p := path switch params.Order { case DagOrderUnspecified, DagOrderUnknown, DagOrderDFS: @@ -1124,7 +1088,7 @@ func (api *GraphGateway) GetCAR(ctx context.Context, path path.ImmutablePath, pa numBlocksSent := 0 var cw storage.WritableCar var blockBuffer []blocks.Block - err = api.fetchCAR(ctx, path, params, func(resource string, reader io.Reader) error { + err = api.fetchCAR(ctx, p, params, func(resource string, reader io.Reader) error { numBlocksThisCall := 0 gb, err := carToLinearBlockGetter(ctx, reader, api.metrics) if err != nil { @@ -1190,9 +1154,8 @@ func (api *GraphGateway) GetCAR(ctx context.Context, path path.ImmutablePath, pa }() return ContentPathMetadata{ - // PathSegmentRoots: []cid.Cid{rootCid}, - PathSegmentRoots: nil, // FIXME(hacdias): originala bove - LastSegment: ipfspath.FromCid(rootCid), + PathSegmentRoots: []cid.Cid{rootCid}, + LastSegment: path.FromCid(rootCid), ContentType: "", }, r, nil } @@ -1212,11 +1175,11 @@ func getRootCid(imPath path.ImmutablePath) (cid.Cid, error) { return rootCid, nil } -func (api *GraphGateway) IsCached(ctx context.Context, path path.Path) bool { +func (api *GraphBackend) IsCached(ctx context.Context, path path.Path) bool { return false } -var _ IPFSBackend = (*GraphGateway)(nil) +var _ IPFSBackend = (*GraphBackend)(nil) func checkRetryableError(e *error, fn func() error) error { err := fn() diff --git a/gateway/backend_graph_test.go b/gateway/backend_graph_test.go index 8d69c3ca0..2ed0f237b 100644 --- a/gateway/backend_graph_test.go +++ b/gateway/backend_graph_test.go @@ -29,7 +29,7 @@ import ( //go:embed testdata/directory-with-multilayer-hamt-and-multiblock-files.car var dirWithMultiblockHAMTandFiles []byte -func TestTar(t *testing.T) { +func TestGraphBackendTar(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -155,32 +155,21 @@ func TestTar(t *testing.T) { bs, err := NewRemoteCarFetcher([]string{s.URL}) require.NoError(t, err) - backend, err := NewGraphGatewayBackend(&retryFetcher{inner: bs.(CarFetcher), allowedRetries: 3, retriesRemaining: 3}) - if err != nil { - t.Fatal(err) - } + backend, err := NewGraphBackend(&retryFetcher{inner: bs, allowedRetries: 3, retriesRemaining: 3}) + require.NoError(t, err) p := path.FromCid(cid.MustParse("bafybeid3fd2xxdcd3dbj7trb433h2aqssn6xovjbwnkargjv7fuog4xjdi")) _, nd, err := backend.GetAll(ctx, p) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) assertNextEntryNameEquals := func(t *testing.T, dirIter files.DirIterator, expectedName string) { t.Helper() - if !dirIter.Next() { - iterErr := dirIter.Err() - t.Fatalf("expected entry, but errored with %s", iterErr.Error()) - } - if expectedName != dirIter.Name() { - t.Fatalf("expected %s, got %s", expectedName, dirIter.Name()) - } + require.True(t, dirIter.Next(), dirIter.Err()) + require.Equal(t, expectedName, dirIter.Name()) } robs, err := carbs.NewReadOnly(bytes.NewReader(dirWithMultiblockHAMTandFiles), nil) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) dsrv := merkledag.NewDAGService(blockservice.New(robs, offline.Exchange(robs))) assertFileEqual := func(t *testing.T, expectedCidString string, receivedFile files.File) { @@ -188,25 +177,15 @@ func TestTar(t *testing.T) { expected := cid.MustParse(expectedCidString) receivedFileData, err := io.ReadAll(receivedFile) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) nd, err := dsrv.Get(ctx, expected) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) expectedFile, err := unixfile.NewUnixfsFile(ctx, dsrv, nd) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) expectedFileData, err := io.ReadAll(expectedFile.(files.File)) - if err != nil { - t.Fatal(err) - } - if !bytes.Equal(expectedFileData, receivedFileData) { - t.Fatalf("expected %s, got %s", string(expectedFileData), string(receivedFileData)) - } + require.NoError(t, err) + require.True(t, bytes.Equal(expectedFileData, receivedFileData)) } rootDirIter := nd.(files.Directory).Entries() @@ -234,12 +213,10 @@ func TestTar(t *testing.T) { assertNextEntryNameEquals(t, hamtDirIter, "exampleA") assertFileEqual(t, "bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa", hamtDirIter.Node().(files.File)) - if rootDirIter.Next() || basicDirIter.Next() || hamtDirIter.Next() { - t.Fatal("expected directories to be fully enumerated") - } + require.False(t, rootDirIter.Next() || basicDirIter.Next() || hamtDirIter.Next()) } -func TestTarAtEndOfPath(t *testing.T) { +func TestGraphBackendTarAtEndOfPath(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -345,40 +322,26 @@ func TestTarAtEndOfPath(t *testing.T) { bs, err := NewRemoteCarFetcher([]string{s.URL}) require.NoError(t, err) - backend, err := NewGraphGatewayBackend(&retryFetcher{inner: bs.(CarFetcher), allowedRetries: 3, retriesRemaining: 3}) - if err != nil { - t.Fatal(err) - } + backend, err := NewGraphBackend(&retryFetcher{inner: bs, allowedRetries: 3, retriesRemaining: 3}) + require.NoError(t, err) p, err := path.Join(path.FromCid(cid.MustParse("bafybeid3fd2xxdcd3dbj7trb433h2aqssn6xovjbwnkargjv7fuog4xjdi")), "hamtDir") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) imPath, err := path.NewImmutablePath(p) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) _, nd, err := backend.GetAll(ctx, imPath) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) assertNextEntryNameEquals := func(t *testing.T, dirIter files.DirIterator, expectedName string) { t.Helper() - if !dirIter.Next() { - t.Fatal("expected entry") - } - if expectedName != dirIter.Name() { - t.Fatalf("expected %s, got %s", expectedName, dirIter.Name()) - } + require.True(t, dirIter.Next()) + require.Equal(t, expectedName, dirIter.Name()) } robs, err := carbs.NewReadOnly(bytes.NewReader(dirWithMultiblockHAMTandFiles), nil) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) dsrv := merkledag.NewDAGService(blockservice.New(robs, offline.Exchange(robs))) assertFileEqual := func(t *testing.T, expectedCidString string, receivedFile files.File) { @@ -386,25 +349,15 @@ func TestTarAtEndOfPath(t *testing.T) { expected := cid.MustParse(expectedCidString) receivedFileData, err := io.ReadAll(receivedFile) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) nd, err := dsrv.Get(ctx, expected) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) expectedFile, err := unixfile.NewUnixfsFile(ctx, dsrv, nd) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) expectedFileData, err := io.ReadAll(expectedFile.(files.File)) - if err != nil { - t.Fatal(err) - } - if !bytes.Equal(expectedFileData, receivedFileData) { - t.Fatalf("expected %s, got %s", string(expectedFileData), string(receivedFileData)) - } + require.NoError(t, err) + require.True(t, bytes.Equal(expectedFileData, receivedFileData)) } hamtDirIter := nd.(files.Directory).Entries() @@ -421,9 +374,7 @@ func TestTarAtEndOfPath(t *testing.T) { assertNextEntryNameEquals(t, hamtDirIter, "exampleA") assertFileEqual(t, "bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa", hamtDirIter.Node().(files.File)) - if hamtDirIter.Next() { - t.Fatal("expected directories to be fully enumerated") - } + require.False(t, hamtDirIter.Next()) } func sendBlocks(ctx context.Context, carFixture []byte, writer io.Writer, cidStrList []string) error { @@ -451,7 +402,7 @@ func sendBlocks(ctx context.Context, carFixture []byte, writer io.Writer, cidStr return nil } -func TestGetFile(t *testing.T) { +func TestGraphBackendGetFile(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -530,50 +481,33 @@ func TestGetFile(t *testing.T) { bs, err := NewRemoteCarFetcher([]string{s.URL}) require.NoError(t, err) - backend, err := NewGraphGatewayBackend(&retryFetcher{inner: bs.(CarFetcher), allowedRetries: 3, retriesRemaining: 3}) - if err != nil { - t.Fatal(err) - } + backend, err := NewGraphBackend(&retryFetcher{inner: bs, allowedRetries: 3, retriesRemaining: 3}) + require.NoError(t, err) trustedGatewayServer := httptest.NewServer(NewHandler(Config{DeserializedResponses: true}, backend)) defer trustedGatewayServer.Close() resp, err := http.Get(trustedGatewayServer.URL + "/ipfs/bafybeid3fd2xxdcd3dbj7trb433h2aqssn6xovjbwnkargjv7fuog4xjdi/hamtDir/exampleA") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) data, err := io.ReadAll(resp.Body) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) robs, err := carbs.NewReadOnly(bytes.NewReader(dirWithMultiblockHAMTandFiles), nil) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) dsrv := merkledag.NewDAGService(blockservice.New(robs, offline.Exchange(robs))) fileRootNd, err := dsrv.Get(ctx, cid.MustParse("bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa")) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) uio, err := unixfile.NewUnixfsFile(ctx, dsrv, fileRootNd) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) f := uio.(files.File) expectedFileData, err := io.ReadAll(f) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(data, expectedFileData) { - t.Fatalf("expected %s, got %s", string(expectedFileData), string(data)) - } + require.NoError(t, err) + require.True(t, bytes.Equal(data, expectedFileData)) } -func TestGetFileRangeRequest(t *testing.T) { +func TestGraphBackendGetFileRangeRequest(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -648,64 +582,41 @@ func TestGetFileRangeRequest(t *testing.T) { bs, err := NewRemoteCarFetcher([]string{s.URL}) require.NoError(t, err) - backend, err := NewGraphGatewayBackend(&retryFetcher{inner: bs.(CarFetcher), allowedRetries: 3, retriesRemaining: 3}) - if err != nil { - t.Fatal(err) - } + backend, err := NewGraphBackend(&retryFetcher{inner: bs, allowedRetries: 3, retriesRemaining: 3}) + require.NoError(t, err) trustedGatewayServer := httptest.NewServer(NewHandler(Config{DeserializedResponses: true}, backend)) defer trustedGatewayServer.Close() req, err := http.NewRequestWithContext(ctx, "GET", trustedGatewayServer.URL+"/ipfs/bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa", nil) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) startIndex := 256 endIndex := 750 req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", startIndex, endIndex)) resp, err := http.DefaultClient.Do(req) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) data, err := io.ReadAll(resp.Body) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) robs, err := carbs.NewReadOnly(bytes.NewReader(dirWithMultiblockHAMTandFiles), nil) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) dsrv := merkledag.NewDAGService(blockservice.New(robs, offline.Exchange(robs))) fileRootNd, err := dsrv.Get(ctx, cid.MustParse("bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa")) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) uio, err := unixfile.NewUnixfsFile(ctx, dsrv, fileRootNd) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) f := uio.(files.File) - if _, err := f.Seek(int64(startIndex), io.SeekStart); err != nil { - t.Fatal(err) - } + _, err = f.Seek(int64(startIndex), io.SeekStart) + require.NoError(t, err) expectedFileData, err := io.ReadAll(io.LimitReader(f, int64(endIndex)-int64(startIndex)+1)) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(data, expectedFileData) { - t.Fatalf("expected %s, got %s", string(expectedFileData), string(data)) - } - - if requestNum != 4 { - t.Fatalf("expected exactly 4 requests, got %d", requestNum) - } + require.NoError(t, err) + require.True(t, bytes.Equal(data, expectedFileData)) + require.Equal(t, 4, requestNum) } -func TestGetFileWithBadBlockReturned(t *testing.T) { +func TestGraphBackendGetFileWithBadBlockReturned(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -778,50 +689,33 @@ func TestGetFileWithBadBlockReturned(t *testing.T) { bs, err := NewRemoteCarFetcher([]string{s.URL}) require.NoError(t, err) - backend, err := NewGraphGatewayBackend(&retryFetcher{inner: bs.(CarFetcher), allowedRetries: 3, retriesRemaining: 3}) - if err != nil { - t.Fatal(err) - } + backend, err := NewGraphBackend(&retryFetcher{inner: bs, allowedRetries: 3, retriesRemaining: 3}) + require.NoError(t, err) trustedGatewayServer := httptest.NewServer(NewHandler(Config{DeserializedResponses: true}, backend)) defer trustedGatewayServer.Close() resp, err := http.Get(trustedGatewayServer.URL + "/ipfs/bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) data, err := io.ReadAll(resp.Body) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) robs, err := carbs.NewReadOnly(bytes.NewReader(dirWithMultiblockHAMTandFiles), nil) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) dsrv := merkledag.NewDAGService(blockservice.New(robs, offline.Exchange(robs))) fileRootNd, err := dsrv.Get(ctx, cid.MustParse("bafybeigcisqd7m5nf3qmuvjdbakl5bdnh4ocrmacaqkpuh77qjvggmt2sa")) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) uio, err := unixfile.NewUnixfsFile(ctx, dsrv, fileRootNd) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) f := uio.(files.File) expectedFileData, err := io.ReadAll(f) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(data, expectedFileData) { - t.Fatalf("expected %s, got %s", string(expectedFileData), string(data)) - } + require.NoError(t, err) + require.True(t, bytes.Equal(data, expectedFileData)) } -func TestGetHAMTDirectory(t *testing.T) { +func TestGraphBackendGetHAMTDirectory(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -898,23 +792,17 @@ func TestGetHAMTDirectory(t *testing.T) { bs, err := NewRemoteCarFetcher([]string{s.URL}) require.NoError(t, err) - backend, err := NewGraphGatewayBackend(&retryFetcher{inner: bs.(CarFetcher), allowedRetries: 3, retriesRemaining: 3}) - if err != nil { - t.Fatal(err) - } + backend, err := NewGraphBackend(&retryFetcher{inner: bs, allowedRetries: 3, retriesRemaining: 3}) + require.NoError(t, err) trustedGatewayServer := httptest.NewServer(NewHandler(Config{DeserializedResponses: true}, backend)) defer trustedGatewayServer.Close() resp, err := http.Get(trustedGatewayServer.URL + "/ipfs/bafybeid3fd2xxdcd3dbj7trb433h2aqssn6xovjbwnkargjv7fuog4xjdi/hamtDir/") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) data, err := io.ReadAll(resp.Body) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) if strings.Count(string(data), ">exampleD-hamt-collide-exampleB-seed-364<") == 1 && strings.Count(string(data), ">exampleC-hamt-collide-exampleA-seed-52<") == 1 && @@ -925,7 +813,7 @@ func TestGetHAMTDirectory(t *testing.T) { t.Fatal("directory does not contain the expected links") } -func TestGetCAR(t *testing.T) { +func TestGraphBackendGetCAR(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1010,28 +898,20 @@ func TestGetCAR(t *testing.T) { bs, err := NewRemoteCarFetcher([]string{s.URL}) require.NoError(t, err) - backend, err := NewGraphGatewayBackend(&retryFetcher{inner: bs.(CarFetcher), allowedRetries: 3, retriesRemaining: 3}) - if err != nil { - t.Fatal(err) - } + backend, err := NewGraphBackend(&retryFetcher{inner: bs, allowedRetries: 3, retriesRemaining: 3}) + require.NoError(t, err) p := path.FromCid(cid.MustParse("bafybeid3fd2xxdcd3dbj7trb433h2aqssn6xovjbwnkargjv7fuog4xjdi")) var carReader io.Reader _, carReader, err = backend.GetCAR(ctx, p, CarParams{Scope: DagScopeAll}) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) carBytes, err := io.ReadAll(carReader) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) carReader = bytes.NewReader(carBytes) blkReader, err := carv2.NewBlockReader(carReader) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) responseCarBlock := []string{ "bafybeid3fd2xxdcd3dbj7trb433h2aqssn6xovjbwnkargjv7fuog4xjdi", // root dir @@ -1057,22 +937,16 @@ func TestGetCAR(t *testing.T) { for i := 0; i < len(responseCarBlock); i++ { expectedCid := cid.MustParse(responseCarBlock[i]) blk, err := blkReader.Next() - if err != nil { - t.Fatal(err) - } - if !blk.Cid().Equals(expectedCid) { - t.Fatalf("expected cid %s, got %s", expectedCid, blk.Cid()) - } + require.NoError(t, err) + require.True(t, blk.Cid().Equals(expectedCid)) } _, err = blkReader.Next() - if !errors.Is(err, io.EOF) { - t.Fatal("expected an EOF") - } + require.ErrorIs(t, err, io.EOF) } -func TestPassthroughErrors(t *testing.T) { +func TestGraphBackendPassthroughErrors(t *testing.T) { t.Run("PathTraversalError", func(t *testing.T) { - pathTraversalTest := func(t *testing.T, traversal func(ctx context.Context, p path.ImmutablePath, backend *GraphGateway) error) { + pathTraversalTest := func(t *testing.T, traversal func(ctx context.Context, p path.ImmutablePath, backend *GraphBackend) error) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1116,30 +990,24 @@ func TestPassthroughErrors(t *testing.T) { require.NoError(t, err) p, err := path.NewPath("/ipfs/bafybeid3fd2xxdcd3dbj7trb433h2aqssn6xovjbwnkargjv7fuog4xjdi/hamtDir/exampleA") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) imPath, err := path.NewImmutablePath(p) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) bogusErr := NewErrorStatusCode(fmt.Errorf("this is a test error"), 418) clientRequestNum := 0 - backend, err := NewGraphGatewayBackend(&retryFetcher{ + backend, err := NewGraphBackend(&retryFetcher{ inner: &fetcherWrapper{fn: func(ctx context.Context, path string, cb DataCallback) error { clientRequestNum++ if clientRequestNum > 2 { return bogusErr } - return bs.(CarFetcher).Fetch(ctx, path, cb) + return bs.Fetch(ctx, path, cb) }}, allowedRetries: 3, retriesRemaining: 3}) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) err = traversal(ctx, imPath, backend) parsedErr := &ErrorStatusCode{} @@ -1151,13 +1019,13 @@ func TestPassthroughErrors(t *testing.T) { t.Fatal("error did not pass through") } t.Run("Block", func(t *testing.T) { - pathTraversalTest(t, func(ctx context.Context, p path.ImmutablePath, backend *GraphGateway) error { + pathTraversalTest(t, func(ctx context.Context, p path.ImmutablePath, backend *GraphBackend) error { _, _, err := backend.GetBlock(ctx, p) return err }) }) t.Run("File", func(t *testing.T) { - pathTraversalTest(t, func(ctx context.Context, p path.ImmutablePath, backend *GraphGateway) error { + pathTraversalTest(t, func(ctx context.Context, p path.ImmutablePath, backend *GraphBackend) error { _, _, err := backend.Get(ctx, p) return err }) diff --git a/gateway/backend_graph_traversal.go b/gateway/backend_graph_traversal.go index ce47d2a92..2219d6803 100644 --- a/gateway/backend_graph_traversal.go +++ b/gateway/backend_graph_traversal.go @@ -23,21 +23,9 @@ import ( type getBlock func(ctx context.Context, cid cid.Cid) (blocks.Block, error) -// ErrInvalidResponse can be returned from a DataCallback to indicate that the data provided for the -// requested resource was explicitly 'incorrect' - that blocks not in the requested dag, or non-car-conforming -// data was returned. -type ErrInvalidResponse struct { - Message string -} - -func (e ErrInvalidResponse) Error() string { - return e.Message -} - -// var ErrNilBlock = caboose.ErrInvalidResponse{Message: "received a nil block with no error"} var ErrNilBlock = ErrInvalidResponse{Message: "received a nil block with no error"} -func carToLinearBlockGetter(ctx context.Context, reader io.Reader, metrics *GraphGatewayMetrics) (getBlock, error) { +func carToLinearBlockGetter(ctx context.Context, reader io.Reader, metrics *GraphBackendMetrics) (getBlock, error) { cr, err := car.NewCarReaderWithOptions(reader, car.WithErrorOnEmptyRoots(false)) if err != nil { return nil, err @@ -104,7 +92,7 @@ func carToLinearBlockGetter(ctx context.Context, reader io.Reader, metrics *Grap if !ok || errors.Is(blkRead.err, io.EOF) { return nil, io.ErrUnexpectedEOF } - return nil, GatewayError(blkRead.err) + return nil, blockstoreErrToGatewayErr(blkRead.err) } if blkRead.block != nil { metrics.carBlocksFetchedMetric.Inc() diff --git a/gateway/backend_graph_utils.go b/gateway/backend_graph_utils.go index 3f86e40e6..1df74761d 100644 --- a/gateway/backend_graph_utils.go +++ b/gateway/backend_graph_utils.go @@ -45,9 +45,9 @@ func carParamsToString(params CarParams) string { return paramsBuilder.String() } -// GatewayError translates underlying blockstore error into one that gateway code will return as HTTP 502 or 504 +// blockstoreErrToGatewayErr translates underlying blockstore error into one that gateway code will return as HTTP 502 or 504 // it also makes sure Retry-After hint from remote blockstore will be passed to HTTP client, if present. -func GatewayError(err error) error { +func blockstoreErrToGatewayErr(err error) error { if errors.Is(err, &ErrorStatusCode{}) || errors.Is(err, &ErrorRetryAfter{}) { // already correct error @@ -56,7 +56,6 @@ func GatewayError(err error) error { // All timeouts should produce 504 Gateway Timeout if errors.Is(err, context.DeadlineExceeded) || - // errors.Is(err, caboose.ErrTimeout) || // Unfortunately this is not an exported type so we have to check for the content. strings.Contains(err.Error(), "Client.Timeout exceeded") { return fmt.Errorf("%w: %s", ErrGatewayTimeout, err.Error()) diff --git a/gateway/backend_graph_utils_test.go b/gateway/backend_graph_utils_test.go index 5b0ec3886..3ff7cae3d 100644 --- a/gateway/backend_graph_utils_test.go +++ b/gateway/backend_graph_utils_test.go @@ -84,13 +84,13 @@ func TestGatewayErrorRetryAfter(t *testing.T) { ) // Test unwrapped - convertedErr = GatewayError(originalErr) + convertedErr = blockstoreErrToGatewayErr(originalErr) ok := errors.As(convertedErr, &gatewayErr) assert.True(t, ok) assert.EqualValues(t, originalErr.retryAfter, gatewayErr.RetryAfter) // Test wrapped. - convertedErr = GatewayError(fmt.Errorf("wrapped error: %w", originalErr)) + convertedErr = blockstoreErrToGatewayErr(fmt.Errorf("wrapped error: %w", originalErr)) ok = errors.As(convertedErr, &gatewayErr) assert.True(t, ok) assert.EqualValues(t, originalErr.retryAfter, gatewayErr.RetryAfter) diff --git a/gateway/errors.go b/gateway/errors.go index 79cedcee0..5c5b52fa7 100644 --- a/gateway/errors.go +++ b/gateway/errors.go @@ -127,6 +127,32 @@ func (e *ErrorStatusCode) Unwrap() error { return e.Err } +// ErrInvalidResponse can be returned from a [DataCallback] to indicate that the data provided for the +// requested resource was explicitly 'incorrect' - that blocks not in the requested dag, or non-car-conforming +// data was returned. +type ErrInvalidResponse struct { + Message string +} + +func (e ErrInvalidResponse) Error() string { + return e.Message +} + +// ErrPartialResponse can be returned from a [DataCallback] to indicate that some of the requested resource +// was successfully fetched, and that instead of retrying the full resource, that there are +// one or more more specific resources that should be fetched (via StillNeed) to complete the request. +type ErrPartialResponse struct { + error + StillNeed []string +} + +func (epr ErrPartialResponse) Error() string { + if epr.error != nil { + return fmt.Sprintf("partial response: %s", epr.error.Error()) + } + return "caboose received a partial response" +} + func webError(w http.ResponseWriter, r *http.Request, c *Config, err error, defaultCode int) { code := defaultCode diff --git a/gateway/remote_blocks_backend.go b/gateway/remote_blocks_backend.go index 5b96385d8..e020e0b1e 100644 --- a/gateway/remote_blocks_backend.go +++ b/gateway/remote_blocks_backend.go @@ -20,7 +20,7 @@ const getBlockTimeout = time.Second * 60 // If you want to create a more custom [BlocksBackend] with only remote IPNS // Record resolution, or only remote block fetching, we recommend using // [NewBlocksBackend] directly. -func NewRemoteBlocksBackend(gatewayURL []string, opts ...BlocksBackendOption) (*BlocksBackend, error) { +func NewRemoteBlocksBackend(gatewayURL []string, opts ...BackendOption) (*BlocksBackend, error) { blockStore, err := NewRemoteBlockstore(gatewayURL) if err != nil { return nil, err