diff options
author | Joey Hess <joey@kitenet.net> | 2013-05-22 15:13:31 -0400 |
---|---|---|
committer | Joey Hess <joey@kitenet.net> | 2013-05-22 15:13:31 -0400 |
commit | 186434797dc41c815a07825072a63c9de1b47a25 (patch) | |
tree | 94ada4e3a7333a5e87557d5bc6ed51bb977f8e74 /Assistant/Threads | |
parent | dcbb9c33d5e82beb32a1068924f467d968ce9611 (diff) |
add two long-running XMPP push threads, no more inversion of control
I hope this will be easier to reason about, and less buggy. It was
certianly easier to write!
An immediate benefit is that with a traversable queue of push requests to
select from, the threads can be a lot fairer about choosing which client to
service next.
Diffstat (limited to 'Assistant/Threads')
-rw-r--r-- | Assistant/Threads/XMPPClient.hs | 7 | ||||
-rw-r--r-- | Assistant/Threads/XMPPPusher.hs | 81 |
2 files changed, 83 insertions, 5 deletions
diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs index 929b4c807..b90a8ca10 100644 --- a/Assistant/Threads/XMPPClient.hs +++ b/Assistant/Threads/XMPPClient.hs @@ -20,7 +20,6 @@ import qualified Remote import Utility.ThreadScheduler import Assistant.WebApp (UrlRenderer) import Assistant.WebApp.Types hiding (liftAssistant) -import Assistant.WebApp.Configurators.XMPP (checkCloudRepos) import Assistant.Alert import Assistant.Pairing import Assistant.XMPP.Git @@ -108,10 +107,8 @@ xmppClient urlrenderer d creds = maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c) handle _ (GotNetMessage m@(Pushing _ pushstage)) | isPushNotice pushstage = inAssistant $ handlePushNotice m - | isPushInitiation pushstage = inAssistant $ do - let checker = checkCloudRepos urlrenderer - void $ forkIO <~> handlePushInitiation checker m - | otherwise = void $ inAssistant $ storeInbox m + | isPushInitiation pushstage = inAssistant $ queuePushInitiation m + | otherwise = inAssistant $ storeInbox m handle _ (Ignorable _) = noop handle _ (Unknown _) = noop handle _ (ProtocolError _) = noop diff --git a/Assistant/Threads/XMPPPusher.hs b/Assistant/Threads/XMPPPusher.hs new file mode 100644 index 000000000..30c91c7f0 --- /dev/null +++ b/Assistant/Threads/XMPPPusher.hs @@ -0,0 +1,81 @@ +{- git-annex XMPP pusher threads + - + - This is a pair of threads. One handles git send-pack, + - and the other git receive-pack. Each thread can be running at most + - one such operation at a time. + - + - Why not use a single thread? Consider two clients A and B. + - If both decide to run a receive-pack at the same time to the other, + - they would deadlock with only one thread. For larger numbers of + - clients, the two threads are also sufficient. + - + - Copyright 2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.XMPPPusher where + +import Assistant.Common +import Assistant.NetMessager +import Assistant.Types.NetMessager +import Assistant.WebApp (UrlRenderer) +import Assistant.WebApp.Configurators.XMPP (checkCloudRepos) +import Assistant.XMPP.Git + +import Control.Exception as E + +xmppSendPackThread :: UrlRenderer -> NamedThread +xmppSendPackThread = pusherThread "XMPPSendPack" SendPack + +xmppReceivePackThread :: UrlRenderer -> NamedThread +xmppReceivePackThread = pusherThread "XMPPReceivePack" ReceivePack + +pusherThread :: String -> PushSide -> UrlRenderer -> NamedThread +pusherThread threadname side urlrenderer = namedThread threadname $ go Nothing + where + go lastpushedto = do + msg <- waitPushInitiation side $ selectNextPush lastpushedto + debug ["started running push", logNetMessage msg] + + runpush <- asIO $ runPush checker msg + r <- liftIO (E.try runpush :: IO (Either SomeException (Maybe ClientID))) + let successful = case r of + Right (Just _) -> True + _ -> False + + {- Empty the inbox, because stuff may have + - been left in it if the push failed. -} + let justpushedto = getclient msg + maybe noop (`emptyInbox` side) justpushedto + + debug ["finished running push", logNetMessage msg, show successful] + go $ if successful then justpushedto else lastpushedto + + checker = checkCloudRepos urlrenderer + + getclient (Pushing cid _) = Just cid + getclient _ = Nothing + +{- Select the next push to run from the queue. + - The queue cannot be empty! + - + - We prefer to select the most recently added push, because its requestor + - is more likely to still be connected. + - + - When passed the ID of a client we just pushed to, we prefer to not + - immediately push again to that same client. This avoids one client + - drowing out others. So pushes from the client we just pushed to are + - relocated to the beginning of the list, to be processed later. + -} +selectNextPush :: Maybe ClientID -> [NetMessage] -> (NetMessage, [NetMessage]) +selectNextPush _ (m:[]) = (m, []) -- common case +selectNextPush _ [] = error "selectNextPush: empty list" +selectNextPush lastpushedto l = go [] l + where + go (r:ejected) [] = (r, ejected) + go rejected (m:ms) = case m of + (Pushing clientid _) + | Just clientid /= lastpushedto -> (m, rejected ++ ms) + _ -> go (m:rejected) ms + go [] [] = undefined |