aboutsummaryrefslogtreecommitdiff
path: root/Assistant
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2013-03-06 21:33:08 -0400
committerGravatar Joey Hess <joey@kitenet.net>2013-03-06 21:38:01 -0400
commitef3ee84450798af5f9908e50e25c8d819594e971 (patch)
tree9e2d38264d8d815f4f39064299a4d1590ca54fa2 /Assistant
parent8f5a55803bc88582220bbbaca85e5025e9d2b053 (diff)
assistant: XMPP git pull and push requests are cached and sent when presence of a new client is detected.
Noticed that, At startup or network reconnect, git push messages were sent, often before presence info has been gathered, so were not sent to any buddies. To fix this, keep track of which buddies have seen such messages, and when new presence is received from a buddy that has not yet seen it, resend. This is done only for push initiation messages, so very little data needs to be stored.
Diffstat (limited to 'Assistant')
-rw-r--r--Assistant/NetMessager.hs37
-rw-r--r--Assistant/Threads/XMPPClient.hs63
-rw-r--r--Assistant/Types/NetMessager.hs20
3 files changed, 104 insertions, 16 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs
index 7098957b3..97d17af6e 100644
--- a/Assistant/NetMessager.hs
+++ b/Assistant/NetMessager.hs
@@ -15,6 +15,7 @@ import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import Control.Exception as E
import qualified Data.Set as S
+import qualified Data.Map as M
sendNetMessage :: NetMessage -> Assistant ()
sendNetMessage m =
@@ -30,6 +31,42 @@ notifyNetMessagerRestart =
waitNetMessagerRestart :: Assistant ()
waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager)
+{- Store an important NetMessage for a client, and if the same message was
+ - already sent, remove it from sentImportantNetMessages. -}
+storeImportantNetMessage :: NetMessage -> ClientID -> (ClientID -> Bool) -> Assistant ()
+storeImportantNetMessage m client matchingclient = go <<~ netMessager
+ where
+ go nm = atomically $ do
+ q <- takeTMVar $ importantNetMessages nm
+ sent <- takeTMVar $ sentImportantNetMessages nm
+ putTMVar (importantNetMessages nm) $
+ M.alter (Just . maybe (S.singleton m) (S.insert m)) client q
+ putTMVar (sentImportantNetMessages nm) $
+ M.mapWithKey removematching sent
+ removematching someclient s
+ | matchingclient someclient = S.delete m s
+ | otherwise = s
+
+{- Indicates that an important NetMessage has been sent to a client. -}
+sentImportantNetMessage :: NetMessage -> ClientID -> Assistant ()
+sentImportantNetMessage m client = go <<~ (sentImportantNetMessages . netMessager)
+ where
+ go v = atomically $ do
+ sent <- takeTMVar v
+ putTMVar v $
+ M.alter (Just . maybe (S.singleton m) (S.insert m)) client sent
+
+{- Checks for important NetMessages that have been stored for a client, and
+ - sent to a client. Typically the same client for both, although
+ - a modified or more specific client may need to be used. -}
+checkImportantNetMessages :: (ClientID, ClientID) -> Assistant (S.Set NetMessage, S.Set NetMessage)
+checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager
+ where
+ go nm = atomically $ do
+ stored <- M.lookup storedclient <$> (readTMVar $ importantNetMessages nm)
+ sent <- M.lookup sentclient <$> (readTMVar $ sentImportantNetMessages nm)
+ return (fromMaybe S.empty stored, fromMaybe S.empty sent)
+
{- Runs an action that runs either the send or receive side of a push.
-
- While the push is running, netMessagesPush will get messages put into it
diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs
index 68806ca4b..0b639cf60 100644
--- a/Assistant/Threads/XMPPClient.hs
+++ b/Assistant/Threads/XMPPClient.hs
@@ -35,7 +35,7 @@ import Data.Time.Clock
{- Whether to include verbose protocol dump in debug output. -}
protocolDebug :: Bool
-protocolDebug = True
+protocolDebug = False
xmppClientThread :: UrlRenderer -> NamedThread
xmppClientThread urlrenderer = namedThread "XMPPClient" $
@@ -97,10 +97,10 @@ xmppClient urlrenderer d creds =
inAssistant $ debug ["received:", show l]
mapM_ (handle selfjid) l
- handle _ (PresenceMessage p) = do
-
+ handle selfjid (PresenceMessage p) = do
void $ inAssistant $
updateBuddyList (updateBuddies p) <<~ buddyList
+ resendImportantMessages selfjid p
handle _ (GotNetMessage QueryPresence) = putStanza gitAnnexSignature
handle _ (GotNetMessage (NotifyPush us)) = void $ inAssistant $ pull us
handle selfjid (GotNetMessage (PairingNotification stage c u)) =
@@ -114,6 +114,16 @@ xmppClient urlrenderer d creds =
handle _ (Unknown _) = noop
handle _ (ProtocolError _) = noop
+ resendImportantMessages selfjid (Presence { presenceFrom = Just jid }) = do
+ let c = formatJID jid
+ (stored, sent) <- inAssistant $
+ checkImportantNetMessages (formatJID (baseJID jid), c)
+ forM_ (S.toList $ S.difference stored sent) $ \msg -> do
+ inAssistant $ debug ["sending to new client:", show c, show msg]
+ a <- inAssistant $ convertNetMsg (readdressNetMessage msg c) selfjid
+ a
+ inAssistant $ sentImportantNetMessage msg c
+ resendImportantMessages _ _ = noop
data XMPPEvent
= GotNetMessage NetMessage
@@ -151,21 +161,27 @@ decodeStanza _ s = [Unknown s]
- Chat messages must be directed to specific clients, not a base
- account JID, due to git-annex clients using a negative presence priority.
- PairingNotification messages are always directed at specific
- - clients, but Pushing messages are sometimes not, and need to be exploded.
+ - clients, but Pushing messages are sometimes not, and need to be exploded
+ - out to specific clients.
+ -
+ - Important messages, not directed at any specific client,
+ - are cached to be sent later when additional clients connect.
-}
relayNetMessage :: JID -> Assistant (XMPP ())
relayNetMessage selfjid = do
msg <- waitNetMessage
when protocolDebug $
debug ["sending:", show msg]
+ handleImportant msg
convert msg
where
- convert (NotifyPush us) = return $ putStanza $ pushNotification us
- convert QueryPresence = return $ putStanza presenceQuery
- convert (PairingNotification stage c u) = withclient c $ \tojid -> do
- changeBuddyPairing tojid True
- return $ putStanza $ pairingNotification stage u tojid selfjid
- convert (Pushing c pushstage) = withclient c $ \tojid -> do
+ handleImportant msg = case parseJID =<< isImportantNetMessage msg of
+ Just tojid
+ | tojid == baseJID tojid ->
+ storeImportantNetMessage msg (formatJID tojid) $
+ \c -> (baseJID <$> parseJID c) == Just tojid
+ _ -> noop
+ convert (Pushing c pushstage) = withOtherClient selfjid c $ \tojid -> do
if tojid == baseJID tojid
then do
clients <- maybe [] (S.toList . buddyAssistants)
@@ -175,12 +191,29 @@ relayNetMessage selfjid = do
return $ forM_ (clients) $ \(Client jid) ->
putStanza $ pushMessage pushstage jid selfjid
else return $ putStanza $ pushMessage pushstage tojid selfjid
+ convert msg = convertNetMsg msg selfjid
- withclient c a = case parseJID c of
- Nothing -> return noop
- Just tojid
- | tojid == selfjid -> return noop
- | otherwise -> a tojid
+{- Converts a NetMessage to an XMPP action. -}
+convertNetMsg :: NetMessage -> JID -> Assistant (XMPP ())
+convertNetMsg msg selfjid = convert msg
+ where
+ convert (NotifyPush us) = return $ putStanza $ pushNotification us
+ convert QueryPresence = return $ putStanza presenceQuery
+ convert (PairingNotification stage c u) = withOtherClient selfjid c $ \tojid -> do
+ changeBuddyPairing tojid True
+ return $ putStanza $ pairingNotification stage u tojid selfjid
+ convert (Pushing c pushstage) = withOtherClient selfjid c $ \tojid ->
+ return $ putStanza $ pushMessage pushstage tojid selfjid
+
+withOtherClient :: JID -> ClientID -> (JID -> Assistant (XMPP ())) -> (Assistant (XMPP ()))
+withOtherClient selfjid c a = case parseJID c of
+ Nothing -> return noop
+ Just tojid
+ | tojid == selfjid -> return noop
+ | otherwise -> a tojid
+
+withClient :: ClientID -> (JID -> XMPP ()) -> XMPP ()
+withClient c a = maybe noop a $ parseJID c
{- Runs a XMPP action in a separate thread, using a session to allow it
- to access the same XMPP client. -}
diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs
index c036d624a..55bf896bd 100644
--- a/Assistant/Types/NetMessager.hs
+++ b/Assistant/Types/NetMessager.hs
@@ -15,6 +15,7 @@ import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import Data.ByteString (ByteString)
import qualified Data.Set as S
+import qualified Data.Map as M
{- Messages that can be sent out of band by a network messager. -}
data NetMessage
@@ -47,6 +48,18 @@ data PushStage
| ReceivePackDone ExitCode
deriving (Show, Eq, Ord)
+{- NetMessages that are important (and small), and should be stored to be
+ - resent when new clients are seen. -}
+isImportantNetMessage :: NetMessage -> Maybe ClientID
+isImportantNetMessage (Pushing c CanPush) = Just c
+isImportantNetMessage (Pushing c PushRequest) = Just c
+isImportantNetMessage _ = Nothing
+
+readdressNetMessage :: NetMessage -> ClientID -> NetMessage
+readdressNetMessage (PairingNotification stage _ uuid) c = PairingNotification stage c uuid
+readdressNetMessage (Pushing _ stage) c = Pushing c stage
+readdressNetMessage m _ = m
+
{- Things that initiate either side of a push, but do not actually send data. -}
isPushInitiation :: PushStage -> Bool
isPushInitiation CanPush = True
@@ -81,6 +94,10 @@ getSide side m = m side
data NetMessager = NetMessager
-- outgoing messages
{ netMessages :: TChan (NetMessage)
+ -- important messages for each client
+ , importantNetMessages :: TMVar (M.Map ClientID (S.Set NetMessage))
+ -- important messages that are believed to have been sent to a client
+ , sentImportantNetMessages :: TMVar (M.Map ClientID (S.Set NetMessage))
-- write to this to restart the net messager
, netMessagerRestart :: MSampleVar ()
-- only one side of a push can be running at a time
@@ -94,8 +111,9 @@ data NetMessager = NetMessager
newNetMessager :: IO NetMessager
newNetMessager = NetMessager
<$> atomically newTChan
+ <*> atomically (newTMVar M.empty)
+ <*> atomically (newTMVar M.empty)
<*> newEmptySV
<*> mkSideMap (newTMVar Nothing)
<*> mkSideMap newTChan
<*> mkSideMap (newTMVar S.empty)
- where