Skip to content

Commit

Permalink
Refactor the processStream to use explicit error handling
Browse files Browse the repository at this point in the history
This change simplifies the stream handling by:
- Using an explicit return value instead of a throw exception,
- Handwritten process batch stream consumer.
  • Loading branch information
TristanCacqueray committed Sep 19, 2023
1 parent f65e640 commit abc2b15
Showing 1 changed file with 55 additions and 62 deletions.
117 changes: 55 additions & 62 deletions src/Macroscope/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ import Monocle.Protob.Crawler as CrawlerPB hiding (Entity)
import Monocle.Protob.Issue (Issue, IssueEvent)
import Monocle.Protob.Search (TaskData)
import Proto3.Suite (Enumerated (Enumerated))
import Streaming qualified as S
import Streaming.Prelude qualified as S

import Effectful qualified as E
import Effectful.Reader.Static qualified as E
import Monocle.Effects

Expand Down Expand Up @@ -76,35 +74,52 @@ data DocumentType
| DTIssues (Issue, [IssueEvent])
deriving (Generic, ToJSON)

data ProcessResult = AddOk | AddError Text deriving stock (Show)
data ProcessError es
= CommitError Text
| AddError Text
| StreamError (LentilleError, LentilleStream es DocumentType)

-- | A stream error contains the first Left, and the rest of the stream
type StreamError es = (LentilleError, LentilleStream es DocumentType)

-- | 'process' read the stream of document and post to the monocle API
process ::
-- | 'processStream' read the stream of document and post to the monocle API
processStream ::
forall es.
-- | Funtion to log about the processing
(Int -> Eff es ()) ->
-- | Function to post on the Monocle API
([DocumentType] -> Eff es AddDocResponse) ->
-- | The stream of documents to read
Stream (Of DocumentType) (Eff es) () ->
LentilleStream es DocumentType ->
-- | The processing results
Eff es [ProcessResult]
process logFunc postFunc =
S.toList_
. S.mapM processBatch
. S.mapped S.toList
. S.chunksOf 500
Eff es [Maybe (ProcessError es)]
processStream logFunc postFunc = go (0 :: Word) [] []
where
processBatch :: [DocumentType] -> Eff es ProcessResult
processBatch docs = do
go count acc results stream = do
eDocument <- S.next stream
case eDocument of
Left () -> do
-- The end of the stream
res <- processBatch acc
pure $ reverse (res : results)
Right (Left err, rest) -> do
-- An error occured in the stream, abort now
let res = Just (StreamError (err, rest))
pure $ reverse (res : results)
Right (Right doc, rest) -> do
-- We got a new document
let newAcc = doc : acc
if count == 499
then do
res <- processBatch newAcc
go 0 [] (res : results) rest
else go (count + 1) newAcc results rest

processBatch :: [DocumentType] -> Eff es (Maybe (ProcessError es))
processBatch [] = pure Nothing
processBatch (reverse -> docs) = do
logFunc (length docs)
resp <- postFunc docs
pure $ case resp of
AddDocResponse Nothing -> AddOk
AddDocResponse (Just err) -> AddError (show err)
AddDocResponse Nothing -> Nothing
AddDocResponse (Just err) -> Just (AddError (show err))

-- | 'runStream' is the main function used by macroscope
runStream ::
Expand All @@ -123,24 +138,23 @@ runStream apiKey indexName crawlerName documentStream = do
go :: UTCTime -> Word32 -> Eff es ()
go startTime offset =
unlessStopped do
res <-
runErrorNoCallStack do
runStreamError startTime apiKey indexName crawlerName documentStream offset
case res of
Right () -> pure ()
Left (x, xs) -> do
logWarn "Error occured when consuming the document stream" ["err" .= x]
S.toList_ xs >>= \case
errors <-
runStreamError startTime apiKey indexName crawlerName documentStream offset
forM_ errors \case
AddError err -> logWarn "Could not add documents" ["err" .= err]
CommitError err -> logWarn "Could not commit update date" ["err" .= err]
StreamError (err, rest) -> do
logWarn "Error occured when consuming the document stream" ["err" .= err]
S.toList_ rest >>= \case
[] -> pure ()
rest -> logWarn "Left over documents found after error" ["items" .= rest]
items -> logWarn "Left over documents found after error" ["items" .= items]

