diff options
-rw-r--r-- | Annex.hs | 4 | ||||
-rw-r--r-- | Assistant.hs | 37 | ||||
-rw-r--r-- | Assistant/BranchChange.hs | 9 | ||||
-rw-r--r-- | Assistant/Pushes.hs | 13 | ||||
-rw-r--r-- | Assistant/Sync.hs | 30 | ||||
-rw-r--r-- | Assistant/Threads/MountWatcher.hs | 36 | ||||
-rw-r--r-- | Assistant/Threads/NetWatcher.hs | 28 | ||||
-rw-r--r-- | Assistant/Threads/PushNotifier.hs | 87 | ||||
-rw-r--r-- | Assistant/Threads/Pusher.hs | 12 | ||||
-rw-r--r-- | Assistant/Threads/WebApp.hs | 1 | ||||
-rw-r--r-- | Assistant/XMPP.hs | 118 | ||||
-rw-r--r-- | Build/Configure.hs | 1 | ||||
-rw-r--r-- | Command/Status.hs | 4 | ||||
-rw-r--r-- | Common.hs | 4 | ||||
-rw-r--r-- | Makefile | 5 | ||||
-rw-r--r-- | Utility/SRV.hs | 82 | ||||
-rw-r--r-- | Utility/State.hs | 4 | ||||
-rw-r--r-- | debian/control | 1 | ||||
-rwxr-xr-x | debian/rules | 4 | ||||
-rw-r--r-- | doc/design/assistant/xmpp.mdwn | 7 | ||||
-rw-r--r-- | doc/install/fromscratch.mdwn | 1 | ||||
-rw-r--r-- | git-annex.cabal | 14 |
22 files changed, 425 insertions, 77 deletions
@@ -5,7 +5,7 @@ - Licensed under the GNU GPL version 3 or higher. -} -{-# LANGUAGE GeneralizedNewtypeDeriving, TypeFamilies, MultiParamTypeClasses #-} +{-# LANGUAGE PackageImports, GeneralizedNewtypeDeriving, TypeFamilies, MultiParamTypeClasses #-} module Annex ( Annex, @@ -30,7 +30,7 @@ module Annex ( fromRepo, ) where -import Control.Monad.State.Strict +import "mtl" Control.Monad.State.Strict import Control.Monad.Trans.Control (StM, MonadBaseControl, liftBaseWith, restoreM) import Control.Monad.Base (liftBase, MonadBase) import System.Posix.Types (Fd) diff --git a/Assistant.hs b/Assistant.hs index cb94ca462..0fdda6997 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -69,7 +69,9 @@ - Thread 18: ConfigMonitor - Triggered by changes to the git-annex branch, checks for changed - config files, and reloads configs. - - Thread 19: WebApp + - Thread 19: PushNotifier + - Notifies other repositories of pushes, using out of band signaling. + - Thread 20: WebApp - Spawns more threads as necessary to handle clients. - Displays the DaemonStatus. - @@ -100,6 +102,11 @@ - ScanRemotes (STM TMVar) - Remotes that have been disconnected, and should be scanned - are indicated by writing to this TMVar. + - BranchChanged (STM SampleVar) + - Changes to the git-annex branch are indicated by updating this + - SampleVar. + - PushNotifier (STM TChan) + - After successful pushes, this SampleVar is updated. - UrlRenderer (MVar) - A Yesod route rendering function is stored here. This allows - things that need to render Yesod routes to block until the webapp @@ -133,6 +140,9 @@ import Assistant.Threads.NetWatcher import Assistant.Threads.TransferScanner import Assistant.Threads.TransferPoller import Assistant.Threads.ConfigMonitor +#ifdef WITH_XMPP +import Assistant.Threads.PushNotifier +#endif #ifdef WITH_WEBAPP import Assistant.WebApp import Assistant.Threads.WebApp @@ -180,6 +190,7 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do transferslots <- newTransferSlots scanremotes <- newScanRemoteMap branchhandle <- newBranchChangeHandle + pushnotifier <- newPushNotifier #ifdef WITH_WEBAPP urlrenderer <- newUrlRenderer #endif @@ -191,25 +202,31 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do , assist $ pairListenerThread st dstatus scanremotes urlrenderer #endif #endif - , assist $ pushThread st dstatus commitchan pushmap - , assist $ pushRetryThread st dstatus pushmap + , assist $ pushThread st dstatus commitchan pushmap pushnotifier + , assist $ pushRetryThread st dstatus pushmap pushnotifier , assist $ mergeThread st dstatus transferqueue branchhandle , assist $ transferWatcherThread st dstatus transferqueue , assist $ transferPollerThread st dstatus , assist $ transfererThread st dstatus transferqueue transferslots , assist $ daemonStatusThread st dstatus , assist $ sanityCheckerThread st dstatus transferqueue changechan - , assist $ mountWatcherThread st dstatus scanremotes - , assist $ netWatcherThread st dstatus scanremotes - , assist $ netWatcherFallbackThread st dstatus scanremotes + , assist $ mountWatcherThread st dstatus scanremotes pushnotifier + , assist $ netWatcherThread st dstatus scanremotes pushnotifier + , assist $ netWatcherFallbackThread st dstatus scanremotes pushnotifier , assist $ transferScannerThread st dstatus scanremotes transferqueue , assist $ configMonitorThread st dstatus branchhandle commitchan +#ifdef WITH_XMPP + {- Bound thread, because TLS needs it. -} + , bound $ assist $ pushNotifierThread st dstatus pushnotifier +#endif , watch $ watchThread st dstatus transferqueue changechan ] waitForTermination - watch a = (True, a) - assist a = (False, a) - startthread dstatus (watcher, t) - | watcher || assistant = void $ forkIO $ + + watch a = (forkIO, True, a) + assist a = (forkIO, False, a) + bound (_, watcher, t) = (forkOS, watcher, t) + startthread dstatus (runner, watcher, t) + | watcher || assistant = void $ runner $ runNamedThread dstatus t | otherwise = noop diff --git a/Assistant/BranchChange.hs b/Assistant/BranchChange.hs index b166c8777..d1d1c20df 100644 --- a/Assistant/BranchChange.hs +++ b/Assistant/BranchChange.hs @@ -8,14 +8,15 @@ module Assistant.BranchChange where import Control.Concurrent.MSampleVar +import Assistant.Common -type BranchChangeHandle = MSampleVar () +newtype BranchChangeHandle = BranchChangeHandle (MSampleVar ()) newBranchChangeHandle :: IO BranchChangeHandle -newBranchChangeHandle = newEmptySV +newBranchChangeHandle = BranchChangeHandle <$> newEmptySV branchChanged :: BranchChangeHandle -> IO () -branchChanged = flip writeSV () +branchChanged (BranchChangeHandle h) = writeSV h () waitBranchChange :: BranchChangeHandle -> IO () -waitBranchChange = readSV +waitBranchChange (BranchChangeHandle h) = readSV h diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs index f411dda07..7842c1884 100644 --- a/Assistant/Pushes.hs +++ b/Assistant/Pushes.hs @@ -8,6 +8,7 @@ module Assistant.Pushes where import Common.Annex +import Utility.TSet import Control.Concurrent.STM import Data.Time.Clock @@ -17,6 +18,9 @@ import qualified Data.Map as M type PushMap = M.Map Remote UTCTime type FailedPushMap = TMVar PushMap +{- Used to notify about successful pushes. -} +newtype PushNotifier = PushNotifier (TSet UUID) + {- The TMVar starts empty, and is left empty when there are no - failed pushes. This way we can block until there are some failed pushes. -} @@ -44,3 +48,12 @@ changeFailedPushMap v a = atomically $ store m | m == M.empty = noop | otherwise = putTMVar v $! m + +newPushNotifier :: IO PushNotifier +newPushNotifier = PushNotifier <$> newTSet + +notifyPush :: [UUID] -> PushNotifier -> IO () +notifyPush us (PushNotifier s) = putTSet s us + +waitPush :: PushNotifier -> IO [UUID] +waitPush (PushNotifier s) = getTSet s diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs index 6c167e2ea..e332d7856 100644 --- a/Assistant/Sync.hs +++ b/Assistant/Sync.hs @@ -36,9 +36,9 @@ import Control.Concurrent - the remotes have diverged from the local git-annex branch. Otherwise, - it's sufficient to requeue failed transfers. -} -reconnectRemotes :: ThreadName -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> [Remote] -> IO () -reconnectRemotes _ _ _ _ [] = noop -reconnectRemotes threadname st dstatus scanremotes rs = void $ +reconnectRemotes :: ThreadName -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> Maybe PushNotifier -> [Remote] -> IO () +reconnectRemotes _ _ _ _ _ [] = noop +reconnectRemotes threadname st dstatus scanremotes pushnotifier rs = void $ alertWhile dstatus (syncAlert rs) $ do (ok, diverged) <- sync =<< runThreadState st (inRepo Git.Branch.current) @@ -48,13 +48,13 @@ reconnectRemotes threadname st dstatus scanremotes rs = void $ (gitremotes, _specialremotes) = partition (Git.repoIsUrl . Remote.repo) rs sync (Just branch) = do - diverged <- manualPull st (Just branch) gitremotes + diverged <- snd <$> manualPull st (Just branch) gitremotes now <- getCurrentTime - ok <- pushToRemotes threadname now st Nothing gitremotes + ok <- pushToRemotes threadname now st pushnotifier Nothing gitremotes return (ok, diverged) {- No local branch exists yet, but we can try pulling. -} sync Nothing = do - diverged <- manualPull st Nothing gitremotes + diverged <- snd <$> manualPull st Nothing gitremotes return (True, diverged) {- Updates the local sync branch, then pushes it to all remotes, in @@ -81,8 +81,8 @@ reconnectRemotes threadname st dstatus scanremotes rs = void $ - them. While ugly, those branches are reserved for pushing by us, and - so our pushes will succeed. -} -pushToRemotes :: ThreadName -> UTCTime -> ThreadState -> Maybe FailedPushMap -> [Remote] -> IO Bool -pushToRemotes threadname now st mpushmap remotes = do +pushToRemotes :: ThreadName -> UTCTime -> ThreadState -> Maybe PushNotifier -> Maybe FailedPushMap -> [Remote] -> IO Bool +pushToRemotes threadname now st mpushnotifier mpushmap remotes = do (g, branch, u) <- runThreadState st $ (,,) <$> gitRepo <*> inRepo Git.Branch.current @@ -100,7 +100,9 @@ pushToRemotes threadname now st mpushmap remotes = do updatemap succeeded [] let ok = null failed if ok - then return ok + then do + maybe noop (notifyPush $ map Remote.uuid succeeded) mpushnotifier + return ok else if shouldretry then retry branch g u failed else fallback branch g u failed @@ -124,6 +126,8 @@ pushToRemotes threadname now st mpushmap remotes = do ] (succeeded, failed) <- inParallel (pushfallback g u branch) rs updatemap succeeded failed + unless (null succeeded) $ + maybe noop (notifyPush $ map Remote.uuid succeeded) mpushnotifier return $ null failed push g branch remote = Command.Sync.pushBranch remote branch g @@ -143,18 +147,18 @@ pushToRemotes threadname now st mpushmap remotes = do where s = show $ Git.Ref.base b {- Manually pull from remotes and merge their branches. -} -manualPull :: ThreadState -> Maybe Git.Ref -> [Remote] -> IO Bool +manualPull :: ThreadState -> Maybe Git.Ref -> [Remote] -> IO ([Bool], Bool) manualPull st currentbranch remotes = do g <- runThreadState st gitRepo - forM_ remotes $ \r -> + results <- forM remotes $ \r -> Git.Command.runBool "fetch" [Param $ Remote.name r] g haddiverged <- runThreadState st Annex.Branch.forceUpdate forM_ remotes $ \r -> runThreadState st $ Command.Sync.mergeRemote r currentbranch - return haddiverged + return (results, haddiverged) {- Start syncing a newly added remote, using a background thread. -} syncNewRemote :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> Remote -> IO () syncNewRemote st dstatus scanremotes remote = do runThreadState st $ updateSyncRemotes dstatus - void $ forkIO $ reconnectRemotes "SyncRemote" st dstatus scanremotes [remote] + void $ forkIO $ reconnectRemotes "SyncRemote" st dstatus scanremotes Nothing [remote] diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs index a5200adfe..afd1c223c 100644 --- a/Assistant/Threads/MountWatcher.hs +++ b/Assistant/Threads/MountWatcher.hs @@ -15,6 +15,7 @@ import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.ScanRemotes import Assistant.Sync +import Assistant.Pushes import qualified Annex import qualified Git import Utility.ThreadScheduler @@ -38,20 +39,21 @@ import qualified Control.Exception as E thisThread :: ThreadName thisThread = "MountWatcher" -mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread -mountWatcherThread st handle scanremotes = thread $ +mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread +mountWatcherThread st handle scanremotes pushnotifier = thread $ #if WITH_DBUS - dbusThread st handle scanremotes + dbusThread st handle scanremotes pushnotifier #else - pollingThread st handle scanremotes + pollingThread st handle scanremotes pushnotifier #endif where thread = NamedThread thisThread #if WITH_DBUS -dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () -dbusThread st dstatus scanremotes = E.catch (runClient getSessionAddress go) onerr +dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () +dbusThread st dstatus scanremotes pushnotifier = + E.catch (runClient getSessionAddress go) onerr where go client = ifM (checkMountMonitor client) ( do @@ -64,7 +66,7 @@ dbusThread st dstatus scanremotes = E.catch (runClient getSessionAddress go) one listen client matcher $ \_event -> do nowmounted <- currentMountPoints wasmounted <- swapMVar mvar nowmounted - handleMounts st dstatus scanremotes wasmounted nowmounted + handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted , do runThreadState st $ warning "No known volume monitor available through dbus; falling back to mtab polling" @@ -80,7 +82,7 @@ dbusThread st dstatus scanremotes = E.catch (runClient getSessionAddress go) one runThreadState st $ warning $ "dbus failed; falling back to mtab polling (" ++ show e ++ ")" pollinstead - pollinstead = pollingThread st dstatus scanremotes + pollinstead = pollingThread st dstatus scanremotes pushnotifier {- Examine the list of services connected to dbus, to see if there - are any we can use to monitor mounts. If not, will attempt to start one. -} @@ -142,24 +144,24 @@ mountChanged = [gvfs True, gvfs False, kde, kdefallback] #endif -pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () -pollingThread st dstatus scanremotes = go =<< currentMountPoints +pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () +pollingThread st dstatus scanremotes pushnotifier = go =<< currentMountPoints where go wasmounted = do threadDelaySeconds (Seconds 10) nowmounted <- currentMountPoints - handleMounts st dstatus scanremotes wasmounted nowmounted + handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted go nowmounted -handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> MountPoints -> MountPoints -> IO () -handleMounts st dstatus scanremotes wasmounted nowmounted = - mapM_ (handleMount st dstatus scanremotes . mnt_dir) $ +handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> MountPoints -> MountPoints -> IO () +handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted = + mapM_ (handleMount st dstatus scanremotes pushnotifier . mnt_dir) $ S.toList $ newMountPoints wasmounted nowmounted -handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> FilePath -> IO () -handleMount st dstatus scanremotes dir = do +handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> FilePath -> IO () +handleMount st dstatus scanremotes pushnotifier dir = do debug thisThread ["detected mount of", dir] - reconnectRemotes thisThread st dstatus scanremotes + reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier) =<< filter (Git.repoIsLocal . Remote.repo) <$> remotesUnder st dstatus dir diff --git a/Assistant/Threads/NetWatcher.hs b/Assistant/Threads/NetWatcher.hs index 96b8007cc..883a7bef5 100644 --- a/Assistant/Threads/NetWatcher.hs +++ b/Assistant/Threads/NetWatcher.hs @@ -15,6 +15,7 @@ import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.ScanRemotes import Assistant.Sync +import Assistant.Pushes import Utility.ThreadScheduler import Remote.List import qualified Types.Remote as Remote @@ -31,12 +32,12 @@ import Data.Word (Word32) thisThread :: ThreadName thisThread = "NetWatcher" -netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread +netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread #if WITH_DBUS -netWatcherThread st dstatus scanremotes = thread $ - dbusThread st dstatus scanremotes +netWatcherThread st dstatus scanremotes pushnotifier = thread $ + dbusThread st dstatus scanremotes pushnotifier #else -netWatcherThread _ _ _ = thread noop +netWatcherThread _ _ _ _ = thread noop #endif where thread = NamedThread thisThread @@ -46,17 +47,18 @@ netWatcherThread _ _ _ = thread noop - any networked remotes that may have not been routable for a - while (despite the local network staying up), are synced with - periodically. -} -netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread -netWatcherFallbackThread st dstatus scanremotes = thread $ +netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread +netWatcherFallbackThread st dstatus scanremotes pushnotifier = thread $ runEvery (Seconds 3600) $ - handleConnection st dstatus scanremotes + handleConnection st dstatus scanremotes pushnotifier where thread = NamedThread thisThread #if WITH_DBUS -dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () -dbusThread st dstatus scanremotes = persistentClient getSystemAddress () onerr go +dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () +dbusThread st dstatus scanremotes pushnotifier = + persistentClient getSystemAddress () onerr go where go client = ifM (checkNetMonitor client) ( do @@ -68,7 +70,7 @@ dbusThread st dstatus scanremotes = persistentClient getSystemAddress () onerr g ) handleconn = do debug thisThread ["detected network connection"] - handleConnection st dstatus scanremotes + handleConnection st dstatus scanremotes pushnotifier onerr e _ = do runThreadState st $ warning $ "lost dbus connection; falling back to polling (" ++ show e ++ ")" @@ -127,9 +129,9 @@ listenWicdConnections client callback = #endif -handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () -handleConnection st dstatus scanremotes = - reconnectRemotes thisThread st dstatus scanremotes +handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () +handleConnection st dstatus scanremotes pushnotifier = + reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier) =<< networkRemotes st {- Finds network remotes. -} diff --git a/Assistant/Threads/PushNotifier.hs b/Assistant/Threads/PushNotifier.hs new file mode 100644 index 000000000..8c71138d7 --- /dev/null +++ b/Assistant/Threads/PushNotifier.hs @@ -0,0 +1,87 @@ +{- git-annex assistant push notification thread, using XMPP + - + - This handles both sending outgoing push notifications, and receiving + - incoming push notifications. + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.PushNotifier where + +import Assistant.Common +import Assistant.XMPP +import Assistant.ThreadedMonad +import Assistant.DaemonStatus +import Assistant.Pushes +import Assistant.Sync +import qualified Remote + +import Network.Protocol.XMPP +import Control.Concurrent +import qualified Data.Set as S +import qualified Git.Branch + +thisThread :: ThreadName +thisThread = "PushNotifier" + +pushNotifierThread :: ThreadState -> DaemonStatusHandle -> PushNotifier -> NamedThread +pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ do + v <- runThreadState st $ getXMPPCreds + case v of + Nothing -> return () -- no creds? exit thread + Just c -> void $ connectXMPP c $ \jid -> do + fulljid <- bindJID jid + liftIO $ debug thisThread ["XMPP connected", show fulljid] + s <- getSession + _ <- liftIO $ forkOS $ void $ runXMPP s $ + receivenotifications + sendnotifications + where + sendnotifications = forever $ do + us <- liftIO $ waitPush pushnotifier + let payload = [extendedAway, encodePushNotification us] + let notification = (emptyPresence PresenceAvailable) + { presencePayloads = payload } + putStanza notification + + receivenotifications = forever $ do + s <- getStanza + liftIO $ debug thisThread ["received XMPP:", show s] + case s of + ReceivedPresence p@(Presence { presenceType = PresenceAvailable }) -> + liftIO $ pull st dstatus $ + concat $ catMaybes $ + map decodePushNotification $ + presencePayloads p + _ -> noop + +{- We only pull from one remote out of the set listed in the push + - notification, as an optimisation. + - + - Note that it might be possible (though very unlikely) for the push + - notification to take a while to be sent, and multiple pushes happen + - before it is sent, so it includes multiple remotes that were pushed + - to at different times. + - + - It could then be the case that the remote we choose had the earlier + - push sent to it, but then failed to get the later push, and so is not + - fully up-to-date. If that happens, the pushRetryThread will come along + - and retry the push, and we'll get another notification once it succeeds, + - and pull again. -} +pull :: ThreadState -> DaemonStatusHandle -> [UUID] -> IO () +pull _ _ [] = noop +pull st dstatus us = do + rs <- filter matching . syncRemotes <$> getDaemonStatus dstatus + debug thisThread $ "push notification for" : + map (fromUUID . Remote.uuid ) rs + pullone rs =<< runThreadState st (inRepo Git.Branch.current) + where + matching r = Remote.uuid r `S.member` s + s = S.fromList us + + pullone [] _ = noop + pullone (r:rs) branch = + unlessM (all id . fst <$> manualPull st branch [r]) $ + pullone rs branch diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs index 4f3a2dd09..295ceddc9 100644 --- a/Assistant/Threads/Pusher.hs +++ b/Assistant/Threads/Pusher.hs @@ -24,8 +24,8 @@ thisThread :: ThreadName thisThread = "Pusher" {- This thread retries pushes that failed before. -} -pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> NamedThread -pushRetryThread st dstatus pushmap = thread $ runEvery (Seconds halfhour) $ do +pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> PushNotifier -> NamedThread +pushRetryThread st dstatus pushmap pushnotifier = thread $ runEvery (Seconds halfhour) $ do -- We already waited half an hour, now wait until there are failed -- pushes to retry. topush <- getFailedPushesBefore pushmap (fromIntegral halfhour) @@ -37,14 +37,14 @@ pushRetryThread st dstatus pushmap = thread $ runEvery (Seconds halfhour) $ do ] now <- getCurrentTime void $ alertWhile dstatus (pushRetryAlert topush) $ - pushToRemotes thisThread now st (Just pushmap) topush + pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) topush where halfhour = 1800 thread = NamedThread thisThread {- This thread pushes git commits out to remotes soon after they are made. -} -pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> NamedThread -pushThread st dstatus commitchan pushmap = thread $ runEvery (Seconds 2) $ do +pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> PushNotifier -> NamedThread +pushThread st dstatus commitchan pushmap pushnotifier = thread $ runEvery (Seconds 2) $ do -- We already waited two seconds as a simple rate limiter. -- Next, wait until at least one commit has been made commits <- getCommits commitchan @@ -56,7 +56,7 @@ pushThread st dstatus commitchan pushmap = thread $ runEvery (Seconds 2) $ do <$> getDaemonStatus dstatus unless (null remotes) $ void $ alertWhile dstatus (pushAlert remotes) $ - pushToRemotes thisThread now st (Just pushmap) remotes + pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) remotes else do debug thisThread [ "delaying push of" diff --git a/Assistant/Threads/WebApp.hs b/Assistant/Threads/WebApp.hs index c33dc2103..5eda88d36 100644 --- a/Assistant/Threads/WebApp.hs +++ b/Assistant/Threads/WebApp.hs @@ -24,6 +24,7 @@ import Assistant.WebApp.Configurators.Pairing #ifdef WITH_S3 import Assistant.WebApp.Configurators.S3 #endif +import Assistant.WebApp.Configurators.XMPP import Assistant.WebApp.Documentation import Assistant.WebApp.OtherRepos import Assistant.ThreadedMonad diff --git a/Assistant/XMPP.hs b/Assistant/XMPP.hs new file mode 100644 index 000000000..d71bd7eaf --- /dev/null +++ b/Assistant/XMPP.hs @@ -0,0 +1,118 @@ +{- xmpp support + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.XMPP where + +import Assistant.Common +import Utility.FileMode +import Utility.SRV + +import Network.Protocol.XMPP +import Network +import Control.Concurrent +import qualified Data.Text as T +import Data.XML.Types +import Control.Exception as E + +{- Everything we need to know to connect to an XMPP server. -} +data XMPPCreds = XMPPCreds + { xmppUsername :: T.Text + , xmppPassword :: T.Text + , xmppHostname :: HostName + , xmppPort :: Int + , xmppJID :: T.Text + } + deriving (Read, Show) + +{- Note that this must be run in a bound thread; gnuTLS requires it. -} +connectXMPP :: XMPPCreds -> (JID -> XMPP a) -> IO (Either SomeException ()) +connectXMPP c a = case parseJID (xmppJID c) of + Nothing -> error "bad JID" + Just jid -> runInBoundThread $ connectXMPP' jid c a + +{- Do a SRV lookup, but if it fails, fall back to the cached xmppHostname. -} +connectXMPP' :: JID -> XMPPCreds -> (JID -> XMPP a) -> IO (Either SomeException ()) +connectXMPP' jid c a = go =<< lookupSRV srvrecord + where + srvrecord = mkSRVTcp "xmpp-client" $ + T.unpack $ strDomain $ jidDomain jid + serverjid = JID Nothing (jidDomain jid) Nothing + + go [] = run (xmppHostname c) + (PortNumber $ fromIntegral $ xmppPort c) + (a jid) + go ((h,p):rest) = do + {- Try each SRV record in turn, until one connects, + - at which point the MVar will be full. -} + mv <- newEmptyMVar + r <- run h p $ do + liftIO $ putMVar mv () + a jid + ifM (isEmptyMVar mv) (go rest, return r) + + run h p a' = E.try (runClientError (Server serverjid h p) jid (xmppUsername c) (xmppPassword c) (void a')) :: IO (Either SomeException ()) + +{- XMPP runClient, that throws errors rather than returning an Either -} +runClientError :: Server -> JID -> T.Text -> T.Text -> XMPP a -> IO a +runClientError s j u p x = either (error . show) return =<< runClient s j u p x + +getXMPPCreds :: Annex (Maybe XMPPCreds) +getXMPPCreds = do + f <- xmppCredsFile + s <- liftIO $ catchMaybeIO $ readFile f + return $ readish =<< s + +setXMPPCreds :: XMPPCreds -> Annex () +setXMPPCreds creds = do + f <- xmppCredsFile + liftIO $ do + h <- openFile f WriteMode + modifyFileMode f $ removeModes + [groupReadMode, otherReadMode] + hPutStr h (show creds) + hClose h + +xmppCredsFile :: Annex FilePath +xmppCredsFile = do + dir <- fromRepo gitAnnexCredsDir + return $ dir </> "notify-xmpp" + +{- Marks the client as extended away. -} +extendedAway :: Element +extendedAway = Element (Name (T.pack "show") Nothing Nothing) [] + [NodeContent $ ContentText $ T.pack "xa"] + +{- Name of a git-annex tag, in our own XML namespace. + - (Not using a namespace URL to avoid unnecessary bloat.) -} +gitAnnexTagName :: Name +gitAnnexTagName = Name (T.pack "git-annex") (Just $ T.pack "git-annex") Nothing + +pushAttr :: Name +pushAttr = Name (T.pack "push") Nothing Nothing + +uuidSep :: T.Text +uuidSep = T.pack "," + +{- git-annex tag with one push attribute per UUID pushed to. -} +encodePushNotification :: [UUID] -> Element +encodePushNotification us = Element gitAnnexTagName + [(pushAttr, [ContentText pushvalue])] [] + where + pushvalue = T.intercalate uuidSep $ + map (T.pack . fromUUID) us + +decodePushNotification :: Element -> Maybe [UUID] +decodePushNotification (Element name attrs _nodes) + | name == gitAnnexTagName && not (null us) = Just us + | otherwise = Nothing + where + us = map (toUUID . T.unpack) $ + concatMap (T.splitOn uuidSep . T.concat . map fromContent . snd) $ + filter ispush attrs + ispush (k, _) = k == pushAttr + fromContent (ContentText t) = t + fromContent (ContentEntity t) = t diff --git a/Build/Configure.hs b/Build/Configure.hs index 96582f923..894feb409 100644 --- a/Build/Configure.hs +++ b/Build/Configure.hs @@ -27,6 +27,7 @@ tests = , TestCase "bup" $ testCmd "bup" "bup --version >/dev/null" , TestCase "gpg" $ testCmd "gpg" "gpg --version >/dev/null" , TestCase "lsof" $ testCmd "lsof" "lsof -v >/dev/null 2>&1" + , TestCase "host" $ testCmd "host" "host localhost >/dev/null 2>&1" , TestCase "ssh connection caching" getSshConnectionCaching ] ++ shaTestCases [ (1, "da39a3ee5e6b4b0d3255bfef95601890afd80709") diff --git a/Command/Status.hs b/Command/Status.hs index ab7dbb007..a16e14317 100644 --- a/Command/Status.hs +++ b/Command/Status.hs @@ -5,11 +5,11 @@ - Licensed under the GNU GPL version 3 or higher. -} -{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE PackageImports, BangPatterns #-} module Command.Status where -import Control.Monad.State.Strict +import "mtl" Control.Monad.State.Strict import qualified Data.Map as M import Text.JSON import Data.Tuple @@ -1,9 +1,11 @@ +{-# LANGUAGE PackageImports #-} + module Common (module X) where import Control.Monad as X hiding (join) import Control.Monad.IfElse as X import Control.Applicative as X -import Control.Monad.State.Strict as X (liftIO) +import "mtl" Control.Monad.State.Strict as X (liftIO) import Control.Exception.Extensible as X (IOException) import Data.Maybe as X @@ -1,13 +1,12 @@ CFLAGS=-Wall GIT_ANNEX_TMP_BUILD_DIR?=tmp -IGNORE=-ignore-package monads-fd -ignore-package monads-tf -BASEFLAGS=-Wall $(IGNORE) -outputdir $(GIT_ANNEX_TMP_BUILD_DIR) -IUtility +BASEFLAGS=-Wall -outputdir $(GIT_ANNEX_TMP_BUILD_DIR) -IUtility # If you get build failures due to missing haskell libraries, # you can turn off some of these features. # # If you're using an old version of yesod, enable -DWITH_OLD_YESOD -FEATURES?=$(GIT_ANNEX_LOCAL_FEATURES) -DWITH_ASSISTANT -DWITH_S3 -DWITH_WEBAPP -DWITH_PAIRING +FEATURES?=$(GIT_ANNEX_LOCAL_FEATURES) -DWITH_ASSISTANT -DWITH_S3 -DWITH_WEBAPP -DWITH_PAIRING -DWITH_XMPP bins=git-annex mans=git-annex.1 git-annex-shell.1 diff --git a/Utility/SRV.hs b/Utility/SRV.hs new file mode 100644 index 000000000..4f2db680b --- /dev/null +++ b/Utility/SRV.hs @@ -0,0 +1,82 @@ +{- SRV record lookup + - + - Uses either the ADNS Haskell library, or if it's not installed, + - the host command. + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} + +module Utility.SRV ( + mkSRVTcp, + mkSRV, + lookupSRV, +) where + +import qualified Build.SysConfig +import Utility.Process +import Utility.Exception +import Utility.PartialPrelude + +import Network +import Data.Function +import Data.List +import Control.Applicative +import Data.Maybe + +#ifdef WITH_ADNS +import ADNS.Resolver +import Data.Either +#endif + +newtype SRV = SRV String + deriving (Show, Eq) + +type HostPort = (HostName, PortID) + +mkSRV :: String -> String -> HostName -> SRV +mkSRV transport protocol host = SRV $ concat + ["_", protocol, "._", transport, ".", host] + +mkSRVTcp :: String -> HostName -> SRV +mkSRVTcp = mkSRV "tcp" + +{- Returns an ordered list, with highest priority hosts first. + - + - On error, returns an empty list. -} +lookupSRV :: SRV -> IO [HostPort] +#ifdef WITH_ADNS +lookupSRV srv = initResolver [] $ \resolver -> do + r <- catchDefaultIO (Right []) $ + resolveSRV resolver srv + return $ either (\_ -> []) id r +#else +lookupSRV = lookupSRVHost +#endif + +lookupSRVHost :: SRV -> IO [HostPort] +lookupSRVHost (SRV srv) + | Build.SysConfig.host = catchDefaultIO [] $ + parseSrvHost <$> readProcessEnv "host" ["-t", "SRV", "--", srv] + -- clear environment, to avoid LANG affecting output + (Just []) + | otherwise = return [] + +parseSrvHost :: String -> [HostPort] +parseSrvHost = map snd . reverse . sortBy cost . catMaybes . map parse . lines + where + cost = compare `on` fst + parse l = case words l of + [_, _, _, _, priority, weight, sport, hostname] -> do + let v = readish sport :: Maybe Int + case v of + Nothing -> Nothing + Just port -> Just + ( (priority, weight) + , (hostname, PortNumber $ fromIntegral port) + ) + _ -> Nothing + diff --git a/Utility/State.hs b/Utility/State.hs index c27f3c261..7f8919082 100644 --- a/Utility/State.hs +++ b/Utility/State.hs @@ -5,9 +5,11 @@ - Licensed under the GNU GPL version 3 or higher. -} +{-# LANGUAGE PackageImports #-} + module Utility.State where -import Control.Monad.State.Strict +import "mtl" Control.Monad.State.Strict {- Modifies Control.Monad.State's state, forcing a strict update. - This avoids building thunks in the state and leaking. diff --git a/debian/control b/debian/control index ba8f571c8..edd5e4d61 100644 --- a/debian/control +++ b/debian/control @@ -40,6 +40,7 @@ Build-Depends: libghc-network-multicast-dev, libghc-network-info-dev, libghc-safesemaphore-dev, + libghc-network-protocol-xmpp-dev, ikiwiki, perlmagick, git, diff --git a/debian/rules b/debian/rules index c0fbd9aa4..4a5532027 100755 --- a/debian/rules +++ b/debian/rules @@ -2,9 +2,9 @@ ARCH = $(shell dpkg-architecture -qDEB_BUILD_ARCH) ifeq (install ok installed,$(shell dpkg-query -W -f '$${Status}' libghc-yesod-dev 2>/dev/null)) -export FEATURES=-DWITH_ASSISTANT -DWITH_S3 -DWITH_OLD_YESOD -DWITH_WEBAPP -DWITH_PAIRING +export FEATURES=-DWITH_ASSISTANT -DWITH_S3 -DWITH_OLD_YESOD -DWITH_WEBAPP -DWITH_PAIRING -DWITH_XMPP else -export FEATURES=-DWITH_ASSISTANT -DWITH_S3 -DWITH_PAIRING +export FEATURES=-DWITH_ASSISTANT -DWITH_S3 -DWITH_PAIRING -DWITH_XMPP endif %: diff --git a/doc/design/assistant/xmpp.mdwn b/doc/design/assistant/xmpp.mdwn index 4a05afab9..c58d42ca9 100644 --- a/doc/design/assistant/xmpp.mdwn +++ b/doc/design/assistant/xmpp.mdwn @@ -43,11 +43,12 @@ using presence messages. These always mark it as extended away. To this, it adds its own tag as [extended content](http://xmpp.org/rfcs/rfc6121.html#presence-extended). The xml namespace is "git-annex" (not an URL because I hate wasting bandwidth). -To indicate it's pushed changes to a git repo, a client uses: +To indicate it's pushed changes to a git repo with a given UUID, a client uses: - <git-annex xmlns='git-annex' push="uuid" /> + <git-annex xmlns='git-annex' push="uuid[,uuid...]" /> -The push attribute can be repeated when the push was sent to multiple repos. +Multiple UUIDs can be listed when multiple clients were pushed. If the +git repo does not have a git-annex UUID, an empty string is used. ### security diff --git a/doc/install/fromscratch.mdwn b/doc/install/fromscratch.mdwn index 4410a59b9..f79ae7dc7 100644 --- a/doc/install/fromscratch.mdwn +++ b/doc/install/fromscratch.mdwn @@ -42,6 +42,7 @@ quite a lot. * [clientsession](http://hackage.haskell.org/package/clientsession) * [network-multicast](http://hackage.haskell.org/package/network-multicast) * [network-info](http://hackage.haskell.org/package/network-info) + * [network-protocol-xmpp](http://hackage.haskell.org/package/network-protocol-xmpp) * Shell commands * [git](http://git-scm.com/) * [uuid](http://www.ossp.org/pkg/lib/uuid/) diff --git a/git-annex.cabal b/git-annex.cabal index 5b7e78654..887cd9841 100644 --- a/git-annex.cabal +++ b/git-annex.cabal @@ -43,6 +43,12 @@ Flag Webapp Flag Pairing Description: Enable pairing +Flag XMPP + Description: Enable notifications using XMPP + +Flag Adns + Description: Enable the ADNS library for DNS lookup + Executable git-annex Main-Is: git-annex.hs Build-Depends: MissingH, hslogger, directory, filepath, @@ -91,6 +97,14 @@ Executable git-annex Build-Depends: network-multicast, network-info CPP-Options: -DWITH_PAIRING + if flag(XMPP) && flag(Assistant) + Build-Depends: network-protocol-xmpp + CPP-Options: -DWITH_XMPP + + if flag(XMPP) && flag(Assistant) && flag(Adns) + Build-Depends: hsdns + CPP-Options: -DWITH_ADNS + Test-Suite test Type: exitcode-stdio-1.0 Main-Is: test.hs |