summaryrefslogtreecommitdiff
path: root/Assistant/NetMessager.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Assistant/NetMessager.hs')
-rw-r--r--Assistant/NetMessager.hs67
1 files changed, 67 insertions, 0 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs
index 8895cb7db..8fac55c8a 100644
--- a/Assistant/NetMessager.hs
+++ b/Assistant/NetMessager.hs
@@ -10,8 +10,11 @@ module Assistant.NetMessager where
import Assistant.Common
import Assistant.Types.NetMessager
+import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
+import Control.Exception as E
+import qualified Data.Set as S
sendNetMessage :: NetMessage -> Assistant ()
sendNetMessage m =
@@ -26,3 +29,67 @@ notifyNetMessagerRestart =
waitNetMessagerRestart :: Assistant ()
waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager)
+
+getPushRunning :: Assistant PushRunning
+getPushRunning =
+ (atomically . readTMVar) <<~ (netMessagerPushRunning . netMessager)
+
+{- Runs an action that runs either the send or receive end of a push.
+ -
+ - While the push is running, netMessagesPush will get messages put into it
+ - relating to this push, while any messages relating to other pushes
+ - go to netMessagesDeferred. Once the push finishes, those deferred
+ - messages will be fed to handledeferred for processing.
+ -}
+runPush :: PushRunning -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a
+runPush v handledeferred a = do
+ nm <- getAssistant netMessager
+ let pr = netMessagerPushRunning nm
+ let setup = void $ atomically $ swapTMVar pr v
+ let cleanup = atomically $ do
+ void $ swapTMVar pr NoPushRunning
+ emptytchan (netMessagesPush nm)
+ r <- E.bracket_ setup cleanup <~> a
+ (void . forkIO) <~> processdeferred nm
+ return r
+ where
+ emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c
+ processdeferred nm = do
+ s <- liftIO $ atomically $ swapTMVar (netMessagesDeferredPush nm) S.empty
+ mapM_ rundeferred (S.toList s)
+ rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ()))))
+ <~> handledeferred m
+
+{- While a push is running, matching push messages are put into
+ - netMessagesPush, while others go to netMessagesDeferredPush. To avoid
+ - bloating memory, only PushRequest and StartingPush messages are
+ - deferred.
+ -
+ - When no push is running, returns False.
+ -}
+queueNetPushMessage :: NetMessage -> Assistant Bool
+queueNetPushMessage m = do
+ nm <- getAssistant netMessager
+ liftIO $ atomically $ do
+ running <- readTMVar (netMessagerPushRunning nm)
+ case running of
+ NoPushRunning -> return False
+ SendPushRunning cid -> go nm cid
+ ReceivePushRunning cid -> go nm cid
+ where
+ go nm cid
+ | getClientID m == Just cid = do
+ writeTChan (netMessagesPush nm) m
+ return True
+ | otherwise = do
+ case m of
+ PushRequest _ -> defer nm
+ StartingPush _ -> defer nm
+ _ -> noop
+ return True
+ defer nm = do
+ s <- takeTMVar (netMessagesDeferredPush nm)
+ putTMVar (netMessagesDeferredPush nm) $ S.insert m s
+
+waitNetPushMessage :: Assistant (NetMessage)
+waitNetPushMessage = (atomically . readTChan) <<~ (netMessagesPush . netMessager)