diff options
-rw-r--r-- | Assistant/NetMessager.hs | 73 | ||||
-rw-r--r-- | Assistant/Types/NetMessager.hs | 4 |
2 files changed, 59 insertions, 18 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs index 7ee4d464b..fe96df56f 100644 --- a/Assistant/NetMessager.hs +++ b/Assistant/NetMessager.hs @@ -70,35 +70,72 @@ checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager return (fromMaybe S.empty stored, fromMaybe S.empty sent) {- Runs an action that runs either the send or receive side of a push. + - Only one such action per side can run at a time. Other pushes, for + - the same, or other clients, need to wait their turn. + - + - Only one push is allowed to wait per client. + - There is no point in building up more. - - While the push is running, netMessagesPush will get messages put into it - relating to this push, while any messages relating to other pushes - on the same side go to netMessagesDeferred. Once the push finishes, - those deferred messages will be fed to handledeferred for processing. - - - - If this is called when a push of the same side is running, it will - - block until that push completes, and then run. -} -runPush :: PushSide -> ClientID -> Assistant a -> Assistant a +runPush :: PushSide -> ClientID -> Assistant Bool -> Assistant Bool runPush side clientid a = do + debugmsg "preparing to run" nm <- getAssistant netMessager - let v = getSide side $ netMessagerPushRunning nm - debugmsg <- asIO1 $ \s -> netMessagerDebug clientid [s, show side] let setup = do - debugmsg "preparing to run" - atomically $ ifM (isNothing <$> tryReadTMVar v) - ( putTMVar v clientid + (canrun, release) <- atomically $ checkcanrun nm + if canrun + then atomically $ waittorun nm release + else return (False, noop) + let cleanup (_, release) = atomically release + go <- asIO1 $ \(run, _) -> + if run + then do + debugmsg "started running" + r <- a + debugmsg "finished running" + {- Empty the inbox, because stuff may have + - been left in it if the push failed. -} + emptyInbox clientid side + return r + else do + debugmsg "skipping running" + return False + r <- liftIO $ E.bracket setup cleanup go + return r + where + debugmsg s = netMessagerDebug clientid [s, show side] + -- check that this is one of the two threads allowed + -- to run at the same time, pushing to the same client + -- on the same side + checkcanrun nm = do + let v = getSide side $ netMessagerPushThreadCount nm + m <- readTVar v + case M.lookup clientid m of + Just count + | count > 2 -> return (False, noop) + _ -> do + writeTVar v $ + M.insertWith' (+) clientid 1 m + let release = modifyTVar' v $ + M.insertWith' (-) clientid 1 + return (True, release) + -- block until this is the only thread performing + -- a push on this side, to any client + waittorun nm release = do + let v = getSide side $ netMessagerPushRunning nm + ifM (isNothing <$> tryReadTMVar v) + ( do + putTMVar v clientid + let release' = do + void $ takeTMVar v + release + return (True, release') , retry ) - debugmsg "started running" - let cleanup = do - debugmsg "finished running" - atomically $ takeTMVar v - r <- E.bracket_ setup cleanup <~> a - {- Empty the inbox, because stuff may have been left in it - - if the push failed. -} - emptyInbox clientid side - return r {- Stores messages for a push into the appropriate inbox. - diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs index cfcbe2aa3..525ff29f2 100644 --- a/Assistant/Types/NetMessager.hs +++ b/Assistant/Types/NetMessager.hs @@ -133,6 +133,9 @@ data NetMessager = NetMessager -- only one side of a push can be running at a time -- the TMVars are empty when nothing is running , netMessagerPushRunning :: SideMap (TMVar ClientID) + -- number of threads trying to push to the same client + -- at the same time (either running, or waiting to run) + , netMessagerPushThreadCount :: SideMap (TVar (M.Map ClientID Int)) -- incoming messages containing data for a push, -- on a per-client and per-side basis , netMessagesInboxes :: SideMap Inboxes @@ -146,3 +149,4 @@ newNetMessager = NetMessager <*> newEmptySV <*> mkSideMap newEmptyTMVar <*> mkSideMap (newTVar M.empty) + <*> mkSideMap (newTVar M.empty) |