diff options
author | Joey Hess <joey@kitenet.net> | 2012-11-11 15:42:03 -0400 |
---|---|---|
committer | Joey Hess <joey@kitenet.net> | 2012-11-11 15:42:03 -0400 |
commit | 0d21e323e0d095232e347859adaaf2cc2cd71592 (patch) | |
tree | 6edb70e3e315926f1a226d30e6e12755c94d3d84 /Assistant/NetMessager.hs | |
parent | 2aa6505378c3789da0cf78c784467c67fd9d593c (diff) |
allow both one push and one receive-pack to run at the same time
Noticed that when pairing, sometimes both sides start to push, and the other
side sends a PushRequest, and the two deadlock, neither doing anything.
(Timeout eventually breaks this.) So, let both run at the same time.
Diffstat (limited to 'Assistant/NetMessager.hs')
-rw-r--r-- | Assistant/NetMessager.hs | 73 |
1 files changed, 36 insertions, 37 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs index c3bd73c57..e3ef89b04 100644 --- a/Assistant/NetMessager.hs +++ b/Assistant/NetMessager.hs @@ -33,69 +33,68 @@ notifyNetMessagerRestart = waitNetMessagerRestart :: Assistant () waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager) -getPushRunning :: Assistant PushRunning -getPushRunning = - (atomically . readTMVar) <<~ (netMessagerPushRunning . netMessager) - -{- Runs an action that runs either the send or receive end of a push. +{- Runs an action that runs either the send or receive side of a push. - - While the push is running, netMessagesPush will get messages put into it - relating to this push, while any messages relating to other pushes - - go to netMessagesDeferred. Once the push finishes, those deferred - - messages will be fed to handledeferred for processing. + - on the same side go to netMessagesDeferred. Once the push finishes, + - those deferred messages will be fed to handledeferred for processing. -} -runPush :: PushRunning -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a -runPush v handledeferred a = do +runPush :: PushSide -> ClientID -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a +runPush side clientid handledeferred a = do nm <- getAssistant netMessager - let pr = netMessagerPushRunning nm - let setup = void $ atomically $ swapTMVar pr v + let runningv = getSide side $ netMessagerPushRunning nm + let setup = void $ atomically $ swapTMVar runningv $ Just clientid let cleanup = atomically $ do - void $ swapTMVar pr NoPushRunning - emptytchan (netMessagesPush nm) + void $ swapTMVar runningv Nothing + emptytchan (getSide side $ netMessagesPush nm) r <- E.bracket_ setup cleanup <~> a (void . forkIO) <~> processdeferred nm return r where emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c processdeferred nm = do - s <- liftIO $ atomically $ swapTMVar (netMessagesDeferredPush nm) S.empty + s <- liftIO $ atomically $ swapTMVar (getSide side $ netMessagesPushDeferred nm) S.empty mapM_ rundeferred (S.toList s) rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ())))) <~> handledeferred m {- While a push is running, matching push messages are put into - - netMessagesPush, while others go to netMessagesDeferredPush. + - netMessagesPush, while others that involve the same side go to + - netMessagesDeferredPush. + - + - When no push is running involving the same side, returns False. + - - To avoid bloating memory, only messages that initiate pushes are - deferred. - - - - When no push is running, returns False. -} queueNetPushMessage :: NetMessage -> Assistant Bool -queueNetPushMessage m = do +queueNetPushMessage m@(Pushing clientid stage) = do nm <- getAssistant netMessager liftIO $ atomically $ do - running <- readTMVar (netMessagerPushRunning nm) - case running of - NoPushRunning -> return False - SendPushRunning runningcid -> do - go nm m runningcid - return True - ReceivePushRunning runningcid -> do - go nm m runningcid - return True + v <- readTMVar (getSide side $ netMessagerPushRunning nm) + case v of + Nothing -> return False + (Just runningclientid) + | runningclientid == clientid -> queue nm + | isPushInitiation stage -> defer nm + | otherwise -> discard where - go nm (Pushing cid stage) runningcid - | cid == runningcid = writeTChan (netMessagesPush nm) m - | isPushInitiation stage = defer nm - | otherwise = noop - go _ _ _ = noop - + side = pushDestinationSide stage + queue nm = do + writeTChan (getSide side $ netMessagesPush nm) m + return True defer nm = do - s <- takeTMVar (netMessagesDeferredPush nm) - putTMVar (netMessagesDeferredPush nm) $ S.insert m s + let mv = getSide side $ netMessagesPushDeferred nm + s <- takeTMVar mv + putTMVar mv $ S.insert m s + return True + discard = return True +queueNetPushMessage _ = return False -waitNetPushMessage :: Assistant (NetMessage) -waitNetPushMessage = (atomically . readTChan) <<~ (netMessagesPush . netMessager) +waitNetPushMessage :: PushSide -> Assistant (NetMessage) +waitNetPushMessage side = (atomically . readTChan) + <<~ (getSide side . netMessagesPush . netMessager) {- Remotes using the XMPP transport have urls like xmpp::user@host -} isXMPPRemote :: Remote -> Bool |