Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect error in the index #1097

Merged
merged 28 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3d41e20
index: add error document to the index
TristanCacqueray Dec 20, 2023
0cd8e46
api: add error document type to the add doc endpoint
TristanCacqueray Dec 20, 2023
b389c62
crawler: emit crawler error when processing stream
TristanCacqueray Dec 20, 2023
f9357eb
api: add error indexing
TristanCacqueray Dec 20, 2023
1d808a0
index: add error created_at attribute
TristanCacqueray Dec 21, 2023
2475a4e
crawler: base64 encode json blob
TristanCacqueray Dec 21, 2023
5cac8e3
test: fix the macroscope failure test
TristanCacqueray Dec 21, 2023
303dae2
api: add crawler/errors endpoint to fetch errors
TristanCacqueray Dec 21, 2023
5e660b0
index: store the entity and timestamp in the errors_data structure
TristanCacqueray Dec 21, 2023
37656c9
test: verify the indexed error content
TristanCacqueray Dec 22, 2023
ee8a869
crawler: continue processing even when there are decoding errors
TristanCacqueray Dec 22, 2023
38119ec
chore: perform monocle-reformat-run
TristanCacqueray Dec 22, 2023
f179448
api: update dropTime to keep the current hour
TristanCacqueray Dec 22, 2023
4084bff
doc: add example to run a single test
TristanCacqueray Dec 22, 2023
9f040a7
web: add crawler api codegen
TristanCacqueray Dec 22, 2023
eec06d1
web: display crawler errors
TristanCacqueray Dec 22, 2023
4a71ad3
index: encode crawler error body by the api
TristanCacqueray Dec 23, 2023
0ea04b3
index: introduce new type for BinaryText
TristanCacqueray Dec 23, 2023
0f1df8a
index: bump version to apply new mapping
TristanCacqueray Dec 23, 2023
796d395
crawler: improve crawler error representation
TristanCacqueray Dec 24, 2023
9fb3c3b
crawler: introduce error variant for page-info
TristanCacqueray Dec 24, 2023
1731dee
crawler: preserve the original fetch error from morpheus client
TristanCacqueray Dec 24, 2023
32f140a
crawler: handle partial results
TristanCacqueray Dec 24, 2023
4d2bf8a
api: introduce CrawlerErrorList
TristanCacqueray Dec 27, 2023
23c4def
crawler: add stream error to stop the stream
TristanCacqueray Jan 3, 2024
9a60ca4
api: prevent error when submitting empty task data
TristanCacqueray Jan 3, 2024
2ba9fdd
doc: add profiling build instructions
TristanCacqueray Jan 3, 2024
8f03288
Rename PageInfoError to RateLimitInfoError
morucci Jan 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
579 changes: 577 additions & 2 deletions codegen/Monocle/Protob/Crawler.hs

Large diffs are not rendered by default.

50 changes: 50 additions & 0 deletions doc/openapi.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions schemas/monocle/protob/crawler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,29 @@ enum EntityType {
ENTITY_TYPE_USER = 3;
}

message CrawlerError {
string message = 1;
string body = 2;
google.protobuf.Timestamp created_at = 3;
Entity entity = 4;
}

message ErrorsRequest {
string index = 1;
string query = 2;
}

message ErrorsList {
repeated CrawlerError errors = 1;
}

message ErrorsResponse {
oneof result {
ErrorsList success = 1;
string error = 2;
}
}

message AddDocRequest {
string index = 1;
string crawler = 2;
Expand All @@ -45,6 +68,7 @@ message AddDocRequest {
repeated monocle_issue.Issue issues = 10;
// issue_events are added when Entity is project_issue_name
repeated monocle_issue.IssueEvent issue_events = 11;
repeated CrawlerError errors = 12;
}

enum AddDocError {
Expand Down
8 changes: 8 additions & 0 deletions schemas/monocle/protob/http.proto
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ service Crawler {
body: "*"
};
}
// Get crawler errors
rpc Errors(monocle_crawler.ErrorsRequest)
returns (monocle_crawler.ErrorsResponse) {
option (google.api.http) = {
post: "/api/2/crawler/errors"
body: "*"
};
}
}

// The monocle HTTP API
Expand Down
8 changes: 3 additions & 5 deletions src/Lentille.hs
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,10 @@ data GraphQLError = GraphQLError
deriving (Show, Generic, ToJSON)

data LentilleError
= DecodeError [Text]
| GraphError GraphQLError
= DecodeError UTCTime [Text]
| GraphError UTCTime GraphQLError
deriving (Show, Generic, ToJSON)

instance Exception LentilleError

type LentilleStream es a = Stream (Of (Either LentilleError a)) (Eff es) ()

