path: root/Assistant
diff options
authorGravatar Joey Hess <>2012-11-03 14:16:17 -0400
committerGravatar Joey Hess <>2012-11-03 14:16:17 -0400
commit1279d72b4e4fe77abb983954dc937021559d4169 (patch)
tree6c7d718be97634ddaaa2a9dd90637363cc0ebeb0 /Assistant
parent85eb13a57a7c0c4f2df46ab4c01c434585370999 (diff)
refactor XMPP client
Diffstat (limited to 'Assistant')
10 files changed, 281 insertions, 167 deletions
diff --git a/Assistant/Monad.hs b/Assistant/Monad.hs
index 64718a7a1..9f5c42aa3 100644
--- a/Assistant/Monad.hs
+++ b/Assistant/Monad.hs
@@ -35,6 +35,7 @@ import Assistant.Types.BranchChange
import Assistant.Types.Commits
import Assistant.Types.Changes
import Assistant.Types.Buddies
+import Assistant.Types.NetMessager
newtype Assistant a = Assistant { mkAssistant :: ReaderT AssistantData IO a }
deriving (
@@ -55,12 +56,12 @@ data AssistantData = AssistantData
, scanRemoteMap :: ScanRemoteMap
, transferQueue :: TransferQueue
, transferSlots :: TransferSlots
- , pushNotifier :: PushNotifier
, failedPushMap :: FailedPushMap
, commitChan :: CommitChan
, changeChan :: ChangeChan
, branchChangeHandle :: BranchChangeHandle
, buddyList :: BuddyList
+ , netMessagerControl :: NetMessagerControl
newAssistantData :: ThreadState -> DaemonStatusHandle -> IO AssistantData
@@ -71,12 +72,12 @@ newAssistantData st dstatus = AssistantData
<*> newScanRemoteMap
<*> newTransferQueue
<*> newTransferSlots
- <*> newPushNotifier
<*> newFailedPushMap
<*> newCommitChan
<*> newChangeChan
<*> newBranchChangeHandle
<*> newBuddyList
+ <*> newNetMessagerControl
runAssistant :: Assistant a -> AssistantData -> IO a
runAssistant a = runReaderT (mkAssistant a)
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs
new file mode 100644
index 000000000..08490e005
--- /dev/null
+++ b/Assistant/NetMessager.hs
@@ -0,0 +1,28 @@
+{- git-annex assistant out of band network messager interface
+ -
+ - Copyright 2012 Joey Hess <>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+module Assistant.NetMessager where
+import Assistant.Common
+import Assistant.Types.NetMessager
+import Control.Concurrent.STM
+import Control.Concurrent.MSampleVar
+sendNetMessage :: NetMessage -> Assistant ()
+sendNetMessage m =
+ (atomically . flip writeTChan m) <<~ (netMessages . netMessagerControl)
+waitNetMessage :: Assistant (NetMessage)
+waitNetMessage = (atomically . readTChan) <<~ (netMessages . netMessagerControl)
+notifyNetMessagerRestart :: Assistant ()
+notifyNetMessagerRestart =
+ flip writeSV () <<~ (netMessagerRestart . netMessagerControl)
+waitNetMessagerRestart :: Assistant ()
+waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessagerControl)
diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs
index 6ac19405a..9765b6a42 100644
--- a/Assistant/Pushes.hs
+++ b/Assistant/Pushes.hs
@@ -9,10 +9,8 @@ module Assistant.Pushes where
import Assistant.Common
import Assistant.Types.Pushes
-import Utility.TSet
import Control.Concurrent.STM
-import Control.Concurrent.MSampleVar
import Data.Time.Clock
import qualified Data.Map as M
@@ -40,15 +38,3 @@ changeFailedPushMap a = do
store v m
| m == M.empty = noop
| otherwise = putTMVar v $! m
-notifyPush :: [UUID] -> Assistant ()
-notifyPush us = flip putTSet us <<~ (pushNotifierSuccesses . pushNotifier)
-waitPush :: Assistant [UUID]
-waitPush = getTSet <<~ (pushNotifierSuccesses . pushNotifier)
-notifyRestart :: Assistant ()
-notifyRestart = flip writeSV () <<~ (pushNotifierWaiter . pushNotifier)
-waitRestart :: Assistant ()
-waitRestart = readSV <<~ (pushNotifierWaiter . pushNotifier)
diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs
index c2c81c57d..1ded407fe 100644
--- a/Assistant/Sync.hs
+++ b/Assistant/Sync.hs
@@ -9,6 +9,8 @@ module Assistant.Sync where
import Assistant.Common
import Assistant.Pushes
+import Assistant.NetMessager
+import Assistant.Types.NetMessager
import Assistant.Alert
import Assistant.DaemonStatus
import Assistant.ScanRemotes
@@ -102,7 +104,8 @@ pushToRemotes now notifypushes remotes = do
if null failed
then do
when notifypushes $
- notifyPush (map Remote.uuid succeeded)
+ sendNetMessage $ NotifyPush $
+ map Remote.uuid succeeded
return True
else if shouldretry
then retry branch g u failed
@@ -124,7 +127,8 @@ pushToRemotes now notifypushes remotes = do
inParallel (pushfallback g u branch) rs
updatemap succeeded failed
when (notifypushes && (not $ null succeeded)) $
- notifyPush (map Remote.uuid succeeded)
+ sendNetMessage $ NotifyPush $
+ map Remote.uuid succeeded
return $ null failed
push g branch remote = Command.Sync.pushBranch remote branch g
diff --git a/Assistant/Threads/NetWatcher.hs b/Assistant/Threads/NetWatcher.hs
index 4396b2632..3b1545543 100644
--- a/Assistant/Threads/NetWatcher.hs
+++ b/Assistant/Threads/NetWatcher.hs
@@ -12,7 +12,7 @@ module Assistant.Threads.NetWatcher where
import Assistant.Common
import Assistant.Sync
-import Assistant.Pushes
+import Assistant.NetMessager
import Utility.ThreadScheduler
import Remote.List
import qualified Types.Remote as Remote
@@ -62,7 +62,7 @@ dbusThread = do
handleconn = do
debug ["detected network connection"]
- notifyRestart
+ notifyNetMessagerRestart
onerr e _ = do
liftAnnex $
diff --git a/Assistant/Threads/PushNotifier.hs b/Assistant/Threads/PushNotifier.hs
deleted file mode 100644
index bdb9e1e12..000000000
--- a/Assistant/Threads/PushNotifier.hs
+++ /dev/null
@@ -1,124 +0,0 @@
-{- git-annex assistant push notification thread, using XMPP
- -
- - This handles both sending outgoing push notifications, and receiving
- - incoming push notifications.
- -
- - Copyright 2012 Joey Hess <>
- -
- - Licensed under the GNU GPL version 3 or higher.
- -}
-module Assistant.Threads.PushNotifier where
-import Assistant.Common
-import Assistant.XMPP
-import Assistant.XMPP.Client
-import Assistant.Pushes
-import Assistant.Types.Buddies
-import Assistant.XMPP.Buddies
-import Assistant.Sync
-import Assistant.DaemonStatus
-import qualified Remote
-import Utility.ThreadScheduler
-import Network.Protocol.XMPP
-import Control.Concurrent
-import qualified Data.Set as S
-import qualified Git.Branch
-import Data.Time.Clock
-pushNotifierThread :: NamedThread
-pushNotifierThread = NamedThread "PushNotifier" $ do
- iodebug <- asIO1 debug
- iopull <- asIO1 pull
- iowaitpush <- asIO waitPush
- ioupdatebuddies <- asIO1 $ \p -> do
- updateBuddyList (updateBuddies p) <<~ buddyList
- debug =<< map show <$> getBuddyList <<~ buddyList
- ioemptybuddies <- asIO $
- updateBuddyList (const noBuddies) <<~ buddyList
- ioclient <- asIO $
- xmppClient iowaitpush iodebug iopull ioupdatebuddies ioemptybuddies
- forever $ do
- tid <- liftIO $ forkIO ioclient
- waitRestart
- liftIO $ killThread tid
- :: (IO [UUID])
- -> ([String] -> IO ())
- -> ([UUID] -> IO ())
- -> (Presence -> IO ())
- -> IO ()
- -> Assistant ()
-xmppClient iowaitpush iodebug iopull ioupdatebuddies ioemptybuddies = do
- v <- liftAnnex getXMPPCreds
- case v of
- Nothing -> noop
- Just c -> liftIO $ loop c =<< getCurrentTime
- where
- loop c starttime = do
- void $ connectXMPP c $ \jid -> do
- fulljid <- bindJID jid
- liftIO $ iodebug ["XMPP connected", show fulljid]
- {- The buddy list starts empty each time the client
- - connects, so that stale info is not retained. -}
- liftIO ioemptybuddies
- putStanza $ gitAnnexPresence gitAnnexSignature
- s <- getSession
- _ <- liftIO $ forkIO $ void $ runXMPP s $
- receivenotifications fulljid
- sendnotifications
- now <- getCurrentTime
- if diffUTCTime now starttime > 300
- then do
- iodebug ["XMPP connection lost; reconnecting"]
- loop c now
- else do
- iodebug ["XMPP connection failed; will retry"]
- threadDelaySeconds (Seconds 300)
- loop c =<< getCurrentTime
- sendnotifications = forever $ do
- us <- liftIO iowaitpush
- putStanza $ gitAnnexPresence $ encodePushNotification us
- receivenotifications fulljid = forever $ do
- s <- getStanza
- liftIO $ iodebug ["received XMPP:", show s]
- case s of
- ReceivedPresence p@(Presence { presenceFrom = from })
- | from == Just fulljid -> noop
- | otherwise -> do
- liftIO $ ioupdatebuddies p
- when (isGitAnnexPresence p) $
- liftIO $ iopull $ concat $ catMaybes $
- map decodePushNotification $
- presencePayloads p
- _ -> noop
-{- We only pull from one remote out of the set listed in the push
- - notification, as an optimisation.
- -
- - Note that it might be possible (though very unlikely) for the push
- - notification to take a while to be sent, and multiple pushes happen
- - before it is sent, so it includes multiple remotes that were pushed
- - to at different times.
- -
- - It could then be the case that the remote we choose had the earlier
- - push sent to it, but then failed to get the later push, and so is not
- - fully up-to-date. If that happens, the pushRetryThread will come along
- - and retry the push, and we'll get another notification once it succeeds,
- - and pull again. -}
-pull :: [UUID] -> Assistant ()
-pull [] = noop
-pull us = do
- rs <- filter matching . syncRemotes <$> getDaemonStatus
- debug $ "push notification for" : map (fromUUID . Remote.uuid ) rs
- pullone rs =<< liftAnnex (inRepo Git.Branch.current)
- where
- matching r = Remote.uuid r `S.member` s
- s = S.fromList us
- pullone [] _ = noop
- pullone (r:rs) branch =
- unlessM (all id . fst <$> manualPull branch [r]) $
- pullone rs branch
diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs
new file mode 100644
index 000000000..7fb3cc874
--- /dev/null
+++ b/Assistant/Threads/XMPPClient.hs
@@ -0,0 +1,155 @@
+{- git-annex XMPP client
+ -
+ - Copyright 2012 Joey Hess <>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+module Assistant.Threads.XMPPClient where
+import Assistant.Common
+import Assistant.XMPP
+import Assistant.XMPP.Client
+import Assistant.NetMessager
+import Assistant.Types.NetMessager
+import Assistant.Types.Buddies
+import Assistant.XMPP.Buddies
+import Assistant.Sync
+import Assistant.DaemonStatus
+import qualified Remote
+import Utility.ThreadScheduler
+import Network.Protocol.XMPP
+import Control.Concurrent
+import qualified Data.Set as S
+import qualified Git.Branch
+import Data.Time.Clock
+xmppClientThread :: NamedThread
+xmppClientThread = NamedThread "XMPPClient" $ do
+ iodebug <- asIO1 debug
+ iopull <- asIO1 pull
+ ioupdatebuddies <- asIO1 $ \p ->
+ updateBuddyList (updateBuddies p) <<~ buddyList
+ ioemptybuddies <- asIO $
+ updateBuddyList (const noBuddies) <<~ buddyList
+ iorelay <- asIO relayNetMessage
+ ioclientthread <- asIO $
+ go iorelay iodebug iopull ioupdatebuddies ioemptybuddies
+ restartableClient ioclientthread
+ where
+ go iorelay iodebug iopull ioupdatebuddies ioemptybuddies = do
+ v <- liftAnnex getXMPPCreds
+ case v of
+ Nothing -> noop
+ Just c -> liftIO $ loop c =<< getCurrentTime
+ where
+ debug' = void . liftIO . iodebug
+ {- When the client exits, it's restarted;
+ - if it keeps failing, back off to wait 5 minutes before
+ - trying it again. -}
+ loop c starttime = do
+ runclient c
+ now <- getCurrentTime
+ if diffUTCTime now starttime > 300
+ then do
+ void $ iodebug ["connection lost; reconnecting"]
+ loop c now
+ else do
+ void $ iodebug ["connection failed; will retry"]
+ threadDelaySeconds (Seconds 300)
+ loop c =<< getCurrentTime
+ runclient c = void $ connectXMPP c $ \jid -> do
+ fulljid <- bindJID jid
+ debug' ["connected", show fulljid]
+ {- The buddy list starts empty each time
+ - the client connects, so that stale info
+ - is not retained. -}
+ void $ liftIO ioemptybuddies
+ putStanza $ gitAnnexPresence gitAnnexSignature
+ xmppThread $ receivenotifications fulljid
+ forever $ do
+ a <- liftIO iorelay
+ a
+ receivenotifications fulljid = forever $ do
+ s <- getStanza
+ let v = decodeStanza fulljid s
+ debug' ["received:", show v]
+ case v of
+ PresenceMessage p -> void $ liftIO $ ioupdatebuddies p
+ PresenceQuery p -> do
+ void $ liftIO $ ioupdatebuddies p
+ putStanza $ gitAnnexPresence gitAnnexSignature
+ PushNotification us -> void $ liftIO $ iopull us
+ Ignorable _ -> noop
+ Unknown _ -> noop
+{- Waits for a NetMessager message to be sent, and relays it to XMPP. -}
+relayNetMessage :: Assistant (XMPP ())
+relayNetMessage = convert <$> waitNetMessage
+ where
+ convert (NotifyPush us) =
+ putStanza $ gitAnnexPresence $ encodePushNotification us
+data DecodedStanza
+ = PresenceMessage Presence
+ | PresenceQuery Presence
+ | PushNotification [UUID]
+ | Ignorable Presence
+ | Unknown ReceivedStanza
+ deriving Show
+decodeStanza :: JID -> ReceivedStanza -> DecodedStanza
+decodeStanza fulljid (ReceivedPresence p)
+ | presenceFrom p == Nothing = Ignorable p
+ | presenceFrom p == Just fulljid = Ignorable p
+ | isPresenceQuery p = PresenceQuery p
+ | null pushed = Ignorable p
+ | otherwise = PushNotification pushed
+ where
+ pushed = concat $ catMaybes $ map decodePushNotification $
+ presencePayloads p
+decodeStanza _ s = Unknown s
+{- Runs the client, handing restart events. -}
+restartableClient :: IO () -> Assistant ()
+restartableClient a = forever $ do
+ tid <- liftIO $ forkIO a
+ waitNetMessagerRestart
+ liftIO $ killThread tid
+{- Runs a XMPP action in a separate thread, using a session to allow it
+ - to access the same XMPP client. -}
+xmppThread :: XMPP () -> XMPP ()
+xmppThread a = do
+ s <- getSession
+ void $ liftIO $ forkIO $
+ void $ runXMPP s a
+{- We only pull from one remote out of the set listed in the push
+ - notification, as an optimisation.
+ -
+ - Note that it might be possible (though very unlikely) for the push
+ - notification to take a while to be sent, and multiple pushes happen
+ - before it is sent, so it includes multiple remotes that were pushed
+ - to at different times.
+ -
+ - It could then be the case that the remote we choose had the earlier
+ - push sent to it, but then failed to get the later push, and so is not
+ - fully up-to-date. If that happens, the pushRetryThread will come along
+ - and retry the push, and we'll get another notification once it succeeds,
+ - and pull again. -}
+pull :: [UUID] -> Assistant ()
+pull [] = noop
+pull us = do
+ rs <- filter matching . syncRemotes <$> getDaemonStatus
+ debug $ "push notification for" : map (fromUUID . Remote.uuid ) rs
+ pullone rs =<< liftAnnex (inRepo Git.Branch.current)
+ where
+ matching r = Remote.uuid r `S.member` s
+ s = S.fromList us
+ pullone [] _ = noop
+ pullone (r:rs) branch =
+ unlessM (all id . fst <$> manualPull branch [r]) $
+ pullone rs branch
diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs
new file mode 100644
index 000000000..fea88a53a
--- /dev/null
+++ b/Assistant/Types/NetMessager.hs
@@ -0,0 +1,31 @@
+{- git-annex assistant out of band network messager types
+ -
+ - Copyright 2012 Joey Hess <>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+module Assistant.Types.NetMessager where
+import Common.Annex
+import Control.Concurrent.STM
+import Control.Concurrent.MSampleVar
+{- Messages that can be sent out of band by a network messager. -}
+data NetMessage = NotifyPush [UUID]
+{- Controls for the XMPP client.
+ -
+ - It can be fed XMPP messages to send.
+ -
+ - It can also be sent a signal when it should restart for some reason. -}
+data NetMessagerControl = NetMessagerControl
+ { netMessages :: TChan (NetMessage)
+ , netMessagerRestart :: MSampleVar ()
+ }
+newNetMessagerControl :: IO NetMessagerControl
+newNetMessagerControl = NetMessagerControl
+ <$> atomically newTChan
+ <*> newEmptySV
diff --git a/Assistant/Types/Pushes.hs b/Assistant/Types/Pushes.hs
index 85362860a..99e0ee162 100644
--- a/Assistant/Types/Pushes.hs
+++ b/Assistant/Types/Pushes.hs
@@ -8,10 +8,8 @@
module Assistant.Types.Pushes where
import Common.Annex
-import Utility.TSet
import Control.Concurrent.STM
-import Control.Concurrent.MSampleVar
import Data.Time.Clock
import qualified Data.Map as M
@@ -19,24 +17,8 @@ import qualified Data.Map as M
type PushMap = M.Map Remote UTCTime
type FailedPushMap = TMVar PushMap
-{- The TSet is recent, successful pushes that other remotes should be
- - notified about.
- -
- - The MSampleVar is written to when the PushNotifier thread should be
- - restarted for some reason.
- -}
-data PushNotifier = PushNotifier
- { pushNotifierSuccesses :: TSet UUID
- , pushNotifierWaiter :: MSampleVar ()
- }
{- The TMVar starts empty, and is left empty when there are no
- failed pushes. This way we can block until there are some failed pushes.
newFailedPushMap :: IO FailedPushMap
newFailedPushMap = atomically newEmptyTMVar
-newPushNotifier :: IO PushNotifier
-newPushNotifier = PushNotifier
- <$> newTSet
- <*> newEmptySV
diff --git a/Assistant/XMPP.hs b/Assistant/XMPP.hs
index 43bf4ac75..05bc94fa3 100644
--- a/Assistant/XMPP.hs
+++ b/Assistant/XMPP.hs
@@ -7,7 +7,9 @@
module Assistant.XMPP where
-import Common.Annex
+import Assistant.Common
+import Annex.UUID
+import Assistant.Pairing
import Network.Protocol.XMPP
import qualified Data.Text as T
@@ -21,24 +23,44 @@ gitAnnexPresence tag = (emptyPresence PresenceAvailable)
extendedAway = Element (Name (T.pack "show") Nothing Nothing) []
[NodeContent $ ContentText $ T.pack "xa"]
-{- Does a presence contain a gitp-annex tag? -}
+{- Does a presence contain a git-annex tag? -}
isGitAnnexPresence :: Presence -> Bool
-isGitAnnexPresence p = any matchingtag (presencePayloads p)
- where
- matchingtag t = elementName t == gitAnnexTagName
+isGitAnnexPresence p = any isGitAnnexTag (presencePayloads p)
{- Name of a git-annex tag, in our own XML namespace.
- (Not using a namespace URL to avoid unnecessary bloat.) -}
gitAnnexTagName :: Name
gitAnnexTagName = Name (T.pack "git-annex") (Just $ T.pack "git-annex") Nothing
+isGitAnnexTag :: Element -> Bool
+isGitAnnexTag t = elementName t == gitAnnexTagName
{- A git-annex tag, to let other clients know we're a git-annex client too. -}
gitAnnexSignature :: Element
gitAnnexSignature = Element gitAnnexTagName [] []
+queryAttr :: Name
+queryAttr = Name (T.pack "query") Nothing Nothing
pushAttr :: Name
pushAttr = Name (T.pack "push") Nothing Nothing
+pairingAttr :: Name
+pairingAttr = Name (T.pack "pairing") Nothing Nothing
+isAttr :: Name -> (Name, [Content]) -> Bool
+isAttr attr (k, _) = k == attr
+getAttr :: Name -> [(Name, [Content])] -> Maybe String
+getAttr wantattr attrs = content <$> headMaybe (filter (isAttr wantattr) attrs)
+ where
+ content (_name, cs) = T.unpack $ T.concat $ map unpack cs
+ unpack (ContentText t) = t
+ unpack (ContentEntity t) = t
+uuidAttr :: Name
+uuidAttr = Name (T.pack "uuid") Nothing Nothing
uuidSep :: T.Text
uuidSep = T.pack ","
@@ -61,3 +83,32 @@ decodePushNotification (Element name attrs _nodes)
ispush (k, _) = k == pushAttr
fromContent (ContentText t) = t
fromContent (ContentEntity t) = t
+{- A request for other git-annex clients to send presence. -}
+presenceQuery :: Presence
+presenceQuery = gitAnnexPresence $ Element gitAnnexTagName
+ [ (queryAttr, [ContentText T.empty]) ]
+ []
+isPresenceQuery :: Presence -> Bool
+isPresenceQuery p = case filter isGitAnnexTag (presencePayloads p) of
+ [] -> False
+ ((Element _name attrs _nodes):_) -> any (isAttr queryAttr) attrs
+{- A notification about a stage of pairing. -}
+pairingNotification :: PairStage -> Annex Presence
+pairingNotification pairstage = do
+ u <- getUUID
+ return $ gitAnnexPresence $ Element gitAnnexTagName
+ [ (pairingAttr, [ContentText $ T.pack $ show pairstage])
+ , (uuidAttr, [ContentText $ T.pack $ fromUUID u])
+ ]
+ []
+isPairingNotification :: Presence -> Maybe (PairStage, UUID)
+isPairingNotification p = case filter isGitAnnexTag (presencePayloads p) of
+ [] -> Nothing
+ ((Element _name attrs _nodes):_) ->
+ (,)
+ <$> (readish =<< getAttr pairingAttr attrs)
+ <*> (toUUID <$> getAttr uuidAttr attrs)