diff options
-rw-r--r-- | Assistant/Monad.hs | 2 | ||||
-rw-r--r-- | Assistant/TransferrerPool.hs | 38 | ||||
-rw-r--r-- | Assistant/Types/TransferrerPool.hs | 53 | ||||
-rw-r--r-- | Utility/NotificationBroadcaster.hs | 34 | ||||
-rw-r--r-- | debian/changelog | 3 |
5 files changed, 102 insertions, 28 deletions
diff --git a/Assistant/Monad.hs b/Assistant/Monad.hs index 44ae210b7..7c28c7f6f 100644 --- a/Assistant/Monad.hs +++ b/Assistant/Monad.hs @@ -78,7 +78,7 @@ newAssistantData st dstatus = AssistantData <*> newScanRemoteMap <*> newTransferQueue <*> newTransferSlots - <*> newTransferrerPool + <*> newTransferrerPool (checkNetworkConnections dstatus) <*> newFailedPushMap <*> newCommitChan <*> newChangePool diff --git a/Assistant/TransferrerPool.hs b/Assistant/TransferrerPool.hs index e7aa72924..580417305 100644 --- a/Assistant/TransferrerPool.hs +++ b/Assistant/TransferrerPool.hs @@ -14,27 +14,43 @@ import Utility.Batch import qualified Command.TransferKeys as T -import Control.Concurrent.STM +import Control.Concurrent.STM hiding (check) import System.Process (create_group, std_in, std_out) import Control.Exception (throw) import Control.Concurrent -{- Runs an action with a Transferrer from the pool. -} +{- Runs an action with a Transferrer from the pool. + - + - Only one Transferrer is left running in the pool at a time. + - So if this needed to start a new Transferrer, it's stopped when done. + -} withTransferrer :: FilePath -> BatchCommandMaker -> TransferrerPool -> (Transferrer -> IO a) -> IO a withTransferrer program batchmaker pool a = do - t <- maybe (mkTransferrer program batchmaker) (checkTransferrer program batchmaker) - =<< atomically (tryReadTChan pool) + i@(TransferrerPoolItem (Just t) _) <- maybe + (mkTransferrerPoolItem pool =<< mkTransferrer program batchmaker) + (checkTransferrerPoolItem program batchmaker) + =<< atomically (popTransferrerPool pool) v <- tryNonAsync $ a t - unlessM (putback t) $ + sz <- atomically $ pushTransferrerPool pool i + when (sz > 1) $ void $ forkIO $ stopTransferrer t either throw return v - where - putback t = atomically $ ifM (isEmptyTChan pool) - ( do - writeTChan pool t - return True - , return False + +{- Check if a Transferrer from the pool is still ok to be used. + - If not, stop it and start a new one. -} +checkTransferrerPoolItem :: FilePath -> BatchCommandMaker -> TransferrerPoolItem -> IO TransferrerPoolItem +checkTransferrerPoolItem program batchmaker i = case i of + TransferrerPoolItem (Just t) check -> ifM check + ( return i + , do + stopTransferrer t + new check ) + TransferrerPoolItem Nothing check -> new check + where + new check = do + t <- mkTransferrer program batchmaker + return $ TransferrerPoolItem (Just t) check {- Requests that a Transferrer perform a Transfer, and waits for it to - finish. -} diff --git a/Assistant/Types/TransferrerPool.hs b/Assistant/Types/TransferrerPool.hs index 2727a6919..899e6969f 100644 --- a/Assistant/Types/TransferrerPool.hs +++ b/Assistant/Types/TransferrerPool.hs @@ -1,4 +1,4 @@ -{- A pool of "git-annex transferkeys" processes +{- A pool of "git-annex transferkeys" processes available for use - - Copyright 2013 Joey Hess <joey@kitenet.net> - @@ -8,10 +8,21 @@ module Assistant.Types.TransferrerPool where import Common.Annex +import Utility.NotificationBroadcaster +import Assistant.Types.DaemonStatus import Control.Concurrent.STM -type TransferrerPool = TChan Transferrer +{- This TMVar is never left empty. -} +type TransferrerPool = TMVar (MkCheckTransferrer, [TransferrerPoolItem]) + +type CheckTransferrer = IO Bool +type MkCheckTransferrer = IO (IO Bool) + +{- Each item in the pool may have a transferrer running, and has an + - IO action that can be used to check if it's still ok to use the + - transferrer. -} +data TransferrerPoolItem = TransferrerPoolItem (Maybe Transferrer) CheckTransferrer data Transferrer = Transferrer { transferrerRead :: Handle @@ -19,5 +30,39 @@ data Transferrer = Transferrer , transferrerHandle :: ProcessHandle } -newTransferrerPool :: IO TransferrerPool -newTransferrerPool = newTChanIO +newTransferrerPool :: MkCheckTransferrer -> IO TransferrerPool +newTransferrerPool c = newTMVarIO (c, []) + +popTransferrerPool :: TransferrerPool -> STM (Maybe TransferrerPoolItem) +popTransferrerPool p = do + (c, l) <- takeTMVar p + case l of + [] -> do + putTMVar p (c, []) + return Nothing + (i:is) -> do + putTMVar p (c, is) + return $ Just i + +pushTransferrerPool :: TransferrerPool -> TransferrerPoolItem -> STM Int +pushTransferrerPool p i = do + (c, l) <- takeTMVar p + let l' = i:l + putTMVar p (c, l') + return $ length l' + +{- Note that making a CheckTransferrer may allocate resources, + - such as a NotificationHandle, so it's important that the returned + - TransferrerPoolItem is pushed into the pool, and not left to be + - garbage collected. -} +mkTransferrerPoolItem :: TransferrerPool -> Transferrer -> IO TransferrerPoolItem +mkTransferrerPoolItem p t = do + mkcheck <- atomically $ fst <$> readTMVar p + check <- mkcheck + return $ TransferrerPoolItem (Just t) check + +checkNetworkConnections :: DaemonStatusHandle -> MkCheckTransferrer +checkNetworkConnections dstatushandle = do + dstatus <- atomically $ readTMVar dstatushandle + h <- newNotificationHandle False (networkConnectedNotifier dstatus) + return $ checkNotification h diff --git a/Utility/NotificationBroadcaster.hs b/Utility/NotificationBroadcaster.hs index b873df655..60353116c 100644 --- a/Utility/NotificationBroadcaster.hs +++ b/Utility/NotificationBroadcaster.hs @@ -21,15 +21,17 @@ module Utility.NotificationBroadcaster ( notificationHandleFromId, sendNotification, waitNotification, + checkNotification, ) where import Common import Control.Concurrent.STM -import Control.Concurrent.MSampleVar -{- One MSampleVar per client. The TMVar is never empty, so never blocks. -} -type NotificationBroadcaster = TMVar [MSampleVar ()] +{- One TMVar per client, which are empty when no notification is pending, + - and full when a notification has been sent but not yet seen by the + - client. The list TMVar is never empty, so never blocks. -} +type NotificationBroadcaster = TMVar [TMVar ()] newtype NotificationId = NotificationId Int deriving (Read, Show, Eq, Ord) @@ -53,14 +55,13 @@ newNotificationHandle force b = NotificationHandle <$> pure b <*> addclient where - addclient = do + addclient = atomically $ do s <- if force - then newSV () - else newEmptySV - atomically $ do - l <- takeTMVar b - putTMVar b $ l ++ [s] - return $ NotificationId $ length l + then newTMVar () + else newEmptyTMVar + l <- takeTMVar b + putTMVar b $ l ++ [s] + return $ NotificationId $ length l {- Extracts the identifier from a notification handle. - This can be used to eg, pass the identifier through to a WebApp. -} @@ -76,11 +77,20 @@ sendNotification b = do l <- atomically $ readTMVar b mapM_ notify l where - notify s = writeSV s () + notify s = atomically $ + whenM (isEmptyTMVar s) $ + putTMVar s () {- Used by a client to block until a new notification is available since - the last time it tried. -} waitNotification :: NotificationHandle -> IO () waitNotification (NotificationHandle b (NotificationId i)) = do l <- atomically $ readTMVar b - readSV (l !! i) + atomically $ takeTMVar (l !! i) + +{- Used by a client to check if there has been a new notification since the + - last time it checked, without blocking. -} +checkNotification :: NotificationHandle -> IO Bool +checkNotification (NotificationHandle b (NotificationId i)) = do + l <- atomically $ readTMVar b + maybe False (const True) <$> atomically (tryTakeTMVar (l !! i)) diff --git a/debian/changelog b/debian/changelog index a7339fded..ac6603ada 100644 --- a/debian/changelog +++ b/debian/changelog @@ -20,6 +20,9 @@ git-annex (5.20131231) UNRELEASED; urgency=medium * add: Fix rollback when disk is completely full. * assistant: Fixed several minor memory leaks that manifested when adding a large number of files. + * assistant: Start a new git-annex transferkeys process + after a network connection change, so that remotes that use a persistent + network connection are restarted. -- Joey Hess <joeyh@debian.org> Tue, 31 Dec 2013 13:41:18 -0400 |