aboutsummaryrefslogtreecommitdiff
path: root/Assistant
diff options
context:
space:
mode:
Diffstat (limited to 'Assistant')
-rw-r--r--Assistant/NetMessager.hs67
-rw-r--r--Assistant/Threads/XMPPClient.hs13
-rw-r--r--Assistant/Types/NetMessager.hs29
-rw-r--r--Assistant/XMPP/Git.hs84
4 files changed, 157 insertions, 36 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs
index 8895cb7db..8fac55c8a 100644
--- a/Assistant/NetMessager.hs
+++ b/Assistant/NetMessager.hs
@@ -10,8 +10,11 @@ module Assistant.NetMessager where
import Assistant.Common
import Assistant.Types.NetMessager
+import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
+import Control.Exception as E
+import qualified Data.Set as S
sendNetMessage :: NetMessage -> Assistant ()
sendNetMessage m =
@@ -26,3 +29,67 @@ notifyNetMessagerRestart =
waitNetMessagerRestart :: Assistant ()
waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager)
+
+getPushRunning :: Assistant PushRunning
+getPushRunning =
+ (atomically . readTMVar) <<~ (netMessagerPushRunning . netMessager)
+
+{- Runs an action that runs either the send or receive end of a push.
+ -
+ - While the push is running, netMessagesPush will get messages put into it
+ - relating to this push, while any messages relating to other pushes
+ - go to netMessagesDeferred. Once the push finishes, those deferred
+ - messages will be fed to handledeferred for processing.
+ -}
+runPush :: PushRunning -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a
+runPush v handledeferred a = do
+ nm <- getAssistant netMessager
+ let pr = netMessagerPushRunning nm
+ let setup = void $ atomically $ swapTMVar pr v
+ let cleanup = atomically $ do
+ void $ swapTMVar pr NoPushRunning
+ emptytchan (netMessagesPush nm)
+ r <- E.bracket_ setup cleanup <~> a
+ (void . forkIO) <~> processdeferred nm
+ return r
+ where
+ emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c
+ processdeferred nm = do
+ s <- liftIO $ atomically $ swapTMVar (netMessagesDeferredPush nm) S.empty
+ mapM_ rundeferred (S.toList s)
+ rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ()))))
+ <~> handledeferred m
+
+{- While a push is running, matching push messages are put into
+ - netMessagesPush, while others go to netMessagesDeferredPush. To avoid
+ - bloating memory, only PushRequest and StartingPush messages are
+ - deferred.
+ -
+ - When no push is running, returns False.
+ -}
+queueNetPushMessage :: NetMessage -> Assistant Bool
+queueNetPushMessage m = do
+ nm <- getAssistant netMessager
+ liftIO $ atomically $ do
+ running <- readTMVar (netMessagerPushRunning nm)
+ case running of
+ NoPushRunning -> return False
+ SendPushRunning cid -> go nm cid
+ ReceivePushRunning cid -> go nm cid
+ where
+ go nm cid
+ | getClientID m == Just cid = do
+ writeTChan (netMessagesPush nm) m
+ return True
+ | otherwise = do
+ case m of
+ PushRequest _ -> defer nm
+ StartingPush _ -> defer nm
+ _ -> noop
+ return True
+ defer nm = do
+ s <- takeTMVar (netMessagesDeferredPush nm)
+ putTMVar (netMessagesDeferredPush nm) $ S.insert m s
+
+waitNetPushMessage :: Assistant (NetMessage)
+waitNetPushMessage = (atomically . readTChan) <<~ (netMessagesPush . netMessager)
diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs
index 1117c3c14..efdecb587 100644
--- a/Assistant/Threads/XMPPClient.hs
+++ b/Assistant/Threads/XMPPClient.hs
@@ -93,15 +93,14 @@ xmppClient urlrenderer d = do
handle _ (PresenceMessage p) = void $ inAssistant $
updateBuddyList (updateBuddies p) <<~ buddyList
handle _ (GotNetMessage QueryPresence) = putStanza gitAnnexSignature
- handle _ (GotNetMessage (NotifyPush us)) = void $ inAssistant $
- pull us
+ handle _ (GotNetMessage (NotifyPush us)) = void $ inAssistant $ pull us
handle selfjid (GotNetMessage (PairingNotification stage c u)) =
maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c)
- handle selfjid (GotNetMessage (PushRequest c)) = error "TODO"
- handle selfjid (GotNetMessage (StartingPush c)) = error "TODO"
- handle selfjid (GotNetMessage (ReceivePackOutput c b)) = error "TODO"
- handle selfjid (GotNetMessage (SendPackOutput c b)) = error "TODO"
- handle selfjid (GotNetMessage (ReceivePackDone c code)) = error "TODO"
+ handle _ (GotNetMessage m@(PushRequest _)) = inAssistant $
+ unlessM (queueNetPushMessage m) $ void $ handlePush m
+ handle _ (GotNetMessage m@(StartingPush _)) = inAssistant $
+ unlessM (queueNetPushMessage m) $ void $ handlePush m
+ handle _ (GotNetMessage m) = void $ inAssistant $ queueNetPushMessage m
handle _ (Ignorable _) = noop
handle _ (Unknown _) = noop
handle _ (ProtocolError _) = noop
diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs
index 77f2759b3..6974cf57d 100644
--- a/Assistant/Types/NetMessager.hs
+++ b/Assistant/Types/NetMessager.hs
@@ -14,6 +14,7 @@ import Data.Text (Text)
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import Data.ByteString (ByteString)
+import Data.Set as S
{- Messages that can be sent out of band by a network messager. -}
data NetMessage
@@ -34,17 +35,41 @@ data NetMessage
| SendPackOutput ClientID ByteString
-- sent when git receive-pack exits, with its exit code
| ReceivePackDone ClientID ExitCode
- deriving (Show)
+ deriving (Show, Eq, Ord)
-{- Something used to identify a specific client to send the message to. -}
+{- Something used to identify the client, or clients to send the message to. -}
type ClientID = Text
+getClientID :: NetMessage -> Maybe ClientID
+getClientID (NotifyPush _) = Nothing
+getClientID QueryPresence = Nothing
+getClientID (PairingNotification _ cid _) = Just cid
+getClientID (PushRequest cid) = Just cid
+getClientID (StartingPush cid) = Just cid
+getClientID (ReceivePackOutput cid _) = Just cid
+getClientID (SendPackOutput cid _) = Just cid
+getClientID (ReceivePackDone cid _) = Just cid
+
data NetMessager = NetMessager
+ -- outgoing messages
{ netMessages :: TChan (NetMessage)
+ -- only one push can be running at a time, and this tracks it
+ , netMessagerPushRunning :: TMVar (PushRunning)
+ -- incoming messages relating to the currently running push
+ , netMessagesPush :: TChan (NetMessage)
+ -- incoming push messages that have been deferred to be processed later
+ , netMessagesDeferredPush :: TMVar (S.Set NetMessage)
+ -- write to this to restart the net messager
, netMessagerRestart :: MSampleVar ()
}
+data PushRunning = NoPushRunning | SendPushRunning ClientID | ReceivePushRunning ClientID
+ deriving (Eq)
+
newNetMessager :: IO NetMessager
newNetMessager = NetMessager
<$> atomically newTChan
+ <*> atomically (newTMVar NoPushRunning)
+ <*> atomically newTChan
+ <*> atomically (newTMVar S.empty)
<*> newEmptySV
diff --git a/Assistant/XMPP/Git.hs b/Assistant/XMPP/Git.hs
index 7c4509c51..344f94327 100644
--- a/Assistant/XMPP/Git.hs
+++ b/Assistant/XMPP/Git.hs
@@ -20,6 +20,8 @@ import Annex.UUID
import Config
import Git
import Git.Command
+import qualified Git.Branch
+import qualified Annex.Branch
import Locations.UserConfig
import qualified Types.Remote as Remote
@@ -31,8 +33,8 @@ import System.Process (std_in, std_out, std_err)
import Control.Concurrent
import qualified Data.ByteString as B
-configKey :: Remote -> ConfigKey
-configKey r = remoteConfig (Remote.repo r) "xmppaddress"
+configKey :: UnqualifiedConfigKey
+configKey = "xmppaddress"
finishXMPPPairing :: JID -> UUID -> Assistant ()
finishXMPPPairing jid u = void $ alertWhile alert $
@@ -53,13 +55,15 @@ makeXMPPGitRemote buddyname jid u = do
liftAnnex $ do
let r = Remote.repo remote
storeUUID (remoteConfig r "uuid") u
- setConfig (configKey remote) xmppaddress
+ setConfig (remoteConfig r configKey) xmppaddress
syncNewRemote remote
return True
where
xmppaddress = T.unpack $ formatJID $ baseJID jid
-{- Pushes the named refs to the remote, over XMPP.
+{- Pushes the named refs to the remote, over XMPP, communicating with a
+ - specific client that either requested the push, or responded to our
+ - StartingPush message.
-
- Strategy: Set GIT_SSH to run git-annex. By setting the remote url
- to "xmppgit:dummy", "git-annex xmppgit" will be run locally by
@@ -78,11 +82,9 @@ makeXMPPGitRemote buddyname jid u = do
-
- We listen at the other end of the pipe and relay to and from XMPP.
-}
-xmppPush :: Remote -> [Ref] -> Assistant Bool
-xmppPush remote refs = error "TODO"
-
-xmppPush' :: ClientID -> Remote -> [Ref] -> Assistant Bool
-xmppPush' cid remote refs = do
+xmppPush :: ClientID -> Remote -> [Ref] -> Assistant Bool
+xmppPush cid remote refs = runPush (SendPushRunning cid) handleDeferred $ do
+ sendNetMessage $ StartingPush cid
program <- liftIO readProgramFile
(Fd inf, writepush) <- liftIO createPipe
@@ -107,30 +109,26 @@ xmppPush' cid remote refs = do
liftIO $ hSetBuffering outh NoBuffering
t1 <- forkIO <~> toxmpp inh
- t2 <- forkIO <~> fromxmpp outh
- t3 <- forkIO <~> controlxmpp controlh
+ t2 <- forkIO <~> fromxmpp outh controlh
ok <- liftIO $ boolSystemEnv "git"
(mainparams ++ gitCommandLine params g)
(Just $ env ++ myenv)
- liftIO $ mapM_ killThread [t1, t2, t3]
+ liftIO $ mapM_ killThread [t1, t2]
return ok
where
toxmpp inh = forever $ do
b <- liftIO $ B.hGetSome inh 1024
- when (B.null b) $
- liftIO $ killThread =<< myThreadId
- sendNetMessage $ SendPackOutput cid b
- error "TODO"
- fromxmpp outh = forever $ do
- -- TODO get b from xmpp
- let b = undefined
- liftIO $ B.hPut outh b
- controlxmpp controlh = do
- -- TODO wait for control message from xmpp
- let exitcode = undefined :: Int
- liftIO $ hPutStrLn controlh (show exitcode)
-
+ if B.null b
+ then liftIO $ killThread =<< myThreadId
+ else sendNetMessage $ SendPackOutput cid b
+ fromxmpp outh controlh = forever $ do
+ m <- waitNetPushMessage
+ case m of
+ (ReceivePackOutput _ b) -> liftIO $ B.hPut outh b
+ (ReceivePackDone _ exitcode) -> do
+ liftIO $ hPutStrLn controlh (show exitcode)
+ _ -> noop
relayIn :: String
relayIn = "GIT_ANNEX_XMPPGIT_IN"
@@ -176,7 +174,7 @@ xmppGitRelay = do
{- Relays git receive-pack stdin and stdout via XMPP, as well as propigating
- its exit status to XMPP. -}
xmppReceivePack :: ClientID -> Assistant Bool
-xmppReceivePack cid = do
+xmppReceivePack cid = runPush (ReceivePushRunning cid) handleDeferred $ do
feeder <- asIO1 toxmpp
reader <- asIO1 fromxmpp
sendexitcode <- asIO1 $ sendNetMessage . ReceivePackDone cid
@@ -202,4 +200,36 @@ xmppReceivePack cid = do
else do
sendNetMessage $ ReceivePackOutput cid b
toxmpp outh
- fromxmpp _inh = error "TODO feed xmpp to inh"
+ fromxmpp inh = forever $ do
+ m <- waitNetPushMessage
+ case m of
+ (SendPackOutput _ b) -> liftIO $ B.hPut inh b
+ _ -> noop
+
+xmppRemotes :: ClientID -> Assistant [Remote]
+xmppRemotes cid = case baseJID <$> parseJID cid of
+ Nothing -> return []
+ Just jid -> do
+ rs <- syncRemotes <$> getDaemonStatus
+ let want = T.unpack $ formatJID jid
+ liftAnnex $ filterM (matching want) rs
+ where
+ matching want r = do
+ v <- getRemoteConfig (Remote.repo r) configKey ""
+ return $ v == want
+
+handleDeferred :: NetMessage -> Assistant ()
+handleDeferred = void . handlePush
+
+handlePush :: NetMessage -> Assistant Bool
+handlePush (PushRequest cid) = do
+ rs <- xmppRemotes cid
+ current <- liftAnnex $ inRepo Git.Branch.current
+ let refs = catMaybes [current, Just Annex.Branch.fullname]
+ any id <$> (forM rs $ \r -> xmppPush cid r refs)
+handlePush (StartingPush cid) = do
+ rs <- xmppRemotes cid
+ if null rs
+ then return False
+ else xmppReceivePack cid
+handlePush _ = return False