diff options
author | 2013-05-22 15:13:31 -0400 | |
---|---|---|
committer | 2013-05-22 15:13:31 -0400 | |
commit | 186434797dc41c815a07825072a63c9de1b47a25 (patch) | |
tree | 94ada4e3a7333a5e87557d5bc6ed51bb977f8e74 /Assistant/NetMessager.hs | |
parent | dcbb9c33d5e82beb32a1068924f467d968ce9611 (diff) |
add two long-running XMPP push threads, no more inversion of control
I hope this will be easier to reason about, and less buggy. It was
certianly easier to write!
An immediate benefit is that with a traversable queue of push requests to
select from, the threads can be a lot fairer about choosing which client to
service next.
Diffstat (limited to 'Assistant/NetMessager.hs')
-rw-r--r-- | Assistant/NetMessager.hs | 105 |
1 files changed, 37 insertions, 68 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs index fe96df56f..2e786717d 100644 --- a/Assistant/NetMessager.hs +++ b/Assistant/NetMessager.hs @@ -14,7 +14,6 @@ import Assistant.Types.NetMessager import Control.Concurrent.STM import Control.Concurrent.MSampleVar -import Control.Exception as E import qualified Data.Set as S import qualified Data.Map as M import qualified Data.DList as D @@ -69,73 +68,39 @@ checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager sent <- M.lookup sentclient <$> (readTMVar $ sentImportantNetMessages nm) return (fromMaybe S.empty stored, fromMaybe S.empty sent) -{- Runs an action that runs either the send or receive side of a push. - - Only one such action per side can run at a time. Other pushes, for - - the same, or other clients, need to wait their turn. - - - - Only one push is allowed to wait per client. - - There is no point in building up more. - - - - While the push is running, netMessagesPush will get messages put into it - - relating to this push, while any messages relating to other pushes - - on the same side go to netMessagesDeferred. Once the push finishes, - - those deferred messages will be fed to handledeferred for processing. - -} -runPush :: PushSide -> ClientID -> Assistant Bool -> Assistant Bool -runPush side clientid a = do - debugmsg "preparing to run" - nm <- getAssistant netMessager - let setup = do - (canrun, release) <- atomically $ checkcanrun nm - if canrun - then atomically $ waittorun nm release - else return (False, noop) - let cleanup (_, release) = atomically release - go <- asIO1 $ \(run, _) -> - if run - then do - debugmsg "started running" - r <- a - debugmsg "finished running" - {- Empty the inbox, because stuff may have - - been left in it if the push failed. -} - emptyInbox clientid side - return r - else do - debugmsg "skipping running" - return False - r <- liftIO $ E.bracket setup cleanup go - return r +{- Queues a push initiation message in the queue for the appropriate + - side of the push but only if there is not already an initiation message + - from the same client in the queue. -} +queuePushInitiation :: NetMessage -> Assistant () +queuePushInitiation msg@(Pushing clientid stage) = do + tv <- getPushInitiationQueue side + liftIO $ atomically $ do + r <- tryTakeTMVar tv + case r of + Nothing -> putTMVar tv [msg] + Just l -> do + let !l' = msg : filter differentclient l + putTMVar tv l' where - debugmsg s = netMessagerDebug clientid [s, show side] - -- check that this is one of the two threads allowed - -- to run at the same time, pushing to the same client - -- on the same side - checkcanrun nm = do - let v = getSide side $ netMessagerPushThreadCount nm - m <- readTVar v - case M.lookup clientid m of - Just count - | count > 2 -> return (False, noop) - _ -> do - writeTVar v $ - M.insertWith' (+) clientid 1 m - let release = modifyTVar' v $ - M.insertWith' (-) clientid 1 - return (True, release) - -- block until this is the only thread performing - -- a push on this side, to any client - waittorun nm release = do - let v = getSide side $ netMessagerPushRunning nm - ifM (isNothing <$> tryReadTMVar v) - ( do - putTMVar v clientid - let release' = do - void $ takeTMVar v - release - return (True, release') - , retry - ) + side = pushDestinationSide stage + differentclient (Pushing cid _) = cid /= clientid + differentclient _ = True +queuePushInitiation _ = noop + +{- Waits for a push inititation message to be received, and runs + - function to select a message from the queue. -} +waitPushInitiation :: PushSide -> ([NetMessage] -> (NetMessage, [NetMessage])) -> Assistant NetMessage +waitPushInitiation side selector = do + tv <- getPushInitiationQueue side + liftIO $ atomically $ do + q <- takeTMVar tv + if null q + then retry + else do + let (msg, !q') = selector q + unless (null q') $ + putTMVar tv q' + return msg {- Stores messages for a push into the appropriate inbox. - @@ -198,7 +163,11 @@ emptyInbox clientid side = do getInboxes :: PushSide -> Assistant Inboxes getInboxes side = - getSide side . netMessagesInboxes <$> getAssistant netMessager + getSide side . netMessagerInboxes <$> getAssistant netMessager + +getPushInitiationQueue :: PushSide -> Assistant (TMVar [NetMessage]) +getPushInitiationQueue side = + getSide side . netMessagerPushInitiations <$> getAssistant netMessager netMessagerDebug :: ClientID -> [String] -> Assistant () netMessagerDebug clientid l = debug $ |