Skip to content

Commit

Permalink
crawler: emit crawler error when processing stream
Browse files Browse the repository at this point in the history
  • Loading branch information
TristanCacqueray committed Dec 20, 2023
1 parent 5125006 commit cf41224
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 21 deletions.
6 changes: 6 additions & 0 deletions src/Lentille.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import Monocle.Protob.Change (
Change_ChangeState (Change_ChangeStateClosed, Change_ChangeStateMerged),
Ident (..),
)
import Monocle.Protob.Crawler (CrawlerError (..))
import Network.HTTP.Client qualified as HTTP
import Proto3.Suite (Enumerated (Enumerated))
import Streaming.Prelude qualified as S
Expand Down Expand Up @@ -96,6 +97,11 @@ data LentilleError

instance Exception LentilleError

instance From LentilleError CrawlerError where
from = \case
DecodeError xs -> CrawlerError "decode error" (decodeUtf8 $ encode xs)
GraphError x -> CrawlerError "graph error" (decodeUtf8 $ encode x)

type LentilleStream es a = Stream (Of (Either LentilleError a)) (Eff es) ()

-------------------------------------------------------------------------------
Expand Down
25 changes: 4 additions & 21 deletions src/Macroscope/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ streamName = \case
TaskDatas _ -> "TaskDatas"
UserChanges _ -> "UserChanges"

isTDStream :: DocumentStream m -> Bool
isTDStream = \case
TaskDatas _ -> True
_anyOtherStream -> False

-------------------------------------------------------------------------------
-- Adapter between protobuf api and crawler stream
-------------------------------------------------------------------------------
Expand All @@ -84,7 +79,6 @@ data DocumentType
data ProcessError es
= CommitError Text
| AddError Text
| StreamError (LentilleError, LentilleStream es DocumentType)

-- | 'processStream' read the stream of document and post to the monocle API
processStream ::
Expand All @@ -106,12 +100,11 @@ processStream logFunc postFunc = go (0 :: Word) [] []
-- The end of the stream
res <- processBatch acc
pure $ reverse (res : results)
Right (Left err, rest) -> do
-- An error occured in the stream, abort now
let res = Just (StreamError (err, rest))
pure $ reverse (res : results)
Right (Right doc, rest) -> do
Right (edoc, rest) -> do
-- We got a new document
let doc = case edoc of
Right x -> x
Left err -> DTError $ from err
let newAcc = doc : acc
if count == 499
then do
Expand Down Expand Up @@ -150,16 +143,6 @@ runStream apiKey indexName crawlerName documentStream = do
forM_ errors \case
AddError err -> logWarn "Could not add documents" ["err" .= err]
CommitError err -> logWarn "Could not commit update date" ["err" .= err]
StreamError (err, rest) -> do
logWarn "Error occured when consuming the document stream" ["err" .= err]
S.toList_ rest >>= \case
[] -> pure ()
items -> logWarn "Left over documents found after error" ["items" .= items]

-- TODO: explains why TDStream don't support offset?
unless (isTDStream documentStream) do
-- Try the next entity by incrementing the offset
go startTime (offset + 1)

-- | 'runStreamError' is the stream processor
runStreamError ::
Expand Down

0 comments on commit cf41224

Please sign in to comment.