summaryrefslogtreecommitdiff
path: root/Assistant/Threads
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2012-07-22 23:16:56 -0400
committerGravatar Joey Hess <joey@kitenet.net>2012-07-22 23:16:56 -0400
commit522f568450a005ae81b24f63bb37e75320b51219 (patch)
tree93c292de024b4e1c6e8bbefd4aee9614c6ab0afc /Assistant/Threads
parent26e4e65307436e4cc9a2db448141652b79d0f582 (diff)
add TransferScanner thread
Efficiently finding transfers that need to be done to get two repos back in sync seems like an interesting problem.
Diffstat (limited to 'Assistant/Threads')
-rw-r--r--Assistant/Threads/MountWatcher.hs45
-rw-r--r--Assistant/Threads/Pusher.hs25
-rw-r--r--Assistant/Threads/TransferScanner.hs34
3 files changed, 73 insertions, 31 deletions
diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs
index bfdfe0ebb..853d96d51 100644
--- a/Assistant/Threads/MountWatcher.hs
+++ b/Assistant/Threads/MountWatcher.hs
@@ -13,6 +13,8 @@ module Assistant.Threads.MountWatcher where
import Assistant.Common
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
+import Assistant.ScanRemotes
+import Assistant.Threads.Pusher (pushToRemotes)
import qualified Annex
import qualified Git
import Utility.ThreadScheduler
@@ -27,6 +29,7 @@ import Logs.Remote
import Control.Concurrent
import qualified Control.Exception as E
import qualified Data.Set as S
+import Data.Time.Clock
#if WITH_DBUS
import DBus.Client
@@ -39,18 +42,18 @@ import Data.Word (Word32)
thisThread :: ThreadName
thisThread = "MountWatcher"
-mountWatcherThread :: ThreadState -> DaemonStatusHandle -> IO ()
-mountWatcherThread st handle =
+mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
+mountWatcherThread st handle scanremotes =
#if WITH_DBUS
- dbusThread st handle
+ dbusThread st handle scanremotes
#else
- pollingThread st handle
+ pollingThread st handle scanremotes
#endif
#if WITH_DBUS
-dbusThread :: ThreadState -> DaemonStatusHandle -> IO ()
-dbusThread st dstatus = E.catch (go =<< connectSession) onerr
+dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
+dbusThread st dstatus scanremotes = E.catch (go =<< connectSession) onerr
where
go client = ifM (checkMountMonitor client)
( do
@@ -63,7 +66,7 @@ dbusThread st dstatus = E.catch (go =<< connectSession) onerr
listen client matcher $ \_event -> do
nowmounted <- currentMountPoints
wasmounted <- swapMVar mvar nowmounted
- handleMounts st dstatus wasmounted nowmounted
+ handleMounts st dstatus scanremotes wasmounted nowmounted
, do
runThreadState st $
warning "No known volume monitor available through dbus; falling back to mtab polling"
@@ -74,7 +77,7 @@ dbusThread st dstatus = E.catch (go =<< connectSession) onerr
runThreadState st $
warning $ "Failed to use dbus; falling back to mtab polling (" ++ show e ++ ")"
pollinstead
- pollinstead = pollingThread st dstatus
+ pollinstead = pollingThread st dstatus scanremotes
type ServiceName = String
@@ -140,30 +143,32 @@ mountAdded = [gvfs, kde]
#endif
-pollingThread :: ThreadState -> DaemonStatusHandle -> IO ()
-pollingThread st dstatus = go =<< currentMountPoints
+pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
+pollingThread st dstatus scanremotes = go =<< currentMountPoints
where
go wasmounted = do
threadDelaySeconds (Seconds 10)
nowmounted <- currentMountPoints
- handleMounts st dstatus wasmounted nowmounted
+ handleMounts st dstatus scanremotes wasmounted nowmounted
go nowmounted
-handleMounts :: ThreadState -> DaemonStatusHandle -> MountPoints -> MountPoints -> IO ()
-handleMounts st dstatus wasmounted nowmounted = mapM_ (handleMount st dstatus) $
+handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> MountPoints -> MountPoints -> IO ()
+handleMounts st dstatus scanremotes wasmounted nowmounted = mapM_ (handleMount st dstatus scanremotes) $
S.toList $ newMountPoints wasmounted nowmounted
-handleMount :: ThreadState -> DaemonStatusHandle -> Mntent -> IO ()
-handleMount st dstatus mntent = do
+handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> Mntent -> IO ()
+handleMount st dstatus scanremotes mntent = do
debug thisThread ["detected mount of", mnt_dir mntent]
rs <- remotesUnder st dstatus mntent
unless (null rs) $ do
branch <- runThreadState st $ Command.Sync.currentBranch
- let pullrs = filter Git.repoIsLocal rs
- debug thisThread ["pulling from", show pullrs]
- runThreadState st $ manualPull branch pullrs
- -- TODO queue transfers for new files in both directions
- where
+ let nonspecial = filter (Git.repoIsLocal . Remote.repo) rs
+ unless (null nonspecial) $ do
+ debug thisThread ["pulling from", show nonspecial]
+ runThreadState st $ manualPull branch nonspecial
+ now <- getCurrentTime
+ pushToRemotes thisThread now st Nothing nonspecial
+ addScanRemotes scanremotes rs
{- Finds remotes located underneath the mount point.
-
diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs
index e5191109c..cba53af23 100644
--- a/Assistant/Threads/Pusher.hs
+++ b/Assistant/Threads/Pusher.hs
@@ -1,4 +1,4 @@
-{- git-annex assistant git pushing threads
+{- git-annex assistant git pushing thread
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
@@ -36,7 +36,7 @@ pushRetryThread st pushmap = runEvery (Seconds halfhour) $ do
, "failed pushes"
]
now <- getCurrentTime
- pushToRemotes now st pushmap topush
+ pushToRemotes thisThread now st (Just pushmap) topush
where
halfhour = 1800
@@ -53,7 +53,7 @@ pushThread st daemonstatus commitchan pushmap = do
then do
remotes <- runThreadState st $
knownRemotes <$> getDaemonStatus daemonstatus
- pushToRemotes now st pushmap remotes
+ pushToRemotes thisThread now st (Just pushmap) remotes
else do
debug thisThread
[ "delaying push of"
@@ -78,24 +78,27 @@ shouldPush _now commits
-
- Avoids running possibly long-duration commands in the Annex monad, so
- as not to block other threads. -}
-pushToRemotes :: UTCTime -> ThreadState -> FailedPushMap -> [Remote] -> IO ()
-pushToRemotes now st pushmap remotes = do
+pushToRemotes :: ThreadName -> UTCTime -> ThreadState -> (Maybe FailedPushMap) -> [Remote] -> IO ()
+pushToRemotes threadname now st mpushmap remotes = do
(g, branch) <- runThreadState st $
(,) <$> fromRepo id <*> Command.Sync.currentBranch
go True branch g remotes
where
go shouldretry branch g rs = do
- debug thisThread
+ debug threadname
[ "pushing to"
, show rs
]
Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
(succeeded, failed) <- inParallel (push g branch) rs
- changeFailedPushMap pushmap $ \m ->
- M.union (makemap failed) $
- M.difference m (makemap succeeded)
+ case mpushmap of
+ Nothing -> noop
+ Just pushmap ->
+ changeFailedPushMap pushmap $ \m ->
+ M.union (makemap failed) $
+ M.difference m (makemap succeeded)
unless (null failed) $
- debug thisThread
+ debug threadname
[ "failed to push to"
, show failed
]
@@ -109,6 +112,6 @@ pushToRemotes now st pushmap remotes = do
( exitSuccess, exitFailure)
retry branch g rs = do
- debug thisThread [ "trying manual pull to resolve failed pushes" ]
+ debug threadname [ "trying manual pull to resolve failed pushes" ]
runThreadState st $ manualPull branch rs
go False branch g rs
diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs
new file mode 100644
index 000000000..0a40f7ead
--- /dev/null
+++ b/Assistant/Threads/TransferScanner.hs
@@ -0,0 +1,34 @@
+{- git-annex assistant thread to scan remotes to find needed transfers
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.TransferScanner where
+
+import Assistant.Common
+import Assistant.ScanRemotes
+import Assistant.TransferQueue
+import Assistant.ThreadedMonad
+import Logs.Transfer
+import Types.Remote
+import Utility.ThreadScheduler
+
+thisThread :: ThreadName
+thisThread = "TransferScanner"
+
+{- This thread scans remotes, to find transfers that need to be made to
+ - keep their data in sync. The transfers are queued with lot priority. -}
+transferScannerThread :: ThreadState -> ScanRemoteMap -> TransferQueue -> IO ()
+transferScannerThread st scanremotes transferqueue = do
+ runEvery (Seconds 2) $ do
+ r <- getScanRemote scanremotes
+ needtransfer <- scan st r
+ forM_ needtransfer $ \(f, t) ->
+ queueLaterTransfer transferqueue f t
+
+scan :: ThreadState -> Remote -> IO [(AssociatedFile, Transfer)]
+scan st r = do
+ debug thisThread ["scanning", show r]
+ return [] -- TODO