diff --git a/src/Macroscope/Worker.hs b/src/Macroscope/Worker.hs index e19776a1a..28491a388 100644 --- a/src/Macroscope/Worker.hs +++ b/src/Macroscope/Worker.hs @@ -76,9 +76,14 @@ data DocumentType | DTError CrawlerError deriving (Generic, ToJSON) -data ProcessError es - = CommitError Text - | AddError Text +-- | ProcessError are produced by the processStream. +data ProcessError + = -- | Monocle crawler commit API failed + CommitError Text + | -- | Monocle crawler add API failed + AddError Text + | -- | External API failed + StreamError LentilleError -- | 'processStream' read the stream of document and post to the monocle API processStream :: @@ -90,7 +95,7 @@ processStream :: -- | The stream of documents to read LentilleStream es DocumentType -> -- | The processing results - Eff es [Maybe (ProcessError es)] + Eff es [Maybe ProcessError] processStream logFunc postFunc = go (0 :: Word) [] [] where go count acc results stream = do @@ -105,12 +110,15 @@ processStream logFunc postFunc = go (0 :: Word) [] [] let doc = case edoc of Right x -> x Left err -> DTError $ toCrawlerError err + let maybeStopStream = case edoc of + Left err@(LentilleError _ (PageInfoError _)) -> (Just (StreamError err) :) + _ -> id let newAcc = doc : acc if count == 499 then do res <- processBatch newAcc - go 0 [] (res : results) rest - else go (count + 1) newAcc results rest + go 0 [] (maybeStopStream (res : results)) rest + else go (count + 1) newAcc (maybeStopStream results) rest toCrawlerError (LentilleError ts err) = CrawlerError {..} where @@ -121,7 +129,7 @@ processStream logFunc postFunc = go (0 :: Word) [] [] PageInfoError e -> ("page-info", encodeJSON e) PartialErrors es -> ("partial", encodeJSON es) - processBatch :: [DocumentType] -> Eff es (Maybe (ProcessError es)) + processBatch :: [DocumentType] -> Eff es (Maybe ProcessError) processBatch [] = pure Nothing processBatch (reverse -> docs) = do logFunc (length docs) @@ -152,6 +160,7 @@ runStream apiKey indexName crawlerName documentStream = do forM_ errors \case AddError err -> logWarn "Could not add documents" ["err" .= err] CommitError err -> logWarn "Could not commit update date" ["err" .= err] + StreamError err -> logWarn "Stream produced a fatal error" ["err" .= err] -- | 'runStreamError' is the stream processor runStreamError :: @@ -163,7 +172,7 @@ runStreamError :: CrawlerName -> DocumentStream es -> Word32 -> - Eff es [ProcessError es] + Eff es [ProcessError] runStreamError startTime apiKey indexName (CrawlerName crawlerName) documentStream offset = go where go = do