Skip to content

Commit

Permalink
crawler: add stream error to stop the stream
Browse files Browse the repository at this point in the history
  • Loading branch information
TristanCacqueray committed Jan 3, 2024
1 parent 4d2bf8a commit 23c4def
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/Macroscope/Test.hs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ testCrawlingPoint = do
Macroscope.runStream apiKey indexName (CrawlerName crawlerName) (Macroscope.Changes badStream)

(currentOldestAge, _) <- getOldest
liftIO $ assertBool "Commit date is updated on failure" (currentOldestAge > oldestAge)
liftIO $ assertEqual "Commit date is not updated on failure" oldestAge currentOldestAge

-- Check that the error got indexed
errorResponse <- crawlerErrors client (CrawlerPB.ErrorsRequest (from indexName) "from:2020")
Expand Down
29 changes: 21 additions & 8 deletions src/Macroscope/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,14 @@ data DocumentType
| DTError CrawlerError
deriving (Generic, ToJSON)

data ProcessError es
= CommitError Text
| AddError Text
-- | ProcessError are produced by the processStream.
data ProcessError
= -- | Monocle crawler commit API failed
CommitError Text
| -- | Monocle crawler add API failed
AddError Text
| -- | External API failed
StreamError LentilleError

-- | 'processStream' read the stream of document and post to the monocle API
processStream ::
Expand All @@ -90,7 +95,7 @@ processStream ::
-- | The stream of documents to read
LentilleStream es DocumentType ->
-- | The processing results
Eff es [Maybe (ProcessError es)]
Eff es [Maybe ProcessError]
processStream logFunc postFunc = go (0 :: Word) [] []
where
go count acc results stream = do
Expand All @@ -105,12 +110,19 @@ processStream logFunc postFunc = go (0 :: Word) [] []
let doc = case edoc of
Right x -> x
Left err -> DTError $ toCrawlerError err
let addStreamError :: [Maybe ProcessError] -> [Maybe ProcessError]
addStreamError = case edoc of
Right _ -> id
-- This is likely an error we can't recover, so don't add stream error
Left (LentilleError _ (PartialErrors _)) -> id
-- Every other 'LentilleError' are fatal$
Left err -> (Just (StreamError err) :)
let newAcc = doc : acc
if count == 499
then do
res <- processBatch newAcc
go 0 [] (res : results) rest
else go (count + 1) newAcc results rest
go 0 [] (addStreamError (res : results)) rest
else go (count + 1) newAcc (addStreamError results) rest

toCrawlerError (LentilleError ts err) = CrawlerError {..}
where
Expand All @@ -121,7 +133,7 @@ processStream logFunc postFunc = go (0 :: Word) [] []
PageInfoError e -> ("page-info", encodeJSON e)
PartialErrors es -> ("partial", encodeJSON es)

processBatch :: [DocumentType] -> Eff es (Maybe (ProcessError es))
processBatch :: [DocumentType] -> Eff es (Maybe ProcessError)
processBatch [] = pure Nothing
processBatch (reverse -> docs) = do
logFunc (length docs)
Expand Down Expand Up @@ -152,6 +164,7 @@ runStream apiKey indexName crawlerName documentStream = do
forM_ errors \case
AddError err -> logWarn "Could not add documents" ["err" .= err]
CommitError err -> logWarn "Could not commit update date" ["err" .= err]
StreamError err -> logWarn "Stream produced a fatal error" ["err" .= err]

-- | 'runStreamError' is the stream processor
runStreamError ::
Expand All @@ -163,7 +176,7 @@ runStreamError ::
CrawlerName ->
DocumentStream es ->
Word32 ->
Eff es [ProcessError es]
Eff es [ProcessError]
runStreamError startTime apiKey indexName (CrawlerName crawlerName) documentStream offset = go
where
go = do
Expand Down

0 comments on commit 23c4def

Please sign in to comment.