summaryrefslogtreecommitdiff
path: root/Assistant
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2012-11-11 15:42:03 -0400
committerGravatar Joey Hess <joey@kitenet.net>2012-11-11 15:42:03 -0400
commit0d21e323e0d095232e347859adaaf2cc2cd71592 (patch)
tree6edb70e3e315926f1a226d30e6e12755c94d3d84 /Assistant
parent2aa6505378c3789da0cf78c784467c67fd9d593c (diff)
allow both one push and one receive-pack to run at the same time
Noticed that when pairing, sometimes both sides start to push, and the other side sends a PushRequest, and the two deadlock, neither doing anything. (Timeout eventually breaks this.) So, let both run at the same time.
Diffstat (limited to 'Assistant')
-rw-r--r--Assistant/NetMessager.hs73
-rw-r--r--Assistant/Threads/XMPPClient.hs2
-rw-r--r--Assistant/Types/NetMessager.hs49
-rw-r--r--Assistant/XMPP/Git.hs20
4 files changed, 83 insertions, 61 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs
index c3bd73c57..e3ef89b04 100644
--- a/Assistant/NetMessager.hs
+++ b/Assistant/NetMessager.hs
@@ -33,69 +33,68 @@ notifyNetMessagerRestart =
waitNetMessagerRestart :: Assistant ()
waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager)
-getPushRunning :: Assistant PushRunning
-getPushRunning =
- (atomically . readTMVar) <<~ (netMessagerPushRunning . netMessager)
-
-{- Runs an action that runs either the send or receive end of a push.
+{- Runs an action that runs either the send or receive side of a push.
-
- While the push is running, netMessagesPush will get messages put into it
- relating to this push, while any messages relating to other pushes
- - go to netMessagesDeferred. Once the push finishes, those deferred
- - messages will be fed to handledeferred for processing.
+ - on the same side go to netMessagesDeferred. Once the push finishes,
+ - those deferred messages will be fed to handledeferred for processing.
-}
-runPush :: PushRunning -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a
-runPush v handledeferred a = do
+runPush :: PushSide -> ClientID -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a
+runPush side clientid handledeferred a = do
nm <- getAssistant netMessager
- let pr = netMessagerPushRunning nm
- let setup = void $ atomically $ swapTMVar pr v
+ let runningv = getSide side $ netMessagerPushRunning nm
+ let setup = void $ atomically $ swapTMVar runningv $ Just clientid
let cleanup = atomically $ do
- void $ swapTMVar pr NoPushRunning
- emptytchan (netMessagesPush nm)
+ void $ swapTMVar runningv Nothing
+ emptytchan (getSide side $ netMessagesPush nm)
r <- E.bracket_ setup cleanup <~> a
(void . forkIO) <~> processdeferred nm
return r
where
emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c
processdeferred nm = do
- s <- liftIO $ atomically $ swapTMVar (netMessagesDeferredPush nm) S.empty
+ s <- liftIO $ atomically $ swapTMVar (getSide side $ netMessagesPushDeferred nm) S.empty
mapM_ rundeferred (S.toList s)
rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ()))))
<~> handledeferred m
{- While a push is running, matching push messages are put into
- - netMessagesPush, while others go to netMessagesDeferredPush.
+ - netMessagesPush, while others that involve the same side go to
+ - netMessagesDeferredPush.
+ -
+ - When no push is running involving the same side, returns False.
+ -
- To avoid bloating memory, only messages that initiate pushes are
- deferred.
- -
- - When no push is running, returns False.
-}
queueNetPushMessage :: NetMessage -> Assistant Bool
-queueNetPushMessage m = do
+queueNetPushMessage m@(Pushing clientid stage) = do
nm <- getAssistant netMessager
liftIO $ atomically $ do
- running <- readTMVar (netMessagerPushRunning nm)
- case running of
- NoPushRunning -> return False
- SendPushRunning runningcid -> do
- go nm m runningcid
- return True
- ReceivePushRunning runningcid -> do
- go nm m runningcid
- return True
+ v <- readTMVar (getSide side $ netMessagerPushRunning nm)
+ case v of
+ Nothing -> return False
+ (Just runningclientid)
+ | runningclientid == clientid -> queue nm
+ | isPushInitiation stage -> defer nm
+ | otherwise -> discard
where
- go nm (Pushing cid stage) runningcid
- | cid == runningcid = writeTChan (netMessagesPush nm) m
- | isPushInitiation stage = defer nm
- | otherwise = noop
- go _ _ _ = noop
-
+ side = pushDestinationSide stage
+ queue nm = do
+ writeTChan (getSide side $ netMessagesPush nm) m
+ return True
defer nm = do
- s <- takeTMVar (netMessagesDeferredPush nm)
- putTMVar (netMessagesDeferredPush nm) $ S.insert m s
+ let mv = getSide side $ netMessagesPushDeferred nm
+ s <- takeTMVar mv
+ putTMVar mv $ S.insert m s
+ return True
+ discard = return True
+queueNetPushMessage _ = return False
-waitNetPushMessage :: Assistant (NetMessage)
-waitNetPushMessage = (atomically . readTChan) <<~ (netMessagesPush . netMessager)
+waitNetPushMessage :: PushSide -> Assistant (NetMessage)
+waitNetPushMessage side = (atomically . readTChan)
+ <<~ (getSide side . netMessagesPush . netMessager)
{- Remotes using the XMPP transport have urls like xmpp::user@host -}
isXMPPRemote :: Remote -> Bool
diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs
index 12ec9d550..8df9ff04e 100644
--- a/Assistant/Threads/XMPPClient.hs
+++ b/Assistant/Threads/XMPPClient.hs
@@ -99,7 +99,7 @@ xmppClient urlrenderer d = do
handle _ (GotNetMessage m@(Pushing _ pushstage))
| isPushInitiation pushstage = inAssistant $
unlessM (queueNetPushMessage m) $
- void $ forkIO <~> handlePushMessage m
+ void $ forkIO <~> handlePushInitiation m
| otherwise = void $ inAssistant $ queueNetPushMessage m
handle _ (Ignorable _) = noop
handle _ (Unknown _) = noop
diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs
index 091d12815..c036d624a 100644
--- a/Assistant/Types/NetMessager.hs
+++ b/Assistant/Types/NetMessager.hs
@@ -14,7 +14,7 @@ import Data.Text (Text)
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import Data.ByteString (ByteString)
-import Data.Set as S
+import qualified Data.Set as S
{- Messages that can be sent out of band by a network messager. -}
data NetMessage
@@ -47,32 +47,55 @@ data PushStage
| ReceivePackDone ExitCode
deriving (Show, Eq, Ord)
-data PushRunning = NoPushRunning | SendPushRunning ClientID | ReceivePushRunning ClientID
- deriving (Eq)
-
+{- Things that initiate either side of a push, but do not actually send data. -}
isPushInitiation :: PushStage -> Bool
isPushInitiation CanPush = True
isPushInitiation PushRequest = True
isPushInitiation StartingPush = True
isPushInitiation _ = False
+data PushSide = SendPack | ReceivePack
+ deriving (Eq, Ord)
+
+pushDestinationSide :: PushStage -> PushSide
+pushDestinationSide CanPush = ReceivePack
+pushDestinationSide PushRequest = SendPack
+pushDestinationSide StartingPush = ReceivePack
+pushDestinationSide (ReceivePackOutput _) = SendPack
+pushDestinationSide (SendPackOutput _) = ReceivePack
+pushDestinationSide (ReceivePackDone _) = SendPack
+
+type SideMap a = PushSide -> a
+
+mkSideMap :: STM a -> IO (SideMap a)
+mkSideMap gen = do
+ (sp, rp) <- atomically $ (,) <$> gen <*> gen
+ return $ lookupside sp rp
+ where
+ lookupside sp _ SendPack = sp
+ lookupside _ rp ReceivePack = rp
+
+getSide :: PushSide -> SideMap a -> a
+getSide side m = m side
+
data NetMessager = NetMessager
-- outgoing messages
{ netMessages :: TChan (NetMessage)
- -- only one push can be running at a time, and this tracks it
- , netMessagerPushRunning :: TMVar (PushRunning)
- -- incoming messages relating to the currently running push
- , netMessagesPush :: TChan (NetMessage)
- -- incoming push messages that have been deferred to be processed later
- , netMessagesDeferredPush :: TMVar (S.Set NetMessage)
-- write to this to restart the net messager
, netMessagerRestart :: MSampleVar ()
+ -- only one side of a push can be running at a time
+ , netMessagerPushRunning :: SideMap (TMVar (Maybe ClientID))
+ -- incoming messages related to a running push
+ , netMessagesPush :: SideMap (TChan NetMessage)
+ -- incoming push messages, deferred to be processed later
+ , netMessagesPushDeferred :: SideMap (TMVar (S.Set NetMessage))
}
newNetMessager :: IO NetMessager
newNetMessager = NetMessager
<$> atomically newTChan
- <*> atomically (newTMVar NoPushRunning)
- <*> atomically newTChan
- <*> atomically (newTMVar S.empty)
<*> newEmptySV
+ <*> mkSideMap (newTMVar Nothing)
+ <*> mkSideMap newTChan
+ <*> mkSideMap (newTMVar S.empty)
+ where
diff --git a/Assistant/XMPP/Git.hs b/Assistant/XMPP/Git.hs
index f03b32439..2d72df531 100644
--- a/Assistant/XMPP/Git.hs
+++ b/Assistant/XMPP/Git.hs
@@ -74,7 +74,7 @@ makeXMPPGitRemote buddyname jid u = do
- We listen at the other end of the pipe and relay to and from XMPP.
-}
xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> Assistant Bool
-xmppPush cid gitpush = runPush (SendPushRunning cid) handleDeferred $ do
+xmppPush cid gitpush = runPush SendPack cid handleDeferred $ do
sendNetMessage $ Pushing cid StartingPush
(Fd inf, writepush) <- liftIO createPipe
@@ -119,7 +119,7 @@ xmppPush cid gitpush = runPush (SendPushRunning cid) handleDeferred $ do
then liftIO $ killThread =<< myThreadId
else sendNetMessage $ Pushing cid $ SendPackOutput b
fromxmpp outh controlh = forever $ do
- m <- runTimeout xmppTimeout <~> waitNetPushMessage
+ m <- runTimeout xmppTimeout <~> waitNetPushMessage SendPack
case m of
(Right (Pushing _ (ReceivePackOutput b))) ->
liftIO $ writeChunk outh b
@@ -195,7 +195,7 @@ xmppGitRelay = do
{- Relays git receive-pack stdin and stdout via XMPP, as well as propigating
- its exit status to XMPP. -}
xmppReceivePack :: ClientID -> Assistant Bool
-xmppReceivePack cid = runPush (ReceivePushRunning cid) handleDeferred $ do
+xmppReceivePack cid = runPush ReceivePack cid handleDeferred $ do
repodir <- liftAnnex $ fromRepo repoPath
let p = (proc "git" ["receive-pack", repodir])
{ std_in = CreatePipe
@@ -220,7 +220,7 @@ xmppReceivePack cid = runPush (ReceivePushRunning cid) handleDeferred $ do
sendNetMessage $ Pushing cid $ ReceivePackOutput b
relaytoxmpp outh
relayfromxmpp inh = forever $ do
- m <- runTimeout xmppTimeout <~> waitNetPushMessage
+ m <- runTimeout xmppTimeout <~> waitNetPushMessage ReceivePack
case m of
(Right (Pushing _ (SendPackOutput b))) ->
liftIO $ writeChunk inh b
@@ -246,12 +246,12 @@ xmppRemotes cid = case baseJID <$> parseJID cid of
whenXMPPRemote :: ClientID -> Assistant () -> Assistant ()
whenXMPPRemote cid = unlessM (null <$> xmppRemotes cid)
-handlePushMessage :: NetMessage -> Assistant ()
-handlePushMessage (Pushing cid CanPush) =
+handlePushInitiation :: NetMessage -> Assistant ()
+handlePushInitiation (Pushing cid CanPush) =
whenXMPPRemote cid $
sendNetMessage $ Pushing cid PushRequest
-handlePushMessage (Pushing cid PushRequest) =
+handlePushInitiation (Pushing cid PushRequest) =
go =<< liftAnnex (inRepo Git.Branch.current)
where
go Nothing = noop
@@ -265,13 +265,13 @@ handlePushMessage (Pushing cid PushRequest) =
debug ["pushing to", show rs]
forM_ rs $ \r -> xmppPush cid $ pushFallback u branch r
-handlePushMessage (Pushing cid StartingPush) =
+handlePushInitiation (Pushing cid StartingPush) =
whenXMPPRemote cid $
void $ xmppReceivePack cid
-handlePushMessage _ = noop
+handlePushInitiation _ = noop
handleDeferred :: NetMessage -> Assistant ()
-handleDeferred = handlePushMessage
+handleDeferred = handlePushInitiation
writeChunk :: Handle -> B.ByteString -> IO ()
writeChunk h b = do