summaryrefslogtreecommitdiff
path: root/Assistant/NetMessager.hs
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2013-05-22 15:13:31 -0400
committerGravatar Joey Hess <joey@kitenet.net>2013-05-22 15:13:31 -0400
commit186434797dc41c815a07825072a63c9de1b47a25 (patch)
tree94ada4e3a7333a5e87557d5bc6ed51bb977f8e74 /Assistant/NetMessager.hs
parentdcbb9c33d5e82beb32a1068924f467d968ce9611 (diff)
add two long-running XMPP push threads, no more inversion of control
I hope this will be easier to reason about, and less buggy. It was certianly easier to write! An immediate benefit is that with a traversable queue of push requests to select from, the threads can be a lot fairer about choosing which client to service next.
Diffstat (limited to 'Assistant/NetMessager.hs')
-rw-r--r--Assistant/NetMessager.hs105
1 files changed, 37 insertions, 68 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs
index fe96df56f..2e786717d 100644
--- a/Assistant/NetMessager.hs
+++ b/Assistant/NetMessager.hs
@@ -14,7 +14,6 @@ import Assistant.Types.NetMessager
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
-import Control.Exception as E
import qualified Data.Set as S
import qualified Data.Map as M
import qualified Data.DList as D
@@ -69,73 +68,39 @@ checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager
sent <- M.lookup sentclient <$> (readTMVar $ sentImportantNetMessages nm)
return (fromMaybe S.empty stored, fromMaybe S.empty sent)
-{- Runs an action that runs either the send or receive side of a push.
- - Only one such action per side can run at a time. Other pushes, for
- - the same, or other clients, need to wait their turn.
- -
- - Only one push is allowed to wait per client.
- - There is no point in building up more.
- -
- - While the push is running, netMessagesPush will get messages put into it
- - relating to this push, while any messages relating to other pushes
- - on the same side go to netMessagesDeferred. Once the push finishes,
- - those deferred messages will be fed to handledeferred for processing.
- -}
-runPush :: PushSide -> ClientID -> Assistant Bool -> Assistant Bool
-runPush side clientid a = do
- debugmsg "preparing to run"
- nm <- getAssistant netMessager
- let setup = do
- (canrun, release) <- atomically $ checkcanrun nm
- if canrun
- then atomically $ waittorun nm release
- else return (False, noop)
- let cleanup (_, release) = atomically release
- go <- asIO1 $ \(run, _) ->
- if run
- then do
- debugmsg "started running"
- r <- a
- debugmsg "finished running"
- {- Empty the inbox, because stuff may have
- - been left in it if the push failed. -}
- emptyInbox clientid side
- return r
- else do
- debugmsg "skipping running"
- return False
- r <- liftIO $ E.bracket setup cleanup go
- return r
+{- Queues a push initiation message in the queue for the appropriate
+ - side of the push but only if there is not already an initiation message
+ - from the same client in the queue. -}
+queuePushInitiation :: NetMessage -> Assistant ()
+queuePushInitiation msg@(Pushing clientid stage) = do
+ tv <- getPushInitiationQueue side
+ liftIO $ atomically $ do
+ r <- tryTakeTMVar tv
+ case r of
+ Nothing -> putTMVar tv [msg]
+ Just l -> do
+ let !l' = msg : filter differentclient l
+ putTMVar tv l'
where
- debugmsg s = netMessagerDebug clientid [s, show side]
- -- check that this is one of the two threads allowed
- -- to run at the same time, pushing to the same client
- -- on the same side
- checkcanrun nm = do
- let v = getSide side $ netMessagerPushThreadCount nm
- m <- readTVar v
- case M.lookup clientid m of
- Just count
- | count > 2 -> return (False, noop)
- _ -> do
- writeTVar v $
- M.insertWith' (+) clientid 1 m
- let release = modifyTVar' v $
- M.insertWith' (-) clientid 1
- return (True, release)
- -- block until this is the only thread performing
- -- a push on this side, to any client
- waittorun nm release = do
- let v = getSide side $ netMessagerPushRunning nm
- ifM (isNothing <$> tryReadTMVar v)
- ( do
- putTMVar v clientid
- let release' = do
- void $ takeTMVar v
- release
- return (True, release')
- , retry
- )
+ side = pushDestinationSide stage
+ differentclient (Pushing cid _) = cid /= clientid
+ differentclient _ = True
+queuePushInitiation _ = noop
+
+{- Waits for a push inititation message to be received, and runs
+ - function to select a message from the queue. -}
+waitPushInitiation :: PushSide -> ([NetMessage] -> (NetMessage, [NetMessage])) -> Assistant NetMessage
+waitPushInitiation side selector = do
+ tv <- getPushInitiationQueue side
+ liftIO $ atomically $ do
+ q <- takeTMVar tv
+ if null q
+ then retry
+ else do
+ let (msg, !q') = selector q
+ unless (null q') $
+ putTMVar tv q'
+ return msg
{- Stores messages for a push into the appropriate inbox.
-
@@ -198,7 +163,11 @@ emptyInbox clientid side = do
getInboxes :: PushSide -> Assistant Inboxes
getInboxes side =
- getSide side . netMessagesInboxes <$> getAssistant netMessager
+ getSide side . netMessagerInboxes <$> getAssistant netMessager
+
+getPushInitiationQueue :: PushSide -> Assistant (TMVar [NetMessage])
+getPushInitiationQueue side =
+ getSide side . netMessagerPushInitiations <$> getAssistant netMessager
netMessagerDebug :: ClientID -> [String] -> Assistant ()
netMessagerDebug clientid l = debug $