From 1279d72b4e4fe77abb983954dc937021559d4169 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Sat, 3 Nov 2012 14:16:17 -0400 Subject: refactor XMPP client --- Assistant.hs | 14 ++-- Assistant/Monad.hs | 5 +- Assistant/NetMessager.hs | 28 +++++++ Assistant/Pushes.hs | 14 ---- Assistant/Sync.hs | 8 +- Assistant/Threads/NetWatcher.hs | 4 +- Assistant/Threads/PushNotifier.hs | 124 ------------------------------ Assistant/Threads/XMPPClient.hs | 155 ++++++++++++++++++++++++++++++++++++++ Assistant/Types/NetMessager.hs | 31 ++++++++ Assistant/Types/Pushes.hs | 18 ----- Assistant/XMPP.hs | 61 +++++++++++++-- 11 files changed, 289 insertions(+), 173 deletions(-) create mode 100644 Assistant/NetMessager.hs delete mode 100644 Assistant/Threads/PushNotifier.hs create mode 100644 Assistant/Threads/XMPPClient.hs create mode 100644 Assistant/Types/NetMessager.hs diff --git a/Assistant.hs b/Assistant.hs index 0f0b4c243..a58015c37 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -69,8 +69,8 @@ - Thread 18: ConfigMonitor - Triggered by changes to the git-annex branch, checks for changed - config files, and reloads configs. - - Thread 19: PushNotifier - - Notifies other repositories of pushes, using out of band signaling. + - Thread 19: XMPPClient + - Built-in XMPP client. - Thread 20: WebApp - Spawns more threads as necessary to handle clients. - Displays the DaemonStatus. @@ -105,8 +105,10 @@ - BranchChanged (STM SampleVar) - Changes to the git-annex branch are indicated by updating this - SampleVar. - - PushNotifier (STM TChan) - - After successful pushes, this SampleVar is updated. + - NetMessagerControl (STM TChan, SampleVar) + - Used to feed messages to the built-in XMPP client, and + - signal it when it needs to restart due to configuration or + - networking changes. - UrlRenderer (MVar) - A Yesod route rendering function is stored here. This allows - things that need to render Yesod routes to block until the webapp @@ -135,7 +137,7 @@ import Assistant.Threads.TransferScanner import Assistant.Threads.TransferPoller import Assistant.Threads.ConfigMonitor #ifdef WITH_XMPP -import Assistant.Threads.PushNotifier +import Assistant.Threads.XMPPClient #endif #ifdef WITH_WEBAPP import Assistant.WebApp @@ -204,7 +206,7 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do , assist $ transferScannerThread , assist $ configMonitorThread #ifdef WITH_XMPP - , assist $ pushNotifierThread + , assist $ xmppClientThread #endif , watch $ watchThread ] diff --git a/Assistant/Monad.hs b/Assistant/Monad.hs index 64718a7a1..9f5c42aa3 100644 --- a/Assistant/Monad.hs +++ b/Assistant/Monad.hs @@ -35,6 +35,7 @@ import Assistant.Types.BranchChange import Assistant.Types.Commits import Assistant.Types.Changes import Assistant.Types.Buddies +import Assistant.Types.NetMessager newtype Assistant a = Assistant { mkAssistant :: ReaderT AssistantData IO a } deriving ( @@ -55,12 +56,12 @@ data AssistantData = AssistantData , scanRemoteMap :: ScanRemoteMap , transferQueue :: TransferQueue , transferSlots :: TransferSlots - , pushNotifier :: PushNotifier , failedPushMap :: FailedPushMap , commitChan :: CommitChan , changeChan :: ChangeChan , branchChangeHandle :: BranchChangeHandle , buddyList :: BuddyList + , netMessagerControl :: NetMessagerControl } newAssistantData :: ThreadState -> DaemonStatusHandle -> IO AssistantData @@ -71,12 +72,12 @@ newAssistantData st dstatus = AssistantData <*> newScanRemoteMap <*> newTransferQueue <*> newTransferSlots - <*> newPushNotifier <*> newFailedPushMap <*> newCommitChan <*> newChangeChan <*> newBranchChangeHandle <*> newBuddyList + <*> newNetMessagerControl runAssistant :: Assistant a -> AssistantData -> IO a runAssistant a = runReaderT (mkAssistant a) diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs new file mode 100644 index 000000000..08490e005 --- /dev/null +++ b/Assistant/NetMessager.hs @@ -0,0 +1,28 @@ +{- git-annex assistant out of band network messager interface + - + - Copyright 2012 Joey Hess + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.NetMessager where + +import Assistant.Common +import Assistant.Types.NetMessager + +import Control.Concurrent.STM +import Control.Concurrent.MSampleVar + +sendNetMessage :: NetMessage -> Assistant () +sendNetMessage m = + (atomically . flip writeTChan m) <<~ (netMessages . netMessagerControl) + +waitNetMessage :: Assistant (NetMessage) +waitNetMessage = (atomically . readTChan) <<~ (netMessages . netMessagerControl) + +notifyNetMessagerRestart :: Assistant () +notifyNetMessagerRestart = + flip writeSV () <<~ (netMessagerRestart . netMessagerControl) + +waitNetMessagerRestart :: Assistant () +waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessagerControl) diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs index 6ac19405a..9765b6a42 100644 --- a/Assistant/Pushes.hs +++ b/Assistant/Pushes.hs @@ -9,10 +9,8 @@ module Assistant.Pushes where import Assistant.Common import Assistant.Types.Pushes -import Utility.TSet import Control.Concurrent.STM -import Control.Concurrent.MSampleVar import Data.Time.Clock import qualified Data.Map as M @@ -40,15 +38,3 @@ changeFailedPushMap a = do store v m | m == M.empty = noop | otherwise = putTMVar v $! m - -notifyPush :: [UUID] -> Assistant () -notifyPush us = flip putTSet us <<~ (pushNotifierSuccesses . pushNotifier) - -waitPush :: Assistant [UUID] -waitPush = getTSet <<~ (pushNotifierSuccesses . pushNotifier) - -notifyRestart :: Assistant () -notifyRestart = flip writeSV () <<~ (pushNotifierWaiter . pushNotifier) - -waitRestart :: Assistant () -waitRestart = readSV <<~ (pushNotifierWaiter . pushNotifier) diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs index c2c81c57d..1ded407fe 100644 --- a/Assistant/Sync.hs +++ b/Assistant/Sync.hs @@ -9,6 +9,8 @@ module Assistant.Sync where import Assistant.Common import Assistant.Pushes +import Assistant.NetMessager +import Assistant.Types.NetMessager import Assistant.Alert import Assistant.DaemonStatus import Assistant.ScanRemotes @@ -102,7 +104,8 @@ pushToRemotes now notifypushes remotes = do if null failed then do when notifypushes $ - notifyPush (map Remote.uuid succeeded) + sendNetMessage $ NotifyPush $ + map Remote.uuid succeeded return True else if shouldretry then retry branch g u failed @@ -124,7 +127,8 @@ pushToRemotes now notifypushes remotes = do inParallel (pushfallback g u branch) rs updatemap succeeded failed when (notifypushes && (not $ null succeeded)) $ - notifyPush (map Remote.uuid succeeded) + sendNetMessage $ NotifyPush $ + map Remote.uuid succeeded return $ null failed push g branch remote = Command.Sync.pushBranch remote branch g diff --git a/Assistant/Threads/NetWatcher.hs b/Assistant/Threads/NetWatcher.hs index 4396b2632..3b1545543 100644 --- a/Assistant/Threads/NetWatcher.hs +++ b/Assistant/Threads/NetWatcher.hs @@ -12,7 +12,7 @@ module Assistant.Threads.NetWatcher where import Assistant.Common import Assistant.Sync -import Assistant.Pushes +import Assistant.NetMessager import Utility.ThreadScheduler import Remote.List import qualified Types.Remote as Remote @@ -62,7 +62,7 @@ dbusThread = do ) handleconn = do debug ["detected network connection"] - notifyRestart + notifyNetMessagerRestart handleConnection onerr e _ = do liftAnnex $ diff --git a/Assistant/Threads/PushNotifier.hs b/Assistant/Threads/PushNotifier.hs deleted file mode 100644 index bdb9e1e12..000000000 --- a/Assistant/Threads/PushNotifier.hs +++ /dev/null @@ -1,124 +0,0 @@ -{- git-annex assistant push notification thread, using XMPP - - - - This handles both sending outgoing push notifications, and receiving - - incoming push notifications. - - - - Copyright 2012 Joey Hess - - - - Licensed under the GNU GPL version 3 or higher. - -} - -module Assistant.Threads.PushNotifier where - -import Assistant.Common -import Assistant.XMPP -import Assistant.XMPP.Client -import Assistant.Pushes -import Assistant.Types.Buddies -import Assistant.XMPP.Buddies -import Assistant.Sync -import Assistant.DaemonStatus -import qualified Remote -import Utility.ThreadScheduler - -import Network.Protocol.XMPP -import Control.Concurrent -import qualified Data.Set as S -import qualified Git.Branch -import Data.Time.Clock - -pushNotifierThread :: NamedThread -pushNotifierThread = NamedThread "PushNotifier" $ do - iodebug <- asIO1 debug - iopull <- asIO1 pull - iowaitpush <- asIO waitPush - ioupdatebuddies <- asIO1 $ \p -> do - updateBuddyList (updateBuddies p) <<~ buddyList - debug =<< map show <$> getBuddyList <<~ buddyList - ioemptybuddies <- asIO $ - updateBuddyList (const noBuddies) <<~ buddyList - ioclient <- asIO $ - xmppClient iowaitpush iodebug iopull ioupdatebuddies ioemptybuddies - forever $ do - tid <- liftIO $ forkIO ioclient - waitRestart - liftIO $ killThread tid - -xmppClient - :: (IO [UUID]) - -> ([String] -> IO ()) - -> ([UUID] -> IO ()) - -> (Presence -> IO ()) - -> IO () - -> Assistant () -xmppClient iowaitpush iodebug iopull ioupdatebuddies ioemptybuddies = do - v <- liftAnnex getXMPPCreds - case v of - Nothing -> noop - Just c -> liftIO $ loop c =<< getCurrentTime - where - loop c starttime = do - void $ connectXMPP c $ \jid -> do - fulljid <- bindJID jid - liftIO $ iodebug ["XMPP connected", show fulljid] - {- The buddy list starts empty each time the client - - connects, so that stale info is not retained. -} - liftIO ioemptybuddies - putStanza $ gitAnnexPresence gitAnnexSignature - s <- getSession - _ <- liftIO $ forkIO $ void $ runXMPP s $ - receivenotifications fulljid - sendnotifications - now <- getCurrentTime - if diffUTCTime now starttime > 300 - then do - iodebug ["XMPP connection lost; reconnecting"] - loop c now - else do - iodebug ["XMPP connection failed; will retry"] - threadDelaySeconds (Seconds 300) - loop c =<< getCurrentTime - sendnotifications = forever $ do - us <- liftIO iowaitpush - putStanza $ gitAnnexPresence $ encodePushNotification us - receivenotifications fulljid = forever $ do - s <- getStanza - liftIO $ iodebug ["received XMPP:", show s] - case s of - ReceivedPresence p@(Presence { presenceFrom = from }) - | from == Just fulljid -> noop - | otherwise -> do - liftIO $ ioupdatebuddies p - when (isGitAnnexPresence p) $ - liftIO $ iopull $ concat $ catMaybes $ - map decodePushNotification $ - presencePayloads p - _ -> noop - -{- We only pull from one remote out of the set listed in the push - - notification, as an optimisation. - - - - Note that it might be possible (though very unlikely) for the push - - notification to take a while to be sent, and multiple pushes happen - - before it is sent, so it includes multiple remotes that were pushed - - to at different times. - - - - It could then be the case that the remote we choose had the earlier - - push sent to it, but then failed to get the later push, and so is not - - fully up-to-date. If that happens, the pushRetryThread will come along - - and retry the push, and we'll get another notification once it succeeds, - - and pull again. -} -pull :: [UUID] -> Assistant () -pull [] = noop -pull us = do - rs <- filter matching . syncRemotes <$> getDaemonStatus - debug $ "push notification for" : map (fromUUID . Remote.uuid ) rs - pullone rs =<< liftAnnex (inRepo Git.Branch.current) - where - matching r = Remote.uuid r `S.member` s - s = S.fromList us - - pullone [] _ = noop - pullone (r:rs) branch = - unlessM (all id . fst <$> manualPull branch [r]) $ - pullone rs branch diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs new file mode 100644 index 000000000..7fb3cc874 --- /dev/null +++ b/Assistant/Threads/XMPPClient.hs @@ -0,0 +1,155 @@ +{- git-annex XMPP client + - + - Copyright 2012 Joey Hess + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.XMPPClient where + +import Assistant.Common +import Assistant.XMPP +import Assistant.XMPP.Client +import Assistant.NetMessager +import Assistant.Types.NetMessager +import Assistant.Types.Buddies +import Assistant.XMPP.Buddies +import Assistant.Sync +import Assistant.DaemonStatus +import qualified Remote +import Utility.ThreadScheduler + +import Network.Protocol.XMPP +import Control.Concurrent +import qualified Data.Set as S +import qualified Git.Branch +import Data.Time.Clock + +xmppClientThread :: NamedThread +xmppClientThread = NamedThread "XMPPClient" $ do + iodebug <- asIO1 debug + iopull <- asIO1 pull + ioupdatebuddies <- asIO1 $ \p -> + updateBuddyList (updateBuddies p) <<~ buddyList + ioemptybuddies <- asIO $ + updateBuddyList (const noBuddies) <<~ buddyList + iorelay <- asIO relayNetMessage + ioclientthread <- asIO $ + go iorelay iodebug iopull ioupdatebuddies ioemptybuddies + restartableClient ioclientthread + where + go iorelay iodebug iopull ioupdatebuddies ioemptybuddies = do + v <- liftAnnex getXMPPCreds + case v of + Nothing -> noop + Just c -> liftIO $ loop c =<< getCurrentTime + where + debug' = void . liftIO . iodebug + {- When the client exits, it's restarted; + - if it keeps failing, back off to wait 5 minutes before + - trying it again. -} + loop c starttime = do + runclient c + now <- getCurrentTime + if diffUTCTime now starttime > 300 + then do + void $ iodebug ["connection lost; reconnecting"] + loop c now + else do + void $ iodebug ["connection failed; will retry"] + threadDelaySeconds (Seconds 300) + loop c =<< getCurrentTime + runclient c = void $ connectXMPP c $ \jid -> do + fulljid <- bindJID jid + debug' ["connected", show fulljid] + {- The buddy list starts empty each time + - the client connects, so that stale info + - is not retained. -} + void $ liftIO ioemptybuddies + putStanza $ gitAnnexPresence gitAnnexSignature + xmppThread $ receivenotifications fulljid + forever $ do + a <- liftIO iorelay + a + receivenotifications fulljid = forever $ do + s <- getStanza + let v = decodeStanza fulljid s + debug' ["received:", show v] + case v of + PresenceMessage p -> void $ liftIO $ ioupdatebuddies p + PresenceQuery p -> do + void $ liftIO $ ioupdatebuddies p + putStanza $ gitAnnexPresence gitAnnexSignature + PushNotification us -> void $ liftIO $ iopull us + Ignorable _ -> noop + Unknown _ -> noop + +{- Waits for a NetMessager message to be sent, and relays it to XMPP. -} +relayNetMessage :: Assistant (XMPP ()) +relayNetMessage = convert <$> waitNetMessage + where + convert (NotifyPush us) = + putStanza $ gitAnnexPresence $ encodePushNotification us + +data DecodedStanza + = PresenceMessage Presence + | PresenceQuery Presence + | PushNotification [UUID] + | Ignorable Presence + | Unknown ReceivedStanza + deriving Show + +decodeStanza :: JID -> ReceivedStanza -> DecodedStanza +decodeStanza fulljid (ReceivedPresence p) + | presenceFrom p == Nothing = Ignorable p + | presenceFrom p == Just fulljid = Ignorable p + | isPresenceQuery p = PresenceQuery p + | null pushed = Ignorable p + | otherwise = PushNotification pushed + where + pushed = concat $ catMaybes $ map decodePushNotification $ + presencePayloads p +decodeStanza _ s = Unknown s + +{- Runs the client, handing restart events. -} +restartableClient :: IO () -> Assistant () +restartableClient a = forever $ do + tid <- liftIO $ forkIO a + waitNetMessagerRestart + liftIO $ killThread tid + +{- Runs a XMPP action in a separate thread, using a session to allow it + - to access the same XMPP client. -} +xmppThread :: XMPP () -> XMPP () +xmppThread a = do + s <- getSession + void $ liftIO $ forkIO $ + void $ runXMPP s a + +{- We only pull from one remote out of the set listed in the push + - notification, as an optimisation. + - + - Note that it might be possible (though very unlikely) for the push + - notification to take a while to be sent, and multiple pushes happen + - before it is sent, so it includes multiple remotes that were pushed + - to at different times. + - + - It could then be the case that the remote we choose had the earlier + - push sent to it, but then failed to get the later push, and so is not + - fully up-to-date. If that happens, the pushRetryThread will come along + - and retry the push, and we'll get another notification once it succeeds, + - and pull again. -} +pull :: [UUID] -> Assistant () +pull [] = noop +pull us = do + rs <- filter matching . syncRemotes <$> getDaemonStatus + debug $ "push notification for" : map (fromUUID . Remote.uuid ) rs + pullone rs =<< liftAnnex (inRepo Git.Branch.current) + where + matching r = Remote.uuid r `S.member` s + s = S.fromList us + + pullone [] _ = noop + pullone (r:rs) branch = + unlessM (all id . fst <$> manualPull branch [r]) $ + pullone rs branch diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs new file mode 100644 index 000000000..fea88a53a --- /dev/null +++ b/Assistant/Types/NetMessager.hs @@ -0,0 +1,31 @@ +{- git-annex assistant out of band network messager types + - + - Copyright 2012 Joey Hess + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Types.NetMessager where + +import Common.Annex + +import Control.Concurrent.STM +import Control.Concurrent.MSampleVar + +{- Messages that can be sent out of band by a network messager. -} +data NetMessage = NotifyPush [UUID] + +{- Controls for the XMPP client. + - + - It can be fed XMPP messages to send. + - + - It can also be sent a signal when it should restart for some reason. -} +data NetMessagerControl = NetMessagerControl + { netMessages :: TChan (NetMessage) + , netMessagerRestart :: MSampleVar () + } + +newNetMessagerControl :: IO NetMessagerControl +newNetMessagerControl = NetMessagerControl + <$> atomically newTChan + <*> newEmptySV diff --git a/Assistant/Types/Pushes.hs b/Assistant/Types/Pushes.hs index 85362860a..99e0ee162 100644 --- a/Assistant/Types/Pushes.hs +++ b/Assistant/Types/Pushes.hs @@ -8,10 +8,8 @@ module Assistant.Types.Pushes where import Common.Annex -import Utility.TSet import Control.Concurrent.STM -import Control.Concurrent.MSampleVar import Data.Time.Clock import qualified Data.Map as M @@ -19,24 +17,8 @@ import qualified Data.Map as M type PushMap = M.Map Remote UTCTime type FailedPushMap = TMVar PushMap -{- The TSet is recent, successful pushes that other remotes should be - - notified about. - - - - The MSampleVar is written to when the PushNotifier thread should be - - restarted for some reason. - -} -data PushNotifier = PushNotifier - { pushNotifierSuccesses :: TSet UUID - , pushNotifierWaiter :: MSampleVar () - } - {- The TMVar starts empty, and is left empty when there are no - failed pushes. This way we can block until there are some failed pushes. -} newFailedPushMap :: IO FailedPushMap newFailedPushMap = atomically newEmptyTMVar - -newPushNotifier :: IO PushNotifier -newPushNotifier = PushNotifier - <$> newTSet - <*> newEmptySV diff --git a/Assistant/XMPP.hs b/Assistant/XMPP.hs index 43bf4ac75..05bc94fa3 100644 --- a/Assistant/XMPP.hs +++ b/Assistant/XMPP.hs @@ -7,7 +7,9 @@ module Assistant.XMPP where -import Common.Annex +import Assistant.Common +import Annex.UUID +import Assistant.Pairing import Network.Protocol.XMPP import qualified Data.Text as T @@ -21,24 +23,44 @@ gitAnnexPresence tag = (emptyPresence PresenceAvailable) extendedAway = Element (Name (T.pack "show") Nothing Nothing) [] [NodeContent $ ContentText $ T.pack "xa"] -{- Does a presence contain a gitp-annex tag? -} +{- Does a presence contain a git-annex tag? -} isGitAnnexPresence :: Presence -> Bool -isGitAnnexPresence p = any matchingtag (presencePayloads p) - where - matchingtag t = elementName t == gitAnnexTagName +isGitAnnexPresence p = any isGitAnnexTag (presencePayloads p) {- Name of a git-annex tag, in our own XML namespace. - (Not using a namespace URL to avoid unnecessary bloat.) -} gitAnnexTagName :: Name gitAnnexTagName = Name (T.pack "git-annex") (Just $ T.pack "git-annex") Nothing +isGitAnnexTag :: Element -> Bool +isGitAnnexTag t = elementName t == gitAnnexTagName + {- A git-annex tag, to let other clients know we're a git-annex client too. -} gitAnnexSignature :: Element gitAnnexSignature = Element gitAnnexTagName [] [] +queryAttr :: Name +queryAttr = Name (T.pack "query") Nothing Nothing + pushAttr :: Name pushAttr = Name (T.pack "push") Nothing Nothing +pairingAttr :: Name +pairingAttr = Name (T.pack "pairing") Nothing Nothing + +isAttr :: Name -> (Name, [Content]) -> Bool +isAttr attr (k, _) = k == attr + +getAttr :: Name -> [(Name, [Content])] -> Maybe String +getAttr wantattr attrs = content <$> headMaybe (filter (isAttr wantattr) attrs) + where + content (_name, cs) = T.unpack $ T.concat $ map unpack cs + unpack (ContentText t) = t + unpack (ContentEntity t) = t + +uuidAttr :: Name +uuidAttr = Name (T.pack "uuid") Nothing Nothing + uuidSep :: T.Text uuidSep = T.pack "," @@ -61,3 +83,32 @@ decodePushNotification (Element name attrs _nodes) ispush (k, _) = k == pushAttr fromContent (ContentText t) = t fromContent (ContentEntity t) = t + +{- A request for other git-annex clients to send presence. -} +presenceQuery :: Presence +presenceQuery = gitAnnexPresence $ Element gitAnnexTagName + [ (queryAttr, [ContentText T.empty]) ] + [] + +isPresenceQuery :: Presence -> Bool +isPresenceQuery p = case filter isGitAnnexTag (presencePayloads p) of + [] -> False + ((Element _name attrs _nodes):_) -> any (isAttr queryAttr) attrs + +{- A notification about a stage of pairing. -} +pairingNotification :: PairStage -> Annex Presence +pairingNotification pairstage = do + u <- getUUID + return $ gitAnnexPresence $ Element gitAnnexTagName + [ (pairingAttr, [ContentText $ T.pack $ show pairstage]) + , (uuidAttr, [ContentText $ T.pack $ fromUUID u]) + ] + [] + +isPairingNotification :: Presence -> Maybe (PairStage, UUID) +isPairingNotification p = case filter isGitAnnexTag (presencePayloads p) of + [] -> Nothing + ((Element _name attrs _nodes):_) -> + (,) + <$> (readish =<< getAttr pairingAttr attrs) + <*> (toUUID <$> getAttr uuidAttr attrs) -- cgit v1.2.3