diff options
Diffstat (limited to 'Assistant')
-rw-r--r-- | Assistant/NetMessager.hs | 67 | ||||
-rw-r--r-- | Assistant/Threads/XMPPClient.hs | 13 | ||||
-rw-r--r-- | Assistant/Types/NetMessager.hs | 29 | ||||
-rw-r--r-- | Assistant/XMPP/Git.hs | 84 |
4 files changed, 157 insertions, 36 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs index 8895cb7db..8fac55c8a 100644 --- a/Assistant/NetMessager.hs +++ b/Assistant/NetMessager.hs @@ -10,8 +10,11 @@ module Assistant.NetMessager where import Assistant.Common import Assistant.Types.NetMessager +import Control.Concurrent import Control.Concurrent.STM import Control.Concurrent.MSampleVar +import Control.Exception as E +import qualified Data.Set as S sendNetMessage :: NetMessage -> Assistant () sendNetMessage m = @@ -26,3 +29,67 @@ notifyNetMessagerRestart = waitNetMessagerRestart :: Assistant () waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager) + +getPushRunning :: Assistant PushRunning +getPushRunning = + (atomically . readTMVar) <<~ (netMessagerPushRunning . netMessager) + +{- Runs an action that runs either the send or receive end of a push. + - + - While the push is running, netMessagesPush will get messages put into it + - relating to this push, while any messages relating to other pushes + - go to netMessagesDeferred. Once the push finishes, those deferred + - messages will be fed to handledeferred for processing. + -} +runPush :: PushRunning -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a +runPush v handledeferred a = do + nm <- getAssistant netMessager + let pr = netMessagerPushRunning nm + let setup = void $ atomically $ swapTMVar pr v + let cleanup = atomically $ do + void $ swapTMVar pr NoPushRunning + emptytchan (netMessagesPush nm) + r <- E.bracket_ setup cleanup <~> a + (void . forkIO) <~> processdeferred nm + return r + where + emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c + processdeferred nm = do + s <- liftIO $ atomically $ swapTMVar (netMessagesDeferredPush nm) S.empty + mapM_ rundeferred (S.toList s) + rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ())))) + <~> handledeferred m + +{- While a push is running, matching push messages are put into + - netMessagesPush, while others go to netMessagesDeferredPush. To avoid + - bloating memory, only PushRequest and StartingPush messages are + - deferred. + - + - When no push is running, returns False. + -} +queueNetPushMessage :: NetMessage -> Assistant Bool +queueNetPushMessage m = do + nm <- getAssistant netMessager + liftIO $ atomically $ do + running <- readTMVar (netMessagerPushRunning nm) + case running of + NoPushRunning -> return False + SendPushRunning cid -> go nm cid + ReceivePushRunning cid -> go nm cid + where + go nm cid + | getClientID m == Just cid = do + writeTChan (netMessagesPush nm) m + return True + | otherwise = do + case m of + PushRequest _ -> defer nm + StartingPush _ -> defer nm + _ -> noop + return True + defer nm = do + s <- takeTMVar (netMessagesDeferredPush nm) + putTMVar (netMessagesDeferredPush nm) $ S.insert m s + +waitNetPushMessage :: Assistant (NetMessage) +waitNetPushMessage = (atomically . readTChan) <<~ (netMessagesPush . netMessager) diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs index 1117c3c14..efdecb587 100644 --- a/Assistant/Threads/XMPPClient.hs +++ b/Assistant/Threads/XMPPClient.hs @@ -93,15 +93,14 @@ xmppClient urlrenderer d = do handle _ (PresenceMessage p) = void $ inAssistant $ updateBuddyList (updateBuddies p) <<~ buddyList handle _ (GotNetMessage QueryPresence) = putStanza gitAnnexSignature - handle _ (GotNetMessage (NotifyPush us)) = void $ inAssistant $ - pull us + handle _ (GotNetMessage (NotifyPush us)) = void $ inAssistant $ pull us handle selfjid (GotNetMessage (PairingNotification stage c u)) = maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c) - handle selfjid (GotNetMessage (PushRequest c)) = error "TODO" - handle selfjid (GotNetMessage (StartingPush c)) = error "TODO" - handle selfjid (GotNetMessage (ReceivePackOutput c b)) = error "TODO" - handle selfjid (GotNetMessage (SendPackOutput c b)) = error "TODO" - handle selfjid (GotNetMessage (ReceivePackDone c code)) = error "TODO" + handle _ (GotNetMessage m@(PushRequest _)) = inAssistant $ + unlessM (queueNetPushMessage m) $ void $ handlePush m + handle _ (GotNetMessage m@(StartingPush _)) = inAssistant $ + unlessM (queueNetPushMessage m) $ void $ handlePush m + handle _ (GotNetMessage m) = void $ inAssistant $ queueNetPushMessage m handle _ (Ignorable _) = noop handle _ (Unknown _) = noop handle _ (ProtocolError _) = noop diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs index 77f2759b3..6974cf57d 100644 --- a/Assistant/Types/NetMessager.hs +++ b/Assistant/Types/NetMessager.hs @@ -14,6 +14,7 @@ import Data.Text (Text) import Control.Concurrent.STM import Control.Concurrent.MSampleVar import Data.ByteString (ByteString) +import Data.Set as S {- Messages that can be sent out of band by a network messager. -} data NetMessage @@ -34,17 +35,41 @@ data NetMessage | SendPackOutput ClientID ByteString -- sent when git receive-pack exits, with its exit code | ReceivePackDone ClientID ExitCode - deriving (Show) + deriving (Show, Eq, Ord) -{- Something used to identify a specific client to send the message to. -} +{- Something used to identify the client, or clients to send the message to. -} type ClientID = Text +getClientID :: NetMessage -> Maybe ClientID +getClientID (NotifyPush _) = Nothing +getClientID QueryPresence = Nothing +getClientID (PairingNotification _ cid _) = Just cid +getClientID (PushRequest cid) = Just cid +getClientID (StartingPush cid) = Just cid +getClientID (ReceivePackOutput cid _) = Just cid +getClientID (SendPackOutput cid _) = Just cid +getClientID (ReceivePackDone cid _) = Just cid + data NetMessager = NetMessager + -- outgoing messages { netMessages :: TChan (NetMessage) + -- only one push can be running at a time, and this tracks it + , netMessagerPushRunning :: TMVar (PushRunning) + -- incoming messages relating to the currently running push + , netMessagesPush :: TChan (NetMessage) + -- incoming push messages that have been deferred to be processed later + , netMessagesDeferredPush :: TMVar (S.Set NetMessage) + -- write to this to restart the net messager , netMessagerRestart :: MSampleVar () } +data PushRunning = NoPushRunning | SendPushRunning ClientID | ReceivePushRunning ClientID + deriving (Eq) + newNetMessager :: IO NetMessager newNetMessager = NetMessager <$> atomically newTChan + <*> atomically (newTMVar NoPushRunning) + <*> atomically newTChan + <*> atomically (newTMVar S.empty) <*> newEmptySV diff --git a/Assistant/XMPP/Git.hs b/Assistant/XMPP/Git.hs index 7c4509c51..344f94327 100644 --- a/Assistant/XMPP/Git.hs +++ b/Assistant/XMPP/Git.hs @@ -20,6 +20,8 @@ import Annex.UUID import Config import Git import Git.Command +import qualified Git.Branch +import qualified Annex.Branch import Locations.UserConfig import qualified Types.Remote as Remote @@ -31,8 +33,8 @@ import System.Process (std_in, std_out, std_err) import Control.Concurrent import qualified Data.ByteString as B -configKey :: Remote -> ConfigKey -configKey r = remoteConfig (Remote.repo r) "xmppaddress" +configKey :: UnqualifiedConfigKey +configKey = "xmppaddress" finishXMPPPairing :: JID -> UUID -> Assistant () finishXMPPPairing jid u = void $ alertWhile alert $ @@ -53,13 +55,15 @@ makeXMPPGitRemote buddyname jid u = do liftAnnex $ do let r = Remote.repo remote storeUUID (remoteConfig r "uuid") u - setConfig (configKey remote) xmppaddress + setConfig (remoteConfig r configKey) xmppaddress syncNewRemote remote return True where xmppaddress = T.unpack $ formatJID $ baseJID jid -{- Pushes the named refs to the remote, over XMPP. +{- Pushes the named refs to the remote, over XMPP, communicating with a + - specific client that either requested the push, or responded to our + - StartingPush message. - - Strategy: Set GIT_SSH to run git-annex. By setting the remote url - to "xmppgit:dummy", "git-annex xmppgit" will be run locally by @@ -78,11 +82,9 @@ makeXMPPGitRemote buddyname jid u = do - - We listen at the other end of the pipe and relay to and from XMPP. -} -xmppPush :: Remote -> [Ref] -> Assistant Bool -xmppPush remote refs = error "TODO" - -xmppPush' :: ClientID -> Remote -> [Ref] -> Assistant Bool -xmppPush' cid remote refs = do +xmppPush :: ClientID -> Remote -> [Ref] -> Assistant Bool +xmppPush cid remote refs = runPush (SendPushRunning cid) handleDeferred $ do + sendNetMessage $ StartingPush cid program <- liftIO readProgramFile (Fd inf, writepush) <- liftIO createPipe @@ -107,30 +109,26 @@ xmppPush' cid remote refs = do liftIO $ hSetBuffering outh NoBuffering t1 <- forkIO <~> toxmpp inh - t2 <- forkIO <~> fromxmpp outh - t3 <- forkIO <~> controlxmpp controlh + t2 <- forkIO <~> fromxmpp outh controlh ok <- liftIO $ boolSystemEnv "git" (mainparams ++ gitCommandLine params g) (Just $ env ++ myenv) - liftIO $ mapM_ killThread [t1, t2, t3] + liftIO $ mapM_ killThread [t1, t2] return ok where toxmpp inh = forever $ do b <- liftIO $ B.hGetSome inh 1024 - when (B.null b) $ - liftIO $ killThread =<< myThreadId - sendNetMessage $ SendPackOutput cid b - error "TODO" - fromxmpp outh = forever $ do - -- TODO get b from xmpp - let b = undefined - liftIO $ B.hPut outh b - controlxmpp controlh = do - -- TODO wait for control message from xmpp - let exitcode = undefined :: Int - liftIO $ hPutStrLn controlh (show exitcode) - + if B.null b + then liftIO $ killThread =<< myThreadId + else sendNetMessage $ SendPackOutput cid b + fromxmpp outh controlh = forever $ do + m <- waitNetPushMessage + case m of + (ReceivePackOutput _ b) -> liftIO $ B.hPut outh b + (ReceivePackDone _ exitcode) -> do + liftIO $ hPutStrLn controlh (show exitcode) + _ -> noop relayIn :: String relayIn = "GIT_ANNEX_XMPPGIT_IN" @@ -176,7 +174,7 @@ xmppGitRelay = do {- Relays git receive-pack stdin and stdout via XMPP, as well as propigating - its exit status to XMPP. -} xmppReceivePack :: ClientID -> Assistant Bool -xmppReceivePack cid = do +xmppReceivePack cid = runPush (ReceivePushRunning cid) handleDeferred $ do feeder <- asIO1 toxmpp reader <- asIO1 fromxmpp sendexitcode <- asIO1 $ sendNetMessage . ReceivePackDone cid @@ -202,4 +200,36 @@ xmppReceivePack cid = do else do sendNetMessage $ ReceivePackOutput cid b toxmpp outh - fromxmpp _inh = error "TODO feed xmpp to inh" + fromxmpp inh = forever $ do + m <- waitNetPushMessage + case m of + (SendPackOutput _ b) -> liftIO $ B.hPut inh b + _ -> noop + +xmppRemotes :: ClientID -> Assistant [Remote] +xmppRemotes cid = case baseJID <$> parseJID cid of + Nothing -> return [] + Just jid -> do + rs <- syncRemotes <$> getDaemonStatus + let want = T.unpack $ formatJID jid + liftAnnex $ filterM (matching want) rs + where + matching want r = do + v <- getRemoteConfig (Remote.repo r) configKey "" + return $ v == want + +handleDeferred :: NetMessage -> Assistant () +handleDeferred = void . handlePush + +handlePush :: NetMessage -> Assistant Bool +handlePush (PushRequest cid) = do + rs <- xmppRemotes cid + current <- liftAnnex $ inRepo Git.Branch.current + let refs = catMaybes [current, Just Annex.Branch.fullname] + any id <$> (forM rs $ \r -> xmppPush cid r refs) +handlePush (StartingPush cid) = do + rs <- xmppRemotes cid + if null rs + then return False + else xmppReceivePack cid +handlePush _ = return False |