summaryrefslogtreecommitdiff
path: root/Assistant/Threads
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2013-05-22 15:13:31 -0400
committerGravatar Joey Hess <joey@kitenet.net>2013-05-22 15:13:31 -0400
commit186434797dc41c815a07825072a63c9de1b47a25 (patch)
tree94ada4e3a7333a5e87557d5bc6ed51bb977f8e74 /Assistant/Threads
parentdcbb9c33d5e82beb32a1068924f467d968ce9611 (diff)
add two long-running XMPP push threads, no more inversion of control
I hope this will be easier to reason about, and less buggy. It was certianly easier to write! An immediate benefit is that with a traversable queue of push requests to select from, the threads can be a lot fairer about choosing which client to service next.
Diffstat (limited to 'Assistant/Threads')
-rw-r--r--Assistant/Threads/XMPPClient.hs7
-rw-r--r--Assistant/Threads/XMPPPusher.hs81
2 files changed, 83 insertions, 5 deletions
diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs
index 929b4c807..b90a8ca10 100644
--- a/Assistant/Threads/XMPPClient.hs
+++ b/Assistant/Threads/XMPPClient.hs
@@ -20,7 +20,6 @@ import qualified Remote
import Utility.ThreadScheduler
import Assistant.WebApp (UrlRenderer)
import Assistant.WebApp.Types hiding (liftAssistant)
-import Assistant.WebApp.Configurators.XMPP (checkCloudRepos)
import Assistant.Alert
import Assistant.Pairing
import Assistant.XMPP.Git
@@ -108,10 +107,8 @@ xmppClient urlrenderer d creds =
maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c)
handle _ (GotNetMessage m@(Pushing _ pushstage))
| isPushNotice pushstage = inAssistant $ handlePushNotice m
- | isPushInitiation pushstage = inAssistant $ do
- let checker = checkCloudRepos urlrenderer
- void $ forkIO <~> handlePushInitiation checker m
- | otherwise = void $ inAssistant $ storeInbox m
+ | isPushInitiation pushstage = inAssistant $ queuePushInitiation m
+ | otherwise = inAssistant $ storeInbox m
handle _ (Ignorable _) = noop
handle _ (Unknown _) = noop
handle _ (ProtocolError _) = noop
diff --git a/Assistant/Threads/XMPPPusher.hs b/Assistant/Threads/XMPPPusher.hs
new file mode 100644
index 000000000..30c91c7f0
--- /dev/null
+++ b/Assistant/Threads/XMPPPusher.hs
@@ -0,0 +1,81 @@
+{- git-annex XMPP pusher threads
+ -
+ - This is a pair of threads. One handles git send-pack,
+ - and the other git receive-pack. Each thread can be running at most
+ - one such operation at a time.
+ -
+ - Why not use a single thread? Consider two clients A and B.
+ - If both decide to run a receive-pack at the same time to the other,
+ - they would deadlock with only one thread. For larger numbers of
+ - clients, the two threads are also sufficient.
+ -
+ - Copyright 2013 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.XMPPPusher where
+
+import Assistant.Common
+import Assistant.NetMessager
+import Assistant.Types.NetMessager
+import Assistant.WebApp (UrlRenderer)
+import Assistant.WebApp.Configurators.XMPP (checkCloudRepos)
+import Assistant.XMPP.Git
+
+import Control.Exception as E
+
+xmppSendPackThread :: UrlRenderer -> NamedThread
+xmppSendPackThread = pusherThread "XMPPSendPack" SendPack
+
+xmppReceivePackThread :: UrlRenderer -> NamedThread
+xmppReceivePackThread = pusherThread "XMPPReceivePack" ReceivePack
+
+pusherThread :: String -> PushSide -> UrlRenderer -> NamedThread
+pusherThread threadname side urlrenderer = namedThread threadname $ go Nothing
+ where
+ go lastpushedto = do
+ msg <- waitPushInitiation side $ selectNextPush lastpushedto
+ debug ["started running push", logNetMessage msg]
+
+ runpush <- asIO $ runPush checker msg
+ r <- liftIO (E.try runpush :: IO (Either SomeException (Maybe ClientID)))
+ let successful = case r of
+ Right (Just _) -> True
+ _ -> False
+
+ {- Empty the inbox, because stuff may have
+ - been left in it if the push failed. -}
+ let justpushedto = getclient msg
+ maybe noop (`emptyInbox` side) justpushedto
+
+ debug ["finished running push", logNetMessage msg, show successful]
+ go $ if successful then justpushedto else lastpushedto
+
+ checker = checkCloudRepos urlrenderer
+
+ getclient (Pushing cid _) = Just cid
+ getclient _ = Nothing
+
+{- Select the next push to run from the queue.
+ - The queue cannot be empty!
+ -
+ - We prefer to select the most recently added push, because its requestor
+ - is more likely to still be connected.
+ -
+ - When passed the ID of a client we just pushed to, we prefer to not
+ - immediately push again to that same client. This avoids one client
+ - drowing out others. So pushes from the client we just pushed to are
+ - relocated to the beginning of the list, to be processed later.
+ -}
+selectNextPush :: Maybe ClientID -> [NetMessage] -> (NetMessage, [NetMessage])
+selectNextPush _ (m:[]) = (m, []) -- common case
+selectNextPush _ [] = error "selectNextPush: empty list"
+selectNextPush lastpushedto l = go [] l
+ where
+ go (r:ejected) [] = (r, ejected)
+ go rejected (m:ms) = case m of
+ (Pushing clientid _)
+ | Just clientid /= lastpushedto -> (m, rejected ++ ms)
+ _ -> go (m:rejected) ms
+ go [] [] = undefined