1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
|
{- Persistent sqlite database handles.
-
- Copyright 2015 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU GPL version 3 or higher.
-}
{-# LANGUAGE BangPatterns #-}
module Database.Handle (
DbHandle,
initDb,
openDb,
queryDb,
closeDb,
Size,
queueDb,
flushQueueDb,
commitDb,
) where
import Utility.Exception
import Utility.Monad
import Messages
import Database.Persist.Sqlite
import qualified Database.Sqlite as Sqlite
import Control.Monad
import Control.Monad.IO.Class (liftIO)
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception (throwIO)
import qualified Data.Text as T
import Control.Monad.Trans.Resource (runResourceT)
import Control.Monad.Logger (runNoLoggingT)
import Data.List
import Data.Time.Clock
{- A DbHandle is a reference to a worker thread that communicates with
- the database. It has a MVar which Jobs are submitted to. -}
data DbHandle = DbHandle (Async ()) (MVar Job) (MVar DbQueue)
{- Ensures that the database is initialized. Pass the migration action for
- the database.
-
- The database is put into WAL mode, to prevent readers from blocking
- writers, and prevent a writer from blocking readers.
-}
initDb :: FilePath -> SqlPersistM () -> IO ()
initDb f migration = do
let db = T.pack f
enableWAL db
runSqlite db migration
enableWAL :: T.Text -> IO ()
enableWAL db = do
conn <- Sqlite.open db
stmt <- Sqlite.prepare conn (T.pack "PRAGMA journal_mode=WAL;")
void $ Sqlite.step stmt
void $ Sqlite.finalize stmt
Sqlite.close conn
{- Opens the database, but does not perform any migrations. Only use
- if the database is known to exist and have the right tables. -}
openDb :: FilePath -> TableName -> IO DbHandle
openDb db tablename = do
jobs <- newEmptyMVar
worker <- async (workerThread (T.pack db) tablename jobs)
q <- newMVar =<< emptyDbQueue
return $ DbHandle worker jobs q
data Job
= QueryJob (SqlPersistM ())
| ChangeJob ((SqlPersistM () -> IO ()) -> IO ())
| CloseJob
type TableName = String
workerThread :: T.Text -> TableName -> MVar Job -> IO ()
workerThread db tablename jobs = catchNonAsync (run loop) showerr
where
showerr e = liftIO $ warningIO $
"sqlite worker thread crashed: " ++ show e
loop = do
job <- liftIO $ takeMVar jobs
case job of
QueryJob a -> a >> loop
-- change is run in a separate database connection
-- since sqlite only supports a single writer at a
-- time, and it may crash the database connection
ChangeJob a -> liftIO (a run) >> loop
CloseJob -> return ()
-- like runSqlite, but calls settle on the raw sql Connection.
run a = do
conn <- Sqlite.open db
settle conn
runResourceT $ runNoLoggingT $
withSqlConn (wrapConnection conn) $
runSqlConn a
-- Work around a bug in sqlite: New database connections can
-- sometimes take a while to become usable; select statements will
-- fail with ErrorBusy for some time. So, loop until a select
-- succeeds; once one succeeds the connection will stay usable.
-- <http://thread.gmane.org/gmane.comp.db.sqlite.general/93116>
settle conn = do
r <- tryNonAsync $ do
stmt <- Sqlite.prepare conn nullselect
void $ Sqlite.step stmt
void $ Sqlite.finalize stmt
case r of
Right _ -> return ()
Left e -> do
if "ErrorBusy" `isInfixOf` show e
then do
threadDelay 1000 -- 1/1000th second
settle conn
else throwIO e
-- This should succeed for any table.
nullselect = T.pack $ "SELECT null from " ++ tablename ++ " limit 1"
{- Makes a query using the DbHandle. This should not be used to make
- changes to the database!
-
- Note that the action is not run by the calling thread, but by a
- worker thread. Exceptions are propigated to the calling thread.
-
- Only one action can be run at a time against a given DbHandle.
- If called concurrently in the same process, this will block until
- it is able to run.
-}
queryDb :: DbHandle -> SqlPersistM a -> IO a
queryDb (DbHandle _ jobs _) a = do
res <- newEmptyMVar
putMVar jobs $ QueryJob $
liftIO . putMVar res =<< tryNonAsync a
either throwIO return =<< takeMVar res
closeDb :: DbHandle -> IO ()
closeDb h@(DbHandle worker jobs _) = do
flushQueueDb h
putMVar jobs CloseJob
wait worker
type Size = Int
type LastCommitTime = UTCTime
{- A queue of actions to perform, with a count of the number of actions
- queued, and a last commit time. -}
data DbQueue = DbQueue Size LastCommitTime (SqlPersistM ())
emptyDbQueue :: IO DbQueue
emptyDbQueue = do
now <- getCurrentTime
return $ DbQueue 0 now (return ())
{- Queues a change to be made to the database. It will be buffered
- to be committed later, unless the commitchecker action returns true.
-
- (Be sure to call closeDb or flushQueueDb to ensure the change
- gets committed.)
-
- Transactions built up by queueDb are sent to sqlite all at once.
- If sqlite fails due to another change being made concurrently by another
- process, the transaction is put back in the queue. This solves
- the sqlite multiple writer problem.
-}
queueDb
:: DbHandle
-> (Size -> LastCommitTime -> IO Bool)
-> SqlPersistM ()
-> IO ()
queueDb h@(DbHandle _ _ qvar) commitchecker a = do
DbQueue sz lastcommittime qa <- takeMVar qvar
let !sz' = sz + 1
let qa' = qa >> a
let enqueue = putMVar qvar
ifM (commitchecker sz' lastcommittime)
( do
r <- commitDb h qa'
case r of
Left _ -> enqueue $ DbQueue sz' lastcommittime qa'
Right _ -> do
now <- getCurrentTime
enqueue $ DbQueue 0 now (return ())
, enqueue $ DbQueue sz' lastcommittime qa'
)
{- If flushing the queue fails, this could be because there is another
- writer to the database. Retry repeatedly for up to 10 seconds. -}
flushQueueDb :: DbHandle -> IO ()
flushQueueDb h@(DbHandle _ _ qvar) = do
DbQueue sz _ qa <- takeMVar qvar
when (sz > 0) $
robustly Nothing 100 (commitDb h qa)
where
robustly :: Maybe SomeException -> Int -> IO (Either SomeException ()) -> IO ()
robustly e 0 _ = error $ "failed to commit changes to sqlite database: " ++ show e
robustly _ n a = do
r <- a
case r of
Right _ -> return ()
Left e -> do
threadDelay 100000 -- 1/10th second
robustly (Just e) (n-1) a
commitDb :: DbHandle -> SqlPersistM () -> IO (Either SomeException ())
commitDb (DbHandle _ jobs _) a = do
res <- newEmptyMVar
putMVar jobs $ ChangeJob $ \runner ->
liftIO $ putMVar res =<< tryNonAsync (runner a)
takeMVar res
|