From e6b0bfe5e099982e7632f7242babcbbc6fc7fc75 Mon Sep 17 00:00:00 2001 From: James King Date: Tue, 27 Sep 2022 21:42:07 -0400 Subject: [PATCH] Replace cereal package with binary `cereal` is in life support maintenance mode. Work is being done to the `binary` package to support strict parsing with a `binary` like API which is what cereal is for. --- package.yaml | 2 +- src/Database/PostgreSQL/Replicant/Message.hs | 26 +++++---- src/Database/PostgreSQL/Replicant/Protocol.hs | 11 ++-- .../PostgreSQL/Replicant/Serialize.hs | 9 ++- .../PostgreSQL/Replicant/Types/Lsn.hs | 6 +- stack.yaml | 2 +- test/Spec.hs | 56 +++++++++---------- 7 files changed, 56 insertions(+), 56 deletions(-) diff --git a/package.yaml b/package.yaml index 56c6b11..83a0183 100644 --- a/package.yaml +++ b/package.yaml @@ -37,9 +37,9 @@ library: - aeson - attoparsec - async + - binary - bits - bytestring - - cereal - containers - keep-alive - postgresql-libpq diff --git a/src/Database/PostgreSQL/Replicant/Message.hs b/src/Database/PostgreSQL/Replicant/Message.hs index 9d7039d..6bf3af5 100644 --- a/src/Database/PostgreSQL/Replicant/Message.hs +++ b/src/Database/PostgreSQL/Replicant/Message.hs @@ -17,9 +17,11 @@ module Database.PostgreSQL.Replicant.Message where import Control.Monad import Data.Aeson +import Data.Binary +import Data.Binary.Get +import Data.Binary.Put import Data.ByteString (ByteString) import Data.Scientific (Scientific) -import Data.Serialize import Data.Text (Text) import GHC.Generics import GHC.Int @@ -36,7 +38,7 @@ data ResponseExpectation | DoNotRespond deriving (Eq, Generic, Show) -instance Serialize ResponseExpectation where +instance Binary ResponseExpectation where put ShouldRespond = putWord8 1 put DoNotRespond = putWord8 0 @@ -62,7 +64,7 @@ data PrimaryKeepAlive } deriving (Eq, Generic, Show) -instance Serialize PrimaryKeepAlive where +instance Binary PrimaryKeepAlive where put (PrimaryKeepAlive walEnd sendTime responseExpectation) = do putWord8 0x6B -- 'k' putInt64be walEnd @@ -70,7 +72,7 @@ instance Serialize PrimaryKeepAlive where put responseExpectation get = do - _ <- getBytes 1 + _ <- getByteString 1 walEnd <- getInt64be sendTime <- getInt64be responseExpectation <- get @@ -89,7 +91,7 @@ data StandbyStatusUpdate } deriving (Eq, Generic, Show) -instance Serialize StandbyStatusUpdate where +instance Binary StandbyStatusUpdate where put (StandbyStatusUpdate walReceived walFlushed @@ -104,7 +106,7 @@ instance Serialize StandbyStatusUpdate where put responseExpectation get = do - _ <- getBytes 1 -- should expect 0x72, 'r' + _ <- getByteString 1 -- should expect 0x72, 'r' walReceived <- get walFlushed <- get walApplied <- get @@ -128,7 +130,7 @@ data XLogData } deriving (Eq, Generic, Show) -instance Serialize XLogData where +instance Binary XLogData where put (XLogData walStart walEnd sendTime walData) = do putWord8 0x77 -- 'w' put walStart @@ -137,7 +139,7 @@ instance Serialize XLogData where putByteString walData get = do - _ <- getBytes 1 -- should expec '0x77', 'w' + _ <- getByteString 1 -- should expec '0x77', 'w' walStart <- get walEnd <- get sendTime <- getInt64be @@ -153,7 +155,7 @@ data HotStandbyFeedback } deriving (Eq, Generic, Show) -instance Serialize HotStandbyFeedback where +instance Binary HotStandbyFeedback where put (HotStandbyFeedback clientSendTime currentXMin currentEpoch) = do putWord8 0x68 putInt64be clientSendTime @@ -161,20 +163,20 @@ instance Serialize HotStandbyFeedback where putInt32be currentEpoch get = do - _ <- getBytes 1 -- should expect '0x68' 'h' + _ <- getByteString 1 -- should expect '0x68' 'h' clientSendTime <- getInt64be currentXmin <- getInt32be currentEpoch <- getInt32be pure $ HotStandbyFeedback clientSendTime currentXmin currentEpoch -- | This structure wraps the two messages sent by the server so that --- we get a Serialize instance for both. +-- we get a Binary instance for both. data WalCopyData = XLogDataM !XLogData | KeepAliveM !PrimaryKeepAlive deriving (Eq, Generic, Show) -instance Serialize WalCopyData where +instance Binary WalCopyData where put (XLogDataM xLogData) = put xLogData put (KeepAliveM keepAlive) = put keepAlive get = do diff --git a/src/Database/PostgreSQL/Replicant/Protocol.hs b/src/Database/PostgreSQL/Replicant/Protocol.hs index 83456a0..b3ed556 100644 --- a/src/Database/PostgreSQL/Replicant/Protocol.hs +++ b/src/Database/PostgreSQL/Replicant/Protocol.hs @@ -21,11 +21,11 @@ import Control.Concurrent.STM import Control.Exception.Base import Control.Monad (forever) import Data.Aeson (eitherDecode') +import Data.Binary import Data.ByteString (ByteString) import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Char8 as B import Data.Maybe -import Data.Serialize hiding (flush) import Database.PostgreSQL.LibPQ import Database.PostgreSQL.Replicant.Connection @@ -122,12 +122,12 @@ handleReplicationRow -> (Change -> IO LSN) -> IO () handleReplicationRow keepAliveChan walState _ row cb = - case decode @WalCopyData row of - Left err -> + case decodeOrFail @WalCopyData . BL.fromStrict $ row of + Left (_, _, err) -> throwIO $ ReplicantException $ "handleReplicationRow (decode error): " ++ err - Right m -> case m of + Right (_, _, m) -> case m of XLogDataM xlog -> do case eitherDecode' @Change $ BL.fromStrict $ xLogDataWalData xlog of Left err -> @@ -149,6 +149,7 @@ handleReplicationError conn = do handleReplicationNoop :: IO () handleReplicationNoop = pure () + -- | Initiate the streaming replication protocol handler. This will -- race the /keep-alive/ and /copy data/ handler threads. It will -- catch and rethrow exceptions from either thread if any fails or @@ -213,7 +214,7 @@ sendStatusUpdate conn w@(WalProgressState walState) = do applied timestamp DoNotRespond - copyResult <- putCopyData (getConnection conn) $ encode statusUpdate + copyResult <- putCopyData (getConnection conn) . BL.toStrict $ encode statusUpdate case copyResult of CopyInOk -> do flushResult <- flush (getConnection conn) diff --git a/src/Database/PostgreSQL/Replicant/Serialize.hs b/src/Database/PostgreSQL/Replicant/Serialize.hs index 2f75b90..de4b7e8 100644 --- a/src/Database/PostgreSQL/Replicant/Serialize.hs +++ b/src/Database/PostgreSQL/Replicant/Serialize.hs @@ -2,11 +2,10 @@ module Database.PostgreSQL.Replicant.Serialize where +import Data.Binary.Get import Data.ByteString (ByteString) -import Data.Serialize +import qualified Data.ByteString.Lazy as BL --- | Consume the rest of the @Get a@ input as a ByteString +-- | Consume the rest of the @Get a@ input as a strict ByteString consumeByteStringToEnd :: Get ByteString -consumeByteStringToEnd = do - numRemaining <- remaining - getByteString numRemaining +consumeByteStringToEnd = BL.toStrict <$> getRemainingLazyByteString diff --git a/src/Database/PostgreSQL/Replicant/Types/Lsn.hs b/src/Database/PostgreSQL/Replicant/Types/Lsn.hs index 3d68dc8..f2c90ea 100644 --- a/src/Database/PostgreSQL/Replicant/Types/Lsn.hs +++ b/src/Database/PostgreSQL/Replicant/Types/Lsn.hs @@ -27,13 +27,15 @@ module Database.PostgreSQL.Replicant.Types.Lsn where import Data.Aeson import Data.Attoparsec.ByteString.Char8 +import Data.Binary +import Data.Binary.Get +import Data.Binary.Put import Data.Bits import Data.Bits.Extras import Data.ByteString (ByteString ()) import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Lazy.Builder as Builder import Data.ByteString.Lazy.Builder.ASCII (word32Hex) -import Data.Serialize import Data.Word import qualified Data.Text as T import qualified Data.Text.Encoding as T @@ -49,7 +51,7 @@ instance Ord LSN where compare (LSN l0 r0) (LSN l1 r1) = compare l0 l1 <> compare r0 r1 -instance Serialize LSN where +instance Binary LSN where put = putInt64be . toInt64 get = fromInt64 <$> getInt64be diff --git a/stack.yaml b/stack.yaml index 78b5c9b..81a9e65 100644 --- a/stack.yaml +++ b/stack.yaml @@ -68,7 +68,7 @@ extra-deps: # Allow a newer minor version of GHC than the snapshot specifies # compiler-check: newer-minor -# To enable profile build, uncomment: +# To enable profile build: # build: # library-profiling: true # executable-profiling: true diff --git a/test/Spec.hs b/test/Spec.hs index 4574148..d07eff3 100644 --- a/test/Spec.hs +++ b/test/Spec.hs @@ -1,9 +1,10 @@ import Test.Hspec -import Data.Serialize +import Data.Binary import Data.ByteString (ByteString) import qualified Data.ByteString as B import qualified Data.ByteString.Char8 as B8 +import qualified Data.ByteString.Lazy as BL import Data.Word import GHC.Int @@ -11,10 +12,10 @@ import Database.PostgreSQL.Replicant.Message import Database.PostgreSQL.Replicant.Settings import Database.PostgreSQL.Replicant.Types.Lsn -examplePrimaryKeepAliveMessage :: ByteString +examplePrimaryKeepAliveMessage :: BL.ByteString examplePrimaryKeepAliveMessage - = B.concat - [ (B.pack [0x6B]) + = BL.concat + [ (BL.pack [0x6B]) , (encode @LSN (fromInt64 123)) , (encode @LSN (fromInt64 346)) , (encode @Word8 1) @@ -25,12 +26,22 @@ main = hspec $ do context "Message" $ do describe "PrimaryKeepAlive" $ do it "should decode a valid primary keep alive message" $ do - (decode $ examplePrimaryKeepAliveMessage) - `shouldBe` - (Right $ PrimaryKeepAlive 123 346 ShouldRespond) + let Right (_, _, result) + = decodeOrFail + $ examplePrimaryKeepAliveMessage + result `shouldBe` (PrimaryKeepAlive 123 346 ShouldRespond) describe "StandbyStatusUpdate" $ do it "should encode a valid standby status update message" $ do + let expected + = BL.concat + [ BL.pack [0x72] + , encode @LSN (fromInt64 213) + , encode @LSN (fromInt64 232) + , encode @LSN (fromInt64 234) + , encode @Int64 454 + , encode @Word8 0 + ] (encode $ StandbyStatusUpdate (fromInt64 213) (fromInt64 232) @@ -38,52 +49,37 @@ main = hspec $ do 454 DoNotRespond) `shouldBe` - B.concat - [ B.pack [0x72] - , encode @LSN (fromInt64 213) - , encode @LSN (fromInt64 232) - , encode @LSN (fromInt64 234) - , encode @Int64 454 - , encode @Word8 0 - ] + expected describe "XLogData" $ do it "should encode/decode a valid xLogData message" $ do let msg = XLogData (fromInt64 123) (fromInt64 234) 345 (B8.pack "hello") - (decode . encode $ msg) - `shouldBe` - Right msg + (decode @XLogData . encode $ msg) `shouldBe` msg describe "HotStandbyFeedback" $ do it "should encode/decode a valid HotStandbyFeedback message" $ do let msg = HotStandbyFeedback 123 234 456 - (decode . encode $ msg) - `shouldBe` - Right msg + (decode @HotStandbyFeedback . encode $ msg) `shouldBe` msg describe "WalCopyData" $ do it "should encode/decode an XLogData message" $ do let msg = XLogDataM (XLogData (fromInt64 123) (fromInt64 234) 345 (B8.pack "hello")) - (decode . encode $ msg) - `shouldBe` - Right msg + (decode @WalCopyData . encode $ msg) `shouldBe` msg it "should encode/decode a PrimaryKeepAlive message" $ do let msg = KeepAliveM (PrimaryKeepAlive 123 346 ShouldRespond) - (decode . encode $ msg) - `shouldBe` - Right msg + (decode @WalCopyData . encode $ msg) `shouldBe` msg context "Types" $ do describe "LSN" $ do - context "Serializable" $ do + context "Binary" $ do it "should be serializable" $ do let lsn = LSN 2 23 - (decode . encode $ lsn) `shouldBe` Right lsn + (decode @LSN . encode $ lsn) `shouldBe` lsn it "should be equivalent to fromByteString/toByteString" $ do let (Right lsn) = fromByteString "16/3002D50" - (toByteString <$> (decode . encode @LSN $ lsn)) `shouldBe` Right "16/3002d50" + (toByteString $ (decode @LSN . encode @LSN $ lsn)) `shouldBe` "16/3002d50" context "Settings" $ do describe "pgConnectionString" $ do