From d8007cc866d325d9596a47260ceb3c32140f3a9b Mon Sep 17 00:00:00 2001 From: Tristan Cacqueray Date: Tue, 19 Sep 2023 19:00:54 +0000 Subject: [PATCH] Refactor the processStream to use explicit error handling This change simplifies the stream handling by: - Using an explicit return value instead of a throw exception, - Handwritten process batch stream consumer. --- src/Macroscope/Worker.hs | 116 ++++++++++++++++++--------------------- 1 file changed, 54 insertions(+), 62 deletions(-) diff --git a/src/Macroscope/Worker.hs b/src/Macroscope/Worker.hs index e9be76b52..40c925f50 100644 --- a/src/Macroscope/Worker.hs +++ b/src/Macroscope/Worker.hs @@ -21,10 +21,8 @@ import Monocle.Protob.Crawler as CrawlerPB hiding (Entity) import Monocle.Protob.Issue (Issue, IssueEvent) import Monocle.Protob.Search (TaskData) import Proto3.Suite (Enumerated (Enumerated)) -import Streaming qualified as S import Streaming.Prelude qualified as S -import Effectful qualified as E import Effectful.Reader.Static qualified as E import Monocle.Effects @@ -76,35 +74,51 @@ data DocumentType | DTIssues (Issue, [IssueEvent]) deriving (Generic, ToJSON) -data ProcessResult = AddOk | AddError Text deriving stock (Show) +data ProcessError es + = CommitError Text + | AddError Text + | StreamError (LentilleError, LentilleStream es DocumentType) --- | A stream error contains the first Left, and the rest of the stream -type StreamError es = (LentilleError, LentilleStream es DocumentType) - --- | 'process' read the stream of document and post to the monocle API -process :: +-- | 'processStream' read the stream of document and post to the monocle API +processStream :: forall es. -- | Funtion to log about the processing (Int -> Eff es ()) -> -- | Function to post on the Monocle API ([DocumentType] -> Eff es AddDocResponse) -> -- | The stream of documents to read - Stream (Of DocumentType) (Eff es) () -> + LentilleStream es DocumentType -> -- | The processing results - Eff es [ProcessResult] -process logFunc postFunc = - S.toList_ - . S.mapM processBatch - . S.mapped S.toList - . S.chunksOf 500 + Eff es [Maybe (ProcessError es)] +processStream logFunc postFunc = go (0 :: Word) [] [] where - processBatch :: [DocumentType] -> Eff es ProcessResult - processBatch docs = do + go count acc results stream = do + eDocument <- S.next stream + case eDocument of + Left () -> do + -- The end of the stream + res <- processBatch acc + pure $ reverse (res : results) + Right (Left err, rest) -> do + -- An error occured in the stream, abort now + let res = Just (StreamError (err, rest)) + pure $ reverse (res : results) + Right (Right doc, rest) -> do + -- We got a new document + let newAcc = doc : acc + if (count == 499) + then do + res <- processBatch newAcc + go 0 [] (res : results) rest + else go (count + 1) newAcc results rest + + processBatch :: [DocumentType] -> Eff es (Maybe (ProcessError es)) + processBatch (reverse -> docs) = do logFunc (length docs) resp <- postFunc docs pure $ case resp of - AddDocResponse Nothing -> AddOk - AddDocResponse (Just err) -> AddError (show err) + AddDocResponse Nothing -> Nothing + AddDocResponse (Just err) -> Just (AddError (show err)) -- | 'runStream' is the main function used by macroscope runStream :: @@ -123,24 +137,23 @@ runStream apiKey indexName crawlerName documentStream = do go :: UTCTime -> Word32 -> Eff es () go startTime offset = unlessStopped do - res <- - runErrorNoCallStack do - runStreamError startTime apiKey indexName crawlerName documentStream offset - case res of - Right () -> pure () - Left (x, xs) -> do - logWarn "Error occured when consuming the document stream" ["err" .= x] - S.toList_ xs >>= \case + errors <- + runStreamError startTime apiKey indexName crawlerName documentStream offset + forM_ errors \case + AddError err -> logWarn "Could not add documents" ["err" .= err] + CommitError err -> logWarn "Could not commit update date" ["err" .= err] + StreamError (err, rest) -> do + logWarn "Error occured when consuming the document stream" ["err" .= err] + S.toList_ rest >>= \case [] -> pure () - rest -> logWarn "Left over documents found after error" ["items" .= rest] + items -> logWarn "Left over documents found after error" ["items" .= items] -- TODO: explains why TDStream don't support offset? unless (isTDStream documentStream) do -- Try the next entity by incrementing the offset go startTime (offset + 1) --- | 'runStreamError' is the stream processor which throws an error to interupt the stream --- when it contains a Left. +-- | 'runStreamError' is the stream processor runStreamError :: forall es. (LoggerEffect :> es, Retry :> es, PrometheusEffect :> es, MonoClientEffect :> es) => @@ -150,7 +163,7 @@ runStreamError :: CrawlerName -> DocumentStream es -> Word32 -> - Eff (Error (StreamError es) : es) () + Eff es [ProcessError es] runStreamError startTime apiKey indexName (CrawlerName crawlerName) documentStream offset = go where go = do @@ -159,11 +172,14 @@ runStreamError startTime apiKey indexName (CrawlerName crawlerName) documentStre -- Query the monocle api for the oldest entity to be updated. oldestEntityM <- getStreamOldestEntity indexName (from crawlerName) (streamEntity documentStream) offset case oldestEntityM of - Nothing -> logInfo_ "Unable to find entity to update" + Nothing -> do + logInfo_ "Unable to find entity to update" + pure [] Just (oldestAge, entity) | -- add a 1 second delta to avoid Hysteresis - addUTCTime 1 oldestAge >= startTime -> + addUTCTime 1 oldestAge >= startTime -> do logInfo "Crawling entities completed" ["entity" .= entity, "age" .= oldestAge] + pure [] | otherwise -> goStream oldestAge entity goStream oldestAge entity = do @@ -171,27 +187,21 @@ runStreamError startTime apiKey indexName (CrawlerName crawlerName) documentStre -- Run the document stream for that entity postResult <- - process + processStream (\c -> logInfo "Posting documents" ["count" .= c]) (httpRetry "api/commit/add" . mCrawlerAddDoc . mkRequest entity) - (eitherStreamToError $ getStream oldestAge entity) + (getStream oldestAge entity) - case foldr collectPostFailure [] postResult of - xs@(_ : _) -> logWarn "Post documents failed" ["errors" .= xs] + case catMaybes postResult of [] -> do -- Post the commit date res <- httpRetry "api/commit" $ commitTimestamp entity case res of - Just (err :: Text) -> do - logWarn "Commit date failed" ["err" .= err] + Just err -> pure [CommitError err] Nothing -> do logInfo_ "Continuing on next entity" go - - collectPostFailure :: ProcessResult -> [Text] -> [Text] - collectPostFailure res acc = case res of - AddOk -> acc - AddError err -> err : acc + xs -> pure xs -- Adapt the document stream to intermediate representation getStream oldestAge entity = case documentStream of @@ -289,21 +299,3 @@ getStreamOldestEntity indexName crawlerName entityType offset = do ) ) -> pure Nothing _ -> error $ "Could not get initial timestamp: " <> show resp - --- | Remove the left part of the stream and throw an error when they occurs. --- The error contains the first left encountered, and the rest of the stream. -eitherStreamToError :: - Stream (Of (Either err a)) (Eff es) () -> - Stream (Of a) (Eff (Error (err, Stream (Of (Either err a)) (Eff es) ()) : es)) () -eitherStreamToError stream = do - nextE <- hoist E.raise (lift (S.next stream)) - case nextE of - -- The stream is over, stop here - Left () -> pure () - Right (x, xs) -> do - case x of - -- TODO: should we continue after the first error? - Left e -> lift (throwError (e, xs)) - Right v -> do - S.yield v - eitherStreamToError xs