Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace cereal package with binary #27

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ library:
- aeson
- attoparsec
- async
- binary
- bits
- bytestring
- cereal
- containers
- keep-alive
- postgresql-libpq
Expand Down
26 changes: 14 additions & 12 deletions src/Database/PostgreSQL/Replicant/Message.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ module Database.PostgreSQL.Replicant.Message where

import Control.Monad
import Data.Aeson
import Data.Binary
import Data.Binary.Get
import Data.Binary.Put
import Data.ByteString (ByteString)
import Data.Scientific (Scientific)
import Data.Serialize
import Data.Text (Text)
import GHC.Generics
import GHC.Int
Expand All @@ -36,7 +38,7 @@ data ResponseExpectation
| DoNotRespond
deriving (Eq, Generic, Show)

instance Serialize ResponseExpectation where
instance Binary ResponseExpectation where
put ShouldRespond = putWord8 1
put DoNotRespond = putWord8 0

Expand All @@ -62,15 +64,15 @@ data PrimaryKeepAlive
}
deriving (Eq, Generic, Show)

instance Serialize PrimaryKeepAlive where
instance Binary PrimaryKeepAlive where
put (PrimaryKeepAlive walEnd sendTime responseExpectation) = do
putWord8 0x6B -- 'k'
putInt64be walEnd
putInt64be sendTime
put responseExpectation

get = do
_ <- getBytes 1
_ <- getByteString 1
walEnd <- getInt64be
sendTime <- getInt64be
responseExpectation <- get
Expand All @@ -89,7 +91,7 @@ data StandbyStatusUpdate
}
deriving (Eq, Generic, Show)

instance Serialize StandbyStatusUpdate where
instance Binary StandbyStatusUpdate where
put (StandbyStatusUpdate
walReceived
walFlushed
Expand All @@ -104,7 +106,7 @@ instance Serialize StandbyStatusUpdate where
put responseExpectation

get = do
_ <- getBytes 1 -- should expect 0x72, 'r'
_ <- getByteString 1 -- should expect 0x72, 'r'
walReceived <- get
walFlushed <- get
walApplied <- get
Expand All @@ -128,7 +130,7 @@ data XLogData
}
deriving (Eq, Generic, Show)

instance Serialize XLogData where
instance Binary XLogData where
put (XLogData walStart walEnd sendTime walData) = do
putWord8 0x77 -- 'w'
put walStart
Expand All @@ -137,7 +139,7 @@ instance Serialize XLogData where
putByteString walData

get = do
_ <- getBytes 1 -- should expec '0x77', 'w'
_ <- getByteString 1 -- should expec '0x77', 'w'
walStart <- get
walEnd <- get
sendTime <- getInt64be
Expand All @@ -153,28 +155,28 @@ data HotStandbyFeedback
}
deriving (Eq, Generic, Show)

instance Serialize HotStandbyFeedback where
instance Binary HotStandbyFeedback where
put (HotStandbyFeedback clientSendTime currentXMin currentEpoch) = do
putWord8 0x68
putInt64be clientSendTime
putInt32be currentXMin
putInt32be currentEpoch

get = do
_ <- getBytes 1 -- should expect '0x68' 'h'
_ <- getByteString 1 -- should expect '0x68' 'h'
clientSendTime <- getInt64be
currentXmin <- getInt32be
currentEpoch <- getInt32be
pure $ HotStandbyFeedback clientSendTime currentXmin currentEpoch

-- | This structure wraps the two messages sent by the server so that
-- we get a Serialize instance for both.
-- we get a Binary instance for both.
data WalCopyData
= XLogDataM !XLogData
| KeepAliveM !PrimaryKeepAlive
deriving (Eq, Generic, Show)

