diff --git a/src/Macroscope/Test.hs b/src/Macroscope/Test.hs index 95044d4a8..835c7dd93 100644 --- a/src/Macroscope/Test.hs +++ b/src/Macroscope/Test.hs @@ -55,7 +55,7 @@ testCrawlingPoint = do Macroscope.runStream apiKey indexName (CrawlerName crawlerName) (Macroscope.Changes badStream) (currentOldestAge, _) <- getOldest - liftIO $ assertBool "Commit date is updated on failure" (currentOldestAge > oldestAge) + liftIO $ assertEqual "Commit date is not updated on failure" oldestAge currentOldestAge -- Check that the error got indexed errorResponse <- crawlerErrors client (CrawlerPB.ErrorsRequest (from indexName) "from:2020") diff --git a/src/Macroscope/Worker.hs b/src/Macroscope/Worker.hs index e19776a1a..1d668692f 100644 --- a/src/Macroscope/Worker.hs +++ b/src/Macroscope/Worker.hs @@ -76,9 +76,14 @@ data DocumentType | DTError CrawlerError deriving (Generic, ToJSON) -data ProcessError es - = CommitError Text - | AddError Text +-- | ProcessError are produced by the processStream. +data ProcessError + = -- | Monocle crawler commit API failed + CommitError Text + | -- | Monocle crawler add API failed + AddError Text + | -- | External API failed + StreamError LentilleError -- | 'processStream' read the stream of document and post to the monocle API processStream :: @@ -90,7 +95,7 @@ processStream :: -- | The stream of documents to read LentilleStream es DocumentType -> -- | The processing results - Eff es [Maybe (ProcessError es)] + Eff es [Maybe ProcessError] processStream logFunc postFunc = go (0 :: Word) [] [] where go count acc results stream = do @@ -105,12 +110,19 @@ processStream logFunc postFunc = go (0 :: Word) [] [] let doc = case edoc of Right x -> x Left err -> DTError $ toCrawlerError err + let addStreamError :: [Maybe ProcessError] -> [Maybe ProcessError] + addStreamError = case edoc of + Right _ -> id + -- This is likely an error we can't recover, so don't add stream error + Left (LentilleError _ (PartialErrors _)) -> id + -- Every other 'LentilleError' are fatal$ + Left err -> (Just (StreamError err) :) let newAcc = doc : acc if count == 499 then do res <- processBatch newAcc - go 0 [] (res : results) rest - else go (count + 1) newAcc results rest + go 0 [] (addStreamError (res : results)) rest + else go (count + 1) newAcc (addStreamError results) rest toCrawlerError (LentilleError ts err) = CrawlerError {..} where @@ -121,7 +133,7 @@ processStream logFunc postFunc = go (0 :: Word) [] [] PageInfoError e -> ("page-info", encodeJSON e) PartialErrors es -> ("partial", encodeJSON es) - processBatch :: [DocumentType] -> Eff es (Maybe (ProcessError es)) + processBatch :: [DocumentType] -> Eff es (Maybe ProcessError) processBatch [] = pure Nothing processBatch (reverse -> docs) = do logFunc (length docs) @@ -152,6 +164,7 @@ runStream apiKey indexName crawlerName documentStream = do forM_ errors \case AddError err -> logWarn "Could not add documents" ["err" .= err] CommitError err -> logWarn "Could not commit update date" ["err" .= err] + StreamError err -> logWarn "Stream produced a fatal error" ["err" .= err] -- | 'runStreamError' is the stream processor runStreamError :: @@ -163,7 +176,7 @@ runStreamError :: CrawlerName -> DocumentStream es -> Word32 -> - Eff es [ProcessError es] + Eff es [ProcessError] runStreamError startTime apiKey indexName (CrawlerName crawlerName) documentStream offset = go where go = do