1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
{- Persistent sqlite database queues
-
- Copyright 2015 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU GPL version 3 or higher.
-}
{-# LANGUAGE BangPatterns #-}
module Database.Queue (
DbQueue,
initDb,
openDbQueue,
queryDbQueue,
closeDbQueue,
QueueSize,
queueDb,
) where
import Utility.Monad
import Database.Handle
import Database.Persist.Sqlite
import Control.Monad
import Control.Concurrent
import Data.Time.Clock
{- A DbQueue wraps a DbHandle, adding a queue of writes to perform.
-
- This is efficient when there are frequent writes, but
- reads will not immediately have access to queued writes. -}
data DbQueue = DQ DbHandle (MVar Queue)
{- Opens the database queue, but does not perform any migrations. Only use
- if the database is known to exist and have the right tables; ie after
- running initDb. -}
openDbQueue :: FilePath -> TableName -> IO DbQueue
openDbQueue db tablename = DQ
<$> openDb db tablename
<*> (newMVar =<< emptyQueue)
{- Must be called to ensure queued changes get written to the database. -}
closeDbQueue :: DbQueue -> IO ()
closeDbQueue h@(DQ hdl _) = do
flushDbQueue h
closeDb hdl
{- Makes a queury using the DbQueue. This should not be used to make
- changes to the database!
-
- Queries will not return changes that have been recently queued,
- so use with care.
-}
queryDbQueue :: DbQueue -> SqlPersistM a -> IO a
queryDbQueue (DQ hdl _) = queryDb hdl
{- A queue of actions to perform, with a count of the number of actions
- queued, and a last commit time. -}
data Queue = Queue QueueSize LastCommitTime (SqlPersistM ())
type QueueSize = Int
type LastCommitTime = UTCTime
emptyQueue :: IO Queue
emptyQueue = do
now <- getCurrentTime
return $ Queue 0 now (return ())
flushDbQueue :: DbQueue -> IO ()
flushDbQueue (DQ hdl qvar) = do
Queue sz _ qa <- takeMVar qvar
when (sz > 0) $
commitDb hdl qa
{- Queues a change to be made to the database. It will be queued
- to be committed later, unless the commitchecker action returns true,
- in which case any previously queued changes are also committed.
-
- Transactions built up by queueDb are sent to sqlite all at once.
- If sqlite fails due to another change being made concurrently by another
- process, the transaction is put back in the queue. This avoids
- the sqlite multiple writer problem.
-}
queueDb
:: DbQueue
-> (QueueSize -> LastCommitTime -> IO Bool)
-> SqlPersistM ()
-> IO ()
queueDb (DQ hdl qvar) commitchecker a = do
Queue sz lastcommittime qa <- takeMVar qvar
let !sz' = sz + 1
let qa' = qa >> a
let enqueue = putMVar qvar
ifM (commitchecker sz' lastcommittime)
( do
r <- commitDb' hdl qa'
case r of
Left _ -> enqueue $ Queue sz' lastcommittime qa'
Right _ -> do
now <- getCurrentTime
enqueue $ Queue 0 now (return ())
, enqueue $ Queue sz' lastcommittime qa'
)
|