instance Serialize WalCopyData where
instance Binary WalCopyData where
put (XLogDataM xLogData) = put xLogData
put (KeepAliveM keepAlive) = put keepAlive
get = do
Expand Down
11 changes: 6 additions & 5 deletions src/Database/PostgreSQL/Replicant/Protocol.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import Control.Concurrent.STM
import Control.Exception.Base
import Control.Monad (forever)
import Data.Aeson (eitherDecode')
import Data.Binary
import Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Char8 as B
import Data.Maybe
import Data.Serialize hiding (flush)
import Database.PostgreSQL.LibPQ

import Database.PostgreSQL.Replicant.Connection
Expand Down Expand Up @@ -122,12 +122,12 @@ handleReplicationRow
-> (Change -> IO LSN)
-> IO ()
handleReplicationRow keepAliveChan walState _ row cb =
case decode @WalCopyData row of
Left err ->
case decodeOrFail @WalCopyData . BL.fromStrict $ row of
Left (_, _, err) ->
throwIO
$ ReplicantException
$ "handleReplicationRow (decode error): " ++ err
Right m -> case m of
Right (_, _, m) -> case m of
XLogDataM xlog -> do
case eitherDecode' @Change $ BL.fromStrict $ xLogDataWalData xlog of
Left err ->
Expand All @@ -149,6 +149,7 @@ handleReplicationError conn = do
handleReplicationNoop :: IO ()
handleReplicationNoop = pure ()


-- | Initiate the streaming replication protocol handler. This will
-- race the /keep-alive/ and /copy data/ handler threads. It will
-- catch and rethrow exceptions from either thread if any fails or
Expand Down Expand Up @@ -213,7 +214,7 @@ sendStatusUpdate conn w@(WalProgressState walState) = do
applied
timestamp
DoNotRespond
copyResult <- putCopyData (getConnection conn) $ encode statusUpdate
copyResult <- putCopyData (getConnection conn) . BL.toStrict $ encode statusUpdate
case copyResult of
CopyInOk -> do
flushResult <- flush (getConnection conn)
Expand Down
9 changes: 4 additions & 5 deletions src/Database/PostgreSQL/Replicant/Serialize.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

module Database.PostgreSQL.Replicant.Serialize where

import Data.Binary.Get
import Data.ByteString (ByteString)
import Data.Serialize
import qualified Data.ByteString.Lazy as BL

-- | Consume the rest of the @Get a@ input as a ByteString
-- | Consume the rest of the @Get a@ input as a strict ByteString
consumeByteStringToEnd :: Get ByteString
consumeByteStringToEnd = do
numRemaining <- remaining
getByteString numRemaining
consumeByteStringToEnd = BL.toStrict <$> getRemainingLazyByteString
6 changes: 4 additions & 2 deletions src/Database/PostgreSQL/Replicant/Types/Lsn.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ module Database.PostgreSQL.Replicant.Types.Lsn where

import Data.Aeson
import Data.Attoparsec.ByteString.Char8
import Data.Binary
import Data.Binary.Get
import Data.Binary.Put
import Data.Bits
import Data.Bits.Extras
import Data.ByteString (ByteString ())
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Builder as Builder
import Data.ByteString.Lazy.Builder.ASCII (word32Hex)
import Data.Serialize
import Data.Word
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
Expand All @@ -49,7 +51,7 @@ instance Ord LSN where
compare (LSN l0 r0) (LSN l1 r1) =
compare l0 l1 <> compare r0 r1

instance Serialize LSN where
instance Binary LSN where
put = putInt64be . toInt64
get = fromInt64 <$> getInt64be

Expand Down
2 changes: 1 addition & 1 deletion stack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ extra-deps:
# Allow a newer minor version of GHC than the snapshot specifies
# compiler-check: newer-minor

# To enable profile build, uncomment:
# To enable profile build:
# build:
# library-profiling: true
# executable-profiling: true
56 changes: 26 additions & 30 deletions test/Spec.hs
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import Test.Hspec

import Data.Serialize
import Data.Binary
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy as BL
import Data.Word
import GHC.Int

import Database.PostgreSQL.Replicant.Message
import Database.PostgreSQL.Replicant.Settings
import Database.PostgreSQL.Replicant.Types.Lsn

examplePrimaryKeepAliveMessage :: ByteString
examplePrimaryKeepAliveMessage :: BL.ByteString
examplePrimaryKeepAliveMessage
= B.concat
[ (B.pack [0x6B])
= BL.concat
[ (BL.pack [0x6B])
, (encode @LSN (fromInt64 123))
, (encode @LSN (fromInt64 346))
, (encode @Word8 1)
Expand All @@ -25,65 +26,60 @@ main = hspec $ do
context "Message" $ do
describe "PrimaryKeepAlive" $ do
it "should decode a valid primary keep alive message" $ do
(decode $ examplePrimaryKeepAliveMessage)
`shouldBe`
(Right $ PrimaryKeepAlive 123 346 ShouldRespond)
let Right (_, _, result)
= decodeOrFail
$ examplePrimaryKeepAliveMessage
result `shouldBe` (PrimaryKeepAlive 123 346 ShouldRespond)

describe "StandbyStatusUpdate" $ do
it "should encode a valid standby status update message" $ do
let expected
= BL.concat
[ BL.pack [0x72]
, encode @LSN (fromInt64 213)
, encode @LSN (fromInt64 232)
, encode @LSN (fromInt64 234)
, encode @Int64 454
, encode @Word8 0
]
(encode $ StandbyStatusUpdate
(fromInt64 213)
(fromInt64 232)
(fromInt64 234)
454
DoNotRespond)
`shouldBe`
B.concat
[ B.pack [0x72]
, encode @LSN (fromInt64 213)
, encode @LSN (fromInt64 232)
, encode @LSN (fromInt64 234)
, encode @Int64 454
, encode @Word8 0
]
expected

describe "XLogData" $ do
it "should encode/decode a valid xLogData message" $ do
let msg = XLogData (fromInt64 123) (fromInt64 234) 345 (B8.pack "hello")
(decode . encode $ msg)
`shouldBe`
Right msg
(decode @XLogData . encode $ msg) `shouldBe` msg

describe "HotStandbyFeedback" $ do
it "should encode/decode a valid HotStandbyFeedback message" $ do
let msg = HotStandbyFeedback 123 234 456
(decode . encode $ msg)
`shouldBe`
Right msg
(decode @HotStandbyFeedback . encode $ msg) `shouldBe` msg

describe "WalCopyData" $ do
it "should encode/decode an XLogData message" $ do
let msg = XLogDataM (XLogData (fromInt64 123) (fromInt64 234) 345 (B8.pack "hello"))
(decode . encode $ msg)
`shouldBe`
Right msg
(decode @WalCopyData . encode $ msg) `shouldBe` msg

it "should encode/decode a PrimaryKeepAlive message" $ do
let msg = KeepAliveM (PrimaryKeepAlive 123 346 ShouldRespond)
(decode . encode $ msg)
`shouldBe`
Right msg
(decode @WalCopyData . encode $ msg) `shouldBe` msg

context "Types" $ do
describe "LSN" $ do
context "Serializable" $ do
context "Binary" $ do
it "should be serializable" $ do
let lsn = LSN 2 23
(decode . encode $ lsn) `shouldBe` Right lsn
(decode @LSN . encode $ lsn) `shouldBe` lsn

it "should be equivalent to fromByteString/toByteString" $ do
let (Right lsn) = fromByteString "16/3002D50"
(toByteString <$> (decode . encode @LSN $ lsn)) `shouldBe` Right "16/3002d50"
(toByteString $ (decode @LSN . encode @LSN $ lsn)) `shouldBe` "16/3002d50"

context "Settings" $ do
describe "pgConnectionString" $ do
Expand Down