Skip to content

Commit

Permalink
Merge pull request #1064 from TristanCacqueray/fix-stream-error
Browse files Browse the repository at this point in the history
Update the breakOnDate logic to take into account error
  • Loading branch information
mergify[bot] authored Sep 20, 2023
2 parents 0b2172e + 3859e4e commit 3b524df
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 27 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ All notable changes to this project will be documented in this file.

### Fixed

- [crawler] Correctly handle errors and do not commit new changes.

## [1.9.0] - 2023-09-16

### Added
Expand Down
1 change: 0 additions & 1 deletion src/CLI.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import Streaming.Prelude qualified as S

import Data.String.Interpolate (i)
import Effectful.Prometheus
import Lentille.GitHub.Types (Changes)
import ListT qualified
import Monocle.Backend.Documents (EChange (..))
import Monocle.Effects
Expand Down
28 changes: 23 additions & 5 deletions src/Lentille.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ module Lentille (
toIdent,
ghostIdent,
sanitizeID,
isChangeTooOld,
swapDuration,

-- * Stream helper
streamDropBefore,
Changes,

-- * Re-export
module Monocle.Class,
module Monocle.Logging,
Expand All @@ -44,6 +47,7 @@ import Monocle.Protob.Change (
)
import Network.HTTP.Client qualified as HTTP
import Proto3.Suite (Enumerated (Enumerated))
import Streaming.Prelude qualified as S

import Effectful.Reader.Static qualified as E

Expand Down Expand Up @@ -126,12 +130,26 @@ toIdent host cb username = Ident {..}
ghostIdent :: Text -> Ident
ghostIdent host = toIdent host (const Nothing) nobody

type Changes = (Change, [ChangeEvent])

-- | Drop oldest element
-- This transform the stream by adding a limit.
-- We don't care about the rest so we replace it with ()
-- See: https://hackage.haskell.org/package/streaming-0.2.4.0/docs/Streaming-Prelude.html#v:break
--
-- >>> let stream = S.yield (Left (DecodeError ["oops"]))
-- >>> runEff $ S.length_ $ streamDropBefore [utctime|2021-05-31 00:00:00|] stream
-- 1
streamDropBefore :: UTCTime -> LentilleStream es Changes -> LentilleStream es Changes
streamDropBefore untilDate = fmap (pure ()) . S.break (isChangeTooOld untilDate)

-- | Return False to keep the stream element.
isChangeTooOld :: UTCTime -> Either LentilleError (Change, [ChangeEvent]) -> Bool
isChangeTooOld _ (Left _) = True
isChangeTooOld date (Right (change, _)) =
isChangeTooOld _ (Left _) = False
isChangeTooOld untilDate (Right (change, _)) =
case changeUpdatedAt change of
Just changeDate -> T.toUTCTime changeDate < date
_ -> True
Just changeDate -> T.toUTCTime changeDate < untilDate
_ -> False

swapDuration :: ChangeOptionalDuration -> ChangeEventOptionalDuration
swapDuration (ChangeOptionalDurationDuration v) = ChangeEventOptionalDurationDuration v
6 changes: 1 addition & 5 deletions src/Lentille/GitHub/PullRequests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import Lentille.GitHub.Utils
import Lentille.GraphQL
import Monocle.Prelude hiding (id, state)
import Monocle.Protob.Change
import Streaming.Prelude qualified as S (break)

-- https://docs.github.com/en/graphql/reference/queries#repository
declareLocalTypesInline
Expand Down Expand Up @@ -52,15 +51,12 @@ streamPullRequests ::
Text ->
LentilleStream es Changes
streamPullRequests client cb untilDate repoFullname =
breakOnDate $ streamFetch client mkArgs optParams transformResponse'
streamDropBefore untilDate $ streamFetch client mkArgs optParams transformResponse'
where
org = Data.Text.takeWhile (/= '/') repoFullname
repo = Data.Text.takeWhileEnd (/= '/') repoFullname
mkArgs = GetProjectPullRequestsArgs org repo
transformResponse' = transformResponse (getHost client) cb
-- This transform the stream by adding a limit.
-- We don't care about the rest so we replace it with ()
breakOnDate = fmap (pure ()) . S.break (isChangeTooOld untilDate)

transformResponse ::
-- hostname of the provider
Expand Down
3 changes: 0 additions & 3 deletions src/Lentille/GitHub/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ module Lentille.GitHub.Types where
import Data.Morpheus.Client (declareGlobalTypesByName)
import Lentille.GraphQL (ghSchemaLocation)
import Monocle.Prelude
import Monocle.Protob.Change (Change, ChangeEvent)

newtype DateTime = DateTime Text deriving (Show, Eq, FromJSON)

Expand All @@ -22,5 +21,3 @@ declareGlobalTypesByName
, "PullRequestState"
, "PullRequestReviewState"
]

type Changes = (Change, [ChangeEvent])
6 changes: 1 addition & 5 deletions src/Lentille/GitHub/UserPullRequests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import Lentille.GitHub.Utils
import Lentille.GraphQL
import Monocle.Prelude hiding (id, state)
import Monocle.Protob.Change
import Streaming.Prelude qualified as S (break)

-- https://docs.github.com/en/graphql/reference/queries#user
declareLocalTypesInline
Expand Down Expand Up @@ -51,13 +50,10 @@ streamUserPullRequests ::
Text ->
LentilleStream es Changes
streamUserPullRequests client cb untilDate userLogin =
breakOnDate $ streamFetch client mkArgs optParams transformResponse'
streamDropBefore untilDate $ streamFetch client mkArgs optParams transformResponse'
where
mkArgs = GetUserPullRequestsArgs userLogin
transformResponse' = transformResponse (getHost client) cb
-- This transform the stream by adding a limit.
-- We don't care about the rest so we replace it with ()
breakOnDate = fmap (pure ()) . S.break (isChangeTooOld untilDate)

transformResponse ::
-- hostname of the provider
Expand Down
9 changes: 1 addition & 8 deletions src/Lentille/GitLab/MergeRequests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ declareLocalTypesInline
}
|]

type Changes = (Change, [ChangeEvent])

fetchMergeRequest :: GraphEffects es => GraphClient -> Text -> Text -> Eff es (Either (FetchError GetProjectMergeRequests) GetProjectMergeRequests, [RequestLog])
fetchMergeRequest client project mrID =
fetchWithLog (doGraphRequest client) (GetProjectMergeRequestsArgs (ID project) (Just [mrID]) Nothing)
Expand All @@ -116,14 +114,9 @@ streamMergeRequests ::
Text ->
LentilleStream es Changes
streamMergeRequests client getIdentIdCb untilDate project =
breakOnDate $ streamFetch client mkArgs defaultStreamFetchOptParams transformResponse'
streamDropBefore untilDate $ streamFetch client mkArgs defaultStreamFetchOptParams transformResponse'
where
mkArgs _ = GetProjectMergeRequestsArgs (ID project) Nothing

-- This transform the stream by adding a limit.
-- We don't care about the rest so we replace it with ()
breakOnDate = fmap (pure ()) . S.break (isChangeTooOld untilDate)

transformResponse' = transformResponse (host client) getIdentIdCb

transformResponse ::
Expand Down

0 comments on commit 3b524df

Please sign in to comment.