-- TODO: explains why TDStream don't support offset?
unless (isTDStream documentStream) do
-- Try the next entity by incrementing the offset
go startTime (offset + 1)

-- | 'runStreamError' is the stream processor which throws an error to interupt the stream
-- when it contains a Left.
-- | 'runStreamError' is the stream processor
runStreamError ::
forall es.
(LoggerEffect :> es, Retry :> es, PrometheusEffect :> es, MonoClientEffect :> es) =>
Expand All @@ -150,7 +164,7 @@ runStreamError ::
CrawlerName ->
DocumentStream es ->
Word32 ->
Eff (Error (StreamError es) : es) ()
Eff es [ProcessError es]
runStreamError startTime apiKey indexName (CrawlerName crawlerName) documentStream offset = go
where
go = do
Expand All @@ -159,39 +173,36 @@ runStreamError startTime apiKey indexName (CrawlerName crawlerName) documentStre
-- Query the monocle api for the oldest entity to be updated.
oldestEntityM <- getStreamOldestEntity indexName (from crawlerName) (streamEntity documentStream) offset
case oldestEntityM of
Nothing -> logInfo_ "Unable to find entity to update"
Nothing -> do
logInfo_ "Unable to find entity to update"
pure []
Just (oldestAge, entity)
| -- add a 1 second delta to avoid Hysteresis
addUTCTime 1 oldestAge >= startTime ->
addUTCTime 1 oldestAge >= startTime -> do
logInfo "Crawling entities completed" ["entity" .= entity, "age" .= oldestAge]
pure []
| otherwise -> goStream oldestAge entity

goStream oldestAge entity = do
logInfo "Processing" ["entity" .= entity, "age" .= oldestAge]

-- Run the document stream for that entity
postResult <-
process
processStream
(\c -> logInfo "Posting documents" ["count" .= c])
(httpRetry "api/commit/add" . mCrawlerAddDoc . mkRequest entity)
(eitherStreamToError $ getStream oldestAge entity)
(getStream oldestAge entity)

case foldr collectPostFailure [] postResult of
xs@(_ : _) -> logWarn "Post documents failed" ["errors" .= xs]
case catMaybes postResult of
[] -> do
-- Post the commit date
res <- httpRetry "api/commit" $ commitTimestamp entity
case res of
Just (err :: Text) -> do
logWarn "Commit date failed" ["err" .= err]
Just err -> pure [CommitError err]
Nothing -> do
logInfo_ "Continuing on next entity"
go

collectPostFailure :: ProcessResult -> [Text] -> [Text]
collectPostFailure res acc = case res of
AddOk -> acc
AddError err -> err : acc
xs -> pure xs

-- Adapt the document stream to intermediate representation
getStream oldestAge entity = case documentStream of
Expand Down Expand Up @@ -289,21 +300,3 @@ getStreamOldestEntity indexName crawlerName entityType offset = do
)
) -> pure Nothing
_ -> error $ "Could not get initial timestamp: " <> show resp

-- | Remove the left part of the stream and throw an error when they occurs.
-- The error contains the first left encountered, and the rest of the stream.
eitherStreamToError ::
Stream (Of (Either err a)) (Eff es) () ->
Stream (Of a) (Eff (Error (err, Stream (Of (Either err a)) (Eff es) ()) : es)) ()
eitherStreamToError stream = do
nextE <- hoist E.raise (lift (S.next stream))
case nextE of
-- The stream is over, stop here
Left () -> pure ()
Right (x, xs) -> do
case x of
-- TODO: should we continue after the first error?
Left e -> lift (throwError (e, xs))
Right v -> do
S.yield v
eitherStreamToError xs

0 comments on commit abc2b15

Please sign in to comment.