summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Assistant.hs3
-rw-r--r--Assistant/NetMessager.hs105
-rw-r--r--Assistant/Threads/XMPPClient.hs7
-rw-r--r--Assistant/Threads/XMPPPusher.hs81
-rw-r--r--Assistant/Types/NetMessager.hs23
-rw-r--r--Assistant/XMPP/Git.hs39
-rw-r--r--Utility/TList.hs15
7 files changed, 166 insertions, 107 deletions
diff --git a/Assistant.hs b/Assistant.hs
index 36fef049d..7e9f82449 100644
--- a/Assistant.hs
+++ b/Assistant.hs
@@ -37,6 +37,7 @@ import Assistant.Threads.PairListener
#endif
#ifdef WITH_XMPP
import Assistant.Threads.XMPPClient
+import Assistant.Threads.XMPPPusher
#endif
#else
#warning Building without the webapp. You probably need to install Yesod..
@@ -111,6 +112,8 @@ startDaemon assistant foreground listenhost startbrowser = do
#endif
#ifdef WITH_XMPP
, assist $ xmppClientThread urlrenderer
+ , assist $ xmppSendPackThread urlrenderer
+ , assist $ xmppReceivePackThread urlrenderer
#endif
#endif
, assist $ pushThread
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs
index fe96df56f..2e786717d 100644
--- a/Assistant/NetMessager.hs
+++ b/Assistant/NetMessager.hs
@@ -14,7 +14,6 @@ import Assistant.Types.NetMessager
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
-import Control.Exception as E
import qualified Data.Set as S
import qualified Data.Map as M
import qualified Data.DList as D
@@ -69,73 +68,39 @@ checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager
sent <- M.lookup sentclient <$> (readTMVar $ sentImportantNetMessages nm)
return (fromMaybe S.empty stored, fromMaybe S.empty sent)
-{- Runs an action that runs either the send or receive side of a push.
- - Only one such action per side can run at a time. Other pushes, for
- - the same, or other clients, need to wait their turn.
- -
- - Only one push is allowed to wait per client.
- - There is no point in building up more.
- -
- - While the push is running, netMessagesPush will get messages put into it
- - relating to this push, while any messages relating to other pushes
- - on the same side go to netMessagesDeferred. Once the push finishes,
- - those deferred messages will be fed to handledeferred for processing.
- -}
-runPush :: PushSide -> ClientID -> Assistant Bool -> Assistant Bool
-runPush side clientid a = do
- debugmsg "preparing to run"
- nm <- getAssistant netMessager
- let setup = do
- (canrun, release) <- atomically $ checkcanrun nm
- if canrun
- then atomically $ waittorun nm release
- else return (False, noop)
- let cleanup (_, release) = atomically release
- go <- asIO1 $ \(run, _) ->
- if run
- then do
- debugmsg "started running"
- r <- a
- debugmsg "finished running"
- {- Empty the inbox, because stuff may have
- - been left in it if the push failed. -}
- emptyInbox clientid side
- return r
- else do
- debugmsg "skipping running"
- return False
- r <- liftIO $ E.bracket setup cleanup go
- return r
+{- Queues a push initiation message in the queue for the appropriate
+ - side of the push but only if there is not already an initiation message
+ - from the same client in the queue. -}
+queuePushInitiation :: NetMessage -> Assistant ()
+queuePushInitiation msg@(Pushing clientid stage) = do
+ tv <- getPushInitiationQueue side
+ liftIO $ atomically $ do
+ r <- tryTakeTMVar tv
+ case r of
+ Nothing -> putTMVar tv [msg]
+ Just l -> do
+ let !l' = msg : filter differentclient l
+ putTMVar tv l'
where
- debugmsg s = netMessagerDebug clientid [s, show side]
- -- check that this is one of the two threads allowed
- -- to run at the same time, pushing to the same client
- -- on the same side
- checkcanrun nm = do
- let v = getSide side $ netMessagerPushThreadCount nm
- m <- readTVar v
- case M.lookup clientid m of
- Just count
- | count > 2 -> return (False, noop)
- _ -> do
- writeTVar v $
- M.insertWith' (+) clientid 1 m
- let release = modifyTVar' v $
- M.insertWith' (-) clientid 1
- return (True, release)
- -- block until this is the only thread performing
- -- a push on this side, to any client
- waittorun nm release = do
- let v = getSide side $ netMessagerPushRunning nm
- ifM (isNothing <$> tryReadTMVar v)
- ( do
- putTMVar v clientid
- let release' = do
- void $ takeTMVar v
- release
- return (True, release')
- , retry
- )
+ side = pushDestinationSide stage
+ differentclient (Pushing cid _) = cid /= clientid
+ differentclient _ = True
+queuePushInitiation _ = noop
+
+{- Waits for a push inititation message to be received, and runs
+ - function to select a message from the queue. -}
+waitPushInitiation :: PushSide -> ([NetMessage] -> (NetMessage, [NetMessage])) -> Assistant NetMessage
+waitPushInitiation side selector = do
+ tv <- getPushInitiationQueue side
+ liftIO $ atomically $ do
+ q <- takeTMVar tv
+ if null q
+ then retry
+ else do
+ let (msg, !q') = selector q
+ unless (null q') $
+ putTMVar tv q'
+ return msg
{- Stores messages for a push into the appropriate inbox.
-
@@ -198,7 +163,11 @@ emptyInbox clientid side = do
getInboxes :: PushSide -> Assistant Inboxes
getInboxes side =
- getSide side . netMessagesInboxes <$> getAssistant netMessager
+ getSide side . netMessagerInboxes <$> getAssistant netMessager
+
+getPushInitiationQueue :: PushSide -> Assistant (TMVar [NetMessage])
+getPushInitiationQueue side =
+ getSide side . netMessagerPushInitiations <$> getAssistant netMessager
netMessagerDebug :: ClientID -> [String] -> Assistant ()
netMessagerDebug clientid l = debug $
diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs
index 929b4c807..b90a8ca10 100644
--- a/Assistant/Threads/XMPPClient.hs
+++ b/Assistant/Threads/XMPPClient.hs
@@ -20,7 +20,6 @@ import qualified Remote
import Utility.ThreadScheduler
import Assistant.WebApp (UrlRenderer)
import Assistant.WebApp.Types hiding (liftAssistant)
-import Assistant.WebApp.Configurators.XMPP (checkCloudRepos)
import Assistant.Alert
import Assistant.Pairing
import Assistant.XMPP.Git
@@ -108,10 +107,8 @@ xmppClient urlrenderer d creds =
maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c)
handle _ (GotNetMessage m@(Pushing _ pushstage))
| isPushNotice pushstage = inAssistant $ handlePushNotice m
- | isPushInitiation pushstage = inAssistant $ do
- let checker = checkCloudRepos urlrenderer
- void $ forkIO <~> handlePushInitiation checker m
- | otherwise = void $ inAssistant $ storeInbox m
+ | isPushInitiation pushstage = inAssistant $ queuePushInitiation m
+ | otherwise = inAssistant $ storeInbox m
handle _ (Ignorable _) = noop
handle _ (Unknown _) = noop
handle _ (ProtocolError _) = noop
diff --git a/Assistant/Threads/XMPPPusher.hs b/Assistant/Threads/XMPPPusher.hs
new file mode 100644
index 000000000..30c91c7f0
--- /dev/null
+++ b/Assistant/Threads/XMPPPusher.hs
@@ -0,0 +1,81 @@
+{- git-annex XMPP pusher threads
+ -
+ - This is a pair of threads. One handles git send-pack,
+ - and the other git receive-pack. Each thread can be running at most
+ - one such operation at a time.
+ -
+ - Why not use a single thread? Consider two clients A and B.
+ - If both decide to run a receive-pack at the same time to the other,
+ - they would deadlock with only one thread. For larger numbers of
+ - clients, the two threads are also sufficient.
+ -
+ - Copyright 2013 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.XMPPPusher where
+
+import Assistant.Common
+import Assistant.NetMessager
+import Assistant.Types.NetMessager
+import Assistant.WebApp (UrlRenderer)
+import Assistant.WebApp.Configurators.XMPP (checkCloudRepos)
+import Assistant.XMPP.Git
+
+import Control.Exception as E
+
+xmppSendPackThread :: UrlRenderer -> NamedThread
+xmppSendPackThread = pusherThread "XMPPSendPack" SendPack
+
+xmppReceivePackThread :: UrlRenderer -> NamedThread
+xmppReceivePackThread = pusherThread "XMPPReceivePack" ReceivePack
+
+pusherThread :: String -> PushSide -> UrlRenderer -> NamedThread
+pusherThread threadname side urlrenderer = namedThread threadname $ go Nothing
+ where
+ go lastpushedto = do
+ msg <- waitPushInitiation side $ selectNextPush lastpushedto
+ debug ["started running push", logNetMessage msg]
+
+ runpush <- asIO $ runPush checker msg
+ r <- liftIO (E.try runpush :: IO (Either SomeException (Maybe ClientID)))
+ let successful = case r of
+ Right (Just _) -> True
+ _ -> False
+
+ {- Empty the inbox, because stuff may have
+ - been left in it if the push failed. -}
+ let justpushedto = getclient msg
+ maybe noop (`emptyInbox` side) justpushedto
+
+ debug ["finished running push", logNetMessage msg, show successful]
+ go $ if successful then justpushedto else lastpushedto
+
+ checker = checkCloudRepos urlrenderer
+
+ getclient (Pushing cid _) = Just cid
+ getclient _ = Nothing
+
+{- Select the next push to run from the queue.
+ - The queue cannot be empty!
+ -
+ - We prefer to select the most recently added push, because its requestor
+ - is more likely to still be connected.
+ -
+ - When passed the ID of a client we just pushed to, we prefer to not
+ - immediately push again to that same client. This avoids one client
+ - drowing out others. So pushes from the client we just pushed to are
+ - relocated to the beginning of the list, to be processed later.
+ -}
+selectNextPush :: Maybe ClientID -> [NetMessage] -> (NetMessage, [NetMessage])
+selectNextPush _ (m:[]) = (m, []) -- common case
+selectNextPush _ [] = error "selectNextPush: empty list"
+selectNextPush lastpushedto l = go [] l
+ where
+ go (r:ejected) [] = (r, ejected)
+ go rejected (m:ms) = case m of
+ (Pushing clientid _)
+ | Just clientid /= lastpushedto -> (m, rejected ++ ms)
+ _ -> go (m:rejected) ms
+ go [] [] = undefined
diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs
index 525ff29f2..4b4e614a2 100644
--- a/Assistant/Types/NetMessager.hs
+++ b/Assistant/Types/NetMessager.hs
@@ -11,15 +11,15 @@ import Common.Annex
import Assistant.Pairing
import Git.Types
+import qualified Data.Text as T
+import qualified Data.Set as S
+import qualified Data.Map as M
+import qualified Data.DList as D
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as B8
import Data.Text (Text)
-import qualified Data.Text as T
-import qualified Data.Set as S
-import qualified Data.Map as M
-import qualified Data.DList as D
{- Messages that can be sent out of band by a network messager. -}
data NetMessage
@@ -130,15 +130,11 @@ data NetMessager = NetMessager
, sentImportantNetMessages :: TMVar (M.Map ClientID (S.Set NetMessage))
-- write to this to restart the net messager
, netMessagerRestart :: MSampleVar ()
- -- only one side of a push can be running at a time
- -- the TMVars are empty when nothing is running
- , netMessagerPushRunning :: SideMap (TMVar ClientID)
- -- number of threads trying to push to the same client
- -- at the same time (either running, or waiting to run)
- , netMessagerPushThreadCount :: SideMap (TVar (M.Map ClientID Int))
- -- incoming messages containing data for a push,
- -- on a per-client and per-side basis
- , netMessagesInboxes :: SideMap Inboxes
+ -- queue of incoming messages that request the initiation of pushes
+ , netMessagerPushInitiations :: SideMap (TMVar [NetMessage])
+ -- incoming messages containing data for a running
+ -- (or not yet started) push
+ , netMessagerInboxes :: SideMap Inboxes
}
newNetMessager :: IO NetMessager
@@ -149,4 +145,3 @@ newNetMessager = NetMessager
<*> newEmptySV
<*> mkSideMap newEmptyTMVar
<*> mkSideMap (newTVar M.empty)
- <*> mkSideMap (newTVar M.empty)
diff --git a/Assistant/XMPP/Git.hs b/Assistant/XMPP/Git.hs
index 1e8ccca62..01585a711 100644
--- a/Assistant/XMPP/Git.hs
+++ b/Assistant/XMPP/Git.hs
@@ -101,7 +101,7 @@ makeXMPPGitRemote buddyname jid u = do
- We listen at the other end of the pipe and relay to and from XMPP.
-}
xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> Assistant Bool
-xmppPush cid gitpush = runPush SendPack cid $ do
+xmppPush cid gitpush = do
u <- liftAnnex getUUID
sendNetMessage $ Pushing cid (StartingPush u)
@@ -239,7 +239,7 @@ xmppGitRelay = do
{- Relays git receive-pack stdin and stdout via XMPP, as well as propigating
- its exit status to XMPP. -}
xmppReceivePack :: ClientID -> Assistant Bool
-xmppReceivePack cid = runPush ReceivePack cid $ do
+xmppReceivePack cid = do
repodir <- liftAnnex $ fromRepo repoPath
let p = (proc "git" ["receive-pack", repodir])
{ std_in = CreatePipe
@@ -288,11 +288,12 @@ xmppRemotes cid theiruuid = case baseJID <$> parseJID cid of
matching loc r = repoIsUrl r && repoLocation r == loc
knownuuid um r = Remote.uuid r == theiruuid || M.member theiruuid um
-handlePushInitiation :: (Remote -> Assistant ()) -> NetMessage -> Assistant ()
-handlePushInitiation checkcloudrepos (Pushing cid (PushRequest theiruuid)) =
+{- Returns the ClientID that it pushed to. -}
+runPush :: (Remote -> Assistant ()) -> NetMessage -> Assistant (Maybe ClientID)
+runPush checkcloudrepos (Pushing cid (PushRequest theiruuid)) =
go =<< liftAnnex (inRepo Git.Branch.current)
where
- go Nothing = noop
+ go Nothing = return Nothing
go (Just branch) = do
rs <- xmppRemotes cid theiruuid
liftAnnex $ Annex.Branch.commit "update"
@@ -301,17 +302,24 @@ handlePushInitiation checkcloudrepos (Pushing cid (PushRequest theiruuid)) =
<*> getUUID
liftIO $ Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
selfjid <- ((T.unpack <$>) . xmppClientID) <$> getDaemonStatus
- forM_ rs $ \r -> do
- void $ alertWhile (syncAlert [r]) $
- xmppPush cid (taggedPush u selfjid branch r)
- checkcloudrepos r
-handlePushInitiation checkcloudrepos (Pushing cid (StartingPush theiruuid)) = do
+ if null rs
+ then return Nothing
+ else do
+ forM_ rs $ \r -> do
+ void $ alertWhile (syncAlert [r]) $
+ xmppPush cid (taggedPush u selfjid branch r)
+ checkcloudrepos r
+ return $ Just cid
+runPush checkcloudrepos (Pushing cid (StartingPush theiruuid)) = do
rs <- xmppRemotes cid theiruuid
- unless (null rs) $ do
- void $ alertWhile (syncAlert rs) $
- xmppReceivePack cid
- mapM_ checkcloudrepos rs
-handlePushInitiation _ _ = noop
+ if null rs
+ then return Nothing
+ else do
+ void $ alertWhile (syncAlert rs) $
+ xmppReceivePack cid
+ mapM_ checkcloudrepos rs
+ return $ Just cid
+runPush _ _ = return Nothing
{- Check if any of the shas that can be pushed are ones we do not
- have.
@@ -370,4 +378,3 @@ extractSequence :: NetMessage -> Maybe Int
extractSequence (Pushing _ (ReceivePackOutput seqnum _)) = Just seqnum
extractSequence (Pushing _ (SendPackOutput seqnum _)) = Just seqnum
extractSequence _ = Nothing
-
diff --git a/Utility/TList.hs b/Utility/TList.hs
index 716f72017..e4bb95498 100644
--- a/Utility/TList.hs
+++ b/Utility/TList.hs
@@ -23,7 +23,17 @@ newTList = newEmptyTMVar
{- Gets the contents of the TList. Blocks when empty.
- TList is left empty. -}
getTList :: TList a -> STM [a]
-getTList tlist = D.toList <$> takeTMVar tlist
+getTList tlist = D.toList <$> getTDList tlist
+
+getTDList :: TList a -> STM (D.DList a)
+getTDList = takeTMVar
+
+{- Replaces the contents of the TList. -}
+setTList :: TList a -> [a] -> STM ()
+setTList tlist = setTDList tlist . D.fromList
+
+setTDList :: TList a -> D.DList a -> STM ()
+setTDList tlist = modifyTList tlist . const
{- Takes anything currently in the TList, without blocking.
- TList is left empty. -}
@@ -54,6 +64,3 @@ snocTList tlist v = modifyTList tlist $ \dl -> D.snoc dl v
appendTList :: TList a -> [a] -> STM ()
appendTList tlist l = modifyTList tlist $ \dl -> D.append dl (D.fromList l)
-
-setTList :: TList a -> [a] -> STM ()
-setTList tlist l = modifyTList tlist $ const $ D.fromList l