diff options
Diffstat (limited to 'Assistant')
-rw-r--r-- | Assistant/Monad.hs | 2 | ||||
-rw-r--r-- | Assistant/TransferrerPool.hs | 38 | ||||
-rw-r--r-- | Assistant/Types/TransferrerPool.hs | 53 |
3 files changed, 77 insertions, 16 deletions
diff --git a/Assistant/Monad.hs b/Assistant/Monad.hs index 44ae210b7..7c28c7f6f 100644 --- a/Assistant/Monad.hs +++ b/Assistant/Monad.hs @@ -78,7 +78,7 @@ newAssistantData st dstatus = AssistantData <*> newScanRemoteMap <*> newTransferQueue <*> newTransferSlots - <*> newTransferrerPool + <*> newTransferrerPool (checkNetworkConnections dstatus) <*> newFailedPushMap <*> newCommitChan <*> newChangePool diff --git a/Assistant/TransferrerPool.hs b/Assistant/TransferrerPool.hs index e7aa72924..580417305 100644 --- a/Assistant/TransferrerPool.hs +++ b/Assistant/TransferrerPool.hs @@ -14,27 +14,43 @@ import Utility.Batch import qualified Command.TransferKeys as T -import Control.Concurrent.STM +import Control.Concurrent.STM hiding (check) import System.Process (create_group, std_in, std_out) import Control.Exception (throw) import Control.Concurrent -{- Runs an action with a Transferrer from the pool. -} +{- Runs an action with a Transferrer from the pool. + - + - Only one Transferrer is left running in the pool at a time. + - So if this needed to start a new Transferrer, it's stopped when done. + -} withTransferrer :: FilePath -> BatchCommandMaker -> TransferrerPool -> (Transferrer -> IO a) -> IO a withTransferrer program batchmaker pool a = do - t <- maybe (mkTransferrer program batchmaker) (checkTransferrer program batchmaker) - =<< atomically (tryReadTChan pool) + i@(TransferrerPoolItem (Just t) _) <- maybe + (mkTransferrerPoolItem pool =<< mkTransferrer program batchmaker) + (checkTransferrerPoolItem program batchmaker) + =<< atomically (popTransferrerPool pool) v <- tryNonAsync $ a t - unlessM (putback t) $ + sz <- atomically $ pushTransferrerPool pool i + when (sz > 1) $ void $ forkIO $ stopTransferrer t either throw return v - where - putback t = atomically $ ifM (isEmptyTChan pool) - ( do - writeTChan pool t - return True - , return False + +{- Check if a Transferrer from the pool is still ok to be used. + - If not, stop it and start a new one. -} +checkTransferrerPoolItem :: FilePath -> BatchCommandMaker -> TransferrerPoolItem -> IO TransferrerPoolItem +checkTransferrerPoolItem program batchmaker i = case i of + TransferrerPoolItem (Just t) check -> ifM check + ( return i + , do + stopTransferrer t + new check ) + TransferrerPoolItem Nothing check -> new check + where + new check = do + t <- mkTransferrer program batchmaker + return $ TransferrerPoolItem (Just t) check {- Requests that a Transferrer perform a Transfer, and waits for it to - finish. -} diff --git a/Assistant/Types/TransferrerPool.hs b/Assistant/Types/TransferrerPool.hs index 2727a6919..899e6969f 100644 --- a/Assistant/Types/TransferrerPool.hs +++ b/Assistant/Types/TransferrerPool.hs @@ -1,4 +1,4 @@ -{- A pool of "git-annex transferkeys" processes +{- A pool of "git-annex transferkeys" processes available for use - - Copyright 2013 Joey Hess <joey@kitenet.net> - @@ -8,10 +8,21 @@ module Assistant.Types.TransferrerPool where import Common.Annex +import Utility.NotificationBroadcaster +import Assistant.Types.DaemonStatus import Control.Concurrent.STM -type TransferrerPool = TChan Transferrer +{- This TMVar is never left empty. -} +type TransferrerPool = TMVar (MkCheckTransferrer, [TransferrerPoolItem]) + +type CheckTransferrer = IO Bool +type MkCheckTransferrer = IO (IO Bool) + +{- Each item in the pool may have a transferrer running, and has an + - IO action that can be used to check if it's still ok to use the + - transferrer. -} +data TransferrerPoolItem = TransferrerPoolItem (Maybe Transferrer) CheckTransferrer data Transferrer = Transferrer { transferrerRead :: Handle @@ -19,5 +30,39 @@ data Transferrer = Transferrer , transferrerHandle :: ProcessHandle } -newTransferrerPool :: IO TransferrerPool -newTransferrerPool = newTChanIO +newTransferrerPool :: MkCheckTransferrer -> IO TransferrerPool +newTransferrerPool c = newTMVarIO (c, []) + +popTransferrerPool :: TransferrerPool -> STM (Maybe TransferrerPoolItem) +popTransferrerPool p = do + (c, l) <- takeTMVar p + case l of + [] -> do + putTMVar p (c, []) + return Nothing + (i:is) -> do + putTMVar p (c, is) + return $ Just i + +pushTransferrerPool :: TransferrerPool -> TransferrerPoolItem -> STM Int +pushTransferrerPool p i = do + (c, l) <- takeTMVar p + let l' = i:l + putTMVar p (c, l') + return $ length l' + +{- Note that making a CheckTransferrer may allocate resources, + - such as a NotificationHandle, so it's important that the returned + - TransferrerPoolItem is pushed into the pool, and not left to be + - garbage collected. -} +mkTransferrerPoolItem :: TransferrerPool -> Transferrer -> IO TransferrerPoolItem +mkTransferrerPoolItem p t = do + mkcheck <- atomically $ fst <$> readTMVar p + check <- mkcheck + return $ TransferrerPoolItem (Just t) check + +checkNetworkConnections :: DaemonStatusHandle -> MkCheckTransferrer +checkNetworkConnections dstatushandle = do + dstatus <- atomically $ readTMVar dstatushandle + h <- newNotificationHandle False (networkConnectedNotifier dstatus) + return $ checkNotification h |