summaryrefslogtreecommitdiff
path: root/Assistant/Threads
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2012-11-03 14:16:17 -0400
committerGravatar Joey Hess <joey@kitenet.net>2012-11-03 14:16:17 -0400
commit1279d72b4e4fe77abb983954dc937021559d4169 (patch)
tree6c7d718be97634ddaaa2a9dd90637363cc0ebeb0 /Assistant/Threads
parent85eb13a57a7c0c4f2df46ab4c01c434585370999 (diff)
refactor XMPP client
Diffstat (limited to 'Assistant/Threads')
-rw-r--r--Assistant/Threads/NetWatcher.hs4
-rw-r--r--Assistant/Threads/PushNotifier.hs124
-rw-r--r--Assistant/Threads/XMPPClient.hs155
3 files changed, 157 insertions, 126 deletions
diff --git a/Assistant/Threads/NetWatcher.hs b/Assistant/Threads/NetWatcher.hs
index 4396b2632..3b1545543 100644
--- a/Assistant/Threads/NetWatcher.hs
+++ b/Assistant/Threads/NetWatcher.hs
@@ -12,7 +12,7 @@ module Assistant.Threads.NetWatcher where
import Assistant.Common
import Assistant.Sync
-import Assistant.Pushes
+import Assistant.NetMessager
import Utility.ThreadScheduler
import Remote.List
import qualified Types.Remote as Remote
@@ -62,7 +62,7 @@ dbusThread = do
)
handleconn = do
debug ["detected network connection"]
- notifyRestart
+ notifyNetMessagerRestart
handleConnection
onerr e _ = do
liftAnnex $
diff --git a/Assistant/Threads/PushNotifier.hs b/Assistant/Threads/PushNotifier.hs
deleted file mode 100644
index bdb9e1e12..000000000
--- a/Assistant/Threads/PushNotifier.hs
+++ /dev/null
@@ -1,124 +0,0 @@
-{- git-annex assistant push notification thread, using XMPP
- -
- - This handles both sending outgoing push notifications, and receiving
- - incoming push notifications.
- -
- - Copyright 2012 Joey Hess <joey@kitenet.net>
- -
- - Licensed under the GNU GPL version 3 or higher.
- -}
-
-module Assistant.Threads.PushNotifier where
-
-import Assistant.Common
-import Assistant.XMPP
-import Assistant.XMPP.Client
-import Assistant.Pushes
-import Assistant.Types.Buddies
-import Assistant.XMPP.Buddies
-import Assistant.Sync
-import Assistant.DaemonStatus
-import qualified Remote
-import Utility.ThreadScheduler
-
-import Network.Protocol.XMPP
-import Control.Concurrent
-import qualified Data.Set as S
-import qualified Git.Branch
-import Data.Time.Clock
-
-pushNotifierThread :: NamedThread
-pushNotifierThread = NamedThread "PushNotifier" $ do
- iodebug <- asIO1 debug
- iopull <- asIO1 pull
- iowaitpush <- asIO waitPush
- ioupdatebuddies <- asIO1 $ \p -> do
- updateBuddyList (updateBuddies p) <<~ buddyList
- debug =<< map show <$> getBuddyList <<~ buddyList
- ioemptybuddies <- asIO $
- updateBuddyList (const noBuddies) <<~ buddyList
- ioclient <- asIO $
- xmppClient iowaitpush iodebug iopull ioupdatebuddies ioemptybuddies
- forever $ do
- tid <- liftIO $ forkIO ioclient
- waitRestart
- liftIO $ killThread tid
-
-xmppClient
- :: (IO [UUID])
- -> ([String] -> IO ())
- -> ([UUID] -> IO ())
- -> (Presence -> IO ())
- -> IO ()
- -> Assistant ()
-xmppClient iowaitpush iodebug iopull ioupdatebuddies ioemptybuddies = do
- v <- liftAnnex getXMPPCreds
- case v of
- Nothing -> noop
- Just c -> liftIO $ loop c =<< getCurrentTime
- where
- loop c starttime = do
- void $ connectXMPP c $ \jid -> do
- fulljid <- bindJID jid
- liftIO $ iodebug ["XMPP connected", show fulljid]
- {- The buddy list starts empty each time the client
- - connects, so that stale info is not retained. -}
- liftIO ioemptybuddies
- putStanza $ gitAnnexPresence gitAnnexSignature
- s <- getSession
- _ <- liftIO $ forkIO $ void $ runXMPP s $
- receivenotifications fulljid
- sendnotifications
- now <- getCurrentTime
- if diffUTCTime now starttime > 300
- then do
- iodebug ["XMPP connection lost; reconnecting"]
- loop c now
- else do
- iodebug ["XMPP connection failed; will retry"]
- threadDelaySeconds (Seconds 300)
- loop c =<< getCurrentTime
- sendnotifications = forever $ do
- us <- liftIO iowaitpush
- putStanza $ gitAnnexPresence $ encodePushNotification us
- receivenotifications fulljid = forever $ do
- s <- getStanza
- liftIO $ iodebug ["received XMPP:", show s]
- case s of
- ReceivedPresence p@(Presence { presenceFrom = from })
- | from == Just fulljid -> noop
- | otherwise -> do
- liftIO $ ioupdatebuddies p
- when (isGitAnnexPresence p) $
- liftIO $ iopull $ concat $ catMaybes $
- map decodePushNotification $
- presencePayloads p
- _ -> noop
-
-{- We only pull from one remote out of the set listed in the push
- - notification, as an optimisation.
- -
- - Note that it might be possible (though very unlikely) for the push
- - notification to take a while to be sent, and multiple pushes happen
- - before it is sent, so it includes multiple remotes that were pushed
- - to at different times.
- -
- - It could then be the case that the remote we choose had the earlier
- - push sent to it, but then failed to get the later push, and so is not
- - fully up-to-date. If that happens, the pushRetryThread will come along
- - and retry the push, and we'll get another notification once it succeeds,
- - and pull again. -}
-pull :: [UUID] -> Assistant ()
-pull [] = noop
-pull us = do
- rs <- filter matching . syncRemotes <$> getDaemonStatus
- debug $ "push notification for" : map (fromUUID . Remote.uuid ) rs
- pullone rs =<< liftAnnex (inRepo Git.Branch.current)
- where
- matching r = Remote.uuid r `S.member` s
- s = S.fromList us
-
- pullone [] _ = noop
- pullone (r:rs) branch =
- unlessM (all id . fst <$> manualPull branch [r]) $
- pullone rs branch
diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs
new file mode 100644
index 000000000..7fb3cc874
--- /dev/null
+++ b/Assistant/Threads/XMPPClient.hs
@@ -0,0 +1,155 @@
+{- git-annex XMPP client
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.XMPPClient where
+
+import Assistant.Common
+import Assistant.XMPP
+import Assistant.XMPP.Client
+import Assistant.NetMessager
+import Assistant.Types.NetMessager
+import Assistant.Types.Buddies
+import Assistant.XMPP.Buddies
+import Assistant.Sync
+import Assistant.DaemonStatus
+import qualified Remote
+import Utility.ThreadScheduler
+
+import Network.Protocol.XMPP
+import Control.Concurrent
+import qualified Data.Set as S
+import qualified Git.Branch
+import Data.Time.Clock
+
+xmppClientThread :: NamedThread
+xmppClientThread = NamedThread "XMPPClient" $ do
+ iodebug <- asIO1 debug
+ iopull <- asIO1 pull
+ ioupdatebuddies <- asIO1 $ \p ->
+ updateBuddyList (updateBuddies p) <<~ buddyList
+ ioemptybuddies <- asIO $
+ updateBuddyList (const noBuddies) <<~ buddyList
+ iorelay <- asIO relayNetMessage
+ ioclientthread <- asIO $
+ go iorelay iodebug iopull ioupdatebuddies ioemptybuddies
+ restartableClient ioclientthread
+ where
+ go iorelay iodebug iopull ioupdatebuddies ioemptybuddies = do
+ v <- liftAnnex getXMPPCreds
+ case v of
+ Nothing -> noop
+ Just c -> liftIO $ loop c =<< getCurrentTime
+ where
+ debug' = void . liftIO . iodebug
+ {- When the client exits, it's restarted;
+ - if it keeps failing, back off to wait 5 minutes before
+ - trying it again. -}
+ loop c starttime = do
+ runclient c
+ now <- getCurrentTime
+ if diffUTCTime now starttime > 300
+ then do
+ void $ iodebug ["connection lost; reconnecting"]
+ loop c now
+ else do
+ void $ iodebug ["connection failed; will retry"]
+ threadDelaySeconds (Seconds 300)
+ loop c =<< getCurrentTime
+ runclient c = void $ connectXMPP c $ \jid -> do
+ fulljid <- bindJID jid
+ debug' ["connected", show fulljid]
+ {- The buddy list starts empty each time
+ - the client connects, so that stale info
+ - is not retained. -}
+ void $ liftIO ioemptybuddies
+ putStanza $ gitAnnexPresence gitAnnexSignature
+ xmppThread $ receivenotifications fulljid
+ forever $ do
+ a <- liftIO iorelay
+ a
+ receivenotifications fulljid = forever $ do
+ s <- getStanza
+ let v = decodeStanza fulljid s
+ debug' ["received:", show v]
+ case v of
+ PresenceMessage p -> void $ liftIO $ ioupdatebuddies p
+ PresenceQuery p -> do
+ void $ liftIO $ ioupdatebuddies p
+ putStanza $ gitAnnexPresence gitAnnexSignature
+ PushNotification us -> void $ liftIO $ iopull us
+ Ignorable _ -> noop
+ Unknown _ -> noop
+
+{- Waits for a NetMessager message to be sent, and relays it to XMPP. -}
+relayNetMessage :: Assistant (XMPP ())
+relayNetMessage = convert <$> waitNetMessage
+ where
+ convert (NotifyPush us) =
+ putStanza $ gitAnnexPresence $ encodePushNotification us
+
+data DecodedStanza
+ = PresenceMessage Presence
+ | PresenceQuery Presence
+ | PushNotification [UUID]
+ | Ignorable Presence
+ | Unknown ReceivedStanza
+ deriving Show
+
+decodeStanza :: JID -> ReceivedStanza -> DecodedStanza
+decodeStanza fulljid (ReceivedPresence p)
+ | presenceFrom p == Nothing = Ignorable p
+ | presenceFrom p == Just fulljid = Ignorable p
+ | isPresenceQuery p = PresenceQuery p
+ | null pushed = Ignorable p
+ | otherwise = PushNotification pushed
+ where
+ pushed = concat $ catMaybes $ map decodePushNotification $
+ presencePayloads p
+decodeStanza _ s = Unknown s
+
+{- Runs the client, handing restart events. -}
+restartableClient :: IO () -> Assistant ()
+restartableClient a = forever $ do
+ tid <- liftIO $ forkIO a
+ waitNetMessagerRestart
+ liftIO $ killThread tid
+
+{- Runs a XMPP action in a separate thread, using a session to allow it
+ - to access the same XMPP client. -}
+xmppThread :: XMPP () -> XMPP ()
+xmppThread a = do
+ s <- getSession
+ void $ liftIO $ forkIO $
+ void $ runXMPP s a
+
+{- We only pull from one remote out of the set listed in the push
+ - notification, as an optimisation.
+ -
+ - Note that it might be possible (though very unlikely) for the push
+ - notification to take a while to be sent, and multiple pushes happen
+ - before it is sent, so it includes multiple remotes that were pushed
+ - to at different times.
+ -
+ - It could then be the case that the remote we choose had the earlier
+ - push sent to it, but then failed to get the later push, and so is not
+ - fully up-to-date. If that happens, the pushRetryThread will come along
+ - and retry the push, and we'll get another notification once it succeeds,
+ - and pull again. -}
+pull :: [UUID] -> Assistant ()
+pull [] = noop
+pull us = do
+ rs <- filter matching . syncRemotes <$> getDaemonStatus
+ debug $ "push notification for" : map (fromUUID . Remote.uuid ) rs
+ pullone rs =<< liftAnnex (inRepo Git.Branch.current)
+ where
+ matching r = Remote.uuid r `S.member` s
+ s = S.fromList us
+
+ pullone [] _ = noop
+ pullone (r:rs) branch =
+ unlessM (all id . fst <$> manualPull branch [r]) $
+ pullone rs branch