From e63042e10b521411b5148ed0d05996bacdf3a4cc Mon Sep 17 00:00:00 2001 From: Tristan Cacqueray Date: Wed, 20 Sep 2023 10:48:11 +0000 Subject: [PATCH 1/2] Update the breakOnDate logic to take into account error This change ensure errors are kept in the stream. --- src/CLI.hs | 1 - src/Lentille.hs | 24 +++++++++++++++++++----- src/Lentille/GitHub/PullRequests.hs | 6 +----- src/Lentille/GitHub/Types.hs | 3 --- src/Lentille/GitHub/UserPullRequests.hs | 6 +----- src/Lentille/GitLab/MergeRequests.hs | 9 +-------- 6 files changed, 22 insertions(+), 27 deletions(-) diff --git a/src/CLI.hs b/src/CLI.hs index 39c2527ac..8f0ab84dc 100644 --- a/src/CLI.hs +++ b/src/CLI.hs @@ -33,7 +33,6 @@ import Streaming.Prelude qualified as S import Data.String.Interpolate (i) import Effectful.Prometheus -import Lentille.GitHub.Types (Changes) import ListT qualified import Monocle.Backend.Documents (EChange (..)) import Monocle.Effects diff --git a/src/Lentille.hs b/src/Lentille.hs index c1241fbfa..83a6cda5d 100644 --- a/src/Lentille.hs +++ b/src/Lentille.hs @@ -20,9 +20,12 @@ module Lentille ( toIdent, ghostIdent, sanitizeID, - isChangeTooOld, swapDuration, + -- * Stream helper + streamDropBefore, + Changes, + -- * Re-export module Monocle.Class, module Monocle.Logging, @@ -44,6 +47,7 @@ import Monocle.Protob.Change ( ) import Network.HTTP.Client qualified as HTTP import Proto3.Suite (Enumerated (Enumerated)) +import Streaming.Prelude qualified as S (break) import Effectful.Reader.Static qualified as E @@ -126,12 +130,22 @@ toIdent host cb username = Ident {..} ghostIdent :: Text -> Ident ghostIdent host = toIdent host (const Nothing) nobody +type Changes = (Change, [ChangeEvent]) + +-- | Drop oldest element +-- This transform the stream by adding a limit. +-- We don't care about the rest so we replace it with () +-- See: https://hackage.haskell.org/package/streaming-0.2.4.0/docs/Streaming-Prelude.html#v:break +streamDropBefore :: UTCTime -> LentilleStream es Changes -> LentilleStream es Changes +streamDropBefore untilDate = fmap (pure ()) . S.break (isChangeTooOld untilDate) + +-- | Return False to keep the stream element. isChangeTooOld :: UTCTime -> Either LentilleError (Change, [ChangeEvent]) -> Bool -isChangeTooOld _ (Left _) = True -isChangeTooOld date (Right (change, _)) = +isChangeTooOld _ (Left _) = False +isChangeTooOld untilDate (Right (change, _)) = case changeUpdatedAt change of - Just changeDate -> T.toUTCTime changeDate < date - _ -> True + Just changeDate -> T.toUTCTime changeDate < untilDate + _ -> False swapDuration :: ChangeOptionalDuration -> ChangeEventOptionalDuration swapDuration (ChangeOptionalDurationDuration v) = ChangeEventOptionalDurationDuration v diff --git a/src/Lentille/GitHub/PullRequests.hs b/src/Lentille/GitHub/PullRequests.hs index 870d1179d..de9192a07 100644 --- a/src/Lentille/GitHub/PullRequests.hs +++ b/src/Lentille/GitHub/PullRequests.hs @@ -14,7 +14,6 @@ import Lentille.GitHub.Utils import Lentille.GraphQL import Monocle.Prelude hiding (id, state) import Monocle.Protob.Change -import Streaming.Prelude qualified as S (break) -- https://docs.github.com/en/graphql/reference/queries#repository declareLocalTypesInline @@ -52,15 +51,12 @@ streamPullRequests :: Text -> LentilleStream es Changes streamPullRequests client cb untilDate repoFullname = - breakOnDate $ streamFetch client mkArgs optParams transformResponse' + streamDropBefore untilDate $ streamFetch client mkArgs optParams transformResponse' where org = Data.Text.takeWhile (/= '/') repoFullname repo = Data.Text.takeWhileEnd (/= '/') repoFullname mkArgs = GetProjectPullRequestsArgs org repo transformResponse' = transformResponse (getHost client) cb - -- This transform the stream by adding a limit. - -- We don't care about the rest so we replace it with () - breakOnDate = fmap (pure ()) . S.break (isChangeTooOld untilDate) transformResponse :: -- hostname of the provider diff --git a/src/Lentille/GitHub/Types.hs b/src/Lentille/GitHub/Types.hs index d3aef3cf4..09097a622 100644 --- a/src/Lentille/GitHub/Types.hs +++ b/src/Lentille/GitHub/Types.hs @@ -7,7 +7,6 @@ module Lentille.GitHub.Types where import Data.Morpheus.Client (declareGlobalTypesByName) import Lentille.GraphQL (ghSchemaLocation) import Monocle.Prelude -import Monocle.Protob.Change (Change, ChangeEvent) newtype DateTime = DateTime Text deriving (Show, Eq, FromJSON) @@ -22,5 +21,3 @@ declareGlobalTypesByName , "PullRequestState" , "PullRequestReviewState" ] - -type Changes = (Change, [ChangeEvent]) diff --git a/src/Lentille/GitHub/UserPullRequests.hs b/src/Lentille/GitHub/UserPullRequests.hs index 2b6fe73f2..c2dbda821 100644 --- a/src/Lentille/GitHub/UserPullRequests.hs +++ b/src/Lentille/GitHub/UserPullRequests.hs @@ -13,7 +13,6 @@ import Lentille.GitHub.Utils import Lentille.GraphQL import Monocle.Prelude hiding (id, state) import Monocle.Protob.Change -import Streaming.Prelude qualified as S (break) -- https://docs.github.com/en/graphql/reference/queries#user declareLocalTypesInline @@ -51,13 +50,10 @@ streamUserPullRequests :: Text -> LentilleStream es Changes streamUserPullRequests client cb untilDate userLogin = - breakOnDate $ streamFetch client mkArgs optParams transformResponse' + streamDropBefore untilDate $ streamFetch client mkArgs optParams transformResponse' where mkArgs = GetUserPullRequestsArgs userLogin transformResponse' = transformResponse (getHost client) cb - -- This transform the stream by adding a limit. - -- We don't care about the rest so we replace it with () - breakOnDate = fmap (pure ()) . S.break (isChangeTooOld untilDate) transformResponse :: -- hostname of the provider diff --git a/src/Lentille/GitLab/MergeRequests.hs b/src/Lentille/GitLab/MergeRequests.hs index b10c255c7..21d5b5756 100644 --- a/src/Lentille/GitLab/MergeRequests.hs +++ b/src/Lentille/GitLab/MergeRequests.hs @@ -101,8 +101,6 @@ declareLocalTypesInline } |] -type Changes = (Change, [ChangeEvent]) - fetchMergeRequest :: GraphEffects es => GraphClient -> Text -> Text -> Eff es (Either (FetchError GetProjectMergeRequests) GetProjectMergeRequests, [RequestLog]) fetchMergeRequest client project mrID = fetchWithLog (doGraphRequest client) (GetProjectMergeRequestsArgs (ID project) (Just [mrID]) Nothing) @@ -116,14 +114,9 @@ streamMergeRequests :: Text -> LentilleStream es Changes streamMergeRequests client getIdentIdCb untilDate project = - breakOnDate $ streamFetch client mkArgs defaultStreamFetchOptParams transformResponse' + streamDropBefore untilDate $ streamFetch client mkArgs defaultStreamFetchOptParams transformResponse' where mkArgs _ = GetProjectMergeRequestsArgs (ID project) Nothing - - -- This transform the stream by adding a limit. - -- We don't care about the rest so we replace it with () - breakOnDate = fmap (pure ()) . S.break (isChangeTooOld untilDate) - transformResponse' = transformResponse (host client) getIdentIdCb transformResponse :: From 2ac92bacf43cc7ba93ae2f81365be77842aedd0e Mon Sep 17 00:00:00 2001 From: Tristan Cacqueray Date: Wed, 20 Sep 2023 11:28:22 +0000 Subject: [PATCH 2/2] Add streamDropBefore test and changelog --- CHANGELOG.md | 2 ++ src/Lentille.hs | 6 +++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cdb326330..16a8c82bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ All notable changes to this project will be documented in this file. ### Fixed +- [crawler] Correctly handle errors and do not commit new changes. + ## [1.9.0] - 2023-09-16 ### Added diff --git a/src/Lentille.hs b/src/Lentille.hs index 83a6cda5d..0a3abf9d1 100644 --- a/src/Lentille.hs +++ b/src/Lentille.hs @@ -47,7 +47,7 @@ import Monocle.Protob.Change ( ) import Network.HTTP.Client qualified as HTTP import Proto3.Suite (Enumerated (Enumerated)) -import Streaming.Prelude qualified as S (break) +import Streaming.Prelude qualified as S import Effectful.Reader.Static qualified as E @@ -136,6 +136,10 @@ type Changes = (Change, [ChangeEvent]) -- This transform the stream by adding a limit. -- We don't care about the rest so we replace it with () -- See: https://hackage.haskell.org/package/streaming-0.2.4.0/docs/Streaming-Prelude.html#v:break +-- +-- >>> let stream = S.yield (Left (DecodeError ["oops"])) +-- >>> runEff $ S.length_ $ streamDropBefore [utctime|2021-05-31 00:00:00|] stream +-- 1 streamDropBefore :: UTCTime -> LentilleStream es Changes -> LentilleStream es Changes streamDropBefore untilDate = fmap (pure ()) . S.break (isChangeTooOld untilDate)