From cf412241226c3f25d9558be6154c80a283b35cb1 Mon Sep 17 00:00:00 2001 From: Tristan Cacqueray Date: Wed, 20 Dec 2023 20:09:31 +0000 Subject: [PATCH] crawler: emit crawler error when processing stream --- src/Lentille.hs | 6 ++++++ src/Macroscope/Worker.hs | 25 ++++--------------------- 2 files changed, 10 insertions(+), 21 deletions(-) diff --git a/src/Lentille.hs b/src/Lentille.hs index 9e4254a9e..86dbfdeac 100644 --- a/src/Lentille.hs +++ b/src/Lentille.hs @@ -45,6 +45,7 @@ import Monocle.Protob.Change ( Change_ChangeState (Change_ChangeStateClosed, Change_ChangeStateMerged), Ident (..), ) +import Monocle.Protob.Crawler (CrawlerError (..)) import Network.HTTP.Client qualified as HTTP import Proto3.Suite (Enumerated (Enumerated)) import Streaming.Prelude qualified as S @@ -96,6 +97,11 @@ data LentilleError instance Exception LentilleError +instance From LentilleError CrawlerError where + from = \case + DecodeError xs -> CrawlerError "decode error" (decodeUtf8 $ encode xs) + GraphError x -> CrawlerError "graph error" (decodeUtf8 $ encode x) + type LentilleStream es a = Stream (Of (Either LentilleError a)) (Eff es) () ------------------------------------------------------------------------------- diff --git a/src/Macroscope/Worker.hs b/src/Macroscope/Worker.hs index 351dfd96c..cb5fac62a 100644 --- a/src/Macroscope/Worker.hs +++ b/src/Macroscope/Worker.hs @@ -55,11 +55,6 @@ streamName = \case TaskDatas _ -> "TaskDatas" UserChanges _ -> "UserChanges" -isTDStream :: DocumentStream m -> Bool -isTDStream = \case - TaskDatas _ -> True - _anyOtherStream -> False - ------------------------------------------------------------------------------- -- Adapter between protobuf api and crawler stream ------------------------------------------------------------------------------- @@ -84,7 +79,6 @@ data DocumentType data ProcessError es = CommitError Text | AddError Text - | StreamError (LentilleError, LentilleStream es DocumentType) -- | 'processStream' read the stream of document and post to the monocle API processStream :: @@ -106,12 +100,11 @@ processStream logFunc postFunc = go (0 :: Word) [] [] -- The end of the stream res <- processBatch acc pure $ reverse (res : results) - Right (Left err, rest) -> do - -- An error occured in the stream, abort now - let res = Just (StreamError (err, rest)) - pure $ reverse (res : results) - Right (Right doc, rest) -> do + Right (edoc, rest) -> do -- We got a new document + let doc = case edoc of + Right x -> x + Left err -> DTError $ from err let newAcc = doc : acc if count == 499 then do @@ -150,16 +143,6 @@ runStream apiKey indexName crawlerName documentStream = do forM_ errors \case AddError err -> logWarn "Could not add documents" ["err" .= err] CommitError err -> logWarn "Could not commit update date" ["err" .= err] - StreamError (err, rest) -> do - logWarn "Error occured when consuming the document stream" ["err" .= err] - S.toList_ rest >>= \case - [] -> pure () - items -> logWarn "Left over documents found after error" ["items" .= items] - - -- TODO: explains why TDStream don't support offset? - unless (isTDStream documentStream) do - -- Try the next entity by incrementing the offset - go startTime (offset + 1) -- | 'runStreamError' is the stream processor runStreamError ::