summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2012-07-28 18:47:24 -0400
committerGravatar Joey Hess <joey@kitenet.net>2012-07-28 18:47:24 -0400
commite31277d38aa5d9b07395d05a6f1646b5eb3d48c2 (patch)
tree290f14867fc066daf2e84f2644bccc2356da09df
parent3cc18857936e5a09e033439971dc9c43e6ccbaa2 (diff)
send notifications when the TransferQueue is changed
The fun part was making it move things from TransferQueue to currentTransfers entirely atomically. Which will avoid inconsistent display if the WebApp renders the current status at just the wrong time. STM to the rescue!
-rw-r--r--Assistant.hs2
-rw-r--r--Assistant/DaemonStatus.hs18
-rw-r--r--Assistant/Threads/TransferScanner.hs13
-rw-r--r--Assistant/Threads/Transferrer.hs10
-rw-r--r--Assistant/TransferQueue.hs56
5 files changed, 67 insertions, 32 deletions
diff --git a/Assistant.hs b/Assistant.hs
index 6b25c3c6f..1f41a9398 100644
--- a/Assistant.hs
+++ b/Assistant.hs
@@ -153,7 +153,7 @@ startDaemon assistant foreground webappwaiter
, daemonStatusThread st dstatus
, sanityCheckerThread st dstatus transferqueue changechan
, mountWatcherThread st dstatus scanremotes
- , transferScannerThread st scanremotes transferqueue
+ , transferScannerThread st dstatus scanremotes transferqueue
#ifdef WITH_WEBAPP
, webAppThread st dstatus transferqueue webappwaiter
#endif
diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs
index 52165138e..3610c2fda 100644
--- a/Assistant/DaemonStatus.hs
+++ b/Assistant/DaemonStatus.hs
@@ -36,6 +36,7 @@ data DaemonStatus = DaemonStatus
-- Ordered list of remotes to talk to.
, knownRemotes :: [Remote]
-- Clients can use this to wait on changes to the DaemonStatus
+ -- and other related things like the TransferQueue.
, notificationBroadcaster :: NotificationBroadcaster
}
@@ -72,6 +73,12 @@ modifyDaemonStatus handle a = do
sendNotification nb
return b
+{- Can be used to send a notification that the daemon status, or other
+ - associated thing, like the TransferQueue, has changed. -}
+notifyDaemonStatusChange :: DaemonStatusHandle -> IO ()
+notifyDaemonStatusChange handle = sendNotification
+ =<< notificationBroadcaster <$> atomically (readTMVar handle)
+
{- Updates the cached ordered list of remotes from the list in Annex
- state. -}
updateKnownRemotes :: DaemonStatusHandle -> Annex ()
@@ -164,7 +171,16 @@ afterLastDaemonRun timestamp status = maybe False (< t) (lastRunning status)
tenMinutes :: Int
tenMinutes = 10 * 60
-{- Mutates the transfer map. -}
+{- Mutates the transfer map. Runs in STM so that the transfer map can
+ - be modified in the same transaction that modifies the transfer queue.
+ - Note that this does not send a notification of the change; that's left
+ - to the caller. -}
+adjustTransfersSTM :: DaemonStatusHandle -> (TransferMap -> TransferMap) -> STM ()
+adjustTransfersSTM dstatus a = do
+ s <- takeTMVar dstatus
+ putTMVar dstatus $ s { currentTransfers = a (currentTransfers s) }
+
+{- Variant that does send notifications. -}
adjustTransfers :: DaemonStatusHandle -> (TransferMap -> TransferMap) -> IO ()
adjustTransfers dstatus a = modifyDaemonStatus_ dstatus $
\s -> s { currentTransfers = a (currentTransfers s) }
diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs
index e76cbe81d..e6a078907 100644
--- a/Assistant/Threads/TransferScanner.hs
+++ b/Assistant/Threads/TransferScanner.hs
@@ -11,6 +11,7 @@ import Assistant.Common
import Assistant.ScanRemotes
import Assistant.TransferQueue
import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
import Logs.Transfer
import Logs.Location
import qualified Remote
@@ -25,20 +26,20 @@ thisThread = "TransferScanner"
{- This thread waits until a remote needs to be scanned, to find transfers
- that need to be made, to keep data in sync.
-}
-transferScannerThread :: ThreadState -> ScanRemoteMap -> TransferQueue -> IO ()
-transferScannerThread st scanremotes transferqueue = do
+transferScannerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> TransferQueue -> IO ()
+transferScannerThread st dstatus scanremotes transferqueue = do
runEvery (Seconds 2) $ do
r <- getScanRemote scanremotes
liftIO $ debug thisThread ["starting scan of", show r]
- scan st transferqueue r
+ scan st dstatus transferqueue r
liftIO $ debug thisThread ["finished scan of", show r]
where
{- This is a naive scan through the git work tree.
-
- The scan is blocked when the transfer queue gets too large. -}
-scan :: ThreadState -> TransferQueue -> Remote -> IO ()
-scan st transferqueue r = do
+scan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Remote -> IO ()
+scan st dstatus transferqueue r = do
g <- runThreadState st $ fromRepo id
files <- LsFiles.inRepo [] g
go files
@@ -63,7 +64,7 @@ scan st transferqueue r = do
| otherwise = return Nothing
u = Remote.uuid r
- enqueue f t = queueTransferAt smallsize Later transferqueue (Just f) t r
+ enqueue f t = queueTransferAt smallsize Later transferqueue dstatus (Just f) t r
smallsize = 10
{- Look directly in remote for the key when it's cheap;
diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs
index 30802f742..f011ff036 100644
--- a/Assistant/Threads/Transferrer.hs
+++ b/Assistant/Threads/Transferrer.hs
@@ -34,12 +34,18 @@ transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Transf
transfererThread st dstatus transferqueue slots = go
where
go = do
- (t, info) <- getNextTransfer transferqueue
+ (t, info) <- getNextTransfer transferqueue dstatus
ifM (runThreadState st $ shouldTransfer dstatus t info)
( do
debug thisThread [ "Transferring:" , show t ]
+ notifyDaemonStatusChange dstatus
transferThread st dstatus slots t info
- , debug thisThread [ "Skipping unnecessary transfer:" , show t ]
+ , do
+ debug thisThread [ "Skipping unnecessary transfer:" , show t ]
+ -- getNextTransfer added t to the
+ -- daemonstatus's transfer map.
+ void $ removeTransfer dstatus t
+ notifyDaemonStatusChange dstatus
)
go
diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs
index 2f09813eb..51ed5c9c7 100644
--- a/Assistant/TransferQueue.hs
+++ b/Assistant/TransferQueue.hs
@@ -23,6 +23,7 @@ import Types.Remote
import qualified Remote
import Control.Concurrent.STM
+import qualified Data.Map as M
{- The transfer queue consists of a channel listing the transfers to make;
- the size of the queue is also tracked, and a list is maintained
@@ -58,8 +59,8 @@ stubInfo f r = TransferInfo
{- Adds transfers to queue for some of the known remotes. -}
queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
-queueTransfers schedule q daemonstatus k f direction = do
- rs <- knownRemotes <$> liftIO (getDaemonStatus daemonstatus)
+queueTransfers schedule q dstatus k f direction = do
+ rs <- knownRemotes <$> liftIO (getDaemonStatus dstatus)
mapM_ go =<< sufficientremotes rs
where
sufficientremotes rs
@@ -80,37 +81,48 @@ queueTransfers schedule q daemonstatus k f direction = do
, transferKey = k
, transferUUID = Remote.uuid r
}
- go r = liftIO $ atomically $
- enqueue schedule q (gentransfer r) (stubInfo f r)
+ go r = liftIO $
+ enqueue schedule q dstatus (gentransfer r) (stubInfo f r)
-enqueue :: Schedule -> TransferQueue -> Transfer -> TransferInfo -> STM ()
-enqueue schedule q t info
+enqueue :: Schedule -> TransferQueue -> DaemonStatusHandle -> Transfer -> TransferInfo -> IO ()
+enqueue schedule q dstatus t info
| schedule == Next = go unGetTChan (new:)
| otherwise = go writeTChan (\l -> l++[new])
where
new = (t, info)
go modqueue modlist = do
- void $ modqueue (queue q) new
- void $ modifyTVar' (queuesize q) succ
- void $ modifyTVar' (queuelist q) modlist
+ atomically $ do
+ void $ modqueue (queue q) new
+ void $ modifyTVar' (queuesize q) succ
+ void $ modifyTVar' (queuelist q) modlist
+ void $ notifyDaemonStatusChange dstatus
{- Adds a transfer to the queue. -}
-queueTransfer :: Schedule -> TransferQueue -> AssociatedFile -> Transfer -> Remote -> IO ()
-queueTransfer schedule q f t remote = atomically $
- enqueue schedule q t (stubInfo f remote)
+queueTransfer :: Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
+queueTransfer schedule q dstatus f t remote =
+ enqueue schedule q dstatus t (stubInfo f remote)
{- Blocks until the queue is no larger than a given size, and then adds a
- transfer to the queue. -}
-queueTransferAt :: Integer -> Schedule -> TransferQueue -> AssociatedFile -> Transfer -> Remote -> IO ()
-queueTransferAt wantsz schedule q f t remote = atomically $ do
- sz <- readTVar (queuesize q)
- if sz <= wantsz
- then enqueue schedule q t (stubInfo f remote)
- else retry -- blocks until queuesize changes
+queueTransferAt :: Integer -> Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
+queueTransferAt wantsz schedule q dstatus f t remote = do
+ atomically $ do
+ sz <- readTVar (queuesize q)
+ if sz <= wantsz
+ then return ()
+ else retry -- blocks until queuesize changes
+ enqueue schedule q dstatus t (stubInfo f remote)
-{- Blocks until a pending transfer is available from the queue. -}
-getNextTransfer :: TransferQueue -> IO (Transfer, TransferInfo)
-getNextTransfer q = atomically $ do
+{- Blocks until a pending transfer is available from the queue.
+ - The transfer is removed from the transfer queue, and added to
+ - the daemon status currentTransfers map. This is done in a single STM
+ - transaction, so there is no window where an observer sees an
+ - inconsistent status. -}
+getNextTransfer :: TransferQueue -> DaemonStatusHandle -> IO (Transfer, TransferInfo)
+getNextTransfer q dstatus = atomically $ do
void $ modifyTVar' (queuesize q) pred
void $ modifyTVar' (queuelist q) (drop 1)
- readTChan (queue q)
+ r@(t, info) <- readTChan (queue q)
+ adjustTransfersSTM dstatus $
+ M.insertWith' const t info
+ return r