diff options
author | Joey Hess <joey@kitenet.net> | 2012-10-24 13:35:43 -0400 |
---|---|---|
committer | Joey Hess <joey@kitenet.net> | 2012-10-24 13:38:28 -0400 |
commit | e8d09e97d3f21ef00cb54e26ea916754bc6fc3cb (patch) | |
tree | 43d3519ee44169a8ad208d15aef598f35bb3d025 /Assistant | |
parent | e7005a07d9d6577e837a61e4bc9c80133321ae03 (diff) |
added push notifier thread, currently a no-op
Hooked up everything that needs to notify on pushes. Note that
syncNewRemote does not notify. This is probably ok, and I'd need to thread
more state through to make it do so.
This is only set up to support a single push notification method; I didn't
use a NotificationBroadcaster. Partly because I don't yet know what info
about pushes needs to be communicated, so my data types are only
preliminary.
Diffstat (limited to 'Assistant')
-rw-r--r-- | Assistant/Pushes.hs | 13 | ||||
-rw-r--r-- | Assistant/Sync.hs | 20 | ||||
-rw-r--r-- | Assistant/Threads/MountWatcher.hs | 35 | ||||
-rw-r--r-- | Assistant/Threads/NetWatcher.hs | 27 | ||||
-rw-r--r-- | Assistant/Threads/PushNotifier.hs | 21 | ||||
-rw-r--r-- | Assistant/Threads/Pusher.hs | 12 |
6 files changed, 84 insertions, 44 deletions
diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs index f411dda07..649975fd1 100644 --- a/Assistant/Pushes.hs +++ b/Assistant/Pushes.hs @@ -10,6 +10,7 @@ module Assistant.Pushes where import Common.Annex import Control.Concurrent.STM +import Control.Concurrent.MSampleVar import Data.Time.Clock import qualified Data.Map as M @@ -17,6 +18,9 @@ import qualified Data.Map as M type PushMap = M.Map Remote UTCTime type FailedPushMap = TMVar PushMap +{- Used to notify about successful pushes. -} +newtype PushNotifier = PushNotifier (MSampleVar ()) + {- The TMVar starts empty, and is left empty when there are no - failed pushes. This way we can block until there are some failed pushes. -} @@ -44,3 +48,12 @@ changeFailedPushMap v a = atomically $ store m | m == M.empty = noop | otherwise = putTMVar v $! m + +newPushNotifier :: IO PushNotifier +newPushNotifier = PushNotifier <$> newEmptySV + +notifyPush :: PushNotifier -> IO () +notifyPush (PushNotifier sv) = writeSV sv () + +waitPush :: PushNotifier -> IO () +waitPush (PushNotifier sv) = readSV sv diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs index 6c167e2ea..e333877f2 100644 --- a/Assistant/Sync.hs +++ b/Assistant/Sync.hs @@ -36,9 +36,9 @@ import Control.Concurrent - the remotes have diverged from the local git-annex branch. Otherwise, - it's sufficient to requeue failed transfers. -} -reconnectRemotes :: ThreadName -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> [Remote] -> IO () -reconnectRemotes _ _ _ _ [] = noop -reconnectRemotes threadname st dstatus scanremotes rs = void $ +reconnectRemotes :: ThreadName -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> Maybe PushNotifier -> [Remote] -> IO () +reconnectRemotes _ _ _ _ _ [] = noop +reconnectRemotes threadname st dstatus scanremotes pushnotifier rs = void $ alertWhile dstatus (syncAlert rs) $ do (ok, diverged) <- sync =<< runThreadState st (inRepo Git.Branch.current) @@ -50,7 +50,7 @@ reconnectRemotes threadname st dstatus scanremotes rs = void $ sync (Just branch) = do diverged <- manualPull st (Just branch) gitremotes now <- getCurrentTime - ok <- pushToRemotes threadname now st Nothing gitremotes + ok <- pushToRemotes threadname now st pushnotifier Nothing gitremotes return (ok, diverged) {- No local branch exists yet, but we can try pulling. -} sync Nothing = do @@ -81,8 +81,8 @@ reconnectRemotes threadname st dstatus scanremotes rs = void $ - them. While ugly, those branches are reserved for pushing by us, and - so our pushes will succeed. -} -pushToRemotes :: ThreadName -> UTCTime -> ThreadState -> Maybe FailedPushMap -> [Remote] -> IO Bool -pushToRemotes threadname now st mpushmap remotes = do +pushToRemotes :: ThreadName -> UTCTime -> ThreadState -> Maybe PushNotifier -> Maybe FailedPushMap -> [Remote] -> IO Bool +pushToRemotes threadname now st mpushnotifier mpushmap remotes = do (g, branch, u) <- runThreadState st $ (,,) <$> gitRepo <*> inRepo Git.Branch.current @@ -100,7 +100,9 @@ pushToRemotes threadname now st mpushmap remotes = do updatemap succeeded [] let ok = null failed if ok - then return ok + then do + maybe noop notifyPush mpushnotifier + return ok else if shouldretry then retry branch g u failed else fallback branch g u failed @@ -124,6 +126,8 @@ pushToRemotes threadname now st mpushmap remotes = do ] (succeeded, failed) <- inParallel (pushfallback g u branch) rs updatemap succeeded failed + unless (null succeeded) $ + maybe noop notifyPush mpushnotifier return $ null failed push g branch remote = Command.Sync.pushBranch remote branch g @@ -157,4 +161,4 @@ manualPull st currentbranch remotes = do syncNewRemote :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> Remote -> IO () syncNewRemote st dstatus scanremotes remote = do runThreadState st $ updateSyncRemotes dstatus - void $ forkIO $ reconnectRemotes "SyncRemote" st dstatus scanremotes [remote] + void $ forkIO $ reconnectRemotes "SyncRemote" st dstatus scanremotes Nothing [remote] diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs index 462f5843c..c36b544a7 100644 --- a/Assistant/Threads/MountWatcher.hs +++ b/Assistant/Threads/MountWatcher.hs @@ -15,6 +15,7 @@ import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.ScanRemotes import Assistant.Sync +import Assistant.Pushes import qualified Annex import qualified Git import Utility.ThreadScheduler @@ -38,20 +39,20 @@ import qualified Control.Exception as E thisThread :: ThreadName thisThread = "MountWatcher" -mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread -mountWatcherThread st handle scanremotes = thread $ +mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread +mountWatcherThread st handle scanremotes pushnotifier = thread $ #if WITH_DBUS - dbusThread st handle scanremotes + dbusThread st handle scanremotes pushnotifier #else - pollingThread st handle scanremotes + pollingThread st handle scanremotes pushnotifier #endif where thread = NamedThread thisThread #if WITH_DBUS -dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () -dbusThread st dstatus scanremotes = E.catch (go =<< connectSession) onerr +dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () +dbusThread st dstatus scanremotes pushnotifier = E.catch (go =<< connectSession) onerr where go client = ifM (checkMountMonitor client) ( do @@ -64,7 +65,7 @@ dbusThread st dstatus scanremotes = E.catch (go =<< connectSession) onerr listen client matcher $ \_event -> do nowmounted <- currentMountPoints wasmounted <- swapMVar mvar nowmounted - handleMounts st dstatus scanremotes wasmounted nowmounted + handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted , do runThreadState st $ warning "No known volume monitor available through dbus; falling back to mtab polling" @@ -75,7 +76,7 @@ dbusThread st dstatus scanremotes = E.catch (go =<< connectSession) onerr runThreadState st $ warning $ "Failed to use dbus; falling back to mtab polling (" ++ show e ++ ")" pollinstead - pollinstead = pollingThread st dstatus scanremotes + pollinstead = pollingThread st dstatus scanremotes pushnotifier {- Examine the list of services connected to dbus, to see if there - are any we can use to monitor mounts. If not, will attempt to start one. -} @@ -137,24 +138,24 @@ mountChanged = [gvfs True, gvfs False, kde, kdefallback] #endif -pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () -pollingThread st dstatus scanremotes = go =<< currentMountPoints +pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () +pollingThread st dstatus scanremotes pushnotifier = go =<< currentMountPoints where go wasmounted = do threadDelaySeconds (Seconds 10) nowmounted <- currentMountPoints - handleMounts st dstatus scanremotes wasmounted nowmounted + handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted go nowmounted -handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> MountPoints -> MountPoints -> IO () -handleMounts st dstatus scanremotes wasmounted nowmounted = - mapM_ (handleMount st dstatus scanremotes . mnt_dir) $ +handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> MountPoints -> MountPoints -> IO () +handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted = + mapM_ (handleMount st dstatus scanremotes pushnotifier . mnt_dir) $ S.toList $ newMountPoints wasmounted nowmounted -handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> FilePath -> IO () -handleMount st dstatus scanremotes dir = do +handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> FilePath -> IO () +handleMount st dstatus scanremotes pushnotifier dir = do debug thisThread ["detected mount of", dir] - reconnectRemotes thisThread st dstatus scanremotes + reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier) =<< filter (Git.repoIsLocal . Remote.repo) <$> remotesUnder st dstatus dir diff --git a/Assistant/Threads/NetWatcher.hs b/Assistant/Threads/NetWatcher.hs index a8daa9435..2c637f414 100644 --- a/Assistant/Threads/NetWatcher.hs +++ b/Assistant/Threads/NetWatcher.hs @@ -15,6 +15,7 @@ import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.ScanRemotes import Assistant.Sync +import Assistant.Pushes import Utility.ThreadScheduler import Remote.List import qualified Types.Remote as Remote @@ -32,12 +33,12 @@ import qualified Control.Exception as E thisThread :: ThreadName thisThread = "NetWatcher" -netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread +netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread #if WITH_DBUS -netWatcherThread st dstatus scanremotes = thread $ - dbusThread st dstatus scanremotes +netWatcherThread st dstatus scanremotes pushnotifier = thread $ + dbusThread st dstatus scanremotes pushnotifier #else -netWatcherThread _ _ _ = thread noop +netWatcherThread _ _ _ _ = thread noop #endif where thread = NamedThread thisThread @@ -47,17 +48,17 @@ netWatcherThread _ _ _ = thread noop - any networked remotes that may have not been routable for a - while (despite the local network staying up), are synced with - periodically. -} -netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread -netWatcherFallbackThread st dstatus scanremotes = thread $ +netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread +netWatcherFallbackThread st dstatus scanremotes pushnotifier = thread $ runEvery (Seconds 3600) $ - handleConnection st dstatus scanremotes + handleConnection st dstatus scanremotes pushnotifier where thread = NamedThread thisThread #if WITH_DBUS -dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () -dbusThread st dstatus scanremotes = E.catch (go =<< connectSystem) onerr +dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () +dbusThread st dstatus scanremotes pushnotifier = E.catch (go =<< connectSystem) onerr where go client = ifM (checkNetMonitor client) ( do @@ -72,7 +73,7 @@ dbusThread st dstatus scanremotes = E.catch (go =<< connectSystem) onerr warning $ "Failed to use dbus; falling back to polling (" ++ show e ++ ")" handle = do debug thisThread ["detected network connection"] - handleConnection st dstatus scanremotes + handleConnection st dstatus scanremotes pushnotifier {- Examine the list of services connected to dbus, to see if there - are any we can use to monitor network connections. -} @@ -126,9 +127,9 @@ listenWicdConnections client callback = #endif -handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () -handleConnection st dstatus scanremotes = - reconnectRemotes thisThread st dstatus scanremotes +handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () +handleConnection st dstatus scanremotes pushnotifier = + reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier) =<< networkRemotes st {- Finds network remotes. -} diff --git a/Assistant/Threads/PushNotifier.hs b/Assistant/Threads/PushNotifier.hs new file mode 100644 index 000000000..cc5309712 --- /dev/null +++ b/Assistant/Threads/PushNotifier.hs @@ -0,0 +1,21 @@ +{- git-annex assistant push notification thread + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.PushNotifier where + +import Assistant.Common +import Assistant.Pushes + +thisThread :: ThreadName +thisThread = "PushNotifier" + +pushNotifierThread :: PushNotifier -> NamedThread +pushNotifierThread pushnotifier = thread $ forever $ do + waitPush pushnotifier + -- TODO + where + thread = NamedThread thisThread diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs index 4f3a2dd09..295ceddc9 100644 --- a/Assistant/Threads/Pusher.hs +++ b/Assistant/Threads/Pusher.hs @@ -24,8 +24,8 @@ thisThread :: ThreadName thisThread = "Pusher" {- This thread retries pushes that failed before. -} -pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> NamedThread -pushRetryThread st dstatus pushmap = thread $ runEvery (Seconds halfhour) $ do +pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> PushNotifier -> NamedThread +pushRetryThread st dstatus pushmap pushnotifier = thread $ runEvery (Seconds halfhour) $ do -- We already waited half an hour, now wait until there are failed -- pushes to retry. topush <- getFailedPushesBefore pushmap (fromIntegral halfhour) @@ -37,14 +37,14 @@ pushRetryThread st dstatus pushmap = thread $ runEvery (Seconds halfhour) $ do ] now <- getCurrentTime void $ alertWhile dstatus (pushRetryAlert topush) $ - pushToRemotes thisThread now st (Just pushmap) topush + pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) topush where halfhour = 1800 thread = NamedThread thisThread {- This thread pushes git commits out to remotes soon after they are made. -} -pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> NamedThread -pushThread st dstatus commitchan pushmap = thread $ runEvery (Seconds 2) $ do +pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> PushNotifier -> NamedThread +pushThread st dstatus commitchan pushmap pushnotifier = thread $ runEvery (Seconds 2) $ do -- We already waited two seconds as a simple rate limiter. -- Next, wait until at least one commit has been made commits <- getCommits commitchan @@ -56,7 +56,7 @@ pushThread st dstatus commitchan pushmap = thread $ runEvery (Seconds 2) $ do <$> getDaemonStatus dstatus unless (null remotes) $ void $ alertWhile dstatus (pushAlert remotes) $ - pushToRemotes thisThread now st (Just pushmap) remotes + pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) remotes else do debug thisThread [ "delaying push of" |