Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 57 additions & 49 deletions src/Ambar/Emulator/Connector/Postgres.hs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,28 @@ newtype PostgreSQLState = PostgreSQLState BoundaryTracker
deriving newtype (Default)
deriving anyclass (FromJSON, ToJSON)

-- | Represents a section of the data to be retrieved.
data Section = Section
{ _start :: Maybe EntryId -- ^ exclusive start
, _end :: Maybe EntryId -- ^ exclusive end
, _exclusions :: [(EntryId, EntryId)]
}

-- | If the number of boundaries gets too big PostgreSQL starts choking on the
-- resulting query. To mitigate that we split the boundaries into sections.
--
-- It must fulfill the property that, within a transaction with RepeatableRead
-- isolation, a single query for some boundaries should return the same results
-- as splitting the boundaries into sections and querying for each section.
splitBoundaries :: Boundaries -> [Section]
splitBoundaries (Boundaries []) = [Section Nothing Nothing []]
splitBoundaries (Boundaries bs) = zipWith3 Section starts ends chunks
where
chunks = chunksOf _MAX_BOUNDARY_BATCH_SIZE bs
lowest = fmap (fmap fst . listToMaybe) chunks
starts = Nothing : drop 1 lowest
ends = drop 1 lowest <> [Nothing]

connect
:: PostgreSQL
-> SimpleLogger
Expand Down Expand Up @@ -150,70 +172,56 @@ connect config@PostgreSQL{..} logger (PostgreSQLState tracker) producer f =
where
pc = Poll.PollingConnector
{ Poll.c_getId = entryId
, Poll.c_poll = run
, Poll.c_poll = poll
, Poll.c_pollingInterval = interval
, Poll.c_maxTransactionTime = _MAX_TRANSACTION_TIME
, Poll.c_producer = producer
}

parser = mkParser (columns config) schema

