summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Assistant.hs2
-rw-r--r--Assistant/DaemonStatus.hs18
-rw-r--r--Assistant/Threads/TransferScanner.hs13
-rw-r--r--Assistant/Threads/Transferrer.hs10
-rw-r--r--Assistant/TransferQueue.hs56
5 files changed, 67 insertions, 32 deletions
diff --git a/Assistant.hs b/Assistant.hs
index 6b25c3c6f..1f41a9398 100644
--- a/Assistant.hs
+++ b/Assistant.hs
@@ -153,7 +153,7 @@ startDaemon assistant foreground webappwaiter
, daemonStatusThread st dstatus
, sanityCheckerThread st dstatus transferqueue changechan
, mountWatcherThread st dstatus scanremotes
- , transferScannerThread st scanremotes transferqueue
+ , transferScannerThread st dstatus scanremotes transferqueue
#ifdef WITH_WEBAPP
, webAppThread st dstatus transferqueue webappwaiter
#endif
diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs
index 52165138e..3610c2fda 100644
--- a/Assistant/DaemonStatus.hs
+++ b/Assistant/DaemonStatus.hs
@@ -36,6 +36,7 @@ data DaemonStatus = DaemonStatus
-- Ordered list of remotes to talk to.
, knownRemotes :: [Remote]
-- Clients can use this to wait on changes to the DaemonStatus
+ -- and other related things like the TransferQueue.
, notificationBroadcaster :: NotificationBroadcaster
}
@@ -72,6 +73,12 @@ modifyDaemonStatus handle a = do
sendNotification nb
return b
+{- Can be used to send a notification that the daemon status, or other
+ - associated thing, like the TransferQueue, has changed. -}
+notifyDaemonStatusChange :: DaemonStatusHandle -> IO ()
+notifyDaemonStatusChange handle = sendNotification
+ =<< notificationBroadcaster <$> atomically (readTMVar handle)
+
{- Updates the cached ordered list of remotes from the list in Annex
- state. -}
updateKnownRemotes :: DaemonStatusHandle -> Annex ()
@@ -164,7 +171,16 @@ afterLastDaemonRun timestamp status = maybe False (< t) (lastRunning status)
tenMinutes :: Int
tenMinutes = 10 * 60
-{- Mutates the transfer map. -}
+{- Mutates the transfer map. Runs in STM so that the transfer map can
+ - be modified in the same transaction that modifies the transfer queue.
+ - Note that this does not send a notification of the change; that's left
+ - to the caller. -}
+adjustTransfersSTM :: DaemonStatusHandle -> (TransferMap -> TransferMap) -> STM ()
+adjustTransfersSTM dstatus a = do
+ s <- takeTMVar dstatus
+ putTMVar dstatus $ s { currentTransfers = a (currentTransfers s) }
+
+{- Variant that does send notifications. -}
adjustTransfers :: DaemonStatusHandle -> (TransferMap -> TransferMap) -> IO ()
adjustTransfers dstatus a = modifyDaemonStatus_ dstatus $
\s -> s { currentTransfers = a (currentTransfers s) }
diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs
index e76cbe81d..e6a078907 100644
--- a/Assistant/Threads/TransferScanner.hs
+++ b/Assistant/Threads/TransferScanner.hs
@@ -11,6 +11,7 @@ import Assistant.Common
import Assistant.ScanRemotes
import Assistant.TransferQueue
import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
import Logs.Transfer
import Logs.Location
import qualified Remote
@@ -25,20 +26,20 @@ thisThread = "TransferScanner"
{- This thread waits until a remote needs to be scanned, to find transfers
- that need to be made, to keep data in sync.
-}
-transferScannerThread :: ThreadState -> ScanRemoteMap -> TransferQueue -> IO ()
-transferScannerThread st scanremotes transferqueue = do
+transferScannerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> TransferQueue -> IO ()
+transferScannerThread st dstatus scanremotes transferqueue = do
runEvery (Seconds 2) $ do
r <- getScanRemote scanremotes
liftIO $ debug thisThread ["starting scan of", show r]
- scan st transferqueue r
+ scan st dstatus transferqueue r
liftIO $ debug thisThread ["finished scan of", show r]
where
{- This is a naive scan through the git work tree.
-
- The scan is blocked when the transfer queue gets too large. -}
-scan :: ThreadState -> TransferQueue -> Remote -> IO ()
-scan st transferqueue r = do
+scan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Remote -> IO ()
+scan st dstatus transferqueue r = do
g <- runThreadState st $ fromRepo id
files <- LsFiles.inRepo [] g
go files
@@ -63,7 +64,7 @@ scan st transferqueue r = do
| otherwise = return Nothing
u = Remote.uuid r
- enqueue f t = queueTransferAt smallsize Later transferqueue (Just f) t r
+ enqueue f t = queueTransferAt smallsize Later transferqueue dstatus (Just f) t r
smallsize = 10
{- Look directly in remote for the key when it's cheap;
diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs
index 30802f742..f011ff036 100644
--- a/Assistant/Threads/Transferrer.hs
+++ b/Assistant/Threads/Transferrer.hs
@@ -34,12 +34,18 @@ transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Transf
transfererThread st dstatus transferqueue slots = go
where
go = do
- (t, info) <- getNextTransfer transferqueue
+ (t, info) <- getNextTransfer transferqueue dstatus
ifM (runThreadState st $ shouldTransfer dstatus t info)
( do
debug thisThread [ "Transferring:" , show t ]
+ notifyDaemonStatusChange dstatus
transferThread st dstatus slots t info
- , debug thisThread [ "Skipping unnecessary transfer:" , show t ]
+ , do
+ debug thisThread [ "Skipping unnecessary transfer:" , show t ]
+ -- getNextTransfer added t to the
+ -- daemonstatus's transfer map.
+ void $ removeTransfer dstatus t
+ notifyDaemonStatusChange dstatus
)
go
diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs
index 2f09813eb..51ed5c9c7 100644
--- a/Assistant/TransferQueue.hs
+++ b/Assistant/TransferQueue.hs
@@ -23,6 +23,7 @@ import Types.Remote
import qualified Remote
import Control.Concurrent.STM
+import qualified Data.Map as M
{- The transfer queue consists of a channel listing the transfers to make;
- the size of the queue is also tracked, and a list is maintained
@@ -58,8 +59,8 @@ stubInfo f r = TransferInfo
{- Adds transfers to queue for some of the known remotes. -}
queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
-queueTransfers schedule q daemonstatus k f direction = do
- rs <- knownRemotes <$> liftIO (getDaemonStatus daemonstatus)
+queueTransfers schedule q dstatus k f direction = do
+ rs <- knownRemotes <$> liftIO (getDaemonStatus dstatus)
mapM_ go =<< sufficientremotes rs
where
sufficientremotes rs
@@ -80,37 +81,48 @@ queueTransfers schedule q daemonstatus k f direction = do
, transferKey = k
, transferUUID = Remote.uuid r
}
- go r = liftIO $ atomically $
- enqueue schedule q (gentransfer r) (stubInfo f r)
+ go r = liftIO $
+ enqueue schedule q dstatus (gentransfer r) (stubInfo f r)
-enqueue :: Schedule -> TransferQueue -> Transfer -> TransferInfo -> STM ()
-enqueue schedule q t info
+enqueue :: Schedule -> TransferQueue -> DaemonStatusHandle -> Transfer -> TransferInfo -> IO ()
+enqueue schedule q dstatus t info
| schedule == Next = go unGetTChan (new:)
| otherwise = go writeTChan (\l -> l++[new])
where
new = (t, info)
go modqueue modlist = do
- void $ modqueue (queue q) new
- void $ modifyTVar' (queuesize q) succ
- void $ modifyTVar' (queuelist q) modlist
+ atomically $ do
+ void $ modqueue (queue q) new
+ void $ modifyTVar' (queuesize q) succ
+ void $ modifyTVar' (queuelist q) modlist
+ void $ notifyDaemonStatusChange dstatus
{- Adds a transfer to the queue. -}
-queueTransfer :: Schedule -> TransferQueue -> AssociatedFile -> Transfer -> Remote -> IO ()
-queueTransfer schedule q f t remote = atomically $
- enqueue schedule q t (stubInfo f remote)
+queueTransfer :: Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
+queueTransfer schedule q dstatus f t remote =
+ enqueue schedule q dstatus t (stubInfo f remote)
{- Blocks until the queue is no larger than a given size, and then adds a
- transfer to the queue. -}
-queueTransferAt :: Integer -> Schedule -> TransferQueue -> AssociatedFile -> Transfer -> Remote -> IO ()
-queueTransferAt wantsz schedule q f t remote = atomically $ do
- sz <- readTVar (queuesize q)
- if sz <= wantsz
- then enqueue schedule q t (stubInfo f remote)
- else retry -- blocks until queuesize changes
+queueTransferAt :: Integer -> Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
+queueTransferAt wantsz schedule q dstatus f t remote = do
+ atomically $ do
+ sz <- readTVar (queuesize q)
+ if sz <= wantsz
+ then return ()
+ else retry -- blocks until queuesize changes
+ enqueue schedule q dstatus t (stubInfo f remote)
-{- Blocks until a pending transfer is available from the queue. -}
-getNextTransfer :: TransferQueue -> IO (Transfer, TransferInfo)
-getNextTransfer q = atomically $ do
+{- Blocks until a pending transfer is available from the queue.
+ - The transfer is removed from the transfer queue, and added to
+ - the daemon status currentTransfers map. This is done in a single STM
+ - transaction, so there is no window where an observer sees an
+ - inconsistent status. -}
+getNextTransfer :: TransferQueue -> DaemonStatusHandle -> IO (Transfer, TransferInfo)
+getNextTransfer q dstatus = atomically $ do
void $ modifyTVar' (queuesize q) pred
void $ modifyTVar' (queuelist q) (drop 1)
- readTChan (queue q)
+ r@(t, info) <- readTChan (queue q)
+ adjustTransfersSTM dstatus $
+ M.insertWith' const t info
+ return r