summaryrefslogtreecommitdiff
path: root/Assistant
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2012-11-08 16:44:23 -0400
committerGravatar Joey Hess <joey@kitenet.net>2012-11-08 16:46:29 -0400
commitf9bf6fbcb9ef2d4afc51b60387d58db6b5cb401a (patch)
tree68a08e40f572520c24814d7bacc4271aca32b1dd /Assistant
parente146cc372b8daa70fa093c9f27cedf7188ce72fc (diff)
xmpp push control flow
It might even work, although nothing yet triggers XMPP pushes. Also added a set of deferred push messages. Only one push can run at a time, and unrelated push messages get deferred. The set will never grow very large, because it only puts two types of messages in there, that can only vary in the client doing the push.
Diffstat (limited to 'Assistant')
-rw-r--r--Assistant/NetMessager.hs67
-rw-r--r--Assistant/Threads/XMPPClient.hs13
-rw-r--r--Assistant/Types/NetMessager.hs29
-rw-r--r--Assistant/XMPP/Git.hs84
4 files changed, 157 insertions, 36 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs
index 8895cb7db..8fac55c8a 100644
--- a/Assistant/NetMessager.hs
+++ b/Assistant/NetMessager.hs
@@ -10,8 +10,11 @@ module Assistant.NetMessager where
import Assistant.Common
import Assistant.Types.NetMessager
+import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
+import Control.Exception as E
+import qualified Data.Set as S
sendNetMessage :: NetMessage -> Assistant ()
sendNetMessage m =
@@ -26,3 +29,67 @@ notifyNetMessagerRestart =
waitNetMessagerRestart :: Assistant ()
waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager)
+
+getPushRunning :: Assistant PushRunning
+getPushRunning =
+ (atomically . readTMVar) <<~ (netMessagerPushRunning . netMessager)
+
+{- Runs an action that runs either the send or receive end of a push.
+ -
+ - While the push is running, netMessagesPush will get messages put into it
+ - relating to this push, while any messages relating to other pushes
+ - go to netMessagesDeferred. Once the push finishes, those deferred
+ - messages will be fed to handledeferred for processing.
+ -}
+runPush :: PushRunning -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a
+runPush v handledeferred a = do
+ nm <- getAssistant netMessager
+ let pr = netMessagerPushRunning nm
+ let setup = void $ atomically $ swapTMVar pr v
+ let cleanup = atomically $ do
+ void $ swapTMVar pr NoPushRunning
+ emptytchan (netMessagesPush nm)
+ r <- E.bracket_ setup cleanup <~> a
+ (void . forkIO) <~> processdeferred nm
+ return r
+ where
+ emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c
+ processdeferred nm = do
+ s <- liftIO $ atomically $ swapTMVar (netMessagesDeferredPush nm) S.empty
+ mapM_ rundeferred (S.toList s)
+ rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ()))))
+ <~> handledeferred m
+
+{- While a push is running, matching push messages are put into
+ - netMessagesPush, while others go to netMessagesDeferredPush. To avoid
+ - bloating memory, only PushRequest and StartingPush messages are
+ - deferred.
+ -
+ - When no push is running, returns False.
+ -}
+queueNetPushMessage :: NetMessage -> Assistant Bool
+queueNetPushMessage m = do
+ nm <- getAssistant netMessager
+ liftIO $ atomically $ do
+ running <- readTMVar (netMessagerPushRunning nm)
+ case running of
+ NoPushRunning -> return False
+ SendPushRunning cid -> go nm cid
+ ReceivePushRunning cid -> go nm cid
+ where
+ go nm cid
+ | getClientID m == Just cid = do
+ writeTChan (netMessagesPush nm) m
+ return True
+ | otherwise = do
+ case m of
+ PushRequest _ -> defer nm
+ StartingPush _ -> defer nm
+ _ -> noop
+ return True
+ defer nm = do
+ s <- takeTMVar (netMessagesDeferredPush nm)
+ putTMVar (netMessagesDeferredPush nm) $ S.insert m s
+
+waitNetPushMessage :: Assistant (NetMessage)
+waitNetPushMessage = (atomically . readTChan) <<~ (netMessagesPush . netMessager)
diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs
index 1117c3c14..efdecb587 100644
--- a/Assistant/Threads/XMPPClient.hs
+++ b/Assistant/Threads/XMPPClient.hs
@@ -93,15 +93,14 @@ xmppClient urlrenderer d = do
handle _ (PresenceMessage p) = void $ inAssistant $
updateBuddyList (updateBuddies p) <<~ buddyList
handle _ (GotNetMessage QueryPresence) = putStanza gitAnnexSignature
- handle _ (GotNetMessage (NotifyPush us)) = void $ inAssistant $
- pull us
+ handle _ (GotNetMessage (NotifyPush us)) = void $ inAssistant $ pull us
handle selfjid (GotNetMessage (PairingNotification stage c u)) =
maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c)
- handle selfjid (GotNetMessage (PushRequest c)) = error "TODO"
- handle selfjid (GotNetMessage (StartingPush c)) = error "TODO"
- handle selfjid (GotNetMessage (ReceivePackOutput c b)) = error "TODO"
- handle selfjid (GotNetMessage (SendPackOutput c b)) = error "TODO"
- handle selfjid (GotNetMessage (ReceivePackDone c code)) = error "TODO"
+ handle _ (GotNetMessage m@(PushRequest _)) = inAssistant $
+ unlessM (queueNetPushMessage m) $ void $ handlePush m
+ handle _ (GotNetMessage m@(StartingPush _)) = inAssistant $
+ unlessM (queueNetPushMessage m) $ void $ handlePush m
+ handle _ (GotNetMessage m) = void $ inAssistant $ queueNetPushMessage m
handle _ (Ignorable _) = noop
handle _ (Unknown _) = noop
handle _ (ProtocolError _) = noop
diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs
index 77f2759b3..6974cf57d 100644
--- a/Assistant/Types/NetMessager.hs
+++ b/Assistant/Types/NetMessager.hs
@@ -14,6 +14,7 @@ import Data.Text (Text)
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import Data.ByteString (ByteString)
+import Data.Set as S
{- Messages that can be sent out of band by a network messager. -}
data NetMessage
@@ -34,17 +35,41 @@ data NetMessage
| SendPackOutput ClientID ByteString
-- sent when git receive-pack exits, with its exit code
| ReceivePackDone ClientID ExitCode
- deriving (Show)
+ deriving (Show, Eq, Ord)
-{- Something used to identify a specific client to send the message to. -}
+{- Something used to identify the client, or clients to send the message to. -}
type ClientID = Text
+getClientID :: NetMessage -> Maybe ClientID
+getClientID (NotifyPush _) = Nothing
+getClientID QueryPresence = Nothing
+getClientID (PairingNotification _ cid _) = Just cid
+getClientID (PushRequest cid) = Just cid
+getClientID (StartingPush cid) = Just cid
+getClientID (ReceivePackOutput cid _) = Just cid
+getClientID (SendPackOutput cid _) = Just cid
+getClientID (ReceivePackDone cid _) = Just cid
+
data NetMessager = NetMessager
+ -- outgoing messages
{ netMessages :: TChan (NetMessage)
+ -- only one push can be running at a time, and this tracks it
+ , netMessagerPushRunning :: TMVar (PushRunning)
+ -- incoming messages relating to the currently running push
+ , netMessagesPush :: TChan (NetMessage)
+ -- incoming push messages that have been deferred to be processed later
+ , netMessagesDeferredPush :: TMVar (S.Set NetMessage)
+ -- write to this to restart the net messager
, netMessagerRestart :: MSampleVar ()
}
+data PushRunning = NoPushRunning | SendPushRunning ClientID | ReceivePushRunning ClientID
+ deriving (Eq)
+
newNetMessager :: IO NetMessager
newNetMessager = NetMessager
<$> atomically newTChan
+ <*> atomically (newTMVar NoPushRunning)
+ <*> atomically newTChan
+ <*> atomically (newTMVar S.empty)
<*> newEmptySV
diff --git a/Assistant/XMPP/Git.hs b/Assistant/XMPP/Git.hs
index 7c4509c51..344f94327 100644
--- a/Assistant/XMPP/Git.hs
+++ b/Assistant/XMPP/Git.hs
@@ -20,6 +20,8 @@ import Annex.UUID
import Config
import Git
import Git.Command
+import qualified Git.Branch
+import qualified Annex.Branch
import Locations.UserConfig
import qualified Types.Remote as Remote
@@ -31,8 +33,8 @@ import System.Process (std_in, std_out, std_err)
import Control.Concurrent
import qualified Data.ByteString as B
-configKey :: Remote -> ConfigKey
-configKey r = remoteConfig (Remote.repo r) "xmppaddress"
+configKey :: UnqualifiedConfigKey
+configKey = "xmppaddress"
finishXMPPPairing :: JID -> UUID -> Assistant ()
finishXMPPPairing jid u = void $ alertWhile alert $
@@ -53,13 +55,15 @@ makeXMPPGitRemote buddyname jid u = do
liftAnnex $ do
let r = Remote.repo remote
storeUUID (remoteConfig r "uuid") u
- setConfig (configKey remote) xmppaddress
+ setConfig (remoteConfig r configKey) xmppaddress
syncNewRemote remote
return True
where
xmppaddress = T.unpack $ formatJID $ baseJID jid
-{- Pushes the named refs to the remote, over XMPP.
+{- Pushes the named refs to the remote, over XMPP, communicating with a
+ - specific client that either requested the push, or responded to our
+ - StartingPush message.
-
- Strategy: Set GIT_SSH to run git-annex. By setting the remote url
- to "xmppgit:dummy", "git-annex xmppgit" will be run locally by
@@ -78,11 +82,9 @@ makeXMPPGitRemote buddyname jid u = do
-
- We listen at the other end of the pipe and relay to and from XMPP.
-}
-xmppPush :: Remote -> [Ref] -> Assistant Bool
-xmppPush remote refs = error "TODO"
-
-xmppPush' :: ClientID -> Remote -> [Ref] -> Assistant Bool
-xmppPush' cid remote refs = do
+xmppPush :: ClientID -> Remote -> [Ref] -> Assistant Bool
+xmppPush cid remote refs = runPush (SendPushRunning cid) handleDeferred $ do
+ sendNetMessage $ StartingPush cid
program <- liftIO readProgramFile
(Fd inf, writepush) <- liftIO createPipe
@@ -107,30 +109,26 @@ xmppPush' cid remote refs = do
liftIO $ hSetBuffering outh NoBuffering
t1 <- forkIO <~> toxmpp inh
- t2 <- forkIO <~> fromxmpp outh
- t3 <- forkIO <~> controlxmpp controlh
+ t2 <- forkIO <~> fromxmpp outh controlh
ok <- liftIO $ boolSystemEnv "git"
(mainparams ++ gitCommandLine params g)
(Just $ env ++ myenv)
- liftIO $ mapM_ killThread [t1, t2, t3]
+ liftIO $ mapM_ killThread [t1, t2]
return ok
where
toxmpp inh = forever $ do
b <- liftIO $ B.hGetSome inh 1024
- when (B.null b) $
- liftIO $ killThread =<< myThreadId
- sendNetMessage $ SendPackOutput cid b
- error "TODO"
- fromxmpp outh = forever $ do
- -- TODO get b from xmpp
- let b = undefined
- liftIO $ B.hPut outh b
- controlxmpp controlh = do
- -- TODO wait for control message from xmpp
- let exitcode = undefined :: Int
- liftIO $ hPutStrLn controlh (show exitcode)
-
+ if B.null b
+ then liftIO $ killThread =<< myThreadId
+ else sendNetMessage $ SendPackOutput cid b
+ fromxmpp outh controlh = forever $ do
+ m <- waitNetPushMessage
+ case m of
+ (ReceivePackOutput _ b) -> liftIO $ B.hPut outh b
+ (ReceivePackDone _ exitcode) -> do
+ liftIO $ hPutStrLn controlh (show exitcode)
+ _ -> noop
relayIn :: String
relayIn = "GIT_ANNEX_XMPPGIT_IN"
@@ -176,7 +174,7 @@ xmppGitRelay = do
{- Relays git receive-pack stdin and stdout via XMPP, as well as propigating
- its exit status to XMPP. -}
xmppReceivePack :: ClientID -> Assistant Bool
-xmppReceivePack cid = do
+xmppReceivePack cid = runPush (ReceivePushRunning cid) handleDeferred $ do
feeder <- asIO1 toxmpp
reader <- asIO1 fromxmpp
sendexitcode <- asIO1 $ sendNetMessage . ReceivePackDone cid
@@ -202,4 +200,36 @@ xmppReceivePack cid = do
else do
sendNetMessage $ ReceivePackOutput cid b
toxmpp outh
- fromxmpp _inh = error "TODO feed xmpp to inh"
+ fromxmpp inh = forever $ do
+ m <- waitNetPushMessage
+ case m of
+ (SendPackOutput _ b) -> liftIO $ B.hPut inh b
+ _ -> noop
+
+xmppRemotes :: ClientID -> Assistant [Remote]
+xmppRemotes cid = case baseJID <$> parseJID cid of
+ Nothing -> return []
+ Just jid -> do
+ rs <- syncRemotes <$> getDaemonStatus
+ let want = T.unpack $ formatJID jid
+ liftAnnex $ filterM (matching want) rs
+ where
+ matching want r = do
+ v <- getRemoteConfig (Remote.repo r) configKey ""
+ return $ v == want
+
+handleDeferred :: NetMessage -> Assistant ()
+handleDeferred = void . handlePush
+
+handlePush :: NetMessage -> Assistant Bool
+handlePush (PushRequest cid) = do
+ rs <- xmppRemotes cid
+ current <- liftAnnex $ inRepo Git.Branch.current
+ let refs = catMaybes [current, Just Annex.Branch.fullname]
+ any id <$> (forM rs $ \r -> xmppPush cid r refs)
+handlePush (StartingPush cid) = do
+ rs <- xmppRemotes cid
+ if null rs
+ then return False
+ else xmppReceivePack cid
+handlePush _ = return False