From 0d21e323e0d095232e347859adaaf2cc2cd71592 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Sun, 11 Nov 2012 15:42:03 -0400 Subject: allow both one push and one receive-pack to run at the same time Noticed that when pairing, sometimes both sides start to push, and the other side sends a PushRequest, and the two deadlock, neither doing anything. (Timeout eventually breaks this.) So, let both run at the same time. --- Assistant/NetMessager.hs | 73 ++++++++++++++++++++--------------------- Assistant/Threads/XMPPClient.hs | 2 +- Assistant/Types/NetMessager.hs | 49 +++++++++++++++++++-------- Assistant/XMPP/Git.hs | 20 +++++------ 4 files changed, 83 insertions(+), 61 deletions(-) (limited to 'Assistant') diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs index c3bd73c57..e3ef89b04 100644 --- a/Assistant/NetMessager.hs +++ b/Assistant/NetMessager.hs @@ -33,69 +33,68 @@ notifyNetMessagerRestart = waitNetMessagerRestart :: Assistant () waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager) -getPushRunning :: Assistant PushRunning -getPushRunning = - (atomically . readTMVar) <<~ (netMessagerPushRunning . netMessager) - -{- Runs an action that runs either the send or receive end of a push. +{- Runs an action that runs either the send or receive side of a push. - - While the push is running, netMessagesPush will get messages put into it - relating to this push, while any messages relating to other pushes - - go to netMessagesDeferred. Once the push finishes, those deferred - - messages will be fed to handledeferred for processing. + - on the same side go to netMessagesDeferred. Once the push finishes, + - those deferred messages will be fed to handledeferred for processing. -} -runPush :: PushRunning -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a -runPush v handledeferred a = do +runPush :: PushSide -> ClientID -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a +runPush side clientid handledeferred a = do nm <- getAssistant netMessager - let pr = netMessagerPushRunning nm - let setup = void $ atomically $ swapTMVar pr v + let runningv = getSide side $ netMessagerPushRunning nm + let setup = void $ atomically $ swapTMVar runningv $ Just clientid let cleanup = atomically $ do - void $ swapTMVar pr NoPushRunning - emptytchan (netMessagesPush nm) + void $ swapTMVar runningv Nothing + emptytchan (getSide side $ netMessagesPush nm) r <- E.bracket_ setup cleanup <~> a (void . forkIO) <~> processdeferred nm return r where emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c processdeferred nm = do - s <- liftIO $ atomically $ swapTMVar (netMessagesDeferredPush nm) S.empty + s <- liftIO $ atomically $ swapTMVar (getSide side $ netMessagesPushDeferred nm) S.empty mapM_ rundeferred (S.toList s) rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ())))) <~> handledeferred m {- While a push is running, matching push messages are put into - - netMessagesPush, while others go to netMessagesDeferredPush. + - netMessagesPush, while others that involve the same side go to + - netMessagesDeferredPush. + - + - When no push is running involving the same side, returns False. + - - To avoid bloating memory, only messages that initiate pushes are - deferred. - - - - When no push is running, returns False. -} queueNetPushMessage :: NetMessage -> Assistant Bool -queueNetPushMessage m = do +queueNetPushMessage m@(Pushing clientid stage) = do nm <- getAssistant netMessager liftIO $ atomically $ do - running <- readTMVar (netMessagerPushRunning nm) - case running of - NoPushRunning -> return False - SendPushRunning runningcid -> do - go nm m runningcid - return True - ReceivePushRunning runningcid -> do - go nm m runningcid - return True + v <- readTMVar (getSide side $ netMessagerPushRunning nm) + case v of + Nothing -> return False + (Just runningclientid) + | runningclientid == clientid -> queue nm + | isPushInitiation stage -> defer nm + | otherwise -> discard where - go nm (Pushing cid stage) runningcid - | cid == runningcid = writeTChan (netMessagesPush nm) m - | isPushInitiation stage = defer nm - | otherwise = noop - go _ _ _ = noop - + side = pushDestinationSide stage + queue nm = do + writeTChan (getSide side $ netMessagesPush nm) m + return True defer nm = do - s <- takeTMVar (netMessagesDeferredPush nm) - putTMVar (netMessagesDeferredPush nm) $ S.insert m s + let mv = getSide side $ netMessagesPushDeferred nm + s <- takeTMVar mv + putTMVar mv $ S.insert m s + return True + discard = return True +queueNetPushMessage _ = return False -waitNetPushMessage :: Assistant (NetMessage) -waitNetPushMessage = (atomically . readTChan) <<~ (netMessagesPush . netMessager) +waitNetPushMessage :: PushSide -> Assistant (NetMessage) +waitNetPushMessage side = (atomically . readTChan) + <<~ (getSide side . netMessagesPush . netMessager) {- Remotes using the XMPP transport have urls like xmpp::user@host -} isXMPPRemote :: Remote -> Bool diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs index 12ec9d550..8df9ff04e 100644 --- a/Assistant/Threads/XMPPClient.hs +++ b/Assistant/Threads/XMPPClient.hs @@ -99,7 +99,7 @@ xmppClient urlrenderer d = do handle _ (GotNetMessage m@(Pushing _ pushstage)) | isPushInitiation pushstage = inAssistant $ unlessM (queueNetPushMessage m) $ - void $ forkIO <~> handlePushMessage m + void $ forkIO <~> handlePushInitiation m | otherwise = void $ inAssistant $ queueNetPushMessage m handle _ (Ignorable _) = noop handle _ (Unknown _) = noop diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs index 091d12815..c036d624a 100644 --- a/Assistant/Types/NetMessager.hs +++ b/Assistant/Types/NetMessager.hs @@ -14,7 +14,7 @@ import Data.Text (Text) import Control.Concurrent.STM import Control.Concurrent.MSampleVar import Data.ByteString (ByteString) -import Data.Set as S +import qualified Data.Set as S {- Messages that can be sent out of band by a network messager. -} data NetMessage @@ -47,32 +47,55 @@ data PushStage | ReceivePackDone ExitCode deriving (Show, Eq, Ord) -data PushRunning = NoPushRunning | SendPushRunning ClientID | ReceivePushRunning ClientID - deriving (Eq) - +{- Things that initiate either side of a push, but do not actually send data. -} isPushInitiation :: PushStage -> Bool isPushInitiation CanPush = True isPushInitiation PushRequest = True isPushInitiation StartingPush = True isPushInitiation _ = False +data PushSide = SendPack | ReceivePack + deriving (Eq, Ord) + +pushDestinationSide :: PushStage -> PushSide +pushDestinationSide CanPush = ReceivePack +pushDestinationSide PushRequest = SendPack +pushDestinationSide StartingPush = ReceivePack +pushDestinationSide (ReceivePackOutput _) = SendPack +pushDestinationSide (SendPackOutput _) = ReceivePack +pushDestinationSide (ReceivePackDone _) = SendPack + +type SideMap a = PushSide -> a + +mkSideMap :: STM a -> IO (SideMap a) +mkSideMap gen = do + (sp, rp) <- atomically $ (,) <$> gen <*> gen + return $ lookupside sp rp + where + lookupside sp _ SendPack = sp + lookupside _ rp ReceivePack = rp + +getSide :: PushSide -> SideMap a -> a +getSide side m = m side + data NetMessager = NetMessager -- outgoing messages { netMessages :: TChan (NetMessage) - -- only one push can be running at a time, and this tracks it - , netMessagerPushRunning :: TMVar (PushRunning) - -- incoming messages relating to the currently running push - , netMessagesPush :: TChan (NetMessage) - -- incoming push messages that have been deferred to be processed later - , netMessagesDeferredPush :: TMVar (S.Set NetMessage) -- write to this to restart the net messager , netMessagerRestart :: MSampleVar () + -- only one side of a push can be running at a time + , netMessagerPushRunning :: SideMap (TMVar (Maybe ClientID)) + -- incoming messages related to a running push + , netMessagesPush :: SideMap (TChan NetMessage) + -- incoming push messages, deferred to be processed later + , netMessagesPushDeferred :: SideMap (TMVar (S.Set NetMessage)) } newNetMessager :: IO NetMessager newNetMessager = NetMessager <$> atomically newTChan - <*> atomically (newTMVar NoPushRunning) - <*> atomically newTChan - <*> atomically (newTMVar S.empty) <*> newEmptySV + <*> mkSideMap (newTMVar Nothing) + <*> mkSideMap newTChan + <*> mkSideMap (newTMVar S.empty) + where diff --git a/Assistant/XMPP/Git.hs b/Assistant/XMPP/Git.hs index f03b32439..2d72df531 100644 --- a/Assistant/XMPP/Git.hs +++ b/Assistant/XMPP/Git.hs @@ -74,7 +74,7 @@ makeXMPPGitRemote buddyname jid u = do - We listen at the other end of the pipe and relay to and from XMPP. -} xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> Assistant Bool -xmppPush cid gitpush = runPush (SendPushRunning cid) handleDeferred $ do +xmppPush cid gitpush = runPush SendPack cid handleDeferred $ do sendNetMessage $ Pushing cid StartingPush (Fd inf, writepush) <- liftIO createPipe @@ -119,7 +119,7 @@ xmppPush cid gitpush = runPush (SendPushRunning cid) handleDeferred $ do then liftIO $ killThread =<< myThreadId else sendNetMessage $ Pushing cid $ SendPackOutput b fromxmpp outh controlh = forever $ do - m <- runTimeout xmppTimeout <~> waitNetPushMessage + m <- runTimeout xmppTimeout <~> waitNetPushMessage SendPack case m of (Right (Pushing _ (ReceivePackOutput b))) -> liftIO $ writeChunk outh b @@ -195,7 +195,7 @@ xmppGitRelay = do {- Relays git receive-pack stdin and stdout via XMPP, as well as propigating - its exit status to XMPP. -} xmppReceivePack :: ClientID -> Assistant Bool -xmppReceivePack cid = runPush (ReceivePushRunning cid) handleDeferred $ do +xmppReceivePack cid = runPush ReceivePack cid handleDeferred $ do repodir <- liftAnnex $ fromRepo repoPath let p = (proc "git" ["receive-pack", repodir]) { std_in = CreatePipe @@ -220,7 +220,7 @@ xmppReceivePack cid = runPush (ReceivePushRunning cid) handleDeferred $ do sendNetMessage $ Pushing cid $ ReceivePackOutput b relaytoxmpp outh relayfromxmpp inh = forever $ do - m <- runTimeout xmppTimeout <~> waitNetPushMessage + m <- runTimeout xmppTimeout <~> waitNetPushMessage ReceivePack case m of (Right (Pushing _ (SendPackOutput b))) -> liftIO $ writeChunk inh b @@ -246,12 +246,12 @@ xmppRemotes cid = case baseJID <$> parseJID cid of whenXMPPRemote :: ClientID -> Assistant () -> Assistant () whenXMPPRemote cid = unlessM (null <$> xmppRemotes cid) -handlePushMessage :: NetMessage -> Assistant () -handlePushMessage (Pushing cid CanPush) = +handlePushInitiation :: NetMessage -> Assistant () +handlePushInitiation (Pushing cid CanPush) = whenXMPPRemote cid $ sendNetMessage $ Pushing cid PushRequest -handlePushMessage (Pushing cid PushRequest) = +handlePushInitiation (Pushing cid PushRequest) = go =<< liftAnnex (inRepo Git.Branch.current) where go Nothing = noop @@ -265,13 +265,13 @@ handlePushMessage (Pushing cid PushRequest) = debug ["pushing to", show rs] forM_ rs $ \r -> xmppPush cid $ pushFallback u branch r -handlePushMessage (Pushing cid StartingPush) = +handlePushInitiation (Pushing cid StartingPush) = whenXMPPRemote cid $ void $ xmppReceivePack cid -handlePushMessage _ = noop +handlePushInitiation _ = noop handleDeferred :: NetMessage -> Assistant () -handleDeferred = handlePushMessage +handleDeferred = handlePushInitiation writeChunk :: Handle -> B.ByteString -> IO () writeChunk h b = do -- cgit v1.2.3