diff options
author | Joey Hess <joey@kitenet.net> | 2013-03-06 21:33:08 -0400 |
---|---|---|
committer | Joey Hess <joey@kitenet.net> | 2013-03-06 21:38:01 -0400 |
commit | ef3ee84450798af5f9908e50e25c8d819594e971 (patch) | |
tree | 9e2d38264d8d815f4f39064299a4d1590ca54fa2 /Assistant | |
parent | 8f5a55803bc88582220bbbaca85e5025e9d2b053 (diff) |
assistant: XMPP git pull and push requests are cached and sent when presence of a new client is detected.
Noticed that, At startup or network reconnect, git push messages were sent,
often before presence info has been gathered, so were not sent to any
buddies.
To fix this, keep track of which buddies have seen such messages,
and when new presence is received from a buddy that has not yet seen it,
resend.
This is done only for push initiation messages, so very little data needs
to be stored.
Diffstat (limited to 'Assistant')
-rw-r--r-- | Assistant/NetMessager.hs | 37 | ||||
-rw-r--r-- | Assistant/Threads/XMPPClient.hs | 63 | ||||
-rw-r--r-- | Assistant/Types/NetMessager.hs | 20 |
3 files changed, 104 insertions, 16 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs index 7098957b3..97d17af6e 100644 --- a/Assistant/NetMessager.hs +++ b/Assistant/NetMessager.hs @@ -15,6 +15,7 @@ import Control.Concurrent.STM import Control.Concurrent.MSampleVar import Control.Exception as E import qualified Data.Set as S +import qualified Data.Map as M sendNetMessage :: NetMessage -> Assistant () sendNetMessage m = @@ -30,6 +31,42 @@ notifyNetMessagerRestart = waitNetMessagerRestart :: Assistant () waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager) +{- Store an important NetMessage for a client, and if the same message was + - already sent, remove it from sentImportantNetMessages. -} +storeImportantNetMessage :: NetMessage -> ClientID -> (ClientID -> Bool) -> Assistant () +storeImportantNetMessage m client matchingclient = go <<~ netMessager + where + go nm = atomically $ do + q <- takeTMVar $ importantNetMessages nm + sent <- takeTMVar $ sentImportantNetMessages nm + putTMVar (importantNetMessages nm) $ + M.alter (Just . maybe (S.singleton m) (S.insert m)) client q + putTMVar (sentImportantNetMessages nm) $ + M.mapWithKey removematching sent + removematching someclient s + | matchingclient someclient = S.delete m s + | otherwise = s + +{- Indicates that an important NetMessage has been sent to a client. -} +sentImportantNetMessage :: NetMessage -> ClientID -> Assistant () +sentImportantNetMessage m client = go <<~ (sentImportantNetMessages . netMessager) + where + go v = atomically $ do + sent <- takeTMVar v + putTMVar v $ + M.alter (Just . maybe (S.singleton m) (S.insert m)) client sent + +{- Checks for important NetMessages that have been stored for a client, and + - sent to a client. Typically the same client for both, although + - a modified or more specific client may need to be used. -} +checkImportantNetMessages :: (ClientID, ClientID) -> Assistant (S.Set NetMessage, S.Set NetMessage) +checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager + where + go nm = atomically $ do + stored <- M.lookup storedclient <$> (readTMVar $ importantNetMessages nm) + sent <- M.lookup sentclient <$> (readTMVar $ sentImportantNetMessages nm) + return (fromMaybe S.empty stored, fromMaybe S.empty sent) + {- Runs an action that runs either the send or receive side of a push. - - While the push is running, netMessagesPush will get messages put into it diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs index 68806ca4b..0b639cf60 100644 --- a/Assistant/Threads/XMPPClient.hs +++ b/Assistant/Threads/XMPPClient.hs @@ -35,7 +35,7 @@ import Data.Time.Clock {- Whether to include verbose protocol dump in debug output. -} protocolDebug :: Bool -protocolDebug = True +protocolDebug = False xmppClientThread :: UrlRenderer -> NamedThread xmppClientThread urlrenderer = namedThread "XMPPClient" $ @@ -97,10 +97,10 @@ xmppClient urlrenderer d creds = inAssistant $ debug ["received:", show l] mapM_ (handle selfjid) l - handle _ (PresenceMessage p) = do - + handle selfjid (PresenceMessage p) = do void $ inAssistant $ updateBuddyList (updateBuddies p) <<~ buddyList + resendImportantMessages selfjid p handle _ (GotNetMessage QueryPresence) = putStanza gitAnnexSignature handle _ (GotNetMessage (NotifyPush us)) = void $ inAssistant $ pull us handle selfjid (GotNetMessage (PairingNotification stage c u)) = @@ -114,6 +114,16 @@ xmppClient urlrenderer d creds = handle _ (Unknown _) = noop handle _ (ProtocolError _) = noop + resendImportantMessages selfjid (Presence { presenceFrom = Just jid }) = do + let c = formatJID jid + (stored, sent) <- inAssistant $ + checkImportantNetMessages (formatJID (baseJID jid), c) + forM_ (S.toList $ S.difference stored sent) $ \msg -> do + inAssistant $ debug ["sending to new client:", show c, show msg] + a <- inAssistant $ convertNetMsg (readdressNetMessage msg c) selfjid + a + inAssistant $ sentImportantNetMessage msg c + resendImportantMessages _ _ = noop data XMPPEvent = GotNetMessage NetMessage @@ -151,21 +161,27 @@ decodeStanza _ s = [Unknown s] - Chat messages must be directed to specific clients, not a base - account JID, due to git-annex clients using a negative presence priority. - PairingNotification messages are always directed at specific - - clients, but Pushing messages are sometimes not, and need to be exploded. + - clients, but Pushing messages are sometimes not, and need to be exploded + - out to specific clients. + - + - Important messages, not directed at any specific client, + - are cached to be sent later when additional clients connect. -} relayNetMessage :: JID -> Assistant (XMPP ()) relayNetMessage selfjid = do msg <- waitNetMessage when protocolDebug $ debug ["sending:", show msg] + handleImportant msg convert msg where - convert (NotifyPush us) = return $ putStanza $ pushNotification us - convert QueryPresence = return $ putStanza presenceQuery - convert (PairingNotification stage c u) = withclient c $ \tojid -> do - changeBuddyPairing tojid True - return $ putStanza $ pairingNotification stage u tojid selfjid - convert (Pushing c pushstage) = withclient c $ \tojid -> do + handleImportant msg = case parseJID =<< isImportantNetMessage msg of + Just tojid + | tojid == baseJID tojid -> + storeImportantNetMessage msg (formatJID tojid) $ + \c -> (baseJID <$> parseJID c) == Just tojid + _ -> noop + convert (Pushing c pushstage) = withOtherClient selfjid c $ \tojid -> do if tojid == baseJID tojid then do clients <- maybe [] (S.toList . buddyAssistants) @@ -175,12 +191,29 @@ relayNetMessage selfjid = do return $ forM_ (clients) $ \(Client jid) -> putStanza $ pushMessage pushstage jid selfjid else return $ putStanza $ pushMessage pushstage tojid selfjid + convert msg = convertNetMsg msg selfjid - withclient c a = case parseJID c of - Nothing -> return noop - Just tojid - | tojid == selfjid -> return noop - | otherwise -> a tojid +{- Converts a NetMessage to an XMPP action. -} +convertNetMsg :: NetMessage -> JID -> Assistant (XMPP ()) +convertNetMsg msg selfjid = convert msg + where + convert (NotifyPush us) = return $ putStanza $ pushNotification us + convert QueryPresence = return $ putStanza presenceQuery + convert (PairingNotification stage c u) = withOtherClient selfjid c $ \tojid -> do + changeBuddyPairing tojid True + return $ putStanza $ pairingNotification stage u tojid selfjid + convert (Pushing c pushstage) = withOtherClient selfjid c $ \tojid -> + return $ putStanza $ pushMessage pushstage tojid selfjid + +withOtherClient :: JID -> ClientID -> (JID -> Assistant (XMPP ())) -> (Assistant (XMPP ())) +withOtherClient selfjid c a = case parseJID c of + Nothing -> return noop + Just tojid + | tojid == selfjid -> return noop + | otherwise -> a tojid + +withClient :: ClientID -> (JID -> XMPP ()) -> XMPP () +withClient c a = maybe noop a $ parseJID c {- Runs a XMPP action in a separate thread, using a session to allow it - to access the same XMPP client. -} diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs index c036d624a..55bf896bd 100644 --- a/Assistant/Types/NetMessager.hs +++ b/Assistant/Types/NetMessager.hs @@ -15,6 +15,7 @@ import Control.Concurrent.STM import Control.Concurrent.MSampleVar import Data.ByteString (ByteString) import qualified Data.Set as S +import qualified Data.Map as M {- Messages that can be sent out of band by a network messager. -} data NetMessage @@ -47,6 +48,18 @@ data PushStage | ReceivePackDone ExitCode deriving (Show, Eq, Ord) +{- NetMessages that are important (and small), and should be stored to be + - resent when new clients are seen. -} +isImportantNetMessage :: NetMessage -> Maybe ClientID +isImportantNetMessage (Pushing c CanPush) = Just c +isImportantNetMessage (Pushing c PushRequest) = Just c +isImportantNetMessage _ = Nothing + +readdressNetMessage :: NetMessage -> ClientID -> NetMessage +readdressNetMessage (PairingNotification stage _ uuid) c = PairingNotification stage c uuid +readdressNetMessage (Pushing _ stage) c = Pushing c stage +readdressNetMessage m _ = m + {- Things that initiate either side of a push, but do not actually send data. -} isPushInitiation :: PushStage -> Bool isPushInitiation CanPush = True @@ -81,6 +94,10 @@ getSide side m = m side data NetMessager = NetMessager -- outgoing messages { netMessages :: TChan (NetMessage) + -- important messages for each client + , importantNetMessages :: TMVar (M.Map ClientID (S.Set NetMessage)) + -- important messages that are believed to have been sent to a client + , sentImportantNetMessages :: TMVar (M.Map ClientID (S.Set NetMessage)) -- write to this to restart the net messager , netMessagerRestart :: MSampleVar () -- only one side of a push can be running at a time @@ -94,8 +111,9 @@ data NetMessager = NetMessager newNetMessager :: IO NetMessager newNetMessager = NetMessager <$> atomically newTChan + <*> atomically (newTMVar M.empty) + <*> atomically (newTMVar M.empty) <*> newEmptySV <*> mkSideMap (newTMVar Nothing) <*> mkSideMap newTChan <*> mkSideMap (newTMVar S.empty) - where |