diff options
author | Joey Hess <joey@kitenet.net> | 2012-07-22 23:16:56 -0400 |
---|---|---|
committer | Joey Hess <joey@kitenet.net> | 2012-07-22 23:16:56 -0400 |
commit | 522f568450a005ae81b24f63bb37e75320b51219 (patch) | |
tree | 93c292de024b4e1c6e8bbefd4aee9614c6ab0afc | |
parent | 26e4e65307436e4cc9a2db448141652b79d0f582 (diff) |
add TransferScanner thread
Efficiently finding transfers that need to be done to get two repos back
in sync seems like an interesting problem.
-rw-r--r-- | Assistant.hs | 22 | ||||
-rw-r--r-- | Assistant/ScanRemotes.hs | 41 | ||||
-rw-r--r-- | Assistant/Threads/MountWatcher.hs | 45 | ||||
-rw-r--r-- | Assistant/Threads/Pusher.hs | 25 | ||||
-rw-r--r-- | Assistant/Threads/TransferScanner.hs | 34 | ||||
-rw-r--r-- | Assistant/TransferQueue.hs | 14 |
6 files changed, 138 insertions, 43 deletions
diff --git a/Assistant.hs b/Assistant.hs index 4bb1ed4ce..0049d3177 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -36,8 +36,7 @@ - inotify threads associated with it, too.) - Thread 9: transfer watcher - Watches for transfer information files being created and removed, - - and maintains the DaemonStatus currentTransfers map and the - - TransferSlots QSemN. + - and maintains the DaemonStatus currentTransfers map. - (This uses inotify on .git/annex/transfer/, so there are - additional inotify threads associated with it, too.) - Thread 10: transferrer @@ -49,8 +48,14 @@ - Thread 13: mount watcher - Either uses dbus to watch for drive mount events, or, when - there's no dbus, polls to find newly mounted filesystems. - - Once a filesystem that contains a remote is mounted, syncs - - with it. + - Once a filesystem that contains a remote is mounted, updates + - state about that remote, pulls from it, and queues a push to it, + - as well as an update, and queues it onto the + - ConnectedRemoteChan + - Thread 14: transfer scanner + - Does potentially expensive checks to find data that needs to be + - transferred from or to remotes, and queues Transfers. + - Uses the ScanRemotes map. - - ThreadState: (MVar) - The Annex state is stored here, which allows resuscitating the @@ -78,6 +83,9 @@ - to block until a slot is available. - This MVar should only be manipulated from inside the Annex monad, - which ensures it's accessed only after the ThreadState MVar. + - ScanRemotes (STM TMVar) + - Remotes that have been disconnected, and should be scanned + - are indicated by writing to this TMVar. -} module Assistant where @@ -88,6 +96,7 @@ import Assistant.DaemonStatus import Assistant.Changes import Assistant.Commits import Assistant.Pushes +import Assistant.ScanRemotes import Assistant.TransferQueue import Assistant.TransferSlots import Assistant.Threads.Watcher @@ -98,6 +107,7 @@ import Assistant.Threads.TransferWatcher import Assistant.Threads.Transferrer import Assistant.Threads.SanityChecker import Assistant.Threads.MountWatcher +import Assistant.Threads.TransferScanner import qualified Utility.Daemon import Utility.LogFile import Utility.ThreadScheduler @@ -124,6 +134,7 @@ startDaemon assistant foreground pushmap <- newFailedPushMap transferqueue <- newTransferQueue transferslots <- newTransferSlots + scanremotes <- newScanRemoteMap mapM_ forkIO [ commitThread st changechan commitchan transferqueue dstatus , pushThread st dstatus commitchan pushmap @@ -133,7 +144,8 @@ startDaemon assistant foreground , transfererThread st dstatus transferqueue transferslots , daemonStatusThread st dstatus , sanityCheckerThread st dstatus transferqueue changechan - , mountWatcherThread st dstatus + , mountWatcherThread st dstatus scanremotes + , transferScannerThread st scanremotes transferqueue , watchThread st dstatus transferqueue changechan ] debug "assistant" diff --git a/Assistant/ScanRemotes.hs b/Assistant/ScanRemotes.hs new file mode 100644 index 000000000..05b2a2ca9 --- /dev/null +++ b/Assistant/ScanRemotes.hs @@ -0,0 +1,41 @@ +{- git-annex assistant remotes needing scanning + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.ScanRemotes where + +import Common.Annex +import Data.Function + +import Control.Concurrent.STM +import Data.Time.Clock +import qualified Data.Map as M + +type ScanRemoteMap = TMVar (M.Map Remote UTCTime) + +{- The TMVar starts empty, and is left empty when there are no remotes + - to scan. -} +newScanRemoteMap :: IO ScanRemoteMap +newScanRemoteMap = atomically newEmptyTMVar + +{- Blocks until there is a remote that needs to be scanned. + - Processes remotes added most recently first. -} +getScanRemote :: ScanRemoteMap -> IO Remote +getScanRemote v = atomically $ do + m <- takeTMVar v + let newest = Prelude.head $ reverse $ + map fst $ sortBy (compare `on` snd) $ M.toList m + putTMVar v $ M.delete newest m + return newest + +{- Adds new remotes that need scanning to the map. -} +addScanRemotes :: ScanRemoteMap -> [Remote] -> IO () +addScanRemotes _ [] = return () +addScanRemotes v rs = do + now <- getCurrentTime + atomically $ do + m <- fromMaybe M.empty <$> tryTakeTMVar v + putTMVar v $ foldr (`M.insert` now) m rs diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs index bfdfe0ebb..853d96d51 100644 --- a/Assistant/Threads/MountWatcher.hs +++ b/Assistant/Threads/MountWatcher.hs @@ -13,6 +13,8 @@ module Assistant.Threads.MountWatcher where import Assistant.Common import Assistant.ThreadedMonad import Assistant.DaemonStatus +import Assistant.ScanRemotes +import Assistant.Threads.Pusher (pushToRemotes) import qualified Annex import qualified Git import Utility.ThreadScheduler @@ -27,6 +29,7 @@ import Logs.Remote import Control.Concurrent import qualified Control.Exception as E import qualified Data.Set as S +import Data.Time.Clock #if WITH_DBUS import DBus.Client @@ -39,18 +42,18 @@ import Data.Word (Word32) thisThread :: ThreadName thisThread = "MountWatcher" -mountWatcherThread :: ThreadState -> DaemonStatusHandle -> IO () -mountWatcherThread st handle = +mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () +mountWatcherThread st handle scanremotes = #if WITH_DBUS - dbusThread st handle + dbusThread st handle scanremotes #else - pollingThread st handle + pollingThread st handle scanremotes #endif #if WITH_DBUS -dbusThread :: ThreadState -> DaemonStatusHandle -> IO () -dbusThread st dstatus = E.catch (go =<< connectSession) onerr +dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () +dbusThread st dstatus scanremotes = E.catch (go =<< connectSession) onerr where go client = ifM (checkMountMonitor client) ( do @@ -63,7 +66,7 @@ dbusThread st dstatus = E.catch (go =<< connectSession) onerr listen client matcher $ \_event -> do nowmounted <- currentMountPoints wasmounted <- swapMVar mvar nowmounted - handleMounts st dstatus wasmounted nowmounted + handleMounts st dstatus scanremotes wasmounted nowmounted , do runThreadState st $ warning "No known volume monitor available through dbus; falling back to mtab polling" @@ -74,7 +77,7 @@ dbusThread st dstatus = E.catch (go =<< connectSession) onerr runThreadState st $ warning $ "Failed to use dbus; falling back to mtab polling (" ++ show e ++ ")" pollinstead - pollinstead = pollingThread st dstatus + pollinstead = pollingThread st dstatus scanremotes type ServiceName = String @@ -140,30 +143,32 @@ mountAdded = [gvfs, kde] #endif -pollingThread :: ThreadState -> DaemonStatusHandle -> IO () -pollingThread st dstatus = go =<< currentMountPoints +pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () +pollingThread st dstatus scanremotes = go =<< currentMountPoints where go wasmounted = do threadDelaySeconds (Seconds 10) nowmounted <- currentMountPoints - handleMounts st dstatus wasmounted nowmounted + handleMounts st dstatus scanremotes wasmounted nowmounted go nowmounted -handleMounts :: ThreadState -> DaemonStatusHandle -> MountPoints -> MountPoints -> IO () -handleMounts st dstatus wasmounted nowmounted = mapM_ (handleMount st dstatus) $ +handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> MountPoints -> MountPoints -> IO () +handleMounts st dstatus scanremotes wasmounted nowmounted = mapM_ (handleMount st dstatus scanremotes) $ S.toList $ newMountPoints wasmounted nowmounted -handleMount :: ThreadState -> DaemonStatusHandle -> Mntent -> IO () -handleMount st dstatus mntent = do +handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> Mntent -> IO () +handleMount st dstatus scanremotes mntent = do debug thisThread ["detected mount of", mnt_dir mntent] rs <- remotesUnder st dstatus mntent unless (null rs) $ do branch <- runThreadState st $ Command.Sync.currentBranch - let pullrs = filter Git.repoIsLocal rs - debug thisThread ["pulling from", show pullrs] - runThreadState st $ manualPull branch pullrs - -- TODO queue transfers for new files in both directions - where + let nonspecial = filter (Git.repoIsLocal . Remote.repo) rs + unless (null nonspecial) $ do + debug thisThread ["pulling from", show nonspecial] + runThreadState st $ manualPull branch nonspecial + now <- getCurrentTime + pushToRemotes thisThread now st Nothing nonspecial + addScanRemotes scanremotes rs {- Finds remotes located underneath the mount point. - diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs index e5191109c..cba53af23 100644 --- a/Assistant/Threads/Pusher.hs +++ b/Assistant/Threads/Pusher.hs @@ -1,4 +1,4 @@ -{- git-annex assistant git pushing threads +{- git-annex assistant git pushing thread - - Copyright 2012 Joey Hess <joey@kitenet.net> - @@ -36,7 +36,7 @@ pushRetryThread st pushmap = runEvery (Seconds halfhour) $ do , "failed pushes" ] now <- getCurrentTime - pushToRemotes now st pushmap topush + pushToRemotes thisThread now st (Just pushmap) topush where halfhour = 1800 @@ -53,7 +53,7 @@ pushThread st daemonstatus commitchan pushmap = do then do remotes <- runThreadState st $ knownRemotes <$> getDaemonStatus daemonstatus - pushToRemotes now st pushmap remotes + pushToRemotes thisThread now st (Just pushmap) remotes else do debug thisThread [ "delaying push of" @@ -78,24 +78,27 @@ shouldPush _now commits - - Avoids running possibly long-duration commands in the Annex monad, so - as not to block other threads. -} -pushToRemotes :: UTCTime -> ThreadState -> FailedPushMap -> [Remote] -> IO () -pushToRemotes now st pushmap remotes = do +pushToRemotes :: ThreadName -> UTCTime -> ThreadState -> (Maybe FailedPushMap) -> [Remote] -> IO () +pushToRemotes threadname now st mpushmap remotes = do (g, branch) <- runThreadState st $ (,) <$> fromRepo id <*> Command.Sync.currentBranch go True branch g remotes where go shouldretry branch g rs = do - debug thisThread + debug threadname [ "pushing to" , show rs ] Command.Sync.updateBranch (Command.Sync.syncBranch branch) g (succeeded, failed) <- inParallel (push g branch) rs - changeFailedPushMap pushmap $ \m -> - M.union (makemap failed) $ - M.difference m (makemap succeeded) + case mpushmap of + Nothing -> noop + Just pushmap -> + changeFailedPushMap pushmap $ \m -> + M.union (makemap failed) $ + M.difference m (makemap succeeded) unless (null failed) $ - debug thisThread + debug threadname [ "failed to push to" , show failed ] @@ -109,6 +112,6 @@ pushToRemotes now st pushmap remotes = do ( exitSuccess, exitFailure) retry branch g rs = do - debug thisThread [ "trying manual pull to resolve failed pushes" ] + debug threadname [ "trying manual pull to resolve failed pushes" ] runThreadState st $ manualPull branch rs go False branch g rs diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs new file mode 100644 index 000000000..0a40f7ead --- /dev/null +++ b/Assistant/Threads/TransferScanner.hs @@ -0,0 +1,34 @@ +{- git-annex assistant thread to scan remotes to find needed transfers + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.TransferScanner where + +import Assistant.Common +import Assistant.ScanRemotes +import Assistant.TransferQueue +import Assistant.ThreadedMonad +import Logs.Transfer +import Types.Remote +import Utility.ThreadScheduler + +thisThread :: ThreadName +thisThread = "TransferScanner" + +{- This thread scans remotes, to find transfers that need to be made to + - keep their data in sync. The transfers are queued with lot priority. -} +transferScannerThread :: ThreadState -> ScanRemoteMap -> TransferQueue -> IO () +transferScannerThread st scanremotes transferqueue = do + runEvery (Seconds 2) $ do + r <- getScanRemote scanremotes + needtransfer <- scan st r + forM_ needtransfer $ \(f, t) -> + queueLaterTransfer transferqueue f t + +scan :: ThreadState -> Remote -> IO [(AssociatedFile, Transfer)] +scan st r = do + debug thisThread ["scanning", show r] + return [] -- TODO diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs index b0eca96c8..f8104914c 100644 --- a/Assistant/TransferQueue.hs +++ b/Assistant/TransferQueue.hs @@ -38,19 +38,19 @@ queueTransfers q daemonstatus k f direction = do mapM_ (\r -> queue r $ gentransfer r) =<< sufficientremotes rs where - sufficientremotes l + sufficientremotes rs -- Queue downloads from all remotes that -- have the key, with the cheapest ones first. -- More expensive ones will only be tried if -- downloading from a cheap one fails. | direction == Download = do uuids <- Remote.keyLocations k - return $ filter (\r -> uuid r `elem` uuids) l + return $ filter (\r -> uuid r `elem` uuids) rs -- TODO: Determine a smaller set of remotes that -- can be uploaded to, in order to ensure all -- remotes can access the content. Currently, -- send to every remote we can. - | otherwise = return l + | otherwise = return rs gentransfer r = Transfer { transferDirection = direction , transferKey = k @@ -60,12 +60,12 @@ queueTransfers q daemonstatus k f direction = do let info = (stubInfo f) { transferRemote = Just r } writeTChan q (t, info) -{- Adds a pending transfer to the end of the queue. -} -queueTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO () -queueTransfer q f t = void $ atomically $ +{- Adds a transfer to the end of the queue, to be processed later. -} +queueLaterTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO () +queueLaterTransfer q f t = void $ atomically $ writeTChan q (t, stubInfo f) -{- Adds a pending transfer to the start of the queue, to be processed next. -} +{- Adds a transfer to the start of the queue, to be processed next. -} queueNextTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO () queueNextTransfer q f t = void $ atomically $ unGetTChan q (t, stubInfo f) |