summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2012-07-22 23:16:56 -0400
committerGravatar Joey Hess <joey@kitenet.net>2012-07-22 23:16:56 -0400
commit522f568450a005ae81b24f63bb37e75320b51219 (patch)
tree93c292de024b4e1c6e8bbefd4aee9614c6ab0afc
parent26e4e65307436e4cc9a2db448141652b79d0f582 (diff)
add TransferScanner thread
Efficiently finding transfers that need to be done to get two repos back in sync seems like an interesting problem.
-rw-r--r--Assistant.hs22
-rw-r--r--Assistant/ScanRemotes.hs41
-rw-r--r--Assistant/Threads/MountWatcher.hs45
-rw-r--r--Assistant/Threads/Pusher.hs25
-rw-r--r--Assistant/Threads/TransferScanner.hs34
-rw-r--r--Assistant/TransferQueue.hs14
6 files changed, 138 insertions, 43 deletions
diff --git a/Assistant.hs b/Assistant.hs
index 4bb1ed4ce..0049d3177 100644
--- a/Assistant.hs
+++ b/Assistant.hs
@@ -36,8 +36,7 @@
- inotify threads associated with it, too.)
- Thread 9: transfer watcher
- Watches for transfer information files being created and removed,
- - and maintains the DaemonStatus currentTransfers map and the
- - TransferSlots QSemN.
+ - and maintains the DaemonStatus currentTransfers map.
- (This uses inotify on .git/annex/transfer/, so there are
- additional inotify threads associated with it, too.)
- Thread 10: transferrer
@@ -49,8 +48,14 @@
- Thread 13: mount watcher
- Either uses dbus to watch for drive mount events, or, when
- there's no dbus, polls to find newly mounted filesystems.
- - Once a filesystem that contains a remote is mounted, syncs
- - with it.
+ - Once a filesystem that contains a remote is mounted, updates
+ - state about that remote, pulls from it, and queues a push to it,
+ - as well as an update, and queues it onto the
+ - ConnectedRemoteChan
+ - Thread 14: transfer scanner
+ - Does potentially expensive checks to find data that needs to be
+ - transferred from or to remotes, and queues Transfers.
+ - Uses the ScanRemotes map.
-
- ThreadState: (MVar)
- The Annex state is stored here, which allows resuscitating the
@@ -78,6 +83,9 @@
- to block until a slot is available.
- This MVar should only be manipulated from inside the Annex monad,
- which ensures it's accessed only after the ThreadState MVar.
+ - ScanRemotes (STM TMVar)
+ - Remotes that have been disconnected, and should be scanned
+ - are indicated by writing to this TMVar.
-}
module Assistant where
@@ -88,6 +96,7 @@ import Assistant.DaemonStatus
import Assistant.Changes
import Assistant.Commits
import Assistant.Pushes
+import Assistant.ScanRemotes
import Assistant.TransferQueue
import Assistant.TransferSlots
import Assistant.Threads.Watcher
@@ -98,6 +107,7 @@ import Assistant.Threads.TransferWatcher
import Assistant.Threads.Transferrer
import Assistant.Threads.SanityChecker
import Assistant.Threads.MountWatcher
+import Assistant.Threads.TransferScanner
import qualified Utility.Daemon
import Utility.LogFile
import Utility.ThreadScheduler
@@ -124,6 +134,7 @@ startDaemon assistant foreground
pushmap <- newFailedPushMap
transferqueue <- newTransferQueue
transferslots <- newTransferSlots
+ scanremotes <- newScanRemoteMap
mapM_ forkIO
[ commitThread st changechan commitchan transferqueue dstatus
, pushThread st dstatus commitchan pushmap
@@ -133,7 +144,8 @@ startDaemon assistant foreground
, transfererThread st dstatus transferqueue transferslots
, daemonStatusThread st dstatus
, sanityCheckerThread st dstatus transferqueue changechan
- , mountWatcherThread st dstatus
+ , mountWatcherThread st dstatus scanremotes
+ , transferScannerThread st scanremotes transferqueue
, watchThread st dstatus transferqueue changechan
]
debug "assistant"
diff --git a/Assistant/ScanRemotes.hs b/Assistant/ScanRemotes.hs
new file mode 100644
index 000000000..05b2a2ca9
--- /dev/null
+++ b/Assistant/ScanRemotes.hs
@@ -0,0 +1,41 @@
+{- git-annex assistant remotes needing scanning
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.ScanRemotes where
+
+import Common.Annex
+import Data.Function
+
+import Control.Concurrent.STM
+import Data.Time.Clock
+import qualified Data.Map as M
+
+type ScanRemoteMap = TMVar (M.Map Remote UTCTime)
+
+{- The TMVar starts empty, and is left empty when there are no remotes
+ - to scan. -}
+newScanRemoteMap :: IO ScanRemoteMap
+newScanRemoteMap = atomically newEmptyTMVar
+
+{- Blocks until there is a remote that needs to be scanned.
+ - Processes remotes added most recently first. -}
+getScanRemote :: ScanRemoteMap -> IO Remote
+getScanRemote v = atomically $ do
+ m <- takeTMVar v
+ let newest = Prelude.head $ reverse $
+ map fst $ sortBy (compare `on` snd) $ M.toList m
+ putTMVar v $ M.delete newest m
+ return newest
+
+{- Adds new remotes that need scanning to the map. -}
+addScanRemotes :: ScanRemoteMap -> [Remote] -> IO ()
+addScanRemotes _ [] = return ()
+addScanRemotes v rs = do
+ now <- getCurrentTime
+ atomically $ do
+ m <- fromMaybe M.empty <$> tryTakeTMVar v
+ putTMVar v $ foldr (`M.insert` now) m rs
diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs
index bfdfe0ebb..853d96d51 100644
--- a/Assistant/Threads/MountWatcher.hs
+++ b/Assistant/Threads/MountWatcher.hs
@@ -13,6 +13,8 @@ module Assistant.Threads.MountWatcher where
import Assistant.Common
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
+import Assistant.ScanRemotes
+import Assistant.Threads.Pusher (pushToRemotes)
import qualified Annex
import qualified Git
import Utility.ThreadScheduler
@@ -27,6 +29,7 @@ import Logs.Remote
import Control.Concurrent
import qualified Control.Exception as E
import qualified Data.Set as S
+import Data.Time.Clock
#if WITH_DBUS
import DBus.Client
@@ -39,18 +42,18 @@ import Data.Word (Word32)
thisThread :: ThreadName
thisThread = "MountWatcher"
-mountWatcherThread :: ThreadState -> DaemonStatusHandle -> IO ()
-mountWatcherThread st handle =
+mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
+mountWatcherThread st handle scanremotes =
#if WITH_DBUS
- dbusThread st handle
+ dbusThread st handle scanremotes
#else
- pollingThread st handle
+ pollingThread st handle scanremotes
#endif
#if WITH_DBUS
-dbusThread :: ThreadState -> DaemonStatusHandle -> IO ()
-dbusThread st dstatus = E.catch (go =<< connectSession) onerr
+dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
+dbusThread st dstatus scanremotes = E.catch (go =<< connectSession) onerr
where
go client = ifM (checkMountMonitor client)
( do
@@ -63,7 +66,7 @@ dbusThread st dstatus = E.catch (go =<< connectSession) onerr
listen client matcher $ \_event -> do
nowmounted <- currentMountPoints
wasmounted <- swapMVar mvar nowmounted
- handleMounts st dstatus wasmounted nowmounted
+ handleMounts st dstatus scanremotes wasmounted nowmounted
, do
runThreadState st $
warning "No known volume monitor available through dbus; falling back to mtab polling"
@@ -74,7 +77,7 @@ dbusThread st dstatus = E.catch (go =<< connectSession) onerr
runThreadState st $
warning $ "Failed to use dbus; falling back to mtab polling (" ++ show e ++ ")"
pollinstead
- pollinstead = pollingThread st dstatus
+ pollinstead = pollingThread st dstatus scanremotes
type ServiceName = String
@@ -140,30 +143,32 @@ mountAdded = [gvfs, kde]
#endif
-pollingThread :: ThreadState -> DaemonStatusHandle -> IO ()
-pollingThread st dstatus = go =<< currentMountPoints
+pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
+pollingThread st dstatus scanremotes = go =<< currentMountPoints
where
go wasmounted = do
threadDelaySeconds (Seconds 10)
nowmounted <- currentMountPoints
- handleMounts st dstatus wasmounted nowmounted
+ handleMounts st dstatus scanremotes wasmounted nowmounted
go nowmounted
-handleMounts :: ThreadState -> DaemonStatusHandle -> MountPoints -> MountPoints -> IO ()
-handleMounts st dstatus wasmounted nowmounted = mapM_ (handleMount st dstatus) $
+handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> MountPoints -> MountPoints -> IO ()
+handleMounts st dstatus scanremotes wasmounted nowmounted = mapM_ (handleMount st dstatus scanremotes) $
S.toList $ newMountPoints wasmounted nowmounted
-handleMount :: ThreadState -> DaemonStatusHandle -> Mntent -> IO ()
-handleMount st dstatus mntent = do
+handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> Mntent -> IO ()
+handleMount st dstatus scanremotes mntent = do
debug thisThread ["detected mount of", mnt_dir mntent]
rs <- remotesUnder st dstatus mntent
unless (null rs) $ do
branch <- runThreadState st $ Command.Sync.currentBranch
- let pullrs = filter Git.repoIsLocal rs
- debug thisThread ["pulling from", show pullrs]
- runThreadState st $ manualPull branch pullrs
- -- TODO queue transfers for new files in both directions
- where
+ let nonspecial = filter (Git.repoIsLocal . Remote.repo) rs
+ unless (null nonspecial) $ do
+ debug thisThread ["pulling from", show nonspecial]
+ runThreadState st $ manualPull branch nonspecial
+ now <- getCurrentTime
+ pushToRemotes thisThread now st Nothing nonspecial
+ addScanRemotes scanremotes rs
{- Finds remotes located underneath the mount point.
-
diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs
index e5191109c..cba53af23 100644
--- a/Assistant/Threads/Pusher.hs
+++ b/Assistant/Threads/Pusher.hs
@@ -1,4 +1,4 @@
-{- git-annex assistant git pushing threads
+{- git-annex assistant git pushing thread
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
@@ -36,7 +36,7 @@ pushRetryThread st pushmap = runEvery (Seconds halfhour) $ do
, "failed pushes"
]
now <- getCurrentTime
- pushToRemotes now st pushmap topush
+ pushToRemotes thisThread now st (Just pushmap) topush
where
halfhour = 1800
@@ -53,7 +53,7 @@ pushThread st daemonstatus commitchan pushmap = do
then do
remotes <- runThreadState st $
knownRemotes <$> getDaemonStatus daemonstatus
- pushToRemotes now st pushmap remotes
+ pushToRemotes thisThread now st (Just pushmap) remotes
else do
debug thisThread
[ "delaying push of"
@@ -78,24 +78,27 @@ shouldPush _now commits
-
- Avoids running possibly long-duration commands in the Annex monad, so
- as not to block other threads. -}
-pushToRemotes :: UTCTime -> ThreadState -> FailedPushMap -> [Remote] -> IO ()
-pushToRemotes now st pushmap remotes = do
+pushToRemotes :: ThreadName -> UTCTime -> ThreadState -> (Maybe FailedPushMap) -> [Remote] -> IO ()
+pushToRemotes threadname now st mpushmap remotes = do
(g, branch) <- runThreadState st $
(,) <$> fromRepo id <*> Command.Sync.currentBranch
go True branch g remotes
where
go shouldretry branch g rs = do
- debug thisThread
+ debug threadname
[ "pushing to"
, show rs
]
Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
(succeeded, failed) <- inParallel (push g branch) rs
- changeFailedPushMap pushmap $ \m ->
- M.union (makemap failed) $
- M.difference m (makemap succeeded)
+ case mpushmap of
+ Nothing -> noop
+ Just pushmap ->
+ changeFailedPushMap pushmap $ \m ->
+ M.union (makemap failed) $
+ M.difference m (makemap succeeded)
unless (null failed) $
- debug thisThread
+ debug threadname
[ "failed to push to"
, show failed
]
@@ -109,6 +112,6 @@ pushToRemotes now st pushmap remotes = do
( exitSuccess, exitFailure)
retry branch g rs = do
- debug thisThread [ "trying manual pull to resolve failed pushes" ]
+ debug threadname [ "trying manual pull to resolve failed pushes" ]
runThreadState st $ manualPull branch rs
go False branch g rs
diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs
new file mode 100644
index 000000000..0a40f7ead
--- /dev/null
+++ b/Assistant/Threads/TransferScanner.hs
@@ -0,0 +1,34 @@
+{- git-annex assistant thread to scan remotes to find needed transfers
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.TransferScanner where
+
+import Assistant.Common
+import Assistant.ScanRemotes
+import Assistant.TransferQueue
+import Assistant.ThreadedMonad
+import Logs.Transfer
+import Types.Remote
+import Utility.ThreadScheduler
+
+thisThread :: ThreadName
+thisThread = "TransferScanner"
+
+{- This thread scans remotes, to find transfers that need to be made to
+ - keep their data in sync. The transfers are queued with lot priority. -}
+transferScannerThread :: ThreadState -> ScanRemoteMap -> TransferQueue -> IO ()
+transferScannerThread st scanremotes transferqueue = do
+ runEvery (Seconds 2) $ do
+ r <- getScanRemote scanremotes
+ needtransfer <- scan st r
+ forM_ needtransfer $ \(f, t) ->
+ queueLaterTransfer transferqueue f t
+
+scan :: ThreadState -> Remote -> IO [(AssociatedFile, Transfer)]
+scan st r = do
+ debug thisThread ["scanning", show r]
+ return [] -- TODO
diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs
index b0eca96c8..f8104914c 100644
--- a/Assistant/TransferQueue.hs
+++ b/Assistant/TransferQueue.hs
@@ -38,19 +38,19 @@ queueTransfers q daemonstatus k f direction = do
mapM_ (\r -> queue r $ gentransfer r)
=<< sufficientremotes rs
where
- sufficientremotes l
+ sufficientremotes rs
-- Queue downloads from all remotes that
-- have the key, with the cheapest ones first.
-- More expensive ones will only be tried if
-- downloading from a cheap one fails.
| direction == Download = do
uuids <- Remote.keyLocations k
- return $ filter (\r -> uuid r `elem` uuids) l
+ return $ filter (\r -> uuid r `elem` uuids) rs
-- TODO: Determine a smaller set of remotes that
-- can be uploaded to, in order to ensure all
-- remotes can access the content. Currently,
-- send to every remote we can.
- | otherwise = return l
+ | otherwise = return rs
gentransfer r = Transfer
{ transferDirection = direction
, transferKey = k
@@ -60,12 +60,12 @@ queueTransfers q daemonstatus k f direction = do
let info = (stubInfo f) { transferRemote = Just r }
writeTChan q (t, info)
-{- Adds a pending transfer to the end of the queue. -}
-queueTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO ()
-queueTransfer q f t = void $ atomically $
+{- Adds a transfer to the end of the queue, to be processed later. -}
+queueLaterTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO ()
+queueLaterTransfer q f t = void $ atomically $
writeTChan q (t, stubInfo f)
-{- Adds a pending transfer to the start of the queue, to be processed next. -}
+{- Adds a transfer to the start of the queue, to be processed next. -}
queueNextTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO ()
queueNextTransfer q f t = void $ atomically $
unGetTChan q (t, stubInfo f)