diff options
Diffstat (limited to 'Assistant/NetMessager.hs')
-rw-r--r-- | Assistant/NetMessager.hs | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs index 8895cb7db..8fac55c8a 100644 --- a/Assistant/NetMessager.hs +++ b/Assistant/NetMessager.hs @@ -10,8 +10,11 @@ module Assistant.NetMessager where import Assistant.Common import Assistant.Types.NetMessager +import Control.Concurrent import Control.Concurrent.STM import Control.Concurrent.MSampleVar +import Control.Exception as E +import qualified Data.Set as S sendNetMessage :: NetMessage -> Assistant () sendNetMessage m = @@ -26,3 +29,67 @@ notifyNetMessagerRestart = waitNetMessagerRestart :: Assistant () waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager) + +getPushRunning :: Assistant PushRunning +getPushRunning = + (atomically . readTMVar) <<~ (netMessagerPushRunning . netMessager) + +{- Runs an action that runs either the send or receive end of a push. + - + - While the push is running, netMessagesPush will get messages put into it + - relating to this push, while any messages relating to other pushes + - go to netMessagesDeferred. Once the push finishes, those deferred + - messages will be fed to handledeferred for processing. + -} +runPush :: PushRunning -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a +runPush v handledeferred a = do + nm <- getAssistant netMessager + let pr = netMessagerPushRunning nm + let setup = void $ atomically $ swapTMVar pr v + let cleanup = atomically $ do + void $ swapTMVar pr NoPushRunning + emptytchan (netMessagesPush nm) + r <- E.bracket_ setup cleanup <~> a + (void . forkIO) <~> processdeferred nm + return r + where + emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c + processdeferred nm = do + s <- liftIO $ atomically $ swapTMVar (netMessagesDeferredPush nm) S.empty + mapM_ rundeferred (S.toList s) + rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ())))) + <~> handledeferred m + +{- While a push is running, matching push messages are put into + - netMessagesPush, while others go to netMessagesDeferredPush. To avoid + - bloating memory, only PushRequest and StartingPush messages are + - deferred. + - + - When no push is running, returns False. + -} +queueNetPushMessage :: NetMessage -> Assistant Bool +queueNetPushMessage m = do + nm <- getAssistant netMessager + liftIO $ atomically $ do + running <- readTMVar (netMessagerPushRunning nm) + case running of + NoPushRunning -> return False + SendPushRunning cid -> go nm cid + ReceivePushRunning cid -> go nm cid + where + go nm cid + | getClientID m == Just cid = do + writeTChan (netMessagesPush nm) m + return True + | otherwise = do + case m of + PushRequest _ -> defer nm + StartingPush _ -> defer nm + _ -> noop + return True + defer nm = do + s <- takeTMVar (netMessagesDeferredPush nm) + putTMVar (netMessagesDeferredPush nm) $ S.insert m s + +waitNetPushMessage :: Assistant (NetMessage) +waitNetPushMessage = (atomically . readTChan) <<~ (netMessagesPush . netMessager) |