diff options
author | Joey Hess <joeyh@joeyh.name> | 2015-02-18 14:11:27 -0400 |
---|---|---|
committer | Joey Hess <joeyh@joeyh.name> | 2015-02-18 14:11:27 -0400 |
commit | 0a653b847cf792632d9da3d6cb95170d57b503d9 (patch) | |
tree | 432c838e114294c592f705034ba47cfaa91261ec /Database | |
parent | 2f95faac015c5a80a832d5c3327daca8d98427e7 (diff) |
more robust handling of deferred commits
Still not robust enough. I have 3 fscks running concurrently, and am
seeing:
("commit deferred",user error (SQLite3 returned ErrorBusy while attempting
to perform step.))
and
git-annex: user error (SQLite3 returned ErrorBusy while attempting to perform prepare "SELECT \"fscked\".\"key\"\nFROM \"fscked\"\nWHERE \"fscked\".\"key\" = ?\n": database is locked)
Diffstat (limited to 'Database')
-rw-r--r-- | Database/Fsck.hs | 6 | ||||
-rw-r--r-- | Database/Handle.hs | 98 |
2 files changed, 58 insertions, 46 deletions
diff --git a/Database/Fsck.hs b/Database/Fsck.hs index ac3d03452..5301109bc 100644 --- a/Database/Fsck.hs +++ b/Database/Fsck.hs @@ -65,8 +65,8 @@ openDb u = do unlessM (liftIO $ doesFileExist db) $ do let newdb = db ++ ".new" h <- liftIO $ H.openDb newdb - void $ liftIO $ H.runDb h $ - runMigrationSilent migrateFsck + void $ liftIO $ H.commitDb h $ + void $ runMigrationSilent migrateFsck liftIO $ H.closeDb h setAnnexFilePerm newdb liftIO $ renameFile newdb db @@ -87,7 +87,7 @@ addDb (FsckHandle h _) k = H.queueDb h 1000 $ sk = toSKey k inDb :: FsckHandle -> Key -> IO Bool -inDb (FsckHandle h _) = H.runDb h . inDb' . toSKey +inDb (FsckHandle h _) = H.queryDb h . inDb' . toSKey inDb' :: SKey -> SqlPersistM Bool inDb' sk = do diff --git a/Database/Handle.hs b/Database/Handle.hs index 4397a0dea..0a426d454 100644 --- a/Database/Handle.hs +++ b/Database/Handle.hs @@ -10,12 +10,12 @@ module Database.Handle ( DbHandle, openDb, - runDb, - commitDb, + queryDb, closeDb, Size, queueDb, flushQueueDb, + commitDb, ) where import Utility.Exception @@ -33,8 +33,6 @@ import qualified Data.Text as T - the database. It has a MVar which Jobs are submitted to. -} data DbHandle = DbHandle (Async ()) (MVar Job) (MVar DbQueue) -data Job = RunJob (SqlPersistM ()) | CommitJob | CloseJob - openDb :: FilePath -> IO DbHandle openDb db = do jobs <- newEmptyMVar @@ -42,25 +40,34 @@ openDb db = do q <- newMVar emptyDbQueue return $ DbHandle worker jobs q +data Job + = QueryJob (SqlPersistM ()) + | ChangeJob ((SqlPersistM () -> IO ()) -> IO ()) + | CloseJob + workerThread :: T.Text -> MVar Job -> IO () -workerThread db jobs = catchNonAsync go showerr +workerThread db jobs = catchNonAsync loop showerr where - showerr e = liftIO $ warningIO $ "sqlite worker thread crashed: " ++ show e - go = do - r <- runSqlite db transaction + showerr e = liftIO $ warningIO $ + "sqlite worker thread crashed: " ++ show e + run = runSqlite db + loop = do + r <- run queryloop case r of + QueryJob _ -> loop + -- change is run in a separate database connection + -- since sqlite only supports a single writer at a + -- time, and it may crash the database connection + ChangeJob a -> a run >> loop CloseJob -> return () - _ -> go - transaction = do + queryloop = do job <- liftIO $ takeMVar jobs case job of - RunJob a -> a >> transaction - CommitJob -> return CommitJob - CloseJob -> return CloseJob + QueryJob a -> a >> queryloop + _ -> return job -{- Runs an action using the DbHandle. The action may be a query, or it may - - make a change. Changes are bundled up in a transaction, which does not - - complete until commitDb is called. +{- Makes a query using the DbHandle. This should not be used to make + - changes to the database! - - Note that the action is not run by the calling thread, but by a - worker thread. Exceptions are propigated to the calling thread. @@ -68,23 +75,14 @@ workerThread db jobs = catchNonAsync go showerr - Only one action can be run at a time against a given DbHandle. - If called concurrently in the same process, this will block until - it is able to run. - - - - Note that if multiple processes are trying to change the database - - at the same time, sqlite will only let one build a transaction at a - - time. -} -runDb :: DbHandle -> SqlPersistM a -> IO a -runDb (DbHandle _ jobs _) a = do +queryDb :: DbHandle -> SqlPersistM a -> IO a +queryDb (DbHandle _ jobs _) a = do res <- newEmptyMVar - putMVar jobs $ RunJob $ + putMVar jobs $ QueryJob $ liftIO . putMVar res =<< tryNonAsync a either throwIO return =<< takeMVar res -{- Commits any transaction that was created by the previous calls to runDb, - - and starts a new transaction. -} -commitDb :: DbHandle -> IO () -commitDb (DbHandle _ jobs _) = putMVar jobs CommitJob - closeDb :: DbHandle -> IO () closeDb h@(DbHandle worker jobs _) = do flushQueueDb h @@ -100,11 +98,12 @@ data DbQueue = DbQueue Size (SqlPersistM ()) emptyDbQueue :: DbQueue emptyDbQueue = DbQueue 0 (return ()) -{- Queues a change to be committed to the database. It will be buffered +{- Queues a change to be made to the database. It will be buffered - to be committed later, unless the queue gets larger than the specified - size. - - - (Be sure to call closeDb or flushQueue to ensure the change gets committed.) + - (Be sure to call closeDb or flushQueueDb to ensure the change + - gets committed.) - - Transactions built up by queueDb are sent to sqlite all at once. - If sqlite fails due to another change being made concurrently by another @@ -116,16 +115,18 @@ queueDb h@(DbHandle _ _ qvar) maxsz a = do DbQueue sz qa <- takeMVar qvar let !sz' = sz + 1 let qa' = qa >> a - let enqueue = putMVar qvar (DbQueue sz' qa') + let enqueue newsz = putMVar qvar (DbQueue newsz qa') if sz' > maxsz then do - r <- tryNonAsync $ do - runDb h qa' - commitDb h + r <- commitDb h qa' case r of - Left _ -> enqueue - Right _ -> putMVar qvar emptyDbQueue - else enqueue + Left e -> do + print ("commit deferred", e) + enqueue 0 + Right _ -> do + print "commit made" + putMVar qvar emptyDbQueue + else enqueue sz' {- If flushing the queue fails, this could be because there is another - writer to the database. Retry repeatedly for up to 10 seconds. -} @@ -133,10 +134,21 @@ flushQueueDb :: DbHandle -> IO () flushQueueDb h@(DbHandle _ _ qvar) = do DbQueue sz qa <- takeMVar qvar when (sz > 0) $ - robustly 100 $ runDb h qa + robustly Nothing 100 (commitDb h qa) where - robustly :: Int -> IO () -> IO () - robustly 0 _ = error "failed to commit changes to sqlite database" - robustly n a = catchNonAsync a $ \_ -> do - threadDelay 100000 -- 1/10th second - robustly (n-1) a + robustly :: Maybe SomeException -> Int -> IO (Either SomeException ()) -> IO () + robustly e 0 _ = error $ "failed to commit changes to sqlite database: " ++ show e + robustly _ n a = do + r <- a + case r of + Right _ -> return () + Left e -> do + threadDelay 100000 -- 1/10th second + robustly (Just e) (n-1) a + +commitDb :: DbHandle -> SqlPersistM () -> IO (Either SomeException ()) +commitDb (DbHandle _ jobs _) a = do + res <- newEmptyMVar + putMVar jobs $ ChangeJob $ \runner -> + liftIO $ putMVar res =<< tryNonAsync (runner a) + takeMVar res |