diff options
-rw-r--r-- | Annex.hs | 4 | ||||
-rw-r--r-- | Assistant.hs | 29 | ||||
-rw-r--r-- | Assistant/BranchChange.hs | 9 | ||||
-rw-r--r-- | Assistant/Pushes.hs | 27 | ||||
-rw-r--r-- | Assistant/Sync.hs | 30 | ||||
-rw-r--r-- | Assistant/Threads/MountWatcher.hs | 36 | ||||
-rw-r--r-- | Assistant/Threads/NetWatcher.hs | 29 | ||||
-rw-r--r-- | Assistant/Threads/PushNotifier.hs | 108 | ||||
-rw-r--r-- | Assistant/Threads/Pusher.hs | 12 | ||||
-rw-r--r-- | Assistant/Threads/WebApp.hs | 6 | ||||
-rw-r--r-- | Assistant/XMPP.hs | 121 | ||||
-rw-r--r-- | Build/Configure.hs | 1 | ||||
-rw-r--r-- | Command/Status.hs | 4 | ||||
-rw-r--r-- | Command/WebApp.hs | 4 | ||||
-rw-r--r-- | Common.hs | 4 | ||||
-rw-r--r-- | Makefile | 5 | ||||
-rw-r--r-- | Utility/DBus.hs | 9 | ||||
-rw-r--r-- | Utility/Exception.hs | 17 | ||||
-rw-r--r-- | Utility/SRV.hs | 82 | ||||
-rw-r--r-- | Utility/State.hs | 4 | ||||
-rw-r--r-- | debian/control | 1 | ||||
-rwxr-xr-x | debian/rules | 4 | ||||
-rw-r--r-- | doc/design/assistant/xmpp.mdwn | 15 | ||||
-rw-r--r-- | doc/install/fromscratch.mdwn | 1 | ||||
-rw-r--r-- | git-annex.cabal | 14 |
25 files changed, 487 insertions, 89 deletions
@@ -5,7 +5,7 @@ - Licensed under the GNU GPL version 3 or higher. -} -{-# LANGUAGE GeneralizedNewtypeDeriving, TypeFamilies, MultiParamTypeClasses #-} +{-# LANGUAGE PackageImports, GeneralizedNewtypeDeriving, TypeFamilies, MultiParamTypeClasses #-} module Annex ( Annex, @@ -30,7 +30,7 @@ module Annex ( fromRepo, ) where -import Control.Monad.State.Strict +import "mtl" Control.Monad.State.Strict import Control.Monad.Trans.Control (StM, MonadBaseControl, liftBaseWith, restoreM) import Control.Monad.Base (liftBase, MonadBase) import System.Posix.Types (Fd) diff --git a/Assistant.hs b/Assistant.hs index cb94ca462..6da565b5e 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -69,7 +69,9 @@ - Thread 18: ConfigMonitor - Triggered by changes to the git-annex branch, checks for changed - config files, and reloads configs. - - Thread 19: WebApp + - Thread 19: PushNotifier + - Notifies other repositories of pushes, using out of band signaling. + - Thread 20: WebApp - Spawns more threads as necessary to handle clients. - Displays the DaemonStatus. - @@ -100,6 +102,11 @@ - ScanRemotes (STM TMVar) - Remotes that have been disconnected, and should be scanned - are indicated by writing to this TMVar. + - BranchChanged (STM SampleVar) + - Changes to the git-annex branch are indicated by updating this + - SampleVar. + - PushNotifier (STM TChan) + - After successful pushes, this SampleVar is updated. - UrlRenderer (MVar) - A Yesod route rendering function is stored here. This allows - things that need to render Yesod routes to block until the webapp @@ -133,6 +140,9 @@ import Assistant.Threads.NetWatcher import Assistant.Threads.TransferScanner import Assistant.Threads.TransferPoller import Assistant.Threads.ConfigMonitor +#ifdef WITH_XMPP +import Assistant.Threads.PushNotifier +#endif #ifdef WITH_WEBAPP import Assistant.WebApp import Assistant.Threads.WebApp @@ -180,33 +190,38 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do transferslots <- newTransferSlots scanremotes <- newScanRemoteMap branchhandle <- newBranchChangeHandle + pushnotifier <- newPushNotifier #ifdef WITH_WEBAPP urlrenderer <- newUrlRenderer #endif mapM_ (startthread dstatus) [ watch $ commitThread st changechan commitchan transferqueue dstatus #ifdef WITH_WEBAPP - , assist $ webAppThread (Just st) dstatus scanremotes transferqueue transferslots urlrenderer Nothing webappwaiter + , assist $ webAppThread (Just st) dstatus scanremotes transferqueue transferslots pushnotifier urlrenderer Nothing webappwaiter #ifdef WITH_PAIRING , assist $ pairListenerThread st dstatus scanremotes urlrenderer #endif #endif - , assist $ pushThread st dstatus commitchan pushmap - , assist $ pushRetryThread st dstatus pushmap + , assist $ pushThread st dstatus commitchan pushmap pushnotifier + , assist $ pushRetryThread st dstatus pushmap pushnotifier , assist $ mergeThread st dstatus transferqueue branchhandle , assist $ transferWatcherThread st dstatus transferqueue , assist $ transferPollerThread st dstatus , assist $ transfererThread st dstatus transferqueue transferslots , assist $ daemonStatusThread st dstatus , assist $ sanityCheckerThread st dstatus transferqueue changechan - , assist $ mountWatcherThread st dstatus scanremotes - , assist $ netWatcherThread st dstatus scanremotes - , assist $ netWatcherFallbackThread st dstatus scanremotes + , assist $ mountWatcherThread st dstatus scanremotes pushnotifier + , assist $ netWatcherThread st dstatus scanremotes pushnotifier + , assist $ netWatcherFallbackThread st dstatus scanremotes pushnotifier , assist $ transferScannerThread st dstatus scanremotes transferqueue , assist $ configMonitorThread st dstatus branchhandle commitchan +#ifdef WITH_XMPP + , assist $ pushNotifierThread st dstatus pushnotifier +#endif , watch $ watchThread st dstatus transferqueue changechan ] waitForTermination + watch a = (True, a) assist a = (False, a) startthread dstatus (watcher, t) diff --git a/Assistant/BranchChange.hs b/Assistant/BranchChange.hs index b166c8777..d1d1c20df 100644 --- a/Assistant/BranchChange.hs +++ b/Assistant/BranchChange.hs @@ -8,14 +8,15 @@ module Assistant.BranchChange where import Control.Concurrent.MSampleVar +import Assistant.Common -type BranchChangeHandle = MSampleVar () +newtype BranchChangeHandle = BranchChangeHandle (MSampleVar ()) newBranchChangeHandle :: IO BranchChangeHandle -newBranchChangeHandle = newEmptySV +newBranchChangeHandle = BranchChangeHandle <$> newEmptySV branchChanged :: BranchChangeHandle -> IO () -branchChanged = flip writeSV () +branchChanged (BranchChangeHandle h) = writeSV h () waitBranchChange :: BranchChangeHandle -> IO () -waitBranchChange = readSV +waitBranchChange (BranchChangeHandle h) = readSV h diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs index f411dda07..49772d56a 100644 --- a/Assistant/Pushes.hs +++ b/Assistant/Pushes.hs @@ -8,8 +8,10 @@ module Assistant.Pushes where import Common.Annex +import Utility.TSet import Control.Concurrent.STM +import Control.Concurrent.MSampleVar import Data.Time.Clock import qualified Data.Map as M @@ -17,6 +19,14 @@ import qualified Data.Map as M type PushMap = M.Map Remote UTCTime type FailedPushMap = TMVar PushMap +{- The TSet is recent, successful pushes that other remotes should be + - notified about. + - + - The MSampleVar is written to when the PushNotifier thread should be + - restarted for some reason. + -} +data PushNotifier = PushNotifier (TSet UUID) (MSampleVar ()) + {- The TMVar starts empty, and is left empty when there are no - failed pushes. This way we can block until there are some failed pushes. -} @@ -44,3 +54,20 @@ changeFailedPushMap v a = atomically $ store m | m == M.empty = noop | otherwise = putTMVar v $! m + +newPushNotifier :: IO PushNotifier +newPushNotifier = PushNotifier + <$> newTSet + <*> newEmptySV + +notifyPush :: [UUID] -> PushNotifier -> IO () +notifyPush us (PushNotifier s _) = putTSet s us + +waitPush :: PushNotifier -> IO [UUID] +waitPush (PushNotifier s _) = getTSet s + +notifyRestart :: PushNotifier -> IO () +notifyRestart (PushNotifier _ sv) = writeSV sv () + +waitRestart :: PushNotifier -> IO () +waitRestart (PushNotifier _ sv) = readSV sv diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs index 6c167e2ea..e332d7856 100644 --- a/Assistant/Sync.hs +++ b/Assistant/Sync.hs @@ -36,9 +36,9 @@ import Control.Concurrent - the remotes have diverged from the local git-annex branch. Otherwise, - it's sufficient to requeue failed transfers. -} -reconnectRemotes :: ThreadName -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> [Remote] -> IO () -reconnectRemotes _ _ _ _ [] = noop -reconnectRemotes threadname st dstatus scanremotes rs = void $ +reconnectRemotes :: ThreadName -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> Maybe PushNotifier -> [Remote] -> IO () +reconnectRemotes _ _ _ _ _ [] = noop +reconnectRemotes threadname st dstatus scanremotes pushnotifier rs = void $ alertWhile dstatus (syncAlert rs) $ do (ok, diverged) <- sync =<< runThreadState st (inRepo Git.Branch.current) @@ -48,13 +48,13 @@ reconnectRemotes threadname st dstatus scanremotes rs = void $ (gitremotes, _specialremotes) = partition (Git.repoIsUrl . Remote.repo) rs sync (Just branch) = do - diverged <- manualPull st (Just branch) gitremotes + diverged <- snd <$> manualPull st (Just branch) gitremotes now <- getCurrentTime - ok <- pushToRemotes threadname now st Nothing gitremotes + ok <- pushToRemotes threadname now st pushnotifier Nothing gitremotes return (ok, diverged) {- No local branch exists yet, but we can try pulling. -} sync Nothing = do - diverged <- manualPull st Nothing gitremotes + diverged <- snd <$> manualPull st Nothing gitremotes return (True, diverged) {- Updates the local sync branch, then pushes it to all remotes, in @@ -81,8 +81,8 @@ reconnectRemotes threadname st dstatus scanremotes rs = void $ - them. While ugly, those branches are reserved for pushing by us, and - so our pushes will succeed. -} -pushToRemotes :: ThreadName -> UTCTime -> ThreadState -> Maybe FailedPushMap -> [Remote] -> IO Bool -pushToRemotes threadname now st mpushmap remotes = do +pushToRemotes :: ThreadName -> UTCTime -> ThreadState -> Maybe PushNotifier -> Maybe FailedPushMap -> [Remote] -> IO Bool +pushToRemotes threadname now st mpushnotifier mpushmap remotes = do (g, branch, u) <- runThreadState st $ (,,) <$> gitRepo <*> inRepo Git.Branch.current @@ -100,7 +100,9 @@ pushToRemotes threadname now st mpushmap remotes = do updatemap succeeded [] let ok = null failed if ok - then return ok + then do + maybe noop (notifyPush $ map Remote.uuid succeeded) mpushnotifier + return ok else if shouldretry then retry branch g u failed else fallback branch g u failed @@ -124,6 +126,8 @@ pushToRemotes threadname now st mpushmap remotes = do ] (succeeded, failed) <- inParallel (pushfallback g u branch) rs updatemap succeeded failed + unless (null succeeded) $ + maybe noop (notifyPush $ map Remote.uuid succeeded) mpushnotifier return $ null failed push g branch remote = Command.Sync.pushBranch remote branch g @@ -143,18 +147,18 @@ pushToRemotes threadname now st mpushmap remotes = do where s = show $ Git.Ref.base b {- Manually pull from remotes and merge their branches. -} -manualPull :: ThreadState -> Maybe Git.Ref -> [Remote] -> IO Bool +manualPull :: ThreadState -> Maybe Git.Ref -> [Remote] -> IO ([Bool], Bool) manualPull st currentbranch remotes = do g <- runThreadState st gitRepo - forM_ remotes $ \r -> + results <- forM remotes $ \r -> Git.Command.runBool "fetch" [Param $ Remote.name r] g haddiverged <- runThreadState st Annex.Branch.forceUpdate forM_ remotes $ \r -> runThreadState st $ Command.Sync.mergeRemote r currentbranch - return haddiverged + return (results, haddiverged) {- Start syncing a newly added remote, using a background thread. -} syncNewRemote :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> Remote -> IO () syncNewRemote st dstatus scanremotes remote = do runThreadState st $ updateSyncRemotes dstatus - void $ forkIO $ reconnectRemotes "SyncRemote" st dstatus scanremotes [remote] + void $ forkIO $ reconnectRemotes "SyncRemote" st dstatus scanremotes Nothing [remote] diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs index a5200adfe..afd1c223c 100644 --- a/Assistant/Threads/MountWatcher.hs +++ b/Assistant/Threads/MountWatcher.hs @@ -15,6 +15,7 @@ import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.ScanRemotes import Assistant.Sync +import Assistant.Pushes import qualified Annex import qualified Git import Utility.ThreadScheduler @@ -38,20 +39,21 @@ import qualified Control.Exception as E thisThread :: ThreadName thisThread = "MountWatcher" -mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread -mountWatcherThread st handle scanremotes = thread $ +mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread +mountWatcherThread st handle scanremotes pushnotifier = thread $ #if WITH_DBUS - dbusThread st handle scanremotes + dbusThread st handle scanremotes pushnotifier #else - pollingThread st handle scanremotes + pollingThread st handle scanremotes pushnotifier #endif where thread = NamedThread thisThread #if WITH_DBUS -dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () -dbusThread st dstatus scanremotes = E.catch (runClient getSessionAddress go) onerr +dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () +dbusThread st dstatus scanremotes pushnotifier = + E.catch (runClient getSessionAddress go) onerr where go client = ifM (checkMountMonitor client) ( do @@ -64,7 +66,7 @@ dbusThread st dstatus scanremotes = E.catch (runClient getSessionAddress go) one listen client matcher $ \_event -> do nowmounted <- currentMountPoints wasmounted <- swapMVar mvar nowmounted - handleMounts st dstatus scanremotes wasmounted nowmounted + handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted , do runThreadState st $ warning "No known volume monitor available through dbus; falling back to mtab polling" @@ -80,7 +82,7 @@ dbusThread st dstatus scanremotes = E.catch (runClient getSessionAddress go) one runThreadState st $ warning $ "dbus failed; falling back to mtab polling (" ++ show e ++ ")" pollinstead - pollinstead = pollingThread st dstatus scanremotes + pollinstead = pollingThread st dstatus scanremotes pushnotifier {- Examine the list of services connected to dbus, to see if there - are any we can use to monitor mounts. If not, will attempt to start one. -} @@ -142,24 +144,24 @@ mountChanged = [gvfs True, gvfs False, kde, kdefallback] #endif -pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () -pollingThread st dstatus scanremotes = go =<< currentMountPoints +pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () +pollingThread st dstatus scanremotes pushnotifier = go =<< currentMountPoints where go wasmounted = do threadDelaySeconds (Seconds 10) nowmounted <- currentMountPoints - handleMounts st dstatus scanremotes wasmounted nowmounted + handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted go nowmounted -handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> MountPoints -> MountPoints -> IO () -handleMounts st dstatus scanremotes wasmounted nowmounted = - mapM_ (handleMount st dstatus scanremotes . mnt_dir) $ +handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> MountPoints -> MountPoints -> IO () +handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted = + mapM_ (handleMount st dstatus scanremotes pushnotifier . mnt_dir) $ S.toList $ newMountPoints wasmounted nowmounted -handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> FilePath -> IO () -handleMount st dstatus scanremotes dir = do +handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> FilePath -> IO () +handleMount st dstatus scanremotes pushnotifier dir = do debug thisThread ["detected mount of", dir] - reconnectRemotes thisThread st dstatus scanremotes + reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier) =<< filter (Git.repoIsLocal . Remote.repo) <$> remotesUnder st dstatus dir diff --git a/Assistant/Threads/NetWatcher.hs b/Assistant/Threads/NetWatcher.hs index 96b8007cc..ed64541c3 100644 --- a/Assistant/Threads/NetWatcher.hs +++ b/Assistant/Threads/NetWatcher.hs @@ -15,6 +15,7 @@ import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.ScanRemotes import Assistant.Sync +import Assistant.Pushes import Utility.ThreadScheduler import Remote.List import qualified Types.Remote as Remote @@ -31,12 +32,12 @@ import Data.Word (Word32) thisThread :: ThreadName thisThread = "NetWatcher" -netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread +netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread #if WITH_DBUS -netWatcherThread st dstatus scanremotes = thread $ - dbusThread st dstatus scanremotes +netWatcherThread st dstatus scanremotes pushnotifier = thread $ + dbusThread st dstatus scanremotes pushnotifier #else -netWatcherThread _ _ _ = thread noop +netWatcherThread _ _ _ _ = thread noop #endif where thread = NamedThread thisThread @@ -46,17 +47,18 @@ netWatcherThread _ _ _ = thread noop - any networked remotes that may have not been routable for a - while (despite the local network staying up), are synced with - periodically. -} -netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread -netWatcherFallbackThread st dstatus scanremotes = thread $ +netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread +netWatcherFallbackThread st dstatus scanremotes pushnotifier = thread $ runEvery (Seconds 3600) $ - handleConnection st dstatus scanremotes + handleConnection st dstatus scanremotes pushnotifier where thread = NamedThread thisThread #if WITH_DBUS -dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () -dbusThread st dstatus scanremotes = persistentClient getSystemAddress () onerr go +dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () +dbusThread st dstatus scanremotes pushnotifier = + persistentClient getSystemAddress () onerr go where go client = ifM (checkNetMonitor client) ( do @@ -68,7 +70,8 @@ dbusThread st dstatus scanremotes = persistentClient getSystemAddress () onerr g ) handleconn = do debug thisThread ["detected network connection"] - handleConnection st dstatus scanremotes + notifyRestart pushnotifier + handleConnection st dstatus scanremotes pushnotifier onerr e _ = do runThreadState st $ warning $ "lost dbus connection; falling back to polling (" ++ show e ++ ")" @@ -127,9 +130,9 @@ listenWicdConnections client callback = #endif -handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () -handleConnection st dstatus scanremotes = - reconnectRemotes thisThread st dstatus scanremotes +handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () +handleConnection st dstatus scanremotes pushnotifier = + reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier) =<< networkRemotes st {- Finds network remotes. -} diff --git a/Assistant/Threads/PushNotifier.hs b/Assistant/Threads/PushNotifier.hs new file mode 100644 index 000000000..f6058b465 --- /dev/null +++ b/Assistant/Threads/PushNotifier.hs @@ -0,0 +1,108 @@ +{- git-annex assistant push notification thread, using XMPP + - + - This handles both sending outgoing push notifications, and receiving + - incoming push notifications. + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.PushNotifier where + +import Assistant.Common +import Assistant.XMPP +import Assistant.ThreadedMonad +import Assistant.DaemonStatus +import Assistant.Pushes +import Assistant.Sync +import qualified Remote +import Utility.ThreadScheduler + +import Network.Protocol.XMPP +import Control.Concurrent +import qualified Data.Set as S +import qualified Git.Branch +import Data.Time.Clock + +thisThread :: ThreadName +thisThread = "PushNotifier" + +controllerThread :: PushNotifier -> IO () -> IO () +controllerThread pushnotifier a = forever $ do + tid <- forkIO a + waitRestart pushnotifier + killThread tid + +pushNotifierThread :: ThreadState -> DaemonStatusHandle -> PushNotifier -> NamedThread +pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ + controllerThread pushnotifier $ do + v <- runThreadState st $ getXMPPCreds + case v of + Nothing -> noop + Just c -> loop c =<< getCurrentTime + where + loop c starttime = do + void $ connectXMPP c $ \jid -> do + fulljid <- bindJID jid + liftIO $ debug thisThread ["XMPP connected", show fulljid] + s <- getSession + _ <- liftIO $ forkIO $ void $ runXMPP s $ + receivenotifications + sendnotifications + now <- getCurrentTime + if diffUTCTime now starttime > 300 + then do + debug thisThread ["XMPP connection lost; reconnecting"] + loop c now + else do + debug thisThread ["XMPP connection failed; will retry"] + threadDelaySeconds (Seconds 300) + loop c =<< getCurrentTime + + sendnotifications = forever $ do + us <- liftIO $ waitPush pushnotifier + let payload = [extendedAway, encodePushNotification us] + let notification = (emptyPresence PresenceAvailable) + { presencePayloads = payload } + putStanza notification + + receivenotifications = forever $ do + s <- getStanza + liftIO $ debug thisThread ["received XMPP:", show s] + case s of + ReceivedPresence p@(Presence { presenceType = PresenceAvailable }) -> + liftIO $ pull st dstatus $ + concat $ catMaybes $ + map decodePushNotification $ + presencePayloads p + _ -> noop + +{- We only pull from one remote out of the set listed in the push + - notification, as an optimisation. + - + - Note that it might be possible (though very unlikely) for the push + - notification to take a while to be sent, and multiple pushes happen + - before it is sent, so it includes multiple remotes that were pushed + - to at different times. + - + - It could then be the case that the remote we choose had the earlier + - push sent to it, but then failed to get the later push, and so is not + - fully up-to-date. If that happens, the pushRetryThread will come along + - and retry the push, and we'll get another notification once it succeeds, + - and pull again. -} +pull :: ThreadState -> DaemonStatusHandle -> [UUID] -> IO () +pull _ _ [] = noop +pull st dstatus us = do + rs <- filter matching . syncRemotes <$> getDaemonStatus dstatus + debug thisThread $ "push notification for" : + map (fromUUID . Remote.uuid ) rs + pullone rs =<< runThreadState st (inRepo Git.Branch.current) + where + matching r = Remote.uuid r `S.member` s + s = S.fromList us + + pullone [] _ = noop + pullone (r:rs) branch = + unlessM (all id . fst <$> manualPull st branch [r]) $ + pullone rs branch diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs index 4f3a2dd09..295ceddc9 100644 --- a/Assistant/Threads/Pusher.hs +++ b/Assistant/Threads/Pusher.hs @@ -24,8 +24,8 @@ thisThread :: ThreadName thisThread = "Pusher" {- This thread retries pushes that failed before. -} -pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> NamedThread -pushRetryThread st dstatus pushmap = thread $ runEvery (Seconds halfhour) $ do +pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> PushNotifier -> NamedThread +pushRetryThread st dstatus pushmap pushnotifier = thread $ runEvery (Seconds halfhour) $ do -- We already waited half an hour, now wait until there are failed -- pushes to retry. topush <- getFailedPushesBefore pushmap (fromIntegral halfhour) @@ -37,14 +37,14 @@ pushRetryThread st dstatus pushmap = thread $ runEvery (Seconds halfhour) $ do ] now <- getCurrentTime void $ alertWhile dstatus (pushRetryAlert topush) $ - pushToRemotes thisThread now st (Just pushmap) topush + pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) topush where halfhour = 1800 thread = NamedThread thisThread {- This thread pushes git commits out to remotes soon after they are made. -} -pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> NamedThread -pushThread st dstatus commitchan pushmap = thread $ runEvery (Seconds 2) $ do +pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> PushNotifier -> NamedThread +pushThread st dstatus commitchan pushmap pushnotifier = thread $ runEvery (Seconds 2) $ do -- We already waited two seconds as a simple rate limiter. -- Next, wait until at least one commit has been made commits <- getCommits commitchan @@ -56,7 +56,7 @@ pushThread st dstatus commitchan pushmap = thread $ runEvery (Seconds 2) $ do <$> getDaemonStatus dstatus unless (null remotes) $ void $ alertWhile dstatus (pushAlert remotes) $ - pushToRemotes thisThread now st (Just pushmap) remotes + pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) remotes else do debug thisThread [ "delaying push of" diff --git a/Assistant/Threads/WebApp.hs b/Assistant/Threads/WebApp.hs index c33dc2103..7657fc7b8 100644 --- a/Assistant/Threads/WebApp.hs +++ b/Assistant/Threads/WebApp.hs @@ -24,6 +24,7 @@ import Assistant.WebApp.Configurators.Pairing #ifdef WITH_S3 import Assistant.WebApp.Configurators.S3 #endif +import Assistant.WebApp.Configurators.XMPP import Assistant.WebApp.Documentation import Assistant.WebApp.OtherRepos import Assistant.ThreadedMonad @@ -31,6 +32,7 @@ import Assistant.DaemonStatus import Assistant.ScanRemotes import Assistant.TransferQueue import Assistant.TransferSlots +import Assistant.Pushes import Utility.WebApp import Utility.FileMode import Utility.TempFile @@ -54,17 +56,19 @@ webAppThread -> ScanRemoteMap -> TransferQueue -> TransferSlots + -> PushNotifier -> UrlRenderer -> Maybe (IO String) -> Maybe (Url -> FilePath -> IO ()) -> NamedThread -webAppThread mst dstatus scanremotes transferqueue transferslots urlrenderer postfirstrun onstartup = thread $ do +webAppThread mst dstatus scanremotes transferqueue transferslots pushnotifier urlrenderer postfirstrun onstartup = thread $ do webapp <- WebApp <$> pure mst <*> pure dstatus <*> pure scanremotes <*> pure transferqueue <*> pure transferslots + <*> pure pushnotifier <*> (pack <$> genRandomToken) <*> getreldir mst <*> pure $(embed "static") diff --git a/Assistant/XMPP.hs b/Assistant/XMPP.hs new file mode 100644 index 000000000..2e38189ea --- /dev/null +++ b/Assistant/XMPP.hs @@ -0,0 +1,121 @@ +{- xmpp support + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.XMPP where + +import Assistant.Common +import Utility.FileMode +import Utility.SRV + +import Network.Protocol.XMPP +import Network +import Control.Concurrent +import qualified Data.Text as T +import Data.XML.Types +import Control.Exception (SomeException) + +{- Everything we need to know to connect to an XMPP server. -} +data XMPPCreds = XMPPCreds + { xmppUsername :: T.Text + , xmppPassword :: T.Text + , xmppHostname :: HostName + , xmppPort :: Int + , xmppJID :: T.Text + } + deriving (Read, Show) + +connectXMPP :: XMPPCreds -> (JID -> XMPP a) -> IO (Either SomeException ()) +connectXMPP c a = case parseJID (xmppJID c) of + Nothing -> error "bad JID" + Just jid -> connectXMPP' jid c a + +{- Do a SRV lookup, but if it fails, fall back to the cached xmppHostname. -} +connectXMPP' :: JID -> XMPPCreds -> (JID -> XMPP a) -> IO (Either SomeException ()) +connectXMPP' jid c a = go =<< lookupSRV srvrecord + where + srvrecord = mkSRVTcp "xmpp-client" $ + T.unpack $ strDomain $ jidDomain jid + serverjid = JID Nothing (jidDomain jid) Nothing + + go [] = run (xmppHostname c) + (PortNumber $ fromIntegral $ xmppPort c) + (a jid) + go ((h,p):rest) = do + {- Try each SRV record in turn, until one connects, + - at which point the MVar will be full. -} + mv <- newEmptyMVar + r <- run h p $ do + liftIO $ putMVar mv () + a jid + ifM (isEmptyMVar mv) (go rest, return r) + + {- Async exceptions are let through so the XMPP thread can + - be killed. -} + run h p a' = tryNonAsync $ + runClientError (Server serverjid h p) jid + (xmppUsername c) (xmppPassword c) (void a') + +{- XMPP runClient, that throws errors rather than returning an Either -} +runClientError :: Server -> JID -> T.Text -> T.Text -> XMPP a -> IO a +runClientError s j u p x = either (error . show) return =<< runClient s j u p x + +getXMPPCreds :: Annex (Maybe XMPPCreds) +getXMPPCreds = do + f <- xmppCredsFile + s <- liftIO $ catchMaybeIO $ readFile f + return $ readish =<< s + +setXMPPCreds :: XMPPCreds -> Annex () +setXMPPCreds creds = do + f <- xmppCredsFile + liftIO $ do + h <- openFile f WriteMode + modifyFileMode f $ removeModes + [groupReadMode, otherReadMode] + hPutStr h (show creds) + hClose h + +xmppCredsFile :: Annex FilePath +xmppCredsFile = do + dir <- fromRepo gitAnnexCredsDir + return $ dir </> "notify-xmpp" + +{- Marks the client as extended away. -} +extendedAway :: Element +extendedAway = Element (Name (T.pack "show") Nothing Nothing) [] + [NodeContent $ ContentText $ T.pack "xa"] + +{- Name of a git-annex tag, in our own XML namespace. + - (Not using a namespace URL to avoid unnecessary bloat.) -} +gitAnnexTagName :: Name +gitAnnexTagName = Name (T.pack "git-annex") (Just $ T.pack "git-annex") Nothing + +pushAttr :: Name +pushAttr = Name (T.pack "push") Nothing Nothing + +uuidSep :: T.Text +uuidSep = T.pack "," + +{- git-annex tag with one push attribute per UUID pushed to. -} +encodePushNotification :: [UUID] -> Element +encodePushNotification us = Element gitAnnexTagName + [(pushAttr, [ContentText pushvalue])] [] + where + pushvalue = T.intercalate uuidSep $ + map (T.pack . fromUUID) us + +decodePushNotification :: Element -> Maybe [UUID] +decodePushNotification (Element name attrs _nodes) + | name == gitAnnexTagName && not (null us) = Just us + | otherwise = Nothing + where + us = map (toUUID . T.unpack) $ + concatMap (T.splitOn uuidSep . T.concat . map fromContent . snd) $ + filter ispush attrs + ispush (k, _) = k == pushAttr + fromContent (ContentText t) = t + fromContent (ContentEntity t) = t diff --git a/Build/Configure.hs b/Build/Configure.hs index 96582f923..894feb409 100644 --- a/Build/Configure.hs +++ b/Build/Configure.hs @@ -27,6 +27,7 @@ tests = , TestCase "bup" $ testCmd "bup" "bup --version >/dev/null" , TestCase "gpg" $ testCmd "gpg" "gpg --version >/dev/null" , TestCase "lsof" $ testCmd "lsof" "lsof -v >/dev/null 2>&1" + , TestCase "host" $ testCmd "host" "host localhost >/dev/null 2>&1" , TestCase "ssh connection caching" getSshConnectionCaching ] ++ shaTestCases [ (1, "da39a3ee5e6b4b0d3255bfef95601890afd80709") diff --git a/Command/Status.hs b/Command/Status.hs index ab7dbb007..a16e14317 100644 --- a/Command/Status.hs +++ b/Command/Status.hs @@ -5,11 +5,11 @@ - Licensed under the GNU GPL version 3 or higher. -} -{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE PackageImports, BangPatterns #-} module Command.Status where -import Control.Monad.State.Strict +import "mtl" Control.Monad.State.Strict import qualified Data.Map as M import Text.JSON import Data.Tuple diff --git a/Command/WebApp.hs b/Command/WebApp.hs index f87ea983a..99f94cd2f 100644 --- a/Command/WebApp.hs +++ b/Command/WebApp.hs @@ -15,6 +15,7 @@ import Assistant.DaemonStatus import Assistant.ScanRemotes import Assistant.TransferQueue import Assistant.TransferSlots +import Assistant.Pushes import Assistant.Threads.WebApp import Assistant.WebApp import Assistant.Install @@ -104,11 +105,12 @@ firstRun = do transferqueue <- newTransferQueue transferslots <- newTransferSlots urlrenderer <- newUrlRenderer + pushnotifier <- newPushNotifier v <- newEmptyMVar let callback a = Just $ a v void $ runNamedThread dstatus $ webAppThread Nothing dstatus scanremotes - transferqueue transferslots urlrenderer + transferqueue transferslots pushnotifier urlrenderer (callback signaler) (callback mainthread) where signaler v = do @@ -1,9 +1,11 @@ +{-# LANGUAGE PackageImports #-} + module Common (module X) where import Control.Monad as X hiding (join) import Control.Monad.IfElse as X import Control.Applicative as X -import Control.Monad.State.Strict as X (liftIO) +import "mtl" Control.Monad.State.Strict as X (liftIO) import Control.Exception.Extensible as X (IOException) import Data.Maybe as X @@ -1,13 +1,12 @@ CFLAGS=-Wall GIT_ANNEX_TMP_BUILD_DIR?=tmp -IGNORE=-ignore-package monads-fd -ignore-package monads-tf -BASEFLAGS=-Wall $(IGNORE) -outputdir $(GIT_ANNEX_TMP_BUILD_DIR) -IUtility +BASEFLAGS=-Wall -outputdir $(GIT_ANNEX_TMP_BUILD_DIR) -IUtility # If you get build failures due to missing haskell libraries, # you can turn off some of these features. # # If you're using an old version of yesod, enable -DWITH_OLD_YESOD -FEATURES?=$(GIT_ANNEX_LOCAL_FEATURES) -DWITH_ASSISTANT -DWITH_S3 -DWITH_WEBAPP -DWITH_PAIRING +FEATURES?=$(GIT_ANNEX_LOCAL_FEATURES) -DWITH_ASSISTANT -DWITH_S3 -DWITH_WEBAPP -DWITH_PAIRING -DWITH_XMPP bins=git-annex mans=git-annex.1 git-annex-shell.1 diff --git a/Utility/DBus.hs b/Utility/DBus.hs index a1a4c4804..d31c20d54 100644 --- a/Utility/DBus.hs +++ b/Utility/DBus.hs @@ -9,6 +9,8 @@ module Utility.DBus where +import Utility.Exception + import DBus.Client import DBus import Data.Maybe @@ -70,10 +72,7 @@ persistentClient :: IO (Maybe Address) -> v -> (SomeException -> v -> IO v) -> ( persistentClient getaddr v onretry clientaction = {- runClient can fail with not just ClientError, but also other - things, if dbus is not running. Let async exceptions through. -} - runClient getaddr clientaction `E.catches` - [ Handler (\ (e :: AsyncException) -> E.throw e) - , Handler (\ (e :: SomeException) -> retry e) - ] + runClient getaddr clientaction `catchNonAsync` retry where retry e = do v' <- onretry e v @@ -81,5 +80,5 @@ persistentClient getaddr v onretry clientaction = {- Catches only ClientError -} catchClientError :: IO () -> (ClientError -> IO ()) -> IO () -catchClientError io handler = do +catchClientError io handler = either handler return =<< (E.try io :: IO (Either ClientError ())) diff --git a/Utility/Exception.hs b/Utility/Exception.hs index 8b6077743..45f2aecec 100644 --- a/Utility/Exception.hs +++ b/Utility/Exception.hs @@ -1,10 +1,12 @@ -{- Simple IO exception handling +{- Simple IO exception handling (and some more) - - Copyright 2011-2012 Joey Hess <joey@kitenet.net> - - Licensed under the GNU GPL version 3 or higher. -} +{-# LANGUAGE ScopedTypeVariables #-} + module Utility.Exception where import Prelude hiding (catch) @@ -34,3 +36,16 @@ catchIO = catch {- try specialized for IO errors only -} tryIO :: IO a -> IO (Either IOException a) tryIO = try + +{- Catches all exceptions except for async exceptions. + - This is often better to use than catching them all, so that + - ThreadKilled and UserInterrupt get through. + -} +catchNonAsync :: IO a -> (SomeException -> IO a) -> IO a +catchNonAsync a onerr = a `catches` + [ Handler (\ (e :: AsyncException) -> throw e) + , Handler (\ (e :: SomeException) -> onerr e) + ] + +tryNonAsync :: IO a -> IO (Either SomeException a) +tryNonAsync a = (Right <$> a) `catchNonAsync` (return . Left) diff --git a/Utility/SRV.hs b/Utility/SRV.hs new file mode 100644 index 000000000..4f2db680b --- /dev/null +++ b/Utility/SRV.hs @@ -0,0 +1,82 @@ +{- SRV record lookup + - + - Uses either the ADNS Haskell library, or if it's not installed, + - the host command. + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} + +module Utility.SRV ( + mkSRVTcp, + mkSRV, + lookupSRV, +) where + +import qualified Build.SysConfig +import Utility.Process +import Utility.Exception +import Utility.PartialPrelude + +import Network +import Data.Function +import Data.List +import Control.Applicative +import Data.Maybe + +#ifdef WITH_ADNS +import ADNS.Resolver +import Data.Either +#endif + +newtype SRV = SRV String + deriving (Show, Eq) + +type HostPort = (HostName, PortID) + +mkSRV :: String -> String -> HostName -> SRV +mkSRV transport protocol host = SRV $ concat + ["_", protocol, "._", transport, ".", host] + +mkSRVTcp :: String -> HostName -> SRV +mkSRVTcp = mkSRV "tcp" + +{- Returns an ordered list, with highest priority hosts first. + - + - On error, returns an empty list. -} +lookupSRV :: SRV -> IO [HostPort] +#ifdef WITH_ADNS +lookupSRV srv = initResolver [] $ \resolver -> do + r <- catchDefaultIO (Right []) $ + resolveSRV resolver srv + return $ either (\_ -> []) id r +#else +lookupSRV = lookupSRVHost +#endif + +lookupSRVHost :: SRV -> IO [HostPort] +lookupSRVHost (SRV srv) + | Build.SysConfig.host = catchDefaultIO [] $ + parseSrvHost <$> readProcessEnv "host" ["-t", "SRV", "--", srv] + -- clear environment, to avoid LANG affecting output + (Just []) + | otherwise = return [] + +parseSrvHost :: String -> [HostPort] +parseSrvHost = map snd . reverse . sortBy cost . catMaybes . map parse . lines + where + cost = compare `on` fst + parse l = case words l of + [_, _, _, _, priority, weight, sport, hostname] -> do + let v = readish sport :: Maybe Int + case v of + Nothing -> Nothing + Just port -> Just + ( (priority, weight) + , (hostname, PortNumber $ fromIntegral port) + ) + _ -> Nothing + diff --git a/Utility/State.hs b/Utility/State.hs index c27f3c261..7f8919082 100644 --- a/Utility/State.hs +++ b/Utility/State.hs @@ -5,9 +5,11 @@ - Licensed under the GNU GPL version 3 or higher. -} +{-# LANGUAGE PackageImports #-} + module Utility.State where -import Control.Monad.State.Strict +import "mtl" Control.Monad.State.Strict {- Modifies Control.Monad.State's state, forcing a strict update. - This avoids building thunks in the state and leaking. diff --git a/debian/control b/debian/control index 4de0223f3..eb20d425b 100644 --- a/debian/control +++ b/debian/control @@ -40,6 +40,7 @@ Build-Depends: libghc-network-multicast-dev, libghc-network-info-dev, libghc-safesemaphore-dev, + libghc-network-protocol-xmpp-dev (>= 0.4.3-2), ikiwiki, perlmagick, git, diff --git a/debian/rules b/debian/rules index c0fbd9aa4..4a5532027 100755 --- a/debian/rules +++ b/debian/rules @@ -2,9 +2,9 @@ ARCH = $(shell dpkg-architecture -qDEB_BUILD_ARCH) ifeq (install ok installed,$(shell dpkg-query -W -f '$${Status}' libghc-yesod-dev 2>/dev/null)) -export FEATURES=-DWITH_ASSISTANT -DWITH_S3 -DWITH_OLD_YESOD -DWITH_WEBAPP -DWITH_PAIRING +export FEATURES=-DWITH_ASSISTANT -DWITH_S3 -DWITH_OLD_YESOD -DWITH_WEBAPP -DWITH_PAIRING -DWITH_XMPP else -export FEATURES=-DWITH_ASSISTANT -DWITH_S3 -DWITH_PAIRING +export FEATURES=-DWITH_ASSISTANT -DWITH_S3 -DWITH_PAIRING -DWITH_XMPP endif %: diff --git a/doc/design/assistant/xmpp.mdwn b/doc/design/assistant/xmpp.mdwn index 4a05afab9..19938dfae 100644 --- a/doc/design/assistant/xmpp.mdwn +++ b/doc/design/assistant/xmpp.mdwn @@ -4,20 +4,14 @@ who share a repository, that is stored in the [[cloud]]. ### TODO -* Track down segfault when the XMPP library is starting up a client connection. -* test with big servers, eg google chat * Prevent idle disconnection. Probably means sending or receiving pings, but would prefer to avoid eg pinging every 60 seconds as some clients do. -* Make the git-annex clients invisible, so a user can use their regular - account without always seeming to be present when git-annex is logged in. - See <http://xmpp.org/extensions/xep-0126.html> -* webapp configuration * After pulling from a remote, may need to scan for transfers, which could involve other remotes (ie, S3). Since the remote client is not able to talk to us directly, it won't be able to upload any new files to us. Need a fast way to find new files, and get them transferring. The expensive transfer scan may be needed to get fully in sync, but is too expensive to - run every time this happens. + run every time this happens. Send transfer notifications using XMPP? ## design goals @@ -43,11 +37,12 @@ using presence messages. These always mark it as extended away. To this, it adds its own tag as [extended content](http://xmpp.org/rfcs/rfc6121.html#presence-extended). The xml namespace is "git-annex" (not an URL because I hate wasting bandwidth). -To indicate it's pushed changes to a git repo, a client uses: +To indicate it's pushed changes to a git repo with a given UUID, a client uses: - <git-annex xmlns='git-annex' push="uuid" /> + <git-annex xmlns='git-annex' push="uuid[,uuid...]" /> -The push attribute can be repeated when the push was sent to multiple repos. +Multiple UUIDs can be listed when multiple clients were pushed. If the +git repo does not have a git-annex UUID, an empty string is used. ### security diff --git a/doc/install/fromscratch.mdwn b/doc/install/fromscratch.mdwn index 4410a59b9..f79ae7dc7 100644 --- a/doc/install/fromscratch.mdwn +++ b/doc/install/fromscratch.mdwn @@ -42,6 +42,7 @@ quite a lot. * [clientsession](http://hackage.haskell.org/package/clientsession) * [network-multicast](http://hackage.haskell.org/package/network-multicast) * [network-info](http://hackage.haskell.org/package/network-info) + * [network-protocol-xmpp](http://hackage.haskell.org/package/network-protocol-xmpp) * Shell commands * [git](http://git-scm.com/) * [uuid](http://www.ossp.org/pkg/lib/uuid/) diff --git a/git-annex.cabal b/git-annex.cabal index 5b7e78654..4e910183c 100644 --- a/git-annex.cabal +++ b/git-annex.cabal @@ -43,6 +43,12 @@ Flag Webapp Flag Pairing Description: Enable pairing +Flag XMPP + Description: Enable notifications using XMPP + +Flag Adns + Description: Enable the ADNS library for DNS lookup + Executable git-annex Main-Is: git-annex.hs Build-Depends: MissingH, hslogger, directory, filepath, @@ -91,6 +97,14 @@ Executable git-annex Build-Depends: network-multicast, network-info CPP-Options: -DWITH_PAIRING + if flag(XMPP) && flag(Assistant) + Build-Depends: network-protocol-xmpp, gnutls (>= 0.1.4) + CPP-Options: -DWITH_XMPP + + if flag(XMPP) && flag(Assistant) && flag(Adns) + Build-Depends: hsdns + CPP-Options: -DWITH_ADNS + Test-Suite test Type: exitcode-stdio-1.0 Main-Is: test.hs |