Skip to content

Commit

Permalink
crawler: add stream error to stop the stream
Browse files Browse the repository at this point in the history
  • Loading branch information
TristanCacqueray committed Jan 3, 2024
1 parent 4d2bf8a commit e266c62
Showing 1 changed file with 17 additions and 8 deletions.
25 changes: 17 additions & 8 deletions src/Macroscope/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,14 @@ data DocumentType
| DTError CrawlerError
deriving (Generic, ToJSON)

data ProcessError es
= CommitError Text
| AddError Text
-- | ProcessError are produced by the processStream.
data ProcessError
= -- | Monocle crawler commit API failed
CommitError Text
| -- | Monocle crawler add API failed
AddError Text
| -- | External API failed
StreamError LentilleError

-- | 'processStream' read the stream of document and post to the monocle API
processStream ::
Expand All @@ -90,7 +95,7 @@ processStream ::
-- | The stream of documents to read
LentilleStream es DocumentType ->
-- | The processing results
Eff es [Maybe (ProcessError es)]
Eff es [Maybe ProcessError]
processStream logFunc postFunc = go (0 :: Word) [] []
where
go count acc results stream = do
Expand All @@ -105,12 +110,15 @@ processStream logFunc postFunc = go (0 :: Word) [] []
let doc = case edoc of
Right x -> x
Left err -> DTError $ toCrawlerError err
let maybeStopStream = case edoc of
Left err@(LentilleError _ (PageInfoError _)) -> (Just (StreamError err) :)
_ -> id
let newAcc = doc : acc
if count == 499
then do
res <- processBatch newAcc
go 0 [] (res : results) rest
else go (count + 1) newAcc results rest
go 0 [] (maybeStopStream (res : results)) rest
else go (count + 1) newAcc (maybeStopStream results) rest

toCrawlerError (LentilleError ts err) = CrawlerError {..}
where
Expand All @@ -121,7 +129,7 @@ processStream logFunc postFunc = go (0 :: Word) [] []
PageInfoError e -> ("page-info", encodeJSON e)
PartialErrors es -> ("partial", encodeJSON es)

processBatch :: [DocumentType] -> Eff es (Maybe (ProcessError es))
processBatch :: [DocumentType] -> Eff es (Maybe ProcessError)
processBatch [] = pure Nothing
processBatch (reverse -> docs) = do
logFunc (length docs)
Expand Down Expand Up @@ -152,6 +160,7 @@ runStream apiKey indexName crawlerName documentStream = do
forM_ errors \case
AddError err -> logWarn "Could not add documents" ["err" .= err]
CommitError err -> logWarn "Could not commit update date" ["err" .= err]
StreamError err -> logWarn "Stream produced a fatal error" ["err" .= err]

-- | 'runStreamError' is the stream processor
runStreamError ::
Expand All @@ -163,7 +172,7 @@ runStreamError ::
CrawlerName ->
DocumentStream es ->
Word32 ->
Eff es [ProcessError es]
Eff es [ProcessError]
runStreamError startTime apiKey indexName (CrawlerName crawlerName) documentStream offset = go
where
go = do
Expand Down

0 comments on commit e266c62

Please sign in to comment.