diff options
author | Joey Hess <joey@kitenet.net> | 2012-11-08 16:44:23 -0400 |
---|---|---|
committer | Joey Hess <joey@kitenet.net> | 2012-11-08 16:46:29 -0400 |
commit | f9bf6fbcb9ef2d4afc51b60387d58db6b5cb401a (patch) | |
tree | 68a08e40f572520c24814d7bacc4271aca32b1dd /Assistant/NetMessager.hs | |
parent | e146cc372b8daa70fa093c9f27cedf7188ce72fc (diff) |
xmpp push control flow
It might even work, although nothing yet triggers XMPP pushes.
Also added a set of deferred push messages. Only one push can run at a
time, and unrelated push messages get deferred. The set will never grow
very large, because it only puts two types of messages in there, that
can only vary in the client doing the push.
Diffstat (limited to 'Assistant/NetMessager.hs')
-rw-r--r-- | Assistant/NetMessager.hs | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs index 8895cb7db..8fac55c8a 100644 --- a/Assistant/NetMessager.hs +++ b/Assistant/NetMessager.hs @@ -10,8 +10,11 @@ module Assistant.NetMessager where import Assistant.Common import Assistant.Types.NetMessager +import Control.Concurrent import Control.Concurrent.STM import Control.Concurrent.MSampleVar +import Control.Exception as E +import qualified Data.Set as S sendNetMessage :: NetMessage -> Assistant () sendNetMessage m = @@ -26,3 +29,67 @@ notifyNetMessagerRestart = waitNetMessagerRestart :: Assistant () waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager) + +getPushRunning :: Assistant PushRunning +getPushRunning = + (atomically . readTMVar) <<~ (netMessagerPushRunning . netMessager) + +{- Runs an action that runs either the send or receive end of a push. + - + - While the push is running, netMessagesPush will get messages put into it + - relating to this push, while any messages relating to other pushes + - go to netMessagesDeferred. Once the push finishes, those deferred + - messages will be fed to handledeferred for processing. + -} +runPush :: PushRunning -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a +runPush v handledeferred a = do + nm <- getAssistant netMessager + let pr = netMessagerPushRunning nm + let setup = void $ atomically $ swapTMVar pr v + let cleanup = atomically $ do + void $ swapTMVar pr NoPushRunning + emptytchan (netMessagesPush nm) + r <- E.bracket_ setup cleanup <~> a + (void . forkIO) <~> processdeferred nm + return r + where + emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c + processdeferred nm = do + s <- liftIO $ atomically $ swapTMVar (netMessagesDeferredPush nm) S.empty + mapM_ rundeferred (S.toList s) + rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ())))) + <~> handledeferred m + +{- While a push is running, matching push messages are put into + - netMessagesPush, while others go to netMessagesDeferredPush. To avoid + - bloating memory, only PushRequest and StartingPush messages are + - deferred. + - + - When no push is running, returns False. + -} +queueNetPushMessage :: NetMessage -> Assistant Bool +queueNetPushMessage m = do + nm <- getAssistant netMessager + liftIO $ atomically $ do + running <- readTMVar (netMessagerPushRunning nm) + case running of + NoPushRunning -> return False + SendPushRunning cid -> go nm cid + ReceivePushRunning cid -> go nm cid + where + go nm cid + | getClientID m == Just cid = do + writeTChan (netMessagesPush nm) m + return True + | otherwise = do + case m of + PushRequest _ -> defer nm + StartingPush _ -> defer nm + _ -> noop + return True + defer nm = do + s <- takeTMVar (netMessagesDeferredPush nm) + putTMVar (netMessagesDeferredPush nm) $ S.insert m s + +waitNetPushMessage :: Assistant (NetMessage) +waitNetPushMessage = (atomically . readTChan) <<~ (netMessagesPush . netMessager) |