summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Assistant/NetMessager.hs73
-rw-r--r--Assistant/Types/NetMessager.hs4
2 files changed, 59 insertions, 18 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs
index 7ee4d464b..fe96df56f 100644
--- a/Assistant/NetMessager.hs
+++ b/Assistant/NetMessager.hs
@@ -70,35 +70,72 @@ checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager
return (fromMaybe S.empty stored, fromMaybe S.empty sent)
{- Runs an action that runs either the send or receive side of a push.
+ - Only one such action per side can run at a time. Other pushes, for
+ - the same, or other clients, need to wait their turn.
+ -
+ - Only one push is allowed to wait per client.
+ - There is no point in building up more.
-
- While the push is running, netMessagesPush will get messages put into it
- relating to this push, while any messages relating to other pushes
- on the same side go to netMessagesDeferred. Once the push finishes,
- those deferred messages will be fed to handledeferred for processing.
- -
- - If this is called when a push of the same side is running, it will
- - block until that push completes, and then run.
-}
-runPush :: PushSide -> ClientID -> Assistant a -> Assistant a
+runPush :: PushSide -> ClientID -> Assistant Bool -> Assistant Bool
runPush side clientid a = do
+ debugmsg "preparing to run"
nm <- getAssistant netMessager
- let v = getSide side $ netMessagerPushRunning nm
- debugmsg <- asIO1 $ \s -> netMessagerDebug clientid [s, show side]
let setup = do
- debugmsg "preparing to run"
- atomically $ ifM (isNothing <$> tryReadTMVar v)
- ( putTMVar v clientid
+ (canrun, release) <- atomically $ checkcanrun nm
+ if canrun
+ then atomically $ waittorun nm release
+ else return (False, noop)
+ let cleanup (_, release) = atomically release
+ go <- asIO1 $ \(run, _) ->
+ if run
+ then do
+ debugmsg "started running"
+ r <- a
+ debugmsg "finished running"
+ {- Empty the inbox, because stuff may have
+ - been left in it if the push failed. -}
+ emptyInbox clientid side
+ return r
+ else do
+ debugmsg "skipping running"
+ return False
+ r <- liftIO $ E.bracket setup cleanup go
+ return r
+ where
+ debugmsg s = netMessagerDebug clientid [s, show side]
+ -- check that this is one of the two threads allowed
+ -- to run at the same time, pushing to the same client
+ -- on the same side
+ checkcanrun nm = do
+ let v = getSide side $ netMessagerPushThreadCount nm
+ m <- readTVar v
+ case M.lookup clientid m of
+ Just count
+ | count > 2 -> return (False, noop)
+ _ -> do
+ writeTVar v $
+ M.insertWith' (+) clientid 1 m
+ let release = modifyTVar' v $
+ M.insertWith' (-) clientid 1
+ return (True, release)
+ -- block until this is the only thread performing
+ -- a push on this side, to any client
+ waittorun nm release = do
+ let v = getSide side $ netMessagerPushRunning nm
+ ifM (isNothing <$> tryReadTMVar v)
+ ( do
+ putTMVar v clientid
+ let release' = do
+ void $ takeTMVar v
+ release
+ return (True, release')
, retry
)
- debugmsg "started running"
- let cleanup = do
- debugmsg "finished running"
- atomically $ takeTMVar v
- r <- E.bracket_ setup cleanup <~> a
- {- Empty the inbox, because stuff may have been left in it
- - if the push failed. -}
- emptyInbox clientid side
- return r
{- Stores messages for a push into the appropriate inbox.
-
diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs
index cfcbe2aa3..525ff29f2 100644
--- a/Assistant/Types/NetMessager.hs
+++ b/Assistant/Types/NetMessager.hs
@@ -133,6 +133,9 @@ data NetMessager = NetMessager
-- only one side of a push can be running at a time
-- the TMVars are empty when nothing is running
, netMessagerPushRunning :: SideMap (TMVar ClientID)
+ -- number of threads trying to push to the same client
+ -- at the same time (either running, or waiting to run)
+ , netMessagerPushThreadCount :: SideMap (TVar (M.Map ClientID Int))
-- incoming messages containing data for a push,
-- on a per-client and per-side basis
, netMessagesInboxes :: SideMap Inboxes
@@ -146,3 +149,4 @@ newNetMessager = NetMessager
<*> newEmptySV
<*> mkSideMap newEmptyTMVar
<*> mkSideMap (newTVar M.empty)
+ <*> mkSideMap (newTVar M.empty)