summaryrefslogtreecommitdiff
path: root/Assistant/NetMessager.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Assistant/NetMessager.hs')
-rw-r--r--Assistant/NetMessager.hs73
1 files changed, 36 insertions, 37 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs
index c3bd73c57..e3ef89b04 100644
--- a/Assistant/NetMessager.hs
+++ b/Assistant/NetMessager.hs
@@ -33,69 +33,68 @@ notifyNetMessagerRestart =
waitNetMessagerRestart :: Assistant ()
waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager)
-getPushRunning :: Assistant PushRunning
-getPushRunning =
- (atomically . readTMVar) <<~ (netMessagerPushRunning . netMessager)
-
-{- Runs an action that runs either the send or receive end of a push.
+{- Runs an action that runs either the send or receive side of a push.
-
- While the push is running, netMessagesPush will get messages put into it
- relating to this push, while any messages relating to other pushes
- - go to netMessagesDeferred. Once the push finishes, those deferred
- - messages will be fed to handledeferred for processing.
+ - on the same side go to netMessagesDeferred. Once the push finishes,
+ - those deferred messages will be fed to handledeferred for processing.
-}
-runPush :: PushRunning -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a
-runPush v handledeferred a = do
+runPush :: PushSide -> ClientID -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a
+runPush side clientid handledeferred a = do
nm <- getAssistant netMessager
- let pr = netMessagerPushRunning nm
- let setup = void $ atomically $ swapTMVar pr v
+ let runningv = getSide side $ netMessagerPushRunning nm
+ let setup = void $ atomically $ swapTMVar runningv $ Just clientid
let cleanup = atomically $ do
- void $ swapTMVar pr NoPushRunning
- emptytchan (netMessagesPush nm)
+ void $ swapTMVar runningv Nothing
+ emptytchan (getSide side $ netMessagesPush nm)
r <- E.bracket_ setup cleanup <~> a
(void . forkIO) <~> processdeferred nm
return r
where
emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c
processdeferred nm = do
- s <- liftIO $ atomically $ swapTMVar (netMessagesDeferredPush nm) S.empty
+ s <- liftIO $ atomically $ swapTMVar (getSide side $ netMessagesPushDeferred nm) S.empty
mapM_ rundeferred (S.toList s)
rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ()))))
<~> handledeferred m
{- While a push is running, matching push messages are put into
- - netMessagesPush, while others go to netMessagesDeferredPush.
+ - netMessagesPush, while others that involve the same side go to
+ - netMessagesDeferredPush.
+ -
+ - When no push is running involving the same side, returns False.
+ -
- To avoid bloating memory, only messages that initiate pushes are
- deferred.
- -
- - When no push is running, returns False.
-}
queueNetPushMessage :: NetMessage -> Assistant Bool
-queueNetPushMessage m = do
+queueNetPushMessage m@(Pushing clientid stage) = do
nm <- getAssistant netMessager
liftIO $ atomically $ do
- running <- readTMVar (netMessagerPushRunning nm)
- case running of
- NoPushRunning -> return False
- SendPushRunning runningcid -> do
- go nm m runningcid
- return True
- ReceivePushRunning runningcid -> do
- go nm m runningcid
- return True
+ v <- readTMVar (getSide side $ netMessagerPushRunning nm)
+ case v of
+ Nothing -> return False
+ (Just runningclientid)
+ | runningclientid == clientid -> queue nm
+ | isPushInitiation stage -> defer nm
+ | otherwise -> discard
where
- go nm (Pushing cid stage) runningcid
- | cid == runningcid = writeTChan (netMessagesPush nm) m
- | isPushInitiation stage = defer nm
- | otherwise = noop
- go _ _ _ = noop
-
+ side = pushDestinationSide stage
+ queue nm = do
+ writeTChan (getSide side $ netMessagesPush nm) m
+ return True
defer nm = do
- s <- takeTMVar (netMessagesDeferredPush nm)
- putTMVar (netMessagesDeferredPush nm) $ S.insert m s
+ let mv = getSide side $ netMessagesPushDeferred nm
+ s <- takeTMVar mv
+ putTMVar mv $ S.insert m s
+ return True
+ discard = return True
+queueNetPushMessage _ = return False
-waitNetPushMessage :: Assistant (NetMessage)
-waitNetPushMessage = (atomically . readTChan) <<~ (netMessagesPush . netMessager)
+waitNetPushMessage :: PushSide -> Assistant (NetMessage)
+waitNetPushMessage side = (atomically . readTChan)
+ <<~ (getSide side . netMessagesPush . netMessager)
{- Remotes using the XMPP transport have urls like xmpp::user@host -}
isXMPPRemote :: Remote -> Bool