From 5cad9656634302b6c24da36b83e7e09403cea450 Mon Sep 17 00:00:00 2001 From: Andrew Gillis Date: Mon, 1 Apr 2024 19:35:36 -0700 Subject: [PATCH] Port fixes from no-graphsync branch (#2583) - Port fixes from no-graphsync branch - Do not process ad chain if not done with previous for provider - Improved log messages - remove global vars - update libipni --- .github/workflows/ecr-publisher.yml | 1 + command/daemon.go | 4 + go.mod | 15 +- go.sum | 38 ++- internal/ingest/hamt_ingest_test.go | 8 +- internal/ingest/ingest.go | 349 ++++++++-------------- internal/ingest/ingest_test.go | 107 +++---- internal/ingest/invalid_mh_ingest_test.go | 8 +- internal/ingest/seg_sync_test.go | 8 +- internal/metrics/server.go | 6 - internal/registry/registry.go | 18 ++ 11 files changed, 238 insertions(+), 324 deletions(-) diff --git a/.github/workflows/ecr-publisher.yml b/.github/workflows/ecr-publisher.yml index fcf4bb316..6145f6b64 100644 --- a/.github/workflows/ecr-publisher.yml +++ b/.github/workflows/ecr-publisher.yml @@ -14,6 +14,7 @@ on: - 'doc/**' branches: - main + - port-fixes-from-no-gs jobs: publisher: diff --git a/command/daemon.go b/command/daemon.go index 35a6a6490..cc40c1e26 100644 --- a/command/daemon.go +++ b/command/daemon.go @@ -576,6 +576,10 @@ func setLoggingConfig(cfgLogging config.Logging) error { for loggerName, level := range cfgLogging.Loggers { err = logging.SetLogLevel(loggerName, level) if err != nil { + if errors.Is(err, logging.ErrNoSuchLogger) { + log.Warnf("Ignoring configuration for nonexistent logger: %s", loggerName) + continue + } return err } } diff --git a/go.mod b/go.mod index 8c98a6083..e79a08d1c 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.51 github.com/aws/aws-sdk-go-v2/service/s3 v1.30.2 github.com/aws/smithy-go v1.13.5 - github.com/cockroachdb/pebble v0.0.0-20240229012220-7531ef4d20c9 + github.com/cockroachdb/pebble v0.0.0-20240328175803-1dc54dfdc519 github.com/filecoin-project/go-dagaggregator-unixfs v0.3.0 github.com/gammazero/channelqueue v0.2.1 github.com/gammazero/deque v0.2.1 @@ -24,12 +24,12 @@ require ( github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0 github.com/ipld/go-ipld-prime v0.21.0 github.com/ipld/go-ipld-prime/storage/dsadapter v0.0.0-20230102063945-1a409dc236dd - github.com/ipni/go-indexer-core v0.8.9 - github.com/ipni/go-libipni v0.5.14 - github.com/libp2p/go-libp2p v0.33.0 + github.com/ipni/go-indexer-core v0.8.10 + github.com/ipni/go-libipni v0.5.16 + github.com/libp2p/go-libp2p v0.33.2 github.com/libp2p/go-msgio v0.3.0 github.com/mitchellh/go-homedir v1.1.0 - github.com/multiformats/go-multiaddr v0.12.2 + github.com/multiformats/go-multiaddr v0.12.3 github.com/multiformats/go-multicodec v0.9.0 github.com/multiformats/go-multihash v0.2.3 github.com/multiformats/go-varint v0.0.7 @@ -46,7 +46,7 @@ require ( require ( github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect - github.com/DataDog/zstd v1.4.5 // indirect + github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e // indirect github.com/Microsoft/go-winio v0.5.2 // indirect github.com/aws/aws-sdk-go v1.44.312 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect @@ -69,6 +69,7 @@ require ( github.com/cockroachdb/errors v1.11.1 // indirect github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect github.com/cockroachdb/redact v1.1.5 // indirect + github.com/cockroachdb/swiss v0.0.0-20240303172742-c161743eb608 // indirect github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/containerd/cgroups v1.1.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect @@ -183,7 +184,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/prometheus/statsd_exporter v0.22.7 // indirect github.com/quic-go/qpack v0.4.0 // indirect - github.com/quic-go/quic-go v0.41.0 // indirect + github.com/quic-go/quic-go v0.42.0 // indirect github.com/quic-go/webtransport-go v0.6.0 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect github.com/rogpeppe/go-internal v1.10.0 // indirect diff --git a/go.sum b/go.sum index f77051bf7..808644780 100644 --- a/go.sum +++ b/go.sum @@ -46,13 +46,15 @@ github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25 github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= -github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= +github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e h1:ZIWapoIRN1VqT8GR8jAwb1Ie9GyehWjVcGh32Y2MznE= +github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA= github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Stebalien/go-bitfield v0.0.1/go.mod h1:GNjFpasyUVkHMsfEOk8EFLJ9syQ6SI+XWrX9Wf2XH0s= +github.com/aclements/go-perfevent v0.0.0-20240301234650-f7843625020f h1:JjxwchlOepwsUWcQwD2mLUAGE9aCp0/ehy6yCHFBOvo= +github.com/aclements/go-perfevent v0.0.0-20240301234650-f7843625020f/go.mod h1:tMDTce/yLLN/SK8gMOxQfnyeMeCg8KGzp0D1cbECEeo= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -154,10 +156,12 @@ github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZe github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895 h1:XANOgPYtvELQ/h4IrmPAohXqe2pWA8Bwhejr3VQoZsA= github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895/go.mod h1:aPd7gM9ov9M8v32Yy5NJrDyOcD8z642dqs+F0CeNXfA= -github.com/cockroachdb/pebble v0.0.0-20240229012220-7531ef4d20c9 h1:3062fLKtbfRkHIO7ZWIaScsZ75L4/h6VMWyBeb9P9s4= -github.com/cockroachdb/pebble v0.0.0-20240229012220-7531ef4d20c9/go.mod h1:BHuaMa/lK7fUe75BlsteiiTu8ptIG+qSAuDtGMArP18= +github.com/cockroachdb/pebble v0.0.0-20240328175803-1dc54dfdc519 h1:9iZwhY4yZXVOMwRP8FG5M5/OaUiHbILctAk434mS9BU= +github.com/cockroachdb/pebble v0.0.0-20240328175803-1dc54dfdc519/go.mod h1:gm/vT3lwZUKyB3iTDgWIZfC0hu0gLr+VcXr/tZeTdEU= github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30= github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= +github.com/cockroachdb/swiss v0.0.0-20240303172742-c161743eb608 h1:GXGrLNSC5+LGrwVgtczB6JCITxB9WGaLv7XilPkBDvc= +github.com/cockroachdb/swiss v0.0.0-20240303172742-c161743eb608/go.mod h1:yBRu/cnL4ks9bgy4vAASdjIW+/xMlFwuHKqtmh3GZQg= github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo= github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327/go.mod h1:ZJeTFisyysqgcCdecO57Dj79RfL0LNeGiFUqLYQRYLE= @@ -255,6 +259,8 @@ github.com/gammazero/targz v0.0.3 h1:vlODyfU9QdRgD1AgdEgLQ7lhg+rc4eDQVoSoMoC1ruc github.com/gammazero/targz v0.0.3/go.mod h1:tSwRar0Db0zQLPoQPpSfzk86s1ijPdySS4n+jHsbEXo= github.com/getsentry/sentry-go v0.18.0 h1:MtBW5H9QgdcJabtZcuJG80BMOwaBpkRDZkxRkNC1sN0= github.com/getsentry/sentry-go v0.18.0/go.mod h1:Kgon4Mby+FJ7ZWHFUAZgVaIa8sxHtnRJRLTXZr51aKQ= +github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9 h1:r5GgOLGbza2wVHRzK7aAj6lWZjfbAwiu/RDCVOKjRyM= +github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9/go.mod h1:106OIgooyS7OzLDOpUGgm9fA3bQENb/cFSyyBmMoJDs= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= @@ -578,10 +584,10 @@ github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236 github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd/go.mod h1:wZ8hH8UxeryOs4kJEJaiui/s00hDSbE37OKsL47g+Sw= github.com/ipld/go-ipld-prime/storage/dsadapter v0.0.0-20230102063945-1a409dc236dd h1:qdjo1CRvAQhOMoyYjPnbdZ5rYFFmqztweQ9KAsuWpO0= github.com/ipld/go-ipld-prime/storage/dsadapter v0.0.0-20230102063945-1a409dc236dd/go.mod h1:9DD/GM0JNPoisgR09F62kbBi7kHa4eDIea4XshXYOVc= -github.com/ipni/go-indexer-core v0.8.9 h1:SCYzRuJYwfD/dnebNH/nLdGoRGf6swsLVU2vkn+GEIk= -github.com/ipni/go-indexer-core v0.8.9/go.mod h1:QipCtL+doHX/J3EU/xb1frYaMgBuPLy1Y9D9XBNbtvA= -github.com/ipni/go-libipni v0.5.14 h1:crJK8WRWajCv5K2kS7jXAhL5KnC4rYqdA8KFI2JCHCY= -github.com/ipni/go-libipni v0.5.14/go.mod h1:dL/2YiaL7pVUmtwoa017M9i4bFqYTvehsZNASU22UXE= +github.com/ipni/go-indexer-core v0.8.10 h1:/MlB3wZW6f/rbrf40/AIHLKtyzZ/PwfpP1kvY18Lq9Q= +github.com/ipni/go-indexer-core v0.8.10/go.mod h1:tbAlzeCNXZmVlwmGXYc+xc/a70eyRe6YFYRCECyZbqQ= +github.com/ipni/go-libipni v0.5.16 h1:icM/MLlXHVnAilyT5N6b0jYveWkimir55OZFIFehDXg= +github.com/ipni/go-libipni v0.5.16/go.mod h1:BMbHRRcCvvIBI311bnE4XifksV2iA8QOCfST4u+8lbs= github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= @@ -671,8 +677,8 @@ github.com/libp2p/go-libp2p v0.7.4/go.mod h1:oXsBlTLF1q7pxr+9w6lqzS1ILpyHsaBPniV github.com/libp2p/go-libp2p v0.8.1/go.mod h1:QRNH9pwdbEBpx5DTJYg+qxcVaDMAz3Ee/qDKwXujH5o= github.com/libp2p/go-libp2p v0.13.0/go.mod h1:pM0beYdACRfHO1WcJlp65WXyG2A6NqYM+t2DTVAJxMo= github.com/libp2p/go-libp2p v0.14.0/go.mod h1:dsQrWLAoIn+GkHPN/U+yypizkHiB9tnv79Os+kSgQ4Q= -github.com/libp2p/go-libp2p v0.33.0 h1:yTPSr8sJRbfeEYXyeN8VPVSlTlFjtMUwGDRniwaf/xQ= -github.com/libp2p/go-libp2p v0.33.0/go.mod h1:RIJFRQVUBKy82dnW7J5f1homqqv6NcsDJAl3e7CRGfE= +github.com/libp2p/go-libp2p v0.33.2 h1:vCdwnFxoGOXMKmaGHlDSnL4bM3fQeW8pgIa9DECnb40= +github.com/libp2p/go-libp2p v0.33.2/go.mod h1:zTeppLuCvUIkT118pFVzA8xzP/p2dJYOMApCkFh0Yww= github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl950SO9L6n94= github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8= github.com/libp2p/go-libp2p-autonat v0.1.0/go.mod h1:1tLf2yXxiE/oKGtDwPYWTSYG3PtvYlJmg7NeVtPRqH8= @@ -921,8 +927,8 @@ github.com/multiformats/go-multiaddr v0.2.1/go.mod h1:s/Apk6IyxfvMjDafnhJgJ3/46z github.com/multiformats/go-multiaddr v0.2.2/go.mod h1:NtfXiOtHvghW9KojvtySjH5y0u0xW5UouOmQQrn6a3Y= github.com/multiformats/go-multiaddr v0.3.0/go.mod h1:dF9kph9wfJ+3VLAaeBqo9Of8x4fJxp6ggJGteB8HQTI= github.com/multiformats/go-multiaddr v0.3.1/go.mod h1:uPbspcUPd5AfaP6ql3ujFY+QWzmBD8uLLL4bXW0XfGc= -github.com/multiformats/go-multiaddr v0.12.2 h1:9G9sTY/wCYajKa9lyfWPmpZAwe6oV+Wb1zcmMS1HG24= -github.com/multiformats/go-multiaddr v0.12.2/go.mod h1:GKyaTYjZRdcUhyOetrxTk9z0cW+jA/YrnqTOvKgi44M= +github.com/multiformats/go-multiaddr v0.12.3 h1:hVBXvPRcKG0w80VinQ23P5t7czWgg65BmIvQKjDydU8= +github.com/multiformats/go-multiaddr v0.12.3/go.mod h1:sBXrNzucqkFJhvKOiwwLyqamGa/P5EIXNPLovyhQCII= github.com/multiformats/go-multiaddr-dns v0.0.1/go.mod h1:9kWcqw/Pj6FwxAwW38n/9403szc57zJPs45fmnznu3Q= github.com/multiformats/go-multiaddr-dns v0.0.2/go.mod h1:9kWcqw/Pj6FwxAwW38n/9403szc57zJPs45fmnznu3Q= github.com/multiformats/go-multiaddr-dns v0.2.0/go.mod h1:TJ5pr5bBO7Y1B18djPuRsVkduhQH2YqYSbxWJzYGdK0= @@ -1064,8 +1070,8 @@ github.com/prometheus/statsd_exporter v0.22.7 h1:7Pji/i2GuhK6Lu7DHrtTkFmNBCudCPT github.com/prometheus/statsd_exporter v0.22.7/go.mod h1:N/TevpjkIh9ccs6nuzY3jQn9dFqnUakOjnEuMPJJJnI= github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo= github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A= -github.com/quic-go/quic-go v0.41.0 h1:aD8MmHfgqTURWNJy48IYFg2OnxwHT3JL7ahGs73lb4k= -github.com/quic-go/quic-go v0.41.0/go.mod h1:qCkNjqczPEvgsOnxZ0eCD14lv+B2LHlFAB++CNOh9hA= +github.com/quic-go/quic-go v0.42.0 h1:uSfdap0eveIl8KXnipv9K7nlwZ5IqLlYOpJ58u5utpM= +github.com/quic-go/quic-go v0.42.0/go.mod h1:132kz4kL3F9vxhW3CtQJLDVwcFe5wdWeJXXijhsO57M= github.com/quic-go/webtransport-go v0.6.0 h1:CvNsKqc4W2HljHJnoT+rMmbRJybShZ0YPFDD3NxaZLY= github.com/quic-go/webtransport-go v0.6.0/go.mod h1:9KjU4AEBqEQidGHNDkZrb8CAa1abRaosM2yGOyiikEc= github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk= @@ -1495,8 +1501,8 @@ golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y= -golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/internal/ingest/hamt_ingest_test.go b/internal/ingest/hamt_ingest_test.go index 8212aac61..4492974f1 100644 --- a/internal/ingest/hamt_ingest_test.go +++ b/internal/ingest/hamt_ingest_test.go @@ -45,16 +45,16 @@ func TestIngester_IngestsMixedEntriesTypeSuccessfully(t *testing.T) { require.Equal(t, headAdCid, gotHeadAd, "Expected latest synced cid to match head of ad chain") // Assert all indices are processed eventually - requireTrueEventually(t, func() bool { + require.Eventually(t, func() bool { return checkAllIndexed(subject.indexer, pubInfo.ID, mhs) == nil - }, testRetryInterval, testRetryTimeout, "Expected all multihashes to have been indexed eventually") + }, testRetryTimeout, testRetryInterval, "Expected all multihashes to have been indexed eventually") // Assert All ads are processed eventually - requireTrueEventually(t, func() bool { + require.Eventually(t, func() bool { latestSync, err := subject.GetLatestSync(pubInfo.ID) require.NoError(t, err) return latestSync.Equals(headAdCid) - }, testRetryInterval, testRetryTimeout, "Expected all ads from publisher to have been indexed eventually") + }, testRetryTimeout, testRetryInterval, "Expected all ads from publisher to have been indexed eventually") // Assert multihash indices correspond to the single expected provider. for _, mh := range mhs { diff --git a/internal/ingest/ingest.go b/internal/ingest/ingest.go index 9745facc4..de83f5bac 100644 --- a/internal/ingest/ingest.go +++ b/internal/ingest/ingest.go @@ -47,14 +47,6 @@ const ( metricsUpdateInterval = time.Minute ) -// Metrics -var ( - totalNonRmAds atomic.Int64 - totalRmAds atomic.Int64 - workersActive atomic.Int32 - workersQueued atomic.Int32 -) - type adProcessedEvent struct { publisher peer.ID // Head of the chain being processed. @@ -65,28 +57,12 @@ type adProcessedEvent struct { err error } -// pendingAnnounce captures an announcement received from a provider that await processing. -type pendingAnnounce struct { - addrInfo peer.AddrInfo - nextCid cid.Cid -} - type adInfo struct { cid cid.Cid resync bool skip bool } -type workerAssignment struct { - // none represents a nil assignment. Used because a nil in atomic.Value - // cannot be stored. - none bool - addresses []string - adInfos []adInfo - publisher peer.ID - provider peer.ID -} - // Ingester is a type that uses dagsync for the ingestion protocol. // // ## Advertisement Ingestion Constraints @@ -133,32 +109,29 @@ type Ingester struct { overwriteMirrorOnResync bool - // A map of providers currently being processed. A worker holds the lock of - // a provider while ingesting ads for that provider. - providersBeingProcessed map[peer.ID]chan struct{} - providersBeingProcessedMu sync.Mutex - providerWorkAssignment map[peer.ID]*atomic.Value + // A map of providers currently being processed. Used to detect if multiple + // publishers are supplying ads for the same provide at the same time. + providersBusy map[peer.ID]struct{} + providersBusyMu sync.Mutex // Used to stop watching for sync finished events from dagsync. cancelOnSyncFinished context.CancelFunc // Channels that workers read from. syncFinishedEvents <-chan dagsync.SyncFinished - workReady chan peer.ID + syncInProgressMu sync.Mutex + syncInProgress map[peer.ID]*dagsync.SyncFinished // Context and cancel function used to cancel all workers. cancelWorkers context.CancelFunc workersCtx context.Context // Worker pool resizing. + nextWorkerNum int stopWorker chan struct{} waitForWorkers sync.WaitGroup workerPoolSize int - // providersPendingAnnounce maps the provider ID to the latest announcement - // received from the provider that is waiting to be processed. - providersPendingAnnounce sync.Map - // Multihash minimum length minKeyLen int @@ -169,6 +142,11 @@ type Ingester struct { ingestRates *rate.Map skip500EntsErr atomic.Bool + + // metrics + totalNonRmAds atomic.Int64 + totalRmAds atomic.Int64 + workersActive atomic.Int32 } // NewIngester creates a new Ingester that uses a dagsync Subscriber to handle @@ -192,11 +170,10 @@ func NewIngester(cfg config.Ingest, h host.Host, idxr indexer.Interface, reg *re closePendingSyncs: make(chan struct{}), overwriteMirrorOnResync: cfg.OverwriteMirrorOnResync, - providersBeingProcessed: make(map[peer.ID]chan struct{}), - providerWorkAssignment: make(map[peer.ID]*atomic.Value), + providersBusy: make(map[peer.ID]struct{}), stopWorker: make(chan struct{}), - workReady: make(chan peer.ID, 1), + syncInProgress: make(map[peer.ID]*dagsync.SyncFinished), minKeyLen: cfg.MinimumKeyLength, @@ -508,36 +485,8 @@ func (ing *Ingester) Sync(ctx context.Context, peerInfo peer.AddrInfo, depth int // Announce sends an announce message to directly to dagsync, instead of // through pubsub. func (ing *Ingester) Announce(ctx context.Context, nextCid cid.Cid, pubAddrInfo peer.AddrInfo) error { - log := log.With("peer", pubAddrInfo.ID, "cid", nextCid, "addrs", pubAddrInfo.Addrs) - - // If the publisher is not the same as the provider, then this will not - // wait for the provider to be done processing the ad chain it is working - // on. - ing.providersBeingProcessedMu.Lock() - pc, ok := ing.providersBeingProcessed[pubAddrInfo.ID] - ing.providersBeingProcessedMu.Unlock() - if !ok { - return ing.sub.Announce(ctx, nextCid, pubAddrInfo) - } - - // The publisher in the announce message has the same ID as a known - // provider, so defer handling the announce if that provider is busy. - select { - case pc <- struct{}{}: - log.Info("Handling direct announce request") - err := ing.sub.Announce(ctx, nextCid, pubAddrInfo) - <-pc - return err - case <-ctx.Done(): - return ctx.Err() - default: - ing.providersPendingAnnounce.Store(pubAddrInfo.ID, pendingAnnounce{ - addrInfo: pubAddrInfo, - nextCid: nextCid, - }) - log.Info("Deferred handling direct announce request") - return nil - } + log.Infow("Handling direct announce request", "peer", pubAddrInfo.ID, "cid", nextCid, "addrs", pubAddrInfo.Addrs) + return ing.sub.Announce(ctx, nextCid, pubAddrInfo) } // markAdUnprocessed takes an advertisement CID and marks it as unprocessed. @@ -651,7 +600,7 @@ func (ing *Ingester) onAdProcessed(peerID peer.ID) (<-chan adProcessedEvent, con defer ing.outEventsMutex.Unlock() pubEventsChans, ok := ing.outEventsChans[peerID] if !ok { - log.Warnw("Advertisement processed notification already cancelled", "peer", peerID) + log.Warnw("Advertisement processed notification already canceled", "peer", peerID) return } @@ -869,8 +818,9 @@ func (ing *Ingester) RunWorkers(n int) { for n > ing.workerPoolSize { // Start worker. ing.waitForWorkers.Add(1) - go ing.ingestWorker(ing.workersCtx, ing.syncFinishedEvents) + go ing.ingestWorker(ing.workersCtx, ing.syncFinishedEvents, ing.nextWorkerNum) ing.workerPoolSize++ + ing.nextWorkerNum++ } for n < ing.workerPoolSize { // Stop worker. @@ -883,57 +833,93 @@ func (ing *Ingester) RunWorkers(n int) { // advertisement chain has been synced. Provider work assignments are created // from raw advertisement chains, and processed by ingestWorker. Work // assignments are processed preferentially over new advertisement chains. -func (ing *Ingester) ingestWorker(ctx context.Context, syncFinishedEvents <-chan dagsync.SyncFinished) { - log.Debug("started ingest worker") +func (ing *Ingester) ingestWorker(ctx context.Context, syncFinishedEvents <-chan dagsync.SyncFinished, wkrNum int) { + log := log.With("worker", wkrNum) + + log.Info("started ingest worker") defer ing.waitForWorkers.Done() for { - // Wait for work only. Work assignments take priority over new - // advertisement chains. + log.Debug("ingest worker waiting for event") select { - case provider := <-ing.workReady: - ing.handleWorkReady(ctx, provider) + case event, ok := <-syncFinishedEvents: + if !ok { + log.Info("ingest worker exiting, sync finished events closed") + return + } + + pubID := event.PeerID + + if event.Err != nil { + provID, ok := ing.reg.ProviderByPublisher(pubID) + if ok { + log.Debug("Setting last error for provider", "provider", provID, "publisher", pubID) + ing.reg.SetLastError(provID, event.Err) + } + continue + } + + log.Debugw("ingest worker processing raw ad chain", "publisher", pubID) + + if ing.putNextSyncFin(event) { + // Ad chain is already being processed. + log.Debugw("Ad chain already being processed, queued ad chain event", "publisher", pubID) + continue + } + + stats.Record(ctx, metrics.AdIngestActive.M(int64(ing.workersActive.Add(1)))) + + for syncFin := ing.getNextSyncFin(pubID); syncFin != nil; syncFin = ing.getNextSyncFin(pubID) { + ing.processRawAdChain(ctx, *syncFin, wkrNum) + } + + stats.Record(ctx, metrics.AdIngestActive.M(int64(ing.workersActive.Add(-1)))) case <-ctx.Done(): log.Info("ingest worker canceled") return case <-ing.stopWorker: - log.Debug("stopped ingest worker") + log.Info("ingest worker stopped") return - default: - // No work assignments, so also check for new advertisement chains. - select { - case provider := <-ing.workReady: - ing.handleWorkReady(ctx, provider) - case event, ok := <-syncFinishedEvents: - if !ok { - log.Info("ingest worker exiting, sync finished events closed") - return - } - if event.Err != nil { - ing.reg.SetLastError(event.PeerID, event.Err) - continue - } - ing.processRawAdChain(ctx, event) - case <-ctx.Done(): - log.Info("ingest worker canceled") - return - case <-ing.stopWorker: - log.Debug("stopped ingest worker") - return - } } } } +func (ing *Ingester) putNextSyncFin(event dagsync.SyncFinished) bool { + pubID := event.PeerID + + ing.syncInProgressMu.Lock() + defer ing.syncInProgressMu.Unlock() + + _, inProgress := ing.syncInProgress[pubID] + ing.syncInProgress[pubID] = &event + + return inProgress +} + +func (ing *Ingester) getNextSyncFin(pubID peer.ID) *dagsync.SyncFinished { + ing.syncInProgressMu.Lock() + defer ing.syncInProgressMu.Unlock() + + syncFin := ing.syncInProgress[pubID] + if syncFin == nil { + delete(ing.syncInProgress, pubID) + } else { + ing.syncInProgress[pubID] = nil + } + + return syncFin +} + // processRawAdChain processes a raw advertisement chain from a publisher to // create provider work assignments. When not workers are working on a work // assignment for a provider, then workers are given the next work assignment // for that provider. -func (ing *Ingester) processRawAdChain(ctx context.Context, syncFinished dagsync.SyncFinished) { +func (ing *Ingester) processRawAdChain(ctx context.Context, syncFinished dagsync.SyncFinished, wkrNum int) { if syncFinished.Count == 0 { // Attempted sync, but already up to data. Nothing to do. return } + publisher := syncFinished.PeerID log := log.With("publisher", publisher) log.Infow("Advertisement chain synced", "length", syncFinished.Count) @@ -968,13 +954,14 @@ func (ing *Ingester) processRawAdChain(ctx context.Context, syncFinished dagsync if processed { // This ad has been processed so all earlier ads already have been // processed. + log.Infow("Remainder of ad chain already processed", "cid", c) break } ad, err := ing.loadAd(c) if err != nil { stats.Record(context.Background(), metrics.AdLoadError.M(1)) - log.Errorw("Failed to load advertisement CID, skipping", "cid", c, "err", err) + log.Errorw("Failed to load advertisement CID, skipping all remaining", "cid", c, "err", err) break } if ad.PreviousID != nil { @@ -986,7 +973,7 @@ func (ing *Ingester) processRawAdChain(ctx context.Context, syncFinished dagsync providerID, err := peer.Decode(ad.Provider) if err != nil { - log.Errorf("Failed to get provider from ad CID: %s skipping", err) + log.Errorw("Failed to get provider from ad CID, skipping", "cid", c, "err", err) continue } // If this is the first ad for this provider, then save the provider @@ -994,6 +981,7 @@ func (ing *Ingester) processRawAdChain(ctx context.Context, syncFinished dagsync _, ok := provAddrs[providerID] if !ok && len(ad.Addresses) != 0 { provAddrs[providerID] = ad.Addresses + log.Debugw("New provider seen in ad stack", "provider", providerID) } ai := adInfo{ @@ -1011,134 +999,76 @@ func (ing *Ingester) processRawAdChain(ctx context.Context, syncFinished dagsync } adsGroupedByProvider[providerID] = append(adsGroupedByProvider[providerID], ai) + if totalAds%1000 == 0 { + log.Debugf("Added %d ads to stack", totalAds) + } } nonRmCount := totalAds - rmCount + log.Debugw("Created ad stack", "providers", len(adsGroupedByProvider), "ads", totalAds, "rmCount", rmCount) + stats.Record(ctx, - metrics.RemoveAdCount.M(totalRmAds.Add(rmCount)), - metrics.NonRemoveAdCount.M(totalNonRmAds.Add(nonRmCount))) + metrics.RemoveAdCount.M(ing.totalRmAds.Add(rmCount)), + metrics.NonRemoveAdCount.M(ing.totalNonRmAds.Add(nonRmCount))) // 2. For each provider put the ad stack to the worker msg channel. Each ad // stack contains ads for a single provider, from a single publisher. for providerID, adInfos := range adsGroupedByProvider { - ing.providersBeingProcessedMu.Lock() - if _, ok := ing.providersBeingProcessed[providerID]; !ok { - ing.providersBeingProcessed[providerID] = make(chan struct{}, 1) - } - wa, ok := ing.providerWorkAssignment[providerID] - if !ok { - wa = &atomic.Value{} - ing.providerWorkAssignment[providerID] = wa - } - ing.providersBeingProcessedMu.Unlock() - - oldAssignment := wa.Swap(workerAssignment{ - adInfos: adInfos, - addresses: provAddrs[providerID], - publisher: publisher, - provider: providerID, - }) - if oldAssignment == nil || oldAssignment.(workerAssignment).none { - // No previous run scheduled a worker to handle this provider, so - // schedule one. - ing.reg.Saw(providerID, false) - - go func(provID peer.ID) { - ing.providersBeingProcessedMu.Lock() - provBusy := ing.providersBeingProcessed[provID] - ing.providersBeingProcessedMu.Unlock() - - stats.Record(ctx, - metrics.AdIngestQueued.M(int64(workersQueued.Add(1)))) - // Wait until the no workers are doing work for the provider - // before notifying that another work assignment is available. - select { - case provBusy <- struct{}{}: - case <-ctx.Done(): - return - } - select { - case ing.workReady <- provID: - case <-ctx.Done(): - return - } - }(providerID) + ing.providersBusyMu.Lock() + if _, ok := ing.providersBusy[providerID]; ok { + ing.providersBusyMu.Unlock() + log.Errorw("Other worker already ingesting for same provider. Provider ad chain may by published at multiple locations.", "provider", providerID) + return } - // If oldAssignment has adInfos, it is not necessary to merge the old - // and new assignments because the new assignment will already have all - // the adInfos that the old assignment does. If the old assignment was - // not processed yet, then the sync that created the new assignment - // would have traversed the same chain as the old. In other words, any - // existing old assignment is always a subset of a new assignment. - } -} + ing.providersBusy[providerID] = struct{}{} + ing.providersBusyMu.Unlock() -func (ing *Ingester) handleWorkReady(ctx context.Context, provider peer.ID) { - ing.providersBeingProcessedMu.Lock() - provBusy := ing.providersBeingProcessed[provider] - // Pull out the assignment for this provider, which was populated by processAdChain. - wa := ing.providerWorkAssignment[provider] - ing.providersBeingProcessedMu.Unlock() + ing.reg.Saw(providerID, false) + ing.ingestWorkerLogic(ctx, providerID, publisher, provAddrs[providerID], adInfos, wkrNum) - stats.Record(context.Background(), - metrics.AdIngestQueued.M(int64(workersQueued.Add(-1))), - metrics.AdIngestActive.M(int64(workersActive.Add(1)))) - - assignmentInterface := wa.Swap(workerAssignment{none: true}) - if assignmentInterface != nil { - ing.ingestWorkerLogic(ctx, provider, assignmentInterface.(workerAssignment)) + ing.providersBusyMu.Lock() + delete(ing.providersBusy, providerID) + ing.providersBusyMu.Unlock() } - - ing.handlePendingAnnounce(ctx, provider) - - // Signal that the worker is done with the provider. - <-provBusy - - stats.Record(ctx, metrics.AdIngestActive.M(int64(workersActive.Add(-1)))) } -func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID, assignment workerAssignment) { - if assignment.none { - // Nothing to do. - return - } - frozen := ing.reg.Frozen() - - log := log.With("publisher", assignment.publisher) +func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher peer.ID, addresses []string, adInfos []adInfo, wkrNum int) { + log := log.With("publisher", publisher, "worker", wkrNum) // Log provider ID if not the same as publisher ID. - if provider != assignment.publisher { + if provider != publisher { log = log.With("provider", provider) } headProvider := peer.AddrInfo{ ID: provider, - Addrs: stringsToMultiaddrs(assignment.addresses), + Addrs: stringsToMultiaddrs(addresses), } - headAdCid := assignment.adInfos[0].cid + headAdCid := adInfos[0].cid - if ing.mirror.canWrite() && !assignment.adInfos[0].resync { + if ing.mirror.canWrite() && !adInfos[0].resync { _, err := ing.mirror.writeHead(ctx, headAdCid, provider) if err != nil { log.Errorw("Cannot write publisher head", "err", err) } } + frozen := ing.reg.Frozen() skip500EntsErr := ing.skip500EntsErr.Load() - total := len(assignment.adInfos) + total := len(adInfos) log.Infow("Running worker on ad stack", "headAdCid", headAdCid, "numAdsToProcess", total) var count int - for i := len(assignment.adInfos) - 1; i >= 0; i-- { + for i := len(adInfos) - 1; i >= 0; i-- { // Note that iteration proceeds backwards here. Earliest to newest. - ai := assignment.adInfos[i] - assignment.adInfos[i] = adInfo{} // Clear the adInfo to free memory. + ai := adInfos[i] + adInfos[i] = adInfo{} // Clear the adInfo to free memory. count++ if ctx.Err() != nil { log.Infow("Ingest worker canceled while processing ads", "err", ctx.Err()) ing.inEvents <- adProcessedEvent{ - publisher: assignment.publisher, + publisher: publisher, headAdCid: headAdCid, adCid: ai.cid, err: ctx.Err(), @@ -1161,7 +1091,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID, as "adCid", ai.cid, "progress", fmt.Sprintf("%d of %d", count, total)) - if markErr := ing.markAdProcessed(assignment.publisher, ai.cid, frozen, false); markErr != nil { + if markErr := ing.markAdProcessed(publisher, ai.cid, frozen, false); markErr != nil { log.Errorw("Failed to mark ad as processed", "err", markErr) } // Do not write removed ads to mirror. They are not read during @@ -1169,7 +1099,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID, as // Distribute the atProcessedEvent notices to waiting Sync calls. ing.inEvents <- adProcessedEvent{ - publisher: assignment.publisher, + publisher: publisher, headAdCid: headAdCid, adCid: ai.cid, } @@ -1182,7 +1112,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID, as "progress", fmt.Sprintf("%d of %d", count, total), "lag", lag) - hasEnts, fromMirror, err := ing.ingestAd(ctx, assignment.publisher, ai.cid, ai.resync, frozen, lag, headProvider) + hasEnts, fromMirror, err := ing.ingestAd(ctx, publisher, ai.cid, ai.resync, frozen, lag, headProvider) if err != nil { var adIngestErr adIngestError if errors.As(err, &adIngestErr) { @@ -1220,7 +1150,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID, as // Tell anyone waiting that the sync finished for this head because // of error. TODO(mm) would be better to propagate the error. ing.inEvents <- adProcessedEvent{ - publisher: assignment.publisher, + publisher: publisher, headAdCid: headAdCid, adCid: ai.cid, err: err, @@ -1235,7 +1165,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID, as ing.reg.SetLastError(provider, nil) putMirror := hasEnts && ing.mirror.canWrite() - if markErr := ing.markAdProcessed(assignment.publisher, ai.cid, frozen, putMirror); markErr != nil { + if markErr := ing.markAdProcessed(publisher, ai.cid, frozen, putMirror); markErr != nil { log.Errorw("Failed to mark ad as processed", "err", markErr) } @@ -1267,36 +1197,9 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID, as // Distribute the atProcessedEvent notices to waiting Sync calls. ing.inEvents <- adProcessedEvent{ - publisher: assignment.publisher, + publisher: publisher, headAdCid: headAdCid, adCid: ai.cid, } } } - -func (ing *Ingester) handlePendingAnnounce(ctx context.Context, pubID peer.ID) { - if ctx.Err() != nil { - return - } - log := log.With("publisher", pubID) - // Process pending announce request if any. - // Note that the pending announce is deleted regardless of whether it was successfully - // processed or not. Because, the cause of failure may be non-recoverable e.g. address - // change and not removing it will block processing of future pending announces. - v, found := ing.providersPendingAnnounce.LoadAndDelete(pubID) - if !found { - return - } - pa, ok := v.(pendingAnnounce) - if !ok { - log.Errorw("Cannot handle pending announce; unexpected type", "got", v) - return - } - log = log.With("cid", pa.nextCid, "addrinfo", pa.addrInfo) - err := ing.sub.Announce(ctx, pa.nextCid, pa.addrInfo) - if err != nil { - log.Errorw("Failed to handle pending announce", "err", err) - return - } - log.Info("Successfully handled pending announce") -} diff --git a/internal/ingest/ingest_test.go b/internal/ingest/ingest_test.go index 512f0151c..655346dd8 100644 --- a/internal/ingest/ingest_test.go +++ b/internal/ingest/ingest_test.go @@ -46,8 +46,8 @@ import ( ) const ( - testRetryInterval = 2 * time.Second - testRetryTimeout = 15 * time.Second + testRetryInterval = 100 * time.Millisecond + testRetryTimeout = 5 * time.Second testEntriesChunkCount = 3 testEntriesChunkSize = 15 @@ -752,11 +752,11 @@ func TestRmWithNoEntries(t *testing.T) { _, err = te.ingester.Sync(ctx, peerInfo, 0, false) require.NoError(t, err) var lcid cid.Cid - requireTrueEventually(t, func() bool { + require.Eventually(t, func() bool { lcid, err = te.ingester.GetLatestSync(te.pubHost.ID()) require.NoError(t, err) return chainHead.(cidlink.Link).Cid == lcid - }, testRetryInterval, testRetryTimeout, "Expected %s but got %s", chainHead, lcid) + }, testRetryTimeout, testRetryInterval, "Expected %s but got %s", chainHead, lcid) allMhs := typehelpers.AllMultihashesFromAdChain(t, prevAd, te.publisherLinkSys) @@ -806,11 +806,11 @@ func TestSync(t *testing.T) { lcid := lnk.(cidlink.Link).Cid require.Equal(t, lcid, c1) // Check that latest sync recorded in datastore - requireTrueEventually(t, func() bool { + require.Eventually(t, func() bool { lcid, err = i.GetLatestSync(pubHost.ID()) require.NoError(t, err) return c1.Equals(lcid) - }, testRetryInterval, testRetryTimeout, "Expected %s but got %s", c1, lcid) + }, testRetryTimeout, testRetryInterval, "Expected %s but got %s", c1, lcid) // Checking providerID, since that was what was put in the advertisement, not pubhost.ID() requireIndexedEventually(t, i.indexer, providerID, mhs) @@ -1409,16 +1409,16 @@ func TestMultiplePublishers(t *testing.T) { requireIndexedEventually(t, i.indexer, pubHost2.ID(), mhs) // Assert that the latest processed ad cid eventually matches the expected cid. - requireTrueEventually(t, func() bool { + require.Eventually(t, func() bool { gotLatestSync, err := i.GetLatestSync(pubHost1.ID()) require.NoError(t, err) return headAd1Cid.Equals(gotLatestSync) - }, testRetryInterval, testRetryTimeout, "Expected latest processed ad cid to be headAd1 for publisher 1.") - requireTrueEventually(t, func() bool { + }, testRetryTimeout, testRetryInterval, "Expected latest processed ad cid to be headAd1 for publisher 1.") + require.Eventually(t, func() bool { gotLatestSync, err := i.GetLatestSync(pubHost2.ID()) require.NoError(t, err) return headAd2Cid.Equals(gotLatestSync) - }, testRetryInterval, testRetryTimeout, "Expected latest processed ad cid to be headAd2 for publisher 2.") + }, testRetryTimeout, testRetryInterval, "Expected latest processed ad cid to be headAd2 for publisher 2.") // Assert that getting the latest synced from dagsync publisher matches the // latest processed. @@ -1455,7 +1455,7 @@ func TestAnnounceIsDeferredWhenProcessingAd(t *testing.T) { ID: te.publisher.ID(), Addrs: te.publisher.Addrs(), } - // Instantiate a sync + wait := make(chan cid.Cid, 1) go func() { syncCid, err := te.ingester.Sync(context.Background(), peerInfo, 0, false) @@ -1468,39 +1468,38 @@ func TestAnnounceIsDeferredWhenProcessingAd(t *testing.T) { // Asset that the head ad multihash is not indexed. requireNotIndexed(t, te.ingester.indexer, te.pubHost.ID(), mhs[3:]) + require.Equal(t, 1, int(te.ingester.workersActive.Load())) + // Announce an ad CID and assert that call to announce is deferred since // we have blocked the processing. ad2Cid := ads[2].(cidlink.Link).Cid + // Blocked in sync handler. err := te.ingester.Announce(context.Background(), ad2Cid, pubAddrInfo) require.NoError(t, err) - gotPendingAnnounce, found := te.ingester.providersPendingAnnounce.Load(te.pubHost.ID()) - require.True(t, found) - require.Equal(t, pendingAnnounce{ - addrInfo: pubAddrInfo, - nextCid: ad2Cid, - }, gotPendingAnnounce) + // Verify sync has not completed. + require.Eventually(t, func() bool { + te.ingester.syncInProgressMu.Lock() + _, found := te.ingester.syncInProgress[pubAddrInfo.ID] + te.ingester.syncInProgressMu.Unlock() + return found + }, testRetryTimeout, testRetryInterval) - // Announce another CID and assert the pending announce is updated to the latest announced CID. ad3Cid := ads[3].(cidlink.Link).Cid + // Blocked waiting for previous async ad chain sync. err = te.ingester.Announce(context.Background(), ad3Cid, pubAddrInfo) require.NoError(t, err) - gotPendingAnnounce, found = te.ingester.providersPendingAnnounce.Load(te.pubHost.ID()) - require.True(t, found) - require.Equal(t, pendingAnnounce{ - addrInfo: pubAddrInfo, - nextCid: ad3Cid, - }, gotPendingAnnounce) // Unblock the processing and assert that everything is indexed. <-hitBlockedRead require.Equal(t, headCid, <-wait) requireIndexedEventually(t, te.ingester.indexer, te.pubHost.ID(), mhs) - // Assert that there is no pending announce. - requireTrueEventually(t, func() bool { - _, found := te.ingester.providersPendingAnnounce.Load(te.pubHost.ID()) + require.Eventually(t, func() bool { + te.ingester.syncInProgressMu.Lock() + _, found := te.ingester.syncInProgress[te.pubHost.ID()] + te.ingester.syncInProgressMu.Unlock() return !found - }, testRetryInterval, testRetryTimeout, "Expected the pending announce to have been processed") + }, testRetryTimeout, testRetryInterval, "Expected the pending announce to have been processed") } func TestAnnounceIsNotDeferredOnNoInProgressIngest(t *testing.T) { @@ -1519,13 +1518,21 @@ func TestAnnounceIsNotDeferredOnNoInProgressIngest(t *testing.T) { // Announce the head ad CID. err := te.ingester.Announce(context.Background(), headCid, pubAddrInfo) require.NoError(t, err) - // Assert that there is no pending announce. - _, found := te.ingester.providersPendingAnnounce.Load(te.pubHost.ID()) - require.False(t, found) // Assert that all multihashes in ad chain are indexed eventually, since Announce triggers // a background sync and should eventually process the ads. requireIndexedEventually(t, te.ingester.indexer, te.pubHost.ID(), mhs) + + require.Eventually(t, func() bool { + te.ingester.syncInProgressMu.Lock() + _, found := te.ingester.syncInProgress[te.pubHost.ID()] + te.ingester.syncInProgressMu.Unlock() + return !found + }, testRetryTimeout, testRetryInterval) + + require.Eventually(t, func() bool { + return te.ingester.workersActive.Load() == 0 + }, testRetryTimeout, testRetryInterval) } func TestAnnounceArrivedJustBeforeEntriesProcessingStartsDoesNotDeadlock(t *testing.T) { @@ -1559,7 +1566,9 @@ func TestAnnounceArrivedJustBeforeEntriesProcessingStartsDoesNotDeadlock(t *test // Assert that there is no announce pending processing since no explicit announce was made to // storetheindex ingester. - _, found := te.ingester.providersPendingAnnounce.Load(te.pubHost.ID()) + te.ingester.syncInProgressMu.Lock() + _, found := te.ingester.syncInProgress[te.pubHost.ID()] + te.ingester.syncInProgressMu.Unlock() require.False(t, found) // This sleep is required to make sure that the messages arrive to the blocking channel in the intended order @@ -1568,15 +1577,12 @@ func TestAnnounceArrivedJustBeforeEntriesProcessingStartsDoesNotDeadlock(t *test // Block head ad which should block explicit Announce call made to the ingester. blockedReads.add(headCid) - // Make an explicit announcement of head ad and assert that it was handled immediately since - // there is no in-progress entries processing yet; remember ad sync triggered by - // publisher.UpdateRoot is still blocked. - // Note that the background handling of the announce should get blocked since headCid is also - // in the block list. + // Make an explicit announcement of head ad while ad sync triggered by + // publisher.UpdateRoot is still blocked. Note that the background handling + // of the announce should get blocked since headCid is also in the block + // list. err = te.ingester.Announce(context.Background(), headCid, pubAddrInfo) require.NoError(t, err) - _, found = te.ingester.providersPendingAnnounce.Load(te.pubHost.ID()) - require.False(t, found) // Unblock the sync triggered by publisher.UpdateRoot which should: // 1. cause the ad chain C->B->A to be downloaded. @@ -2018,28 +2024,9 @@ func checkAllIndexed(ix indexer.Interface, p peer.ID, mhs []multihash.Multihash) func requireIndexedEventually(t *testing.T, ix indexer.Interface, p peer.ID, mhs []multihash.Multihash) { t.Helper() - requireTrueEventually(t, func() bool { + require.Eventually(t, func() bool { return checkAllIndexed(ix, p, mhs) == nil - }, testRetryInterval, testRetryTimeout, "Expected all multihashes from %s to have been indexed eventually", p.String()) -} - -func requireTrueEventually(t *testing.T, attempt func() bool, interval time.Duration, timeout time.Duration, msgAndArgs ...interface{}) { - t.Helper() - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - ticker := time.NewTicker(interval) - defer ticker.Stop() - for { - if attempt() { - return - } - select { - case <-ctx.Done(): - require.FailNow(t, "timed out awaiting eventual success", msgAndArgs...) - return - case <-ticker.C: - } - } + }, testRetryTimeout, testRetryInterval, "Expected all multihashes from %s to have been indexed eventually", p.String()) } type testEnv struct { diff --git a/internal/ingest/invalid_mh_ingest_test.go b/internal/ingest/invalid_mh_ingest_test.go index 448680000..b04c141aa 100644 --- a/internal/ingest/invalid_mh_ingest_test.go +++ b/internal/ingest/invalid_mh_ingest_test.go @@ -42,15 +42,15 @@ func TestInvalidMultihashesAreNotIngested(t *testing.T) { require.Equal(t, headAdCid, gotHeadAd, "Expected latest synced cid to match head of ad chain") - requireTrueEventually(t, func() bool { + require.Eventually(t, func() bool { return checkAllIndexed(subject.indexer, pubInfo.ID, validMhs) == nil - }, testRetryInterval, testRetryTimeout, "Expected only valid multihashes to be indexed") + }, testRetryTimeout, testRetryInterval, "Expected only valid multihashes to be indexed") - requireTrueEventually(t, func() bool { + require.Eventually(t, func() bool { latestSync, err := subject.GetLatestSync(pubInfo.ID) require.NoError(t, err) return latestSync.Equals(headAdCid) - }, testRetryInterval, testRetryTimeout, "Expected all ads from publisher to have been indexed") + }, testRetryTimeout, testRetryInterval, "Expected all ads from publisher to have been indexed") // Assert valid multihash indices correspond to the expected provider. for _, mh := range validMhs { diff --git a/internal/ingest/seg_sync_test.go b/internal/ingest/seg_sync_test.go index e99a7a00e..109daac3c 100644 --- a/internal/ingest/seg_sync_test.go +++ b/internal/ingest/seg_sync_test.go @@ -53,13 +53,13 @@ func TestAdsSyncedViaSegmentsAreProcessed(t *testing.T) { require.NoError(t, err) require.Equal(t, headAdCid, gotHeadAd, "Expected latest synced cid to match head of ad chain") - requireTrueEventually(t, func() bool { + require.Eventually(t, func() bool { return checkAllIndexed(subject.indexer, pubInfo.ID, mhs) == nil - }, testRetryInterval, testRetryTimeout, "Expected all ads from publisher to have been indexed.") + }, testRetryTimeout, testRetryInterval, "Expected all ads from publisher to have been indexed.") - requireTrueEventually(t, func() bool { + require.Eventually(t, func() bool { latestSync, err := subject.GetLatestSync(pubInfo.ID) require.NoError(t, err) return latestSync.Equals(headAdCid) - }, testRetryInterval, testRetryTimeout, "Expected all ads from publisher to have been indexed.") + }, testRetryTimeout, testRetryInterval, "Expected all ads from publisher to have been indexed.") } diff --git a/internal/metrics/server.go b/internal/metrics/server.go index 7a0d38812..51d9334d7 100644 --- a/internal/metrics/server.go +++ b/internal/metrics/server.go @@ -25,7 +25,6 @@ var ( IngestChange = stats.Int64("ingest/change", "Number of syncAdEntries started", stats.UnitDimensionless) AdIngestLatency = stats.Float64("ingest/adsynclatency", "latency of syncAdEntries completed successfully", stats.UnitDimensionless) AdIngestErrorCount = stats.Int64("ingest/adingestError", "Number of errors encountered while processing an ad", stats.UnitDimensionless) - AdIngestQueued = stats.Int64("ingest/adingestqueued", "Number of queued advertisements", stats.UnitDimensionless) AdIngestActive = stats.Int64("ingest/adactive", "Active ingest workers", stats.UnitDimensionless) AdIngestSuccessCount = stats.Int64("ingest/adingestSuccess", "Number of successful ad ingest", stats.UnitDimensionless) AdIngestSkippedCount = stats.Int64("ingest/adingestSkipped", "Number of ads skipped during ingest", stats.UnitDimensionless) @@ -66,10 +65,6 @@ var ( Aggregation: view.Count(), TagKeys: []tag.Key{ErrKind}, } - adIngestQueued = &view.View{ - Measure: AdIngestQueued, - Aggregation: view.Count(), - } adIngestActive = &view.View{ Measure: AdIngestActive, Aggregation: view.Distribution(0, 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024), @@ -112,7 +107,6 @@ func Start(views []*view.View) http.Handler { entriesSyncLatencyView, adIngestLatencyView, adIngestError, - adIngestQueued, adIngestActive, adIngestSkipped, adIngestSuccess, diff --git a/internal/registry/registry.go b/internal/registry/registry.go index 12cd353f2..72133da97 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -1054,6 +1054,24 @@ func (r *Registry) RemoveProvider(ctx context.Context, providerID peer.ID) error return nil } +func (r *Registry) ProviderByPublisher(pubID peer.ID) (peer.ID, bool) { + r.provMutex.Lock() + defer r.provMutex.Unlock() + + pinfo, ok := r.providers[pubID] + if ok && pinfo.Publisher == pubID { + return pubID, true + } + + for provID, pinfo := range r.providers { + if pinfo.Publisher == pubID { + return provID, true + } + } + + return "", false +} + func (r *Registry) SetLastError(providerID peer.ID, err error) { var now time.Time if err != nil {