summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Assistant/Monad.hs2
-rw-r--r--Assistant/TransferrerPool.hs38
-rw-r--r--Assistant/Types/TransferrerPool.hs53
-rw-r--r--Utility/NotificationBroadcaster.hs34
-rw-r--r--debian/changelog3
5 files changed, 102 insertions, 28 deletions
diff --git a/Assistant/Monad.hs b/Assistant/Monad.hs
index 44ae210b7..7c28c7f6f 100644
--- a/Assistant/Monad.hs
+++ b/Assistant/Monad.hs
@@ -78,7 +78,7 @@ newAssistantData st dstatus = AssistantData
<*> newScanRemoteMap
<*> newTransferQueue
<*> newTransferSlots
- <*> newTransferrerPool
+ <*> newTransferrerPool (checkNetworkConnections dstatus)
<*> newFailedPushMap
<*> newCommitChan
<*> newChangePool
diff --git a/Assistant/TransferrerPool.hs b/Assistant/TransferrerPool.hs
index e7aa72924..580417305 100644
--- a/Assistant/TransferrerPool.hs
+++ b/Assistant/TransferrerPool.hs
@@ -14,27 +14,43 @@ import Utility.Batch
import qualified Command.TransferKeys as T
-import Control.Concurrent.STM
+import Control.Concurrent.STM hiding (check)
import System.Process (create_group, std_in, std_out)
import Control.Exception (throw)
import Control.Concurrent
-{- Runs an action with a Transferrer from the pool. -}
+{- Runs an action with a Transferrer from the pool.
+ -
+ - Only one Transferrer is left running in the pool at a time.
+ - So if this needed to start a new Transferrer, it's stopped when done.
+ -}
withTransferrer :: FilePath -> BatchCommandMaker -> TransferrerPool -> (Transferrer -> IO a) -> IO a
withTransferrer program batchmaker pool a = do
- t <- maybe (mkTransferrer program batchmaker) (checkTransferrer program batchmaker)
- =<< atomically (tryReadTChan pool)
+ i@(TransferrerPoolItem (Just t) _) <- maybe
+ (mkTransferrerPoolItem pool =<< mkTransferrer program batchmaker)
+ (checkTransferrerPoolItem program batchmaker)
+ =<< atomically (popTransferrerPool pool)
v <- tryNonAsync $ a t
- unlessM (putback t) $
+ sz <- atomically $ pushTransferrerPool pool i
+ when (sz > 1) $
void $ forkIO $ stopTransferrer t
either throw return v
- where
- putback t = atomically $ ifM (isEmptyTChan pool)
- ( do
- writeTChan pool t
- return True
- , return False
+
+{- Check if a Transferrer from the pool is still ok to be used.
+ - If not, stop it and start a new one. -}
+checkTransferrerPoolItem :: FilePath -> BatchCommandMaker -> TransferrerPoolItem -> IO TransferrerPoolItem
+checkTransferrerPoolItem program batchmaker i = case i of
+ TransferrerPoolItem (Just t) check -> ifM check
+ ( return i
+ , do
+ stopTransferrer t
+ new check
)
+ TransferrerPoolItem Nothing check -> new check
+ where
+ new check = do
+ t <- mkTransferrer program batchmaker
+ return $ TransferrerPoolItem (Just t) check
{- Requests that a Transferrer perform a Transfer, and waits for it to
- finish. -}
diff --git a/Assistant/Types/TransferrerPool.hs b/Assistant/Types/TransferrerPool.hs
index 2727a6919..899e6969f 100644
--- a/Assistant/Types/TransferrerPool.hs
+++ b/Assistant/Types/TransferrerPool.hs
@@ -1,4 +1,4 @@
-{- A pool of "git-annex transferkeys" processes
+{- A pool of "git-annex transferkeys" processes available for use
-
- Copyright 2013 Joey Hess <joey@kitenet.net>
-
@@ -8,10 +8,21 @@
module Assistant.Types.TransferrerPool where
import Common.Annex
+import Utility.NotificationBroadcaster
+import Assistant.Types.DaemonStatus
import Control.Concurrent.STM
-type TransferrerPool = TChan Transferrer
+{- This TMVar is never left empty. -}
+type TransferrerPool = TMVar (MkCheckTransferrer, [TransferrerPoolItem])
+
+type CheckTransferrer = IO Bool
+type MkCheckTransferrer = IO (IO Bool)
+
+{- Each item in the pool may have a transferrer running, and has an
+ - IO action that can be used to check if it's still ok to use the
+ - transferrer. -}
+data TransferrerPoolItem = TransferrerPoolItem (Maybe Transferrer) CheckTransferrer
data Transferrer = Transferrer
{ transferrerRead :: Handle
@@ -19,5 +30,39 @@ data Transferrer = Transferrer
, transferrerHandle :: ProcessHandle
}
-newTransferrerPool :: IO TransferrerPool
-newTransferrerPool = newTChanIO
+newTransferrerPool :: MkCheckTransferrer -> IO TransferrerPool
+newTransferrerPool c = newTMVarIO (c, [])
+
+popTransferrerPool :: TransferrerPool -> STM (Maybe TransferrerPoolItem)
+popTransferrerPool p = do
+ (c, l) <- takeTMVar p
+ case l of
+ [] -> do
+ putTMVar p (c, [])
+ return Nothing
+ (i:is) -> do
+ putTMVar p (c, is)
+ return $ Just i
+
+pushTransferrerPool :: TransferrerPool -> TransferrerPoolItem -> STM Int
+pushTransferrerPool p i = do
+ (c, l) <- takeTMVar p
+ let l' = i:l
+ putTMVar p (c, l')
+ return $ length l'
+
+{- Note that making a CheckTransferrer may allocate resources,
+ - such as a NotificationHandle, so it's important that the returned
+ - TransferrerPoolItem is pushed into the pool, and not left to be
+ - garbage collected. -}
+mkTransferrerPoolItem :: TransferrerPool -> Transferrer -> IO TransferrerPoolItem
+mkTransferrerPoolItem p t = do
+ mkcheck <- atomically $ fst <$> readTMVar p
+ check <- mkcheck
+ return $ TransferrerPoolItem (Just t) check
+
+checkNetworkConnections :: DaemonStatusHandle -> MkCheckTransferrer
+checkNetworkConnections dstatushandle = do
+ dstatus <- atomically $ readTMVar dstatushandle
+ h <- newNotificationHandle False (networkConnectedNotifier dstatus)
+ return $ checkNotification h
diff --git a/Utility/NotificationBroadcaster.hs b/Utility/NotificationBroadcaster.hs
index b873df655..60353116c 100644
--- a/Utility/NotificationBroadcaster.hs
+++ b/Utility/NotificationBroadcaster.hs
@@ -21,15 +21,17 @@ module Utility.NotificationBroadcaster (
notificationHandleFromId,
sendNotification,
waitNotification,
+ checkNotification,
) where
import Common
import Control.Concurrent.STM
-import Control.Concurrent.MSampleVar
-{- One MSampleVar per client. The TMVar is never empty, so never blocks. -}
-type NotificationBroadcaster = TMVar [MSampleVar ()]
+{- One TMVar per client, which are empty when no notification is pending,
+ - and full when a notification has been sent but not yet seen by the
+ - client. The list TMVar is never empty, so never blocks. -}
+type NotificationBroadcaster = TMVar [TMVar ()]
newtype NotificationId = NotificationId Int
deriving (Read, Show, Eq, Ord)
@@ -53,14 +55,13 @@ newNotificationHandle force b = NotificationHandle
<$> pure b
<*> addclient
where
- addclient = do
+ addclient = atomically $ do
s <- if force
- then newSV ()
- else newEmptySV
- atomically $ do
- l <- takeTMVar b
- putTMVar b $ l ++ [s]
- return $ NotificationId $ length l
+ then newTMVar ()
+ else newEmptyTMVar
+ l <- takeTMVar b
+ putTMVar b $ l ++ [s]
+ return $ NotificationId $ length l
{- Extracts the identifier from a notification handle.
- This can be used to eg, pass the identifier through to a WebApp. -}
@@ -76,11 +77,20 @@ sendNotification b = do
l <- atomically $ readTMVar b
mapM_ notify l
where
- notify s = writeSV s ()
+ notify s = atomically $
+ whenM (isEmptyTMVar s) $
+ putTMVar s ()
{- Used by a client to block until a new notification is available since
- the last time it tried. -}
waitNotification :: NotificationHandle -> IO ()
waitNotification (NotificationHandle b (NotificationId i)) = do
l <- atomically $ readTMVar b
- readSV (l !! i)
+ atomically $ takeTMVar (l !! i)
+
+{- Used by a client to check if there has been a new notification since the
+ - last time it checked, without blocking. -}
+checkNotification :: NotificationHandle -> IO Bool
+checkNotification (NotificationHandle b (NotificationId i)) = do
+ l <- atomically $ readTMVar b
+ maybe False (const True) <$> atomically (tryTakeTMVar (l !! i))
diff --git a/debian/changelog b/debian/changelog
index a7339fded..ac6603ada 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -20,6 +20,9 @@ git-annex (5.20131231) UNRELEASED; urgency=medium
* add: Fix rollback when disk is completely full.
* assistant: Fixed several minor memory leaks that manifested when
adding a large number of files.
+ * assistant: Start a new git-annex transferkeys process
+ after a network connection change, so that remotes that use a persistent
+ network connection are restarted.
-- Joey Hess <joeyh@debian.org> Tue, 31 Dec 2013 13:41:18 -0400