aboutsummaryrefslogtreecommitdiff
path: root/Database
diff options
context:
space:
mode:
authorGravatar Joey Hess <joeyh@joeyh.name>2015-02-18 14:11:27 -0400
committerGravatar Joey Hess <joeyh@joeyh.name>2015-02-18 14:11:27 -0400
commit0a653b847cf792632d9da3d6cb95170d57b503d9 (patch)
tree432c838e114294c592f705034ba47cfaa91261ec /Database
parent2f95faac015c5a80a832d5c3327daca8d98427e7 (diff)
more robust handling of deferred commits
Still not robust enough. I have 3 fscks running concurrently, and am seeing: ("commit deferred",user error (SQLite3 returned ErrorBusy while attempting to perform step.)) and git-annex: user error (SQLite3 returned ErrorBusy while attempting to perform prepare "SELECT \"fscked\".\"key\"\nFROM \"fscked\"\nWHERE \"fscked\".\"key\" = ?\n": database is locked)
Diffstat (limited to 'Database')
-rw-r--r--Database/Fsck.hs6
-rw-r--r--Database/Handle.hs98
2 files changed, 58 insertions, 46 deletions
diff --git a/Database/Fsck.hs b/Database/Fsck.hs
index ac3d03452..5301109bc 100644
--- a/Database/Fsck.hs
+++ b/Database/Fsck.hs
@@ -65,8 +65,8 @@ openDb u = do
unlessM (liftIO $ doesFileExist db) $ do
let newdb = db ++ ".new"
h <- liftIO $ H.openDb newdb
- void $ liftIO $ H.runDb h $
- runMigrationSilent migrateFsck
+ void $ liftIO $ H.commitDb h $
+ void $ runMigrationSilent migrateFsck
liftIO $ H.closeDb h
setAnnexFilePerm newdb
liftIO $ renameFile newdb db
@@ -87,7 +87,7 @@ addDb (FsckHandle h _) k = H.queueDb h 1000 $
sk = toSKey k
inDb :: FsckHandle -> Key -> IO Bool
-inDb (FsckHandle h _) = H.runDb h . inDb' . toSKey
+inDb (FsckHandle h _) = H.queryDb h . inDb' . toSKey
inDb' :: SKey -> SqlPersistM Bool
inDb' sk = do
diff --git a/Database/Handle.hs b/Database/Handle.hs
index 4397a0dea..0a426d454 100644
--- a/Database/Handle.hs
+++ b/Database/Handle.hs
@@ -10,12 +10,12 @@
module Database.Handle (
DbHandle,
openDb,
- runDb,
- commitDb,
+ queryDb,
closeDb,
Size,
queueDb,
flushQueueDb,
+ commitDb,
) where
import Utility.Exception
@@ -33,8 +33,6 @@ import qualified Data.Text as T
- the database. It has a MVar which Jobs are submitted to. -}
data DbHandle = DbHandle (Async ()) (MVar Job) (MVar DbQueue)
-data Job = RunJob (SqlPersistM ()) | CommitJob | CloseJob
-
openDb :: FilePath -> IO DbHandle
openDb db = do
jobs <- newEmptyMVar
@@ -42,25 +40,34 @@ openDb db = do
q <- newMVar emptyDbQueue
return $ DbHandle worker jobs q
+data Job
+ = QueryJob (SqlPersistM ())
+ | ChangeJob ((SqlPersistM () -> IO ()) -> IO ())
+ | CloseJob
+
workerThread :: T.Text -> MVar Job -> IO ()
-workerThread db jobs = catchNonAsync go showerr
+workerThread db jobs = catchNonAsync loop showerr
where
- showerr e = liftIO $ warningIO $ "sqlite worker thread crashed: " ++ show e
- go = do
- r <- runSqlite db transaction
+ showerr e = liftIO $ warningIO $
+ "sqlite worker thread crashed: " ++ show e
+ run = runSqlite db
+ loop = do
+ r <- run queryloop
case r of
+ QueryJob _ -> loop
+ -- change is run in a separate database connection
+ -- since sqlite only supports a single writer at a
+ -- time, and it may crash the database connection
+ ChangeJob a -> a run >> loop
CloseJob -> return ()
- _ -> go
- transaction = do
+ queryloop = do
job <- liftIO $ takeMVar jobs
case job of
- RunJob a -> a >> transaction
- CommitJob -> return CommitJob
- CloseJob -> return CloseJob
+ QueryJob a -> a >> queryloop
+ _ -> return job
-{- Runs an action using the DbHandle. The action may be a query, or it may
- - make a change. Changes are bundled up in a transaction, which does not
- - complete until commitDb is called.
+{- Makes a query using the DbHandle. This should not be used to make
+ - changes to the database!
-
- Note that the action is not run by the calling thread, but by a
- worker thread. Exceptions are propigated to the calling thread.
@@ -68,23 +75,14 @@ workerThread db jobs = catchNonAsync go showerr
- Only one action can be run at a time against a given DbHandle.
- If called concurrently in the same process, this will block until
- it is able to run.
- -
- - Note that if multiple processes are trying to change the database
- - at the same time, sqlite will only let one build a transaction at a
- - time.
-}
-runDb :: DbHandle -> SqlPersistM a -> IO a
-runDb (DbHandle _ jobs _) a = do
+queryDb :: DbHandle -> SqlPersistM a -> IO a
+queryDb (DbHandle _ jobs _) a = do
res <- newEmptyMVar
- putMVar jobs $ RunJob $
+ putMVar jobs $ QueryJob $
liftIO . putMVar res =<< tryNonAsync a
either throwIO return =<< takeMVar res
-{- Commits any transaction that was created by the previous calls to runDb,
- - and starts a new transaction. -}
-commitDb :: DbHandle -> IO ()
-commitDb (DbHandle _ jobs _) = putMVar jobs CommitJob
-
closeDb :: DbHandle -> IO ()
closeDb h@(DbHandle worker jobs _) = do
flushQueueDb h
@@ -100,11 +98,12 @@ data DbQueue = DbQueue Size (SqlPersistM ())
emptyDbQueue :: DbQueue
emptyDbQueue = DbQueue 0 (return ())
-{- Queues a change to be committed to the database. It will be buffered
+{- Queues a change to be made to the database. It will be buffered
- to be committed later, unless the queue gets larger than the specified
- size.
-
- - (Be sure to call closeDb or flushQueue to ensure the change gets committed.)
+ - (Be sure to call closeDb or flushQueueDb to ensure the change
+ - gets committed.)
-
- Transactions built up by queueDb are sent to sqlite all at once.
- If sqlite fails due to another change being made concurrently by another
@@ -116,16 +115,18 @@ queueDb h@(DbHandle _ _ qvar) maxsz a = do
DbQueue sz qa <- takeMVar qvar
let !sz' = sz + 1
let qa' = qa >> a
- let enqueue = putMVar qvar (DbQueue sz' qa')
+ let enqueue newsz = putMVar qvar (DbQueue newsz qa')
if sz' > maxsz
then do
- r <- tryNonAsync $ do
- runDb h qa'
- commitDb h
+ r <- commitDb h qa'
case r of
- Left _ -> enqueue
- Right _ -> putMVar qvar emptyDbQueue
- else enqueue
+ Left e -> do
+ print ("commit deferred", e)
+ enqueue 0
+ Right _ -> do
+ print "commit made"
+ putMVar qvar emptyDbQueue
+ else enqueue sz'
{- If flushing the queue fails, this could be because there is another
- writer to the database. Retry repeatedly for up to 10 seconds. -}
@@ -133,10 +134,21 @@ flushQueueDb :: DbHandle -> IO ()
flushQueueDb h@(DbHandle _ _ qvar) = do
DbQueue sz qa <- takeMVar qvar
when (sz > 0) $
- robustly 100 $ runDb h qa
+ robustly Nothing 100 (commitDb h qa)
where
- robustly :: Int -> IO () -> IO ()
- robustly 0 _ = error "failed to commit changes to sqlite database"
- robustly n a = catchNonAsync a $ \_ -> do
- threadDelay 100000 -- 1/10th second
- robustly (n-1) a
+ robustly :: Maybe SomeException -> Int -> IO (Either SomeException ()) -> IO ()
+ robustly e 0 _ = error $ "failed to commit changes to sqlite database: " ++ show e
+ robustly _ n a = do
+ r <- a
+ case r of
+ Right _ -> return ()
+ Left e -> do
+ threadDelay 100000 -- 1/10th second
+ robustly (Just e) (n-1) a
+
+commitDb :: DbHandle -> SqlPersistM () -> IO (Either SomeException ())
+commitDb (DbHandle _ jobs _) a = do
+ res <- newEmptyMVar
+ putMVar jobs $ ChangeJob $ \runner ->
+ liftIO $ putMVar res =<< tryNonAsync (runner a)
+ takeMVar res