run :: Boundaries -> Stream Record
run (Boundaries bs) acc0 emit =
P.withTransactionMode txMode conn $ do
let batches = chunksOf _MAX_BOUNDARY_BATCH_SIZE bs
highest = fmap snd . listToMaybe . reverse
case reverse batches of
[] ->
runBatch Nothing Nothing [] acc0
(lastBatch : restReversed) -> do
let initBatches = reverse restReversed
(acc, mLastUpper) <-
foldM
(\(acc, mLower) batch -> do
let mUpper = highest batch
acc' <- runBatch mLower mUpper batch acc
return (acc', mUpper)
)
(acc0, Nothing)
initBatches
-- last batch has no upper bound
runBatch mLastUpper Nothing lastBatch acc
poll :: Boundaries -> Stream Record
poll bounds acc0 emit =
P.withTransactionMode txMode conn $ foldM pollSection acc0 (splitBoundaries bounds)
where
txMode = P.TransactionMode
{ P.isolationLevel = P.ReadCommitted
{ P.isolationLevel = P.RepeatableRead
, P.readWriteMode = P.ReadOnly
}

runBatch mLower mUpper batchBs acc = do
logDebug logger batchQuery
(acc', count) <- P.foldWith_ parser conn (fromString batchQuery) (acc, 0 :: Int)
(\(a, n) record -> do
logResult record
a' <- emit a record
return (a', n + 1)
)
logDebug logger $ "results: " <> show count
pollSection acc section = do
let query = toQuery section
logDebug logger query
(acc', count) <- P.foldWith_ parser conn (fromString query) (acc, 0) $
\(a, n) record -> do
logResult record
a' <- emit a record
return (a', n + 1)
logDebug logger $ "results: " <> show @Int count
return acc'
where
batchQuery = fromString $ Text.unpack $ renderPretty $ Pretty.fillSep
[ "SELECT" , commaSeparated $ map pretty $ columns config
, "FROM" , pretty c_table
, if null allConstraints then "" else "WHERE" <+> sepBy "AND" allConstraints
, "ORDER BY" , pretty c_serialColumn
]

allConstraints = lowerBound ++ upperBound ++ exclusions
toQuery :: Section -> String
toQuery (Section mstart mend excl) =
fromString $ Text.unpack $ renderPretty $ Pretty.fillSep
[ "SELECT" , commaSeparated $ map pretty $ columns config
, "FROM" , pretty c_table
, if null constraints
then ""
else "WHERE" <+> sepBy "AND" constraints
, "ORDER BY" , pretty c_serialColumn
]
where
constraints = lowerBound <> exclusions <> upperBound

lowerBound = case mLower of
Nothing -> []
Just (EntryId low) ->
[Pretty.fillSep [pretty low, "<", pretty c_serialColumn]]
lowerBound =
[ Pretty.fillSep [ pretty start, "<", pretty c_serialColumn ]
| Just start <- [mstart]
]

upperBound = case mUpper of
Nothing -> []
Just (EntryId high) ->
[Pretty.fillSep [pretty c_serialColumn, "<=", pretty high]]
upperBound =
[ Pretty.fillSep [ pretty c_serialColumn, "<", pretty end]
| Just end <- [mend]
]

exclusions =
[ Pretty.fillSep
Expand All @@ -222,7 +230,7 @@ connect config@PostgreSQL{..} logger (PostgreSQLState tracker) producer f =
, "OR"
, pretty high, "<", pretty c_serialColumn
, ")"]
| (EntryId low, EntryId high) <- batchBs
| (EntryId low, EntryId high) <- excl
]

logResult row =
Expand Down
50 changes: 39 additions & 11 deletions tests/Test/Connector/PostgreSQL.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ module Test.Connector.PostgreSQL

import Control.Concurrent (MVar, newMVar, modifyMVar)
import Control.Exception (bracket, throwIO, ErrorCall(..), fromException)
import Control.Monad (void, forM_)
import Control.Monad (void, forM_, forM)
import qualified Data.Aeson as Aeson
import Data.Aeson (FromJSON)
import Data.List (isInfixOf)
Expand Down Expand Up @@ -57,16 +57,44 @@ testPostgreSQL p = do

it "handles large boundary lists" $ do
with (PartitionCount 1) $ \conn table topic connected -> do
void $ P.execute_ conn
(fromString $ "ALTER SEQUENCE " <> tableName table <> "_id_seq INCREMENT BY 2")
void $ P.execute_ conn $ fromString $
"ALTER SEQUENCE " <> tableName table <> "_id_seq INCREMENT BY 2"

let n = 10000
insert conn table (take n $ head $ mocks table)
connected $
deadline (seconds 10) $
Topic.withConsumer topic group $ \consumer -> do
forM_ [1..n] $ \_ -> void $ readEntry @Event consumer
insert conn table [head (mocks table !! 1)]
void $ readEntry @Event consumer
connected $ Topic.withConsumer topic group $ \consumer -> deadline (seconds 5) $ do
-- pre-fill the table
insert conn table (take n $ head $ mocks table)
-- advance connector.
forM_ [1..n] $ \_ -> readEntry @Event consumer

-- now insert and expect a new entry, which will definitely be retrieved from a query
-- containing lots of gaps.
insert conn table [head (mocks table !! 1)]
void $ readEntry @Event consumer

it "splitBoundaries does not produce overlapping sections" $ do
with (PartitionCount 1) $ \conn table topic connected -> do
let odds = [1, 3..]
evens = [2, 4..]
n = 2000
-- cheat a little bit by specifying the ids.
insertMany :: [Int] -> IO ()
insertMany xs = void $ P.executeMany conn q [(x,x,x) | x <- xs]
where
q = fromString $ "INSERT INTO " <> tableName table <>
" (id, aggregate_id, sequence_number) VALUES (?, ?, ?)"

-- leave lots of gaps
insertMany $ take n odds
connected $ Topic.withConsumer topic group $ \consumer -> deadline (seconds 1) $ do
oddEntries <- forM [1..n] $ \_ -> readEntry @Event consumer

-- fill all the gaps
insertMany (take n evens)
evenEntries <- forM [1..n] $ \_ -> readEntry @Event consumer

let ids = fmap (e_id . fst) $ oddEntries <> evenEntries
ids `shouldBe` (take n odds <> take n evens)

-- Test that column types are supported/unsupported by
-- creating database entries with the value and reporting
Expand Down Expand Up @@ -327,7 +355,7 @@ instance Table (EventsTable PostgreSQL) where
tableName (EventsTable name) = name
tableCols _ = ["id", "aggregate_id", "sequence_number"]
mocks _ =
-- the aggregate_id is given when the records are inserted into the database
-- the event id is given when the records are inserted into the database
[ [ Event err agg_id seq_id | seq_id <- [0..] ]
| agg_id <- [0..]
]
Expand Down