summaryrefslogtreecommitdiff
path: root/Assistant/Threads
diff options
context:
space:
mode:
Diffstat (limited to 'Assistant/Threads')
-rw-r--r--Assistant/Threads/MountWatcher.hs36
-rw-r--r--Assistant/Threads/NetWatcher.hs28
-rw-r--r--Assistant/Threads/PushNotifier.hs87
-rw-r--r--Assistant/Threads/Pusher.hs12
-rw-r--r--Assistant/Threads/WebApp.hs1
5 files changed, 128 insertions, 36 deletions
diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs
index a5200adfe..afd1c223c 100644
--- a/Assistant/Threads/MountWatcher.hs
+++ b/Assistant/Threads/MountWatcher.hs
@@ -15,6 +15,7 @@ import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.ScanRemotes
import Assistant.Sync
+import Assistant.Pushes
import qualified Annex
import qualified Git
import Utility.ThreadScheduler
@@ -38,20 +39,21 @@ import qualified Control.Exception as E
thisThread :: ThreadName
thisThread = "MountWatcher"
-mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread
-mountWatcherThread st handle scanremotes = thread $
+mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread
+mountWatcherThread st handle scanremotes pushnotifier = thread $
#if WITH_DBUS
- dbusThread st handle scanremotes
+ dbusThread st handle scanremotes pushnotifier
#else
- pollingThread st handle scanremotes
+ pollingThread st handle scanremotes pushnotifier
#endif
where
thread = NamedThread thisThread
#if WITH_DBUS
-dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
-dbusThread st dstatus scanremotes = E.catch (runClient getSessionAddress go) onerr
+dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO ()
+dbusThread st dstatus scanremotes pushnotifier =
+ E.catch (runClient getSessionAddress go) onerr
where
go client = ifM (checkMountMonitor client)
( do
@@ -64,7 +66,7 @@ dbusThread st dstatus scanremotes = E.catch (runClient getSessionAddress go) one
listen client matcher $ \_event -> do
nowmounted <- currentMountPoints
wasmounted <- swapMVar mvar nowmounted
- handleMounts st dstatus scanremotes wasmounted nowmounted
+ handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted
, do
runThreadState st $
warning "No known volume monitor available through dbus; falling back to mtab polling"
@@ -80,7 +82,7 @@ dbusThread st dstatus scanremotes = E.catch (runClient getSessionAddress go) one
runThreadState st $
warning $ "dbus failed; falling back to mtab polling (" ++ show e ++ ")"
pollinstead
- pollinstead = pollingThread st dstatus scanremotes
+ pollinstead = pollingThread st dstatus scanremotes pushnotifier
{- Examine the list of services connected to dbus, to see if there
- are any we can use to monitor mounts. If not, will attempt to start one. -}
@@ -142,24 +144,24 @@ mountChanged = [gvfs True, gvfs False, kde, kdefallback]
#endif
-pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
-pollingThread st dstatus scanremotes = go =<< currentMountPoints
+pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO ()
+pollingThread st dstatus scanremotes pushnotifier = go =<< currentMountPoints
where
go wasmounted = do
threadDelaySeconds (Seconds 10)
nowmounted <- currentMountPoints
- handleMounts st dstatus scanremotes wasmounted nowmounted
+ handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted
go nowmounted
-handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> MountPoints -> MountPoints -> IO ()
-handleMounts st dstatus scanremotes wasmounted nowmounted =
- mapM_ (handleMount st dstatus scanremotes . mnt_dir) $
+handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> MountPoints -> MountPoints -> IO ()
+handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted =
+ mapM_ (handleMount st dstatus scanremotes pushnotifier . mnt_dir) $
S.toList $ newMountPoints wasmounted nowmounted
-handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> FilePath -> IO ()
-handleMount st dstatus scanremotes dir = do
+handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> FilePath -> IO ()
+handleMount st dstatus scanremotes pushnotifier dir = do
debug thisThread ["detected mount of", dir]
- reconnectRemotes thisThread st dstatus scanremotes
+ reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier)
=<< filter (Git.repoIsLocal . Remote.repo)
<$> remotesUnder st dstatus dir
diff --git a/Assistant/Threads/NetWatcher.hs b/Assistant/Threads/NetWatcher.hs
index 96b8007cc..883a7bef5 100644
--- a/Assistant/Threads/NetWatcher.hs
+++ b/Assistant/Threads/NetWatcher.hs
@@ -15,6 +15,7 @@ import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.ScanRemotes
import Assistant.Sync
+import Assistant.Pushes
import Utility.ThreadScheduler
import Remote.List
import qualified Types.Remote as Remote
@@ -31,12 +32,12 @@ import Data.Word (Word32)
thisThread :: ThreadName
thisThread = "NetWatcher"
-netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread
+netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread
#if WITH_DBUS
-netWatcherThread st dstatus scanremotes = thread $
- dbusThread st dstatus scanremotes
+netWatcherThread st dstatus scanremotes pushnotifier = thread $
+ dbusThread st dstatus scanremotes pushnotifier
#else
-netWatcherThread _ _ _ = thread noop
+netWatcherThread _ _ _ _ = thread noop
#endif
where
thread = NamedThread thisThread
@@ -46,17 +47,18 @@ netWatcherThread _ _ _ = thread noop
- any networked remotes that may have not been routable for a
- while (despite the local network staying up), are synced with
- periodically. -}
-netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread
-netWatcherFallbackThread st dstatus scanremotes = thread $
+netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread
+netWatcherFallbackThread st dstatus scanremotes pushnotifier = thread $
runEvery (Seconds 3600) $
- handleConnection st dstatus scanremotes
+ handleConnection st dstatus scanremotes pushnotifier
where
thread = NamedThread thisThread
#if WITH_DBUS
-dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
-dbusThread st dstatus scanremotes = persistentClient getSystemAddress () onerr go
+dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO ()
+dbusThread st dstatus scanremotes pushnotifier =
+ persistentClient getSystemAddress () onerr go
where
go client = ifM (checkNetMonitor client)
( do
@@ -68,7 +70,7 @@ dbusThread st dstatus scanremotes = persistentClient getSystemAddress () onerr g
)
handleconn = do
debug thisThread ["detected network connection"]
- handleConnection st dstatus scanremotes
+ handleConnection st dstatus scanremotes pushnotifier
onerr e _ = do
runThreadState st $
warning $ "lost dbus connection; falling back to polling (" ++ show e ++ ")"
@@ -127,9 +129,9 @@ listenWicdConnections client callback =
#endif
-handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
-handleConnection st dstatus scanremotes =
- reconnectRemotes thisThread st dstatus scanremotes
+handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO ()
+handleConnection st dstatus scanremotes pushnotifier =
+ reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier)
=<< networkRemotes st
{- Finds network remotes. -}
diff --git a/Assistant/Threads/PushNotifier.hs b/Assistant/Threads/PushNotifier.hs
new file mode 100644
index 000000000..8c71138d7
--- /dev/null
+++ b/Assistant/Threads/PushNotifier.hs
@@ -0,0 +1,87 @@
+{- git-annex assistant push notification thread, using XMPP
+ -
+ - This handles both sending outgoing push notifications, and receiving
+ - incoming push notifications.
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.PushNotifier where
+
+import Assistant.Common
+import Assistant.XMPP
+import Assistant.ThreadedMonad
+import Assistant.DaemonStatus
+import Assistant.Pushes
+import Assistant.Sync
+import qualified Remote
+
+import Network.Protocol.XMPP
+import Control.Concurrent
+import qualified Data.Set as S
+import qualified Git.Branch
+
+thisThread :: ThreadName
+thisThread = "PushNotifier"
+
+pushNotifierThread :: ThreadState -> DaemonStatusHandle -> PushNotifier -> NamedThread
+pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ do
+ v <- runThreadState st $ getXMPPCreds
+ case v of
+ Nothing -> return () -- no creds? exit thread
+ Just c -> void $ connectXMPP c $ \jid -> do
+ fulljid <- bindJID jid
+ liftIO $ debug thisThread ["XMPP connected", show fulljid]
+ s <- getSession
+ _ <- liftIO $ forkOS $ void $ runXMPP s $
+ receivenotifications
+ sendnotifications
+ where
+ sendnotifications = forever $ do
+ us <- liftIO $ waitPush pushnotifier
+ let payload = [extendedAway, encodePushNotification us]
+ let notification = (emptyPresence PresenceAvailable)
+ { presencePayloads = payload }
+ putStanza notification
+
+ receivenotifications = forever $ do
+ s <- getStanza
+ liftIO $ debug thisThread ["received XMPP:", show s]
+ case s of
+ ReceivedPresence p@(Presence { presenceType = PresenceAvailable }) ->
+ liftIO $ pull st dstatus $
+ concat $ catMaybes $
+ map decodePushNotification $
+ presencePayloads p
+ _ -> noop
+
+{- We only pull from one remote out of the set listed in the push
+ - notification, as an optimisation.
+ -
+ - Note that it might be possible (though very unlikely) for the push
+ - notification to take a while to be sent, and multiple pushes happen
+ - before it is sent, so it includes multiple remotes that were pushed
+ - to at different times.
+ -
+ - It could then be the case that the remote we choose had the earlier
+ - push sent to it, but then failed to get the later push, and so is not
+ - fully up-to-date. If that happens, the pushRetryThread will come along
+ - and retry the push, and we'll get another notification once it succeeds,
+ - and pull again. -}
+pull :: ThreadState -> DaemonStatusHandle -> [UUID] -> IO ()
+pull _ _ [] = noop
+pull st dstatus us = do
+ rs <- filter matching . syncRemotes <$> getDaemonStatus dstatus
+ debug thisThread $ "push notification for" :
+ map (fromUUID . Remote.uuid ) rs
+ pullone rs =<< runThreadState st (inRepo Git.Branch.current)
+ where
+ matching r = Remote.uuid r `S.member` s
+ s = S.fromList us
+
+ pullone [] _ = noop
+ pullone (r:rs) branch =
+ unlessM (all id . fst <$> manualPull st branch [r]) $
+ pullone rs branch
diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs
index 4f3a2dd09..295ceddc9 100644
--- a/Assistant/Threads/Pusher.hs
+++ b/Assistant/Threads/Pusher.hs
@@ -24,8 +24,8 @@ thisThread :: ThreadName
thisThread = "Pusher"
{- This thread retries pushes that failed before. -}
-pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> NamedThread
-pushRetryThread st dstatus pushmap = thread $ runEvery (Seconds halfhour) $ do
+pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> PushNotifier -> NamedThread
+pushRetryThread st dstatus pushmap pushnotifier = thread $ runEvery (Seconds halfhour) $ do
-- We already waited half an hour, now wait until there are failed
-- pushes to retry.
topush <- getFailedPushesBefore pushmap (fromIntegral halfhour)
@@ -37,14 +37,14 @@ pushRetryThread st dstatus pushmap = thread $ runEvery (Seconds halfhour) $ do
]
now <- getCurrentTime
void $ alertWhile dstatus (pushRetryAlert topush) $
- pushToRemotes thisThread now st (Just pushmap) topush
+ pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) topush
where
halfhour = 1800
thread = NamedThread thisThread
{- This thread pushes git commits out to remotes soon after they are made. -}
-pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> NamedThread
-pushThread st dstatus commitchan pushmap = thread $ runEvery (Seconds 2) $ do
+pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> PushNotifier -> NamedThread
+pushThread st dstatus commitchan pushmap pushnotifier = thread $ runEvery (Seconds 2) $ do
-- We already waited two seconds as a simple rate limiter.
-- Next, wait until at least one commit has been made
commits <- getCommits commitchan
@@ -56,7 +56,7 @@ pushThread st dstatus commitchan pushmap = thread $ runEvery (Seconds 2) $ do
<$> getDaemonStatus dstatus
unless (null remotes) $
void $ alertWhile dstatus (pushAlert remotes) $
- pushToRemotes thisThread now st (Just pushmap) remotes
+ pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) remotes
else do
debug thisThread
[ "delaying push of"
diff --git a/Assistant/Threads/WebApp.hs b/Assistant/Threads/WebApp.hs
index c33dc2103..5eda88d36 100644
--- a/Assistant/Threads/WebApp.hs
+++ b/Assistant/Threads/WebApp.hs
@@ -24,6 +24,7 @@ import Assistant.WebApp.Configurators.Pairing
#ifdef WITH_S3
import Assistant.WebApp.Configurators.S3
#endif
+import Assistant.WebApp.Configurators.XMPP
import Assistant.WebApp.Documentation
import Assistant.WebApp.OtherRepos
import Assistant.ThreadedMonad