diff options
Diffstat (limited to 'Database')
-rw-r--r-- | Database/Export.hs | 2 | ||||
-rw-r--r-- | Database/Fsck.hs | 2 | ||||
-rw-r--r-- | Database/Handle.hs | 65 | ||||
-rw-r--r-- | Database/Keys.hs | 2 | ||||
-rw-r--r-- | Database/Queue.hs | 12 |
5 files changed, 61 insertions, 22 deletions
diff --git a/Database/Export.hs b/Database/Export.hs index dcef88854..00c6ab251 100644 --- a/Database/Export.hs +++ b/Database/Export.hs @@ -48,7 +48,7 @@ openDb u = do unlessM (liftIO $ doesFileExist db) $ do initDb db $ void $ runMigrationSilent migrateExport - h <- liftIO $ H.openDbQueue db "exported" + h <- liftIO $ H.openDbQueue H.SingleWriter db "exported" return $ ExportHandle h closeDb :: ExportHandle -> Annex () diff --git a/Database/Fsck.hs b/Database/Fsck.hs index 9affeac85..1ce513dcf 100644 --- a/Database/Fsck.hs +++ b/Database/Fsck.hs @@ -63,7 +63,7 @@ openDb u = do initDb db $ void $ runMigrationSilent migrateFsck lockFileCached =<< fromRepo (gitAnnexFsckDbLock u) - h <- liftIO $ H.openDbQueue db "fscked" + h <- liftIO $ H.openDbQueue H.MultiWriter db "fscked" return $ FsckHandle h u closeDb :: FsckHandle -> Annex () diff --git a/Database/Handle.hs b/Database/Handle.hs index 7827be749..f5a0a5dda 100644 --- a/Database/Handle.hs +++ b/Database/Handle.hs @@ -9,6 +9,7 @@ module Database.Handle ( DbHandle, + DbConcurrency(..), openDb, TableName, queryDb, @@ -35,27 +36,49 @@ import System.IO {- A DbHandle is a reference to a worker thread that communicates with - the database. It has a MVar which Jobs are submitted to. -} -data DbHandle = DbHandle (Async ()) (MVar Job) +data DbHandle = DbHandle DbConcurrency (Async ()) (MVar Job) {- Name of a table that should exist once the database is initialized. -} type TableName = String +{- Sqlite only allows a single write to a database at a time; a concurrent + - write will crash. + - + - While a DbHandle serializes concurrent writes from + - multiple threads. But, when a database can be written to by + - multiple processes concurrently, use MultiWriter to make writes + - to the database be done robustly. + - + - The downside of using MultiWriter is that after writing a change to the + - database, the a query using the same DbHandle will not immediately see + - the change! This is because the change is actually written using a + - separate database connection, and caching can prevent seeing the change. + - Also, consider that if multiple processes are writing to a database, + - you can't rely on seeing values you've just written anyway, as another + - process may change them. + - + - When a database can only be written to by a single process, use + - SingleWriter. Changes written to the database will always be immediately + - visible then. + -} +data DbConcurrency = SingleWriter | MultiWriter + {- Opens the database, but does not perform any migrations. Only use - - if the database is known to exist and have the right tables. -} -openDb :: FilePath -> TableName -> IO DbHandle -openDb db tablename = do + - once the database is known to exist and have the right tables. -} +openDb :: DbConcurrency -> FilePath -> TableName -> IO DbHandle +openDb dbconcurrency db tablename = do jobs <- newEmptyMVar worker <- async (workerThread (T.pack db) tablename jobs) -- work around https://github.com/yesodweb/persistent/issues/474 liftIO $ fileEncoding stderr - return $ DbHandle worker jobs + return $ DbHandle dbconcurrency worker jobs {- This is optional; when the DbHandle gets garbage collected it will - auto-close. -} closeDb :: DbHandle -> IO () -closeDb (DbHandle worker jobs) = do +closeDb (DbHandle _ worker jobs) = do putMVar jobs CloseJob wait worker @@ -68,9 +91,12 @@ closeDb (DbHandle worker jobs) = do - Only one action can be run at a time against a given DbHandle. - If called concurrently in the same process, this will block until - it is able to run. + - + - Note that when the DbHandle was opened in MultiWriter mode, recent + - writes may not be seen by queryDb. -} queryDb :: DbHandle -> SqlPersistM a -> IO a -queryDb (DbHandle _ jobs) a = do +queryDb (DbHandle _ _ jobs) a = do res <- newEmptyMVar putMVar jobs $ QueryJob $ liftIO . putMVar res =<< tryNonAsync a @@ -79,9 +105,9 @@ queryDb (DbHandle _ jobs) a = do {- Writes a change to the database. - - - If a database is opened multiple times and there's a concurrent writer, - - the write could fail. Retries repeatedly for up to 10 seconds, - - which should avoid all but the most exceptional problems. + - In MultiWriter mode, catches failure to write to the database, + - and retries repeatedly for up to 10 seconds, which should avoid + - all but the most exceptional problems. -} commitDb :: DbHandle -> SqlPersistM () -> IO () commitDb h wa = robustly Nothing 100 (commitDb' h wa) @@ -97,15 +123,22 @@ commitDb h wa = robustly Nothing 100 (commitDb' h wa) robustly (Just e) (n-1) a commitDb' :: DbHandle -> SqlPersistM () -> IO (Either SomeException ()) -commitDb' (DbHandle _ jobs) a = do +commitDb' (DbHandle MultiWriter _ jobs) a = do res <- newEmptyMVar - putMVar jobs $ ChangeJob $ \runner -> + putMVar jobs $ RobustChangeJob $ \runner -> liftIO $ putMVar res =<< tryNonAsync (runner a) takeMVar res +commitDb' (DbHandle SingleWriter _ jobs) a = do + res <- newEmptyMVar + putMVar jobs $ ChangeJob $ + liftIO . putMVar res =<< tryNonAsync a + takeMVar res + `catchNonAsync` (const $ error "sqlite commit crashed") data Job = QueryJob (SqlPersistM ()) - | ChangeJob ((SqlPersistM () -> IO ()) -> IO ()) + | ChangeJob (SqlPersistM ()) + | RobustChangeJob ((SqlPersistM () -> IO ()) -> IO ()) | CloseJob workerThread :: T.Text -> TableName -> MVar Job -> IO () @@ -127,10 +160,12 @@ workerThread db tablename jobs = Left BlockedIndefinitelyOnMVar -> return () Right CloseJob -> return () Right (QueryJob a) -> a >> loop - -- change is run in a separate database connection + Right (ChangeJob a) -> a >> loop + -- Change is run in a separate database connection -- since sqlite only supports a single writer at a -- time, and it may crash the database connection - Right (ChangeJob a) -> liftIO (a (runSqliteRobustly tablename db)) >> loop + -- that the write is made to. + Right (RobustChangeJob a) -> liftIO (a (runSqliteRobustly tablename db)) >> loop -- like runSqlite, but calls settle on the raw sql Connection. runSqliteRobustly :: TableName -> T.Text -> (SqlPersistM a) -> IO a diff --git a/Database/Keys.hs b/Database/Keys.hs index b9440ac1a..282da9f94 100644 --- a/Database/Keys.hs +++ b/Database/Keys.hs @@ -124,7 +124,7 @@ openDb createdb _ = catchPermissionDenied permerr $ withExclusiveLock gitAnnexKe open db (False, False) -> return DbUnavailable where - open db = liftIO $ DbOpen <$> H.openDbQueue db SQL.containedTable + open db = liftIO $ DbOpen <$> H.openDbQueue H.MultiWriter db SQL.containedTable -- If permissions don't allow opening the database, treat it as if -- it does not exist. permerr e = case createdb of diff --git a/Database/Queue.hs b/Database/Queue.hs index 143871079..f0a2d2b65 100644 --- a/Database/Queue.hs +++ b/Database/Queue.hs @@ -9,6 +9,7 @@ module Database.Queue ( DbQueue, + DbConcurrency(..), openDbQueue, queryDbQueue, closeDbQueue, @@ -35,9 +36,9 @@ data DbQueue = DQ DbHandle (MVar Queue) {- Opens the database queue, but does not perform any migrations. Only use - if the database is known to exist and have the right tables; ie after - running initDb. -} -openDbQueue :: FilePath -> TableName -> IO DbQueue -openDbQueue db tablename = DQ - <$> openDb db tablename +openDbQueue :: DbConcurrency -> FilePath -> TableName -> IO DbQueue +openDbQueue dbconcurrency db tablename = DQ + <$> openDb dbconcurrency db tablename <*> (newMVar =<< emptyQueue) {- This or flushDbQueue must be called, eg at program exit to ensure @@ -60,8 +61,11 @@ flushDbQueue (DQ hdl qvar) = do {- Makes a query using the DbQueue's database connection. - This should not be used to make changes to the database! - - - Queries will not return changes that have been recently queued, + - Queries will not see changes that have been recently queued, - so use with care. + - + - Also, when the database was opened in MultiWriter mode, + - queries may not see changes even after flushDbQueue. -} queryDbQueue :: DbQueue -> SqlPersistM a -> IO a queryDbQueue (DQ hdl _) = queryDb hdl |