diff options
Diffstat (limited to 'Assistant')
-rw-r--r-- | Assistant/NetMessager.hs | 37 | ||||
-rw-r--r-- | Assistant/Threads/XMPPClient.hs | 63 | ||||
-rw-r--r-- | Assistant/Types/NetMessager.hs | 20 |
3 files changed, 104 insertions, 16 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs index 7098957b3..97d17af6e 100644 --- a/Assistant/NetMessager.hs +++ b/Assistant/NetMessager.hs @@ -15,6 +15,7 @@ import Control.Concurrent.STM import Control.Concurrent.MSampleVar import Control.Exception as E import qualified Data.Set as S +import qualified Data.Map as M sendNetMessage :: NetMessage -> Assistant () sendNetMessage m = @@ -30,6 +31,42 @@ notifyNetMessagerRestart = waitNetMessagerRestart :: Assistant () waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager) +{- Store an important NetMessage for a client, and if the same message was + - already sent, remove it from sentImportantNetMessages. -} +storeImportantNetMessage :: NetMessage -> ClientID -> (ClientID -> Bool) -> Assistant () +storeImportantNetMessage m client matchingclient = go <<~ netMessager + where + go nm = atomically $ do + q <- takeTMVar $ importantNetMessages nm + sent <- takeTMVar $ sentImportantNetMessages nm + putTMVar (importantNetMessages nm) $ + M.alter (Just . maybe (S.singleton m) (S.insert m)) client q + putTMVar (sentImportantNetMessages nm) $ + M.mapWithKey removematching sent + removematching someclient s + | matchingclient someclient = S.delete m s + | otherwise = s + +{- Indicates that an important NetMessage has been sent to a client. -} +sentImportantNetMessage :: NetMessage -> ClientID -> Assistant () +sentImportantNetMessage m client = go <<~ (sentImportantNetMessages . netMessager) + where + go v = atomically $ do + sent <- takeTMVar v + putTMVar v $ + M.alter (Just . maybe (S.singleton m) (S.insert m)) client sent + +{- Checks for important NetMessages that have been stored for a client, and + - sent to a client. Typically the same client for both, although + - a modified or more specific client may need to be used. -} +checkImportantNetMessages :: (ClientID, ClientID) -> Assistant (S.Set NetMessage, S.Set NetMessage) +checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager + where + go nm = atomically $ do + stored <- M.lookup storedclient <$> (readTMVar $ importantNetMessages nm) + sent <- M.lookup sentclient <$> (readTMVar $ sentImportantNetMessages nm) + return (fromMaybe S.empty stored, fromMaybe S.empty sent) + {- Runs an action that runs either the send or receive side of a push. - - While the push is running, netMessagesPush will get messages put into it diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs index 68806ca4b..0b639cf60 100644 --- a/Assistant/Threads/XMPPClient.hs +++ b/Assistant/Threads/XMPPClient.hs @@ -35,7 +35,7 @@ import Data.Time.Clock {- Whether to include verbose protocol dump in debug output. -} protocolDebug :: Bool -protocolDebug = True +protocolDebug = False xmppClientThread :: UrlRenderer -> NamedThread xmppClientThread urlrenderer = namedThread "XMPPClient" $ @@ -97,10 +97,10 @@ xmppClient urlrenderer d creds = inAssistant $ debug ["received:", show l] mapM_ (handle selfjid) l - handle _ (PresenceMessage p) = do - + handle selfjid (PresenceMessage p) = do void $ inAssistant $ updateBuddyList (updateBuddies p) <<~ buddyList + resendImportantMessages selfjid p handle _ (GotNetMessage QueryPresence) = putStanza gitAnnexSignature handle _ (GotNetMessage (NotifyPush us)) = void $ inAssistant $ pull us handle selfjid (GotNetMessage (PairingNotification stage c u)) = @@ -114,6 +114,16 @@ xmppClient urlrenderer d creds = handle _ (Unknown _) = noop handle _ (ProtocolError _) = noop + resendImportantMessages selfjid (Presence { presenceFrom = Just jid }) = do + let c = formatJID jid + (stored, sent) <- inAssistant $ + checkImportantNetMessages (formatJID (baseJID jid), c) + forM_ (S.toList $ S.difference stored sent) $ \msg -> do + inAssistant $ debug ["sending to new client:", show c, show msg] + a <- inAssistant $ convertNetMsg (readdressNetMessage msg c) selfjid + a + inAssistant $ sentImportantNetMessage msg c + resendImportantMessages _ _ = noop data XMPPEvent = GotNetMessage NetMessage @@ -151,21 +161,27 @@ decodeStanza _ s = [Unknown s] - Chat messages must be directed to specific clients, not a base - account JID, due to git-annex clients using a negative presence priority. - PairingNotification messages are always directed at specific - - clients, but Pushing messages are sometimes not, and need to be exploded. + - clients, but Pushing messages are sometimes not, and need to be exploded + - out to specific clients. + - + - Important messages, not directed at any specific client, + - are cached to be sent later when additional clients connect. -} relayNetMessage :: JID -> Assistant (XMPP ()) relayNetMessage selfjid = do msg <- waitNetMessage when protocolDebug $ debug ["sending:", show msg] + handleImportant msg convert msg where - convert (NotifyPush us) = return $ putStanza $ pushNotification us - convert QueryPresence = return $ putStanza presenceQuery - convert (PairingNotification stage c u) = withclient c $ \tojid -> do - changeBuddyPairing tojid True - return $ putStanza $ pairingNotification stage u tojid selfjid - convert (Pushing c pushstage) = withclient c $ \tojid -> do + handleImportant msg = case parseJID =<< isImportantNetMessage msg of + Just tojid + | tojid == baseJID tojid -> + storeImportantNetMessage msg (formatJID tojid) $ + \c -> (baseJID <$> parseJID c) == Just tojid + _ -> noop + convert (Pushing c pushstage) = withOtherClient selfjid c $ \tojid -> do if tojid == baseJID tojid then do clients <- maybe [] (S.toList . buddyAssistants) @@ -175,12 +191,29 @@ relayNetMessage selfjid = do return $ forM_ (clients) $ \(Client jid) -> putStanza $ pushMessage pushstage jid selfjid else return $ putStanza $ pushMessage pushstage tojid selfjid + convert msg = convertNetMsg msg selfjid - withclient c a = case parseJID c of - Nothing -> return noop - Just tojid - | tojid == selfjid -> return noop - | otherwise -> a tojid +{- Converts a NetMessage to an XMPP action. -} +convertNetMsg :: NetMessage -> JID -> Assistant (XMPP ()) +convertNetMsg msg selfjid = convert msg + where + convert (NotifyPush us) = return $ putStanza $ pushNotification us + convert QueryPresence = return $ putStanza presenceQuery + convert (PairingNotification stage c u) = withOtherClient selfjid c $ \tojid -> do + changeBuddyPairing tojid True + return $ putStanza $ pairingNotification stage u tojid selfjid + convert (Pushing c pushstage) = withOtherClient selfjid c $ \tojid -> + return $ putStanza $ pushMessage pushstage tojid selfjid + +withOtherClient :: JID -> ClientID -> (JID -> Assistant (XMPP ())) -> (Assistant (XMPP ())) +withOtherClient selfjid c a = case parseJID c of + Nothing -> return noop + Just tojid + | tojid == selfjid -> return noop + | otherwise -> a tojid + +withClient :: ClientID -> (JID -> XMPP ()) -> XMPP () +withClient c a = maybe noop a $ parseJID c {- Runs a XMPP action in a separate thread, using a session to allow it - to access the same XMPP client. -} diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs index c036d624a..55bf896bd 100644 --- a/Assistant/Types/NetMessager.hs +++ b/Assistant/Types/NetMessager.hs @@ -15,6 +15,7 @@ import Control.Concurrent.STM import Control.Concurrent.MSampleVar import Data.ByteString (ByteString) import qualified Data.Set as S +import qualified Data.Map as M {- Messages that can be sent out of band by a network messager. -} data NetMessage @@ -47,6 +48,18 @@ data PushStage | ReceivePackDone ExitCode deriving (Show, Eq, Ord) +{- NetMessages that are important (and small), and should be stored to be + - resent when new clients are seen. -} +isImportantNetMessage :: NetMessage -> Maybe ClientID +isImportantNetMessage (Pushing c CanPush) = Just c +isImportantNetMessage (Pushing c PushRequest) = Just c +isImportantNetMessage _ = Nothing + +readdressNetMessage :: NetMessage -> ClientID -> NetMessage +readdressNetMessage (PairingNotification stage _ uuid) c = PairingNotification stage c uuid +readdressNetMessage (Pushing _ stage) c = Pushing c stage +readdressNetMessage m _ = m + {- Things that initiate either side of a push, but do not actually send data. -} isPushInitiation :: PushStage -> Bool isPushInitiation CanPush = True @@ -81,6 +94,10 @@ getSide side m = m side data NetMessager = NetMessager -- outgoing messages { netMessages :: TChan (NetMessage) + -- important messages for each client + , importantNetMessages :: TMVar (M.Map ClientID (S.Set NetMessage)) + -- important messages that are believed to have been sent to a client + , sentImportantNetMessages :: TMVar (M.Map ClientID (S.Set NetMessage)) -- write to this to restart the net messager , netMessagerRestart :: MSampleVar () -- only one side of a push can be running at a time @@ -94,8 +111,9 @@ data NetMessager = NetMessager newNetMessager :: IO NetMessager newNetMessager = NetMessager <$> atomically newTChan + <*> atomically (newTMVar M.empty) + <*> atomically (newTMVar M.empty) <*> newEmptySV <*> mkSideMap (newTMVar Nothing) <*> mkSideMap newTChan <*> mkSideMap (newTMVar S.empty) - where |