-------------------------------------------------------------------------------
Expand Down Expand Up @@ -143,7 +141,7 @@ type Changes = (Change, [ChangeEvent])
-- We don't care about the rest so we replace it with ()
-- See: https://hackage.haskell.org/package/streaming-0.2.4.0/docs/Streaming-Prelude.html#v:break
--
-- >>> let stream = S.yield (Left (DecodeError ["oops"]))
-- >>> let stream = S.yield (Left (DecodeError [utctime|2021-05-31 00:00:00|] ["oops"]))
-- >>> runEff $ S.length_ $ streamDropBefore [utctime|2021-05-31 00:00:00|] stream
-- 1
streamDropBefore :: UTCTime -> LentilleStream es Changes -> LentilleStream es Changes
Expand Down
13 changes: 9 additions & 4 deletions src/Lentille/GraphQL.hs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,9 @@ streamFetch client@GraphClient {..} mkArgs StreamFetchOptParams {..} transformRe
Nothing -> pure Nothing

case mErr of
Just err -> S.yield (Left $ GraphError err)
Just err -> do
now <- lift mGetCurrentTime
S.yield (Left $ GraphError now err)
Nothing -> go Nothing 0

go pageInfoM totalFetched = do
Expand All @@ -254,15 +256,18 @@ streamFetch client@GraphClient {..} mkArgs StreamFetchOptParams {..} transformRe

-- Handle the response
case respE of
Left e ->
Left e -> do
-- Yield the error and stop the stream
S.yield (Left $ GraphError e)
now <- lift mGetCurrentTime
S.yield (Left $ GraphError now e)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess here we should try to extract the pageInfo from the error to make the process continue, otherwise I'm not sure the current implementation will be able to resume after the error.

Right (pageInfo, rateLimitM, decodingErrors, xs) -> do
-- Log crawling status
logStep pageInfo rateLimitM xs totalFetched

case decodingErrors of
_ : _ -> S.yield (Left $ DecodeError decodingErrors)
_ : _ -> do
now <- lift mGetCurrentTime
S.yield (Left $ DecodeError now decodingErrors)
[] -> do
-- Yield the results
S.each (Right <$> xs)
Expand Down
21 changes: 16 additions & 5 deletions src/Macroscope/Test.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
-- | Tests for the macroscope process
module Macroscope.Test where

import Data.ByteString.Base64.Lazy qualified as B64
import Effectful.Env
import Effectful.Prometheus
import Effectful.Reader.Static qualified as E
Expand All @@ -17,6 +18,7 @@ import Monocle.Backend.Queries qualified as Q
import Monocle.Backend.Test (fakeChangePB, withTenantConfig)
import Monocle.Backend.Test qualified as BT (fakeChange, fakeDate, fakeDateAlt)
import Monocle.Client
import Monocle.Client.Api (crawlerErrors)
import Monocle.Config qualified as Config
import Monocle.Effects
import Monocle.Entity (CrawlerName (..), Entity (Project))
Expand Down Expand Up @@ -53,9 +55,18 @@ testCrawlingPoint = do
Macroscope.runStream apiKey indexName (CrawlerName crawlerName) (Macroscope.Changes badStream)

(currentOldestAge, _) <- getOldest
liftIO $ assertEqual "Commit date is not updated on failure" oldestAge currentOldestAge
liftIO $ assertBool "Commit date is updated on failure" (currentOldestAge > oldestAge)

Macroscope.runStream apiKey indexName (CrawlerName crawlerName) (Macroscope.Changes goodStream)
-- Check that the error got indexed
errorResponse <- crawlerErrors client (CrawlerPB.ErrorsRequest (from indexName) "from:2020")
case errorResponse of
CrawlerPB.ErrorsResponse (Just (CrawlerPB.ErrorsResponseResultSuccess (CrawlerPB.ErrorsList (toList -> [e])))) -> liftIO do
e.crawlerErrorMessage @?= "decode"
(B64.decode . encodeUtf8 $ e.crawlerErrorBody) @?= Right "[\"Oops\"]"
(from <$> e.crawlerErrorEntity) @?= Just (Project "opendev/neutron")
_ -> error $ "Expected one error, got: " <> show errorResponse

Macroscope.runStream apiKey indexName (CrawlerName crawlerName) (Macroscope.Changes $ goodStream currentOldestAge)

(newOldestAge, _) <- getOldest
liftIO $ assertBool "Commit date updated" (newOldestAge > oldestAge)
Expand All @@ -64,12 +75,12 @@ testCrawlingPoint = do
badStream date name
| date == BT.fakeDateAlt && name == "opendev/neutron" = do
Streaming.yield $ Right (fakeChangePB, [])
Streaming.yield $ Left (DecodeError ["Oops"])
Streaming.yield $ Left (DecodeError BT.fakeDateAlt ["Oops"])
| otherwise = error "Bad crawling point"

-- A document stream that yield a change
goodStream date name
| date == BT.fakeDateAlt && name == "opendev/neutron" = do
goodStream expected date name
| date == expected && name == "opendev/neutron" = do
Streaming.yield $ Right (fakeChangePB, [])
| otherwise = error "Bad crawling point"

Expand Down
35 changes: 13 additions & 22 deletions src/Macroscope/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ streamName = \case
TaskDatas _ -> "TaskDatas"
UserChanges _ -> "UserChanges"

isTDStream :: DocumentStream m -> Bool
isTDStream = \case
TaskDatas _ -> True
_anyOtherStream -> False

-------------------------------------------------------------------------------
-- Adapter between protobuf api and crawler stream
-------------------------------------------------------------------------------
Expand All @@ -78,16 +73,17 @@ data DocumentType
| DTChanges (Change, [ChangeEvent])
| DTTaskData TaskData
| DTIssues (Issue, [IssueEvent])
| DTError CrawlerError
deriving (Generic, ToJSON)

data ProcessError es
= CommitError Text
| AddError Text
| StreamError (LentilleError, LentilleStream es DocumentType)

-- | 'processStream' read the stream of document and post to the monocle API
processStream ::
forall es.
Entity ->
-- | Funtion to log about the processing
(Int -> Eff es ()) ->
-- | Function to post on the Monocle API
Expand All @@ -96,7 +92,7 @@ processStream ::
LentilleStream es DocumentType ->
-- | The processing results
Eff es [Maybe (ProcessError es)]
processStream logFunc postFunc = go (0 :: Word) [] []
processStream entity logFunc postFunc = go (0 :: Word) [] []
where
go count acc results stream = do
eDocument <- S.next stream
Expand All @@ -105,12 +101,12 @@ processStream logFunc postFunc = go (0 :: Word) [] []
-- The end of the stream
res <- processBatch acc
pure $ reverse (res : results)
Right (Left err, rest) -> do
-- An error occured in the stream, abort now
let res = Just (StreamError (err, rest))
TristanCacqueray marked this conversation as resolved.
Show resolved Hide resolved
pure $ reverse (res : results)
Right (Right doc, rest) -> do
Right (edoc, rest) -> do
-- We got a new document
let doc = case edoc of
Right x -> x
Left (DecodeError ts err) -> DTError $ CrawlerError "decode" (encodeBlob err) (Just $ from ts) (Just $ from entity)
TristanCacqueray marked this conversation as resolved.
Show resolved Hide resolved
Left (GraphError ts err) -> DTError $ CrawlerError "graph" (encodeBlob err) (Just $ from ts) (Just $ from entity)
let newAcc = doc : acc
if count == 499
then do
Expand Down Expand Up @@ -149,16 +145,6 @@ runStream apiKey indexName crawlerName documentStream = do
forM_ errors \case
AddError err -> logWarn "Could not add documents" ["err" .= err]
CommitError err -> logWarn "Could not commit update date" ["err" .= err]
StreamError (err, rest) -> do
logWarn "Error occured when consuming the document stream" ["err" .= err]
S.toList_ rest >>= \case
[] -> pure ()
items -> logWarn "Left over documents found after error" ["items" .= items]

-- TODO: explains why TDStream don't support offset?
unless (isTDStream documentStream) do
-- Try the next entity by incrementing the offset
go startTime (offset + 1)

-- | 'runStreamError' is the stream processor
runStreamError ::
Expand Down Expand Up @@ -195,6 +181,7 @@ runStreamError startTime apiKey indexName (CrawlerName crawlerName) documentStre
-- Run the document stream for that entity
postResult <-
processStream
entity
(\c -> logInfo "Posting documents" ["count" .= c])
(httpRetry "api/commit/add" . mCrawlerAddDoc . mkRequest entity)
(getStream oldestAge entity)
Expand Down Expand Up @@ -246,6 +233,7 @@ runStreamError startTime apiKey indexName (CrawlerName crawlerName) documentStre
addDocRequestTaskDatas = V.fromList $ mapMaybe getTD xs
addDocRequestIssues = V.fromList $ mapMaybe getIssue xs
addDocRequestIssueEvents = V.fromList $ concat $ mapMaybe getIssueEvent xs
addDocRequestErrors = V.fromList $ mapMaybe getError xs
in AddDocRequest {..}
where
getIssue = \case
Expand All @@ -266,6 +254,9 @@ runStreamError startTime apiKey indexName (CrawlerName crawlerName) documentStre
getTD = \case
DTTaskData td -> Just td
_ -> Nothing
getError = \case
DTError e -> Just e
_ -> Nothing

-- 'commitTimestamp' post the commit date.
commitTimestamp entity = do
Expand Down
Loading
Loading