diff options
Diffstat (limited to 'Assistant/Threads')
-rw-r--r-- | Assistant/Threads/Merger.hs | 26 | ||||
-rw-r--r-- | Assistant/Threads/MountWatcher.hs | 2 | ||||
-rw-r--r-- | Assistant/Threads/NetWatcher.hs | 9 | ||||
-rw-r--r-- | Assistant/Threads/Pusher.hs | 4 | ||||
-rw-r--r-- | Assistant/Threads/TransferScanner.hs | 2 | ||||
-rw-r--r-- | Assistant/Threads/WebApp.hs | 3 | ||||
-rw-r--r-- | Assistant/Threads/XMPPClient.hs | 375 | ||||
-rw-r--r-- | Assistant/Threads/XMPPPusher.hs | 82 |
8 files changed, 10 insertions, 493 deletions
diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs index ce0dfbcb4..0bb37e664 100644 --- a/Assistant/Threads/Merger.hs +++ b/Assistant/Threads/Merger.hs @@ -10,19 +10,12 @@ module Assistant.Threads.Merger where import Assistant.Common import Assistant.TransferQueue import Assistant.BranchChange -import Assistant.DaemonStatus -import Assistant.ScanRemotes import Utility.DirWatcher import Utility.DirWatcher.Types import qualified Annex.Branch import qualified Git import qualified Git.Branch import qualified Command.Sync -import Annex.TaggedPush -import Remote (remoteFromUUID) - -import qualified Data.Set as S -import qualified Data.Text as T {- This thread watches for changes to .git/refs/, and handles incoming - pushes. -} @@ -69,8 +62,7 @@ onChange file branchChanged diverged <- liftAnnex Annex.Branch.forceUpdate when diverged $ - unlessM handleDesynced $ - queueDeferredDownloads "retrying deferred download" Later + queueDeferredDownloads "retrying deferred download" Later | "/synced/" `isInfixOf` file = mergecurrent =<< liftAnnex (join Command.Sync.getCurrBranch) | otherwise = noop @@ -90,22 +82,6 @@ onChange file changedbranch mergecurrent _ = noop - handleDesynced = case fromTaggedBranch changedbranch of - Nothing -> return False - Just (u, info) -> do - mr <- liftAnnex $ remoteFromUUID u - case mr of - Nothing -> return False - Just r -> do - s <- desynced <$> getDaemonStatus - if S.member u s || Just (T.unpack $ getXMPPClientID r) == info - then do - modifyDaemonStatus_ $ \st -> st - { desynced = S.delete u s } - addScanRemotes True [r] - return True - else return False - equivBranches :: Git.Ref -> Git.Ref -> Bool equivBranches x y = base x == base y where diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs index a741b731d..bd8d0e614 100644 --- a/Assistant/Threads/MountWatcher.hs +++ b/Assistant/Threads/MountWatcher.hs @@ -146,7 +146,7 @@ handleMount urlrenderer dir = do debug ["detected mount of", dir] rs <- filter (Git.repoIsLocal . Remote.repo) <$> remotesUnder dir mapM_ (fsckNudge urlrenderer . Just) rs - reconnectRemotes True rs + reconnectRemotes rs {- Finds remotes located underneath the mount point. - diff --git a/Assistant/Threads/NetWatcher.hs b/Assistant/Threads/NetWatcher.hs index 52f8db474..4dc8721b1 100644 --- a/Assistant/Threads/NetWatcher.hs +++ b/Assistant/Threads/NetWatcher.hs @@ -22,7 +22,6 @@ import Assistant.RemoteControl import Utility.DBus import DBus.Client import DBus -import Assistant.NetMessager #else #ifdef linux_HOST_OS #warning Building without dbus support; will poll for network connection changes @@ -44,9 +43,8 @@ netWatcherThread = thread noop - while (despite the local network staying up), are synced with - periodically. - - - Note that it does not call notifyNetMessagerRestart, or - - signal the RemoteControl, because it doesn't know that the - - network has changed. + - Note that it does not signal the RemoteControl, because it doesn't + - know that the network has changed. -} netWatcherFallbackThread :: NamedThread netWatcherFallbackThread = namedThread "NetWatcherFallback" $ @@ -76,7 +74,6 @@ dbusThread = do sendRemoteControl LOSTNET connchange True = do debug ["detected network connection"] - notifyNetMessagerRestart handleConnection sendRemoteControl RESUME onerr e _ = do @@ -197,7 +194,7 @@ listenWicdConnections client setconnected = do handleConnection :: Assistant () handleConnection = do liftIO . sendNotification . networkConnectedNotifier =<< getDaemonStatus - reconnectRemotes True =<< networkRemotes + reconnectRemotes =<< networkRemotes {- Network remotes to sync with. -} networkRemotes :: Assistant [Remote] diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs index 35989ed48..5b4055885 100644 --- a/Assistant/Threads/Pusher.hs +++ b/Assistant/Threads/Pusher.hs @@ -24,7 +24,7 @@ pushRetryThread = namedThread "PushRetrier" $ runEvery (Seconds halfhour) <~> do topush <- getFailedPushesBefore (fromIntegral halfhour) unless (null topush) $ do debug ["retrying", show (length topush), "failed pushes"] - void $ pushToRemotes True topush + void $ pushToRemotes topush where halfhour = 1800 @@ -35,7 +35,7 @@ pushThread = namedThread "Pusher" $ runEvery (Seconds 2) <~> do -- Next, wait until at least one commit has been made void getCommits -- Now see if now's a good time to push. - void $ pushToRemotes True =<< pushTargets + void $ pushToRemotes =<< pushTargets {- We want to avoid pushing to remotes that are marked readonly. - diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs index 10aed20b0..a55a3496e 100644 --- a/Assistant/Threads/TransferScanner.hs +++ b/Assistant/Threads/TransferScanner.hs @@ -76,7 +76,7 @@ transferScannerThread urlrenderer = namedThread "TransferScanner" $ do - to determine if the remote has been emptied. -} startupScan = do - reconnectRemotes True =<< syncGitRemotes <$> getDaemonStatus + reconnectRemotes =<< syncGitRemotes <$> getDaemonStatus addScanRemotes True =<< syncDataRemotes <$> getDaemonStatus {- This is a cheap scan for failed transfers involving a remote. -} diff --git a/Assistant/Threads/WebApp.hs b/Assistant/Threads/WebApp.hs index a5cd38504..dfb631bc6 100644 --- a/Assistant/Threads/WebApp.hs +++ b/Assistant/Threads/WebApp.hs @@ -26,7 +26,6 @@ import Assistant.WebApp.Configurators.Pairing import Assistant.WebApp.Configurators.AWS import Assistant.WebApp.Configurators.IA import Assistant.WebApp.Configurators.WebDAV -import Assistant.WebApp.Configurators.XMPP import Assistant.WebApp.Configurators.Preferences import Assistant.WebApp.Configurators.Unused import Assistant.WebApp.Configurators.Edit @@ -37,6 +36,7 @@ import Assistant.WebApp.Documentation import Assistant.WebApp.Control import Assistant.WebApp.OtherRepos import Assistant.WebApp.Repair +import Assistant.WebApp.Pairing import Assistant.Types.ThreadedMonad import Utility.WebApp import Utility.AuthToken @@ -83,6 +83,7 @@ webAppThread assistantdata urlrenderer noannex cannotrun postfirstrun listenhost <*> pure cannotrun <*> pure noannex <*> pure listenhost' + <*> newWormholePairingState setUrlRenderer urlrenderer $ yesodRender webapp (pack "") app <- toWaiAppPlain webapp app' <- ifM debugEnabled diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs deleted file mode 100644 index 2b68ecbe1..000000000 --- a/Assistant/Threads/XMPPClient.hs +++ /dev/null @@ -1,375 +0,0 @@ -{- git-annex XMPP client - - - - Copyright 2012, 2013 Joey Hess <id@joeyh.name> - - - - Licensed under the GNU GPL version 3 or higher. - -} - -module Assistant.Threads.XMPPClient where - -import Assistant.Common hiding (ProtocolError) -import Assistant.XMPP -import Assistant.XMPP.Client -import Assistant.NetMessager -import Assistant.Types.NetMessager -import Assistant.Types.Buddies -import Assistant.XMPP.Buddies -import Assistant.Sync -import Assistant.DaemonStatus -import qualified Remote -import Utility.ThreadScheduler -import Assistant.WebApp (UrlRenderer) -import Assistant.WebApp.Types hiding (liftAssistant) -import Assistant.Alert -import Assistant.Pairing -import Assistant.XMPP.Git -import Annex.UUID -import Logs.UUID -import qualified Command.Sync - -import Network.Protocol.XMPP -import Control.Concurrent -import Control.Concurrent.STM.TMVar -import Control.Concurrent.STM (atomically) -import qualified Data.Text as T -import qualified Data.Set as S -import qualified Data.Map as M -import Data.Time.Clock -import Control.Concurrent.Async - -xmppClientThread :: UrlRenderer -> NamedThread -xmppClientThread urlrenderer = namedThread "XMPPClient" $ - restartableClient . xmppClient urlrenderer =<< getAssistant id - -{- Runs the client, handing restart events. -} -restartableClient :: (XMPPCreds -> UUID -> IO ()) -> Assistant () -restartableClient a = forever $ go =<< liftAnnex getXMPPCreds - where - go Nothing = waitNetMessagerRestart - go (Just creds) = do - xmppuuid <- maybe NoUUID Remote.uuid . headMaybe - . filter Remote.isXMPPRemote . syncRemotes - <$> getDaemonStatus - tid <- liftIO $ forkIO $ a creds xmppuuid - waitNetMessagerRestart - liftIO $ killThread tid - -xmppClient :: UrlRenderer -> AssistantData -> XMPPCreds -> UUID -> IO () -xmppClient urlrenderer d creds xmppuuid = - retry (runclient creds) =<< getCurrentTime - where - liftAssistant = runAssistant d - inAssistant = liftIO . liftAssistant - - {- When the client exits, it's restarted; - - if it keeps failing, back off to wait 5 minutes before - - trying it again. -} - retry client starttime = do - {- The buddy list starts empty each time - - the client connects, so that stale info - - is not retained. -} - liftAssistant $ - updateBuddyList (const noBuddies) <<~ buddyList - void client - liftAssistant $ do - modifyDaemonStatus_ $ \s -> s - { xmppClientID = Nothing } - changeCurrentlyConnected $ S.delete xmppuuid - - now <- getCurrentTime - if diffUTCTime now starttime > 300 - then do - liftAssistant $ debug ["connection lost; reconnecting"] - retry client now - else do - liftAssistant $ debug ["connection failed; will retry"] - threadDelaySeconds (Seconds 300) - retry client =<< getCurrentTime - - runclient c = liftIO $ connectXMPP c $ \jid -> do - selfjid <- bindJID jid - putStanza gitAnnexSignature - - inAssistant $ do - modifyDaemonStatus_ $ \s -> s - { xmppClientID = Just $ xmppJID creds } - changeCurrentlyConnected $ S.insert xmppuuid - debug ["connected", logJid selfjid] - - lasttraffic <- liftIO $ atomically . newTMVar =<< getCurrentTime - - sender <- xmppSession $ sendnotifications selfjid - receiver <- xmppSession $ receivenotifications selfjid lasttraffic - pinger <- xmppSession $ sendpings selfjid lasttraffic - {- Run all 3 threads concurrently, until - - any of them throw an exception. - - Then kill all 3 threads, and rethrow the - - exception. - - - - If this thread gets an exception, the 3 threads - - will also be killed. -} - liftIO $ pinger `concurrently` sender `concurrently` receiver - - sendnotifications selfjid = forever $ - join $ inAssistant $ relayNetMessage selfjid - receivenotifications selfjid lasttraffic = forever $ do - l <- decodeStanza selfjid <$> getStanza - void $ liftIO $ atomically . swapTMVar lasttraffic =<< getCurrentTime - inAssistant $ debug - ["received:", show $ map logXMPPEvent l] - mapM_ (handlemsg selfjid) l - sendpings selfjid lasttraffic = forever $ do - putStanza pingstanza - - startping <- liftIO getCurrentTime - liftIO $ threadDelaySeconds (Seconds 120) - t <- liftIO $ atomically $ readTMVar lasttraffic - when (t < startping) $ do - inAssistant $ debug ["ping timeout"] - error "ping timeout" - where - {- XEP-0199 says that the server will respond with either - - a ping response or an error message. Either will - - cause traffic, so good enough. -} - pingstanza = xmppPing selfjid - - handlemsg selfjid (PresenceMessage p) = do - void $ inAssistant $ - updateBuddyList (updateBuddies p) <<~ buddyList - resendImportantMessages selfjid p - handlemsg _ (GotNetMessage QueryPresence) = putStanza gitAnnexSignature - handlemsg _ (GotNetMessage (NotifyPush us)) = void $ inAssistant $ pull us - handlemsg selfjid (GotNetMessage (PairingNotification stage c u)) = - maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c) - handlemsg _ (GotNetMessage m@(Pushing _ pushstage)) - | isPushNotice pushstage = inAssistant $ handlePushNotice m - | isPushInitiation pushstage = inAssistant $ queuePushInitiation m - | otherwise = inAssistant $ storeInbox m - handlemsg _ (Ignorable _) = noop - handlemsg _ (Unknown _) = noop - handlemsg _ (ProtocolError _) = noop - - resendImportantMessages selfjid (Presence { presenceFrom = Just jid }) = do - let c = formatJID jid - (stored, sent) <- inAssistant $ - checkImportantNetMessages (formatJID (baseJID jid), c) - forM_ (S.toList $ S.difference stored sent) $ \msg -> do - let msg' = readdressNetMessage msg c - inAssistant $ debug - [ "sending to new client:" - , logJid jid - , show $ logNetMessage msg' - ] - join $ inAssistant $ convertNetMsg msg' selfjid - inAssistant $ sentImportantNetMessage msg c - resendImportantMessages _ _ = noop - -data XMPPEvent - = GotNetMessage NetMessage - | PresenceMessage Presence - | Ignorable ReceivedStanza - | Unknown ReceivedStanza - | ProtocolError ReceivedStanza - deriving Show - -logXMPPEvent :: XMPPEvent -> String -logXMPPEvent (GotNetMessage m) = logNetMessage m -logXMPPEvent (PresenceMessage p) = logPresence p -logXMPPEvent (Ignorable (ReceivedPresence p)) = "Ignorable " ++ logPresence p -logXMPPEvent (Ignorable _) = "Ignorable message" -logXMPPEvent (Unknown _) = "Unknown message" -logXMPPEvent (ProtocolError _) = "Protocol error message" - -logPresence :: Presence -> String -logPresence (p@Presence { presenceFrom = Just jid }) = unwords - [ "Presence from" - , logJid jid - , show $ extractGitAnnexTag p - ] -logPresence _ = "Presence from unknown" - -logJid :: JID -> String -logJid jid = - let name = T.unpack (buddyName jid) - resource = maybe "" (T.unpack . strResource) (jidResource jid) - in take 1 name ++ show (length name) ++ "/" ++ resource - -logClient :: Client -> String -logClient (Client jid) = logJid jid - -{- Decodes an XMPP stanza into one or more events. -} -decodeStanza :: JID -> ReceivedStanza -> [XMPPEvent] -decodeStanza selfjid s@(ReceivedPresence p) - | presenceType p == PresenceError = [ProtocolError s] - | isNothing (presenceFrom p) = [Ignorable s] - | presenceFrom p == Just selfjid = [Ignorable s] - | otherwise = maybe [PresenceMessage p] decode (gitAnnexTagInfo p) - where - decode i - | tagAttr i == pushAttr = impliedp $ GotNetMessage $ NotifyPush $ - decodePushNotification (tagValue i) - | tagAttr i == queryAttr = impliedp $ GotNetMessage QueryPresence - | otherwise = [Unknown s] - {- Things sent via presence imply a presence message, - - along with their real meaning. -} - impliedp v = [PresenceMessage p, v] -decodeStanza selfjid s@(ReceivedMessage m) - | isNothing (messageFrom m) = [Ignorable s] - | messageFrom m == Just selfjid = [Ignorable s] - | messageType m == MessageError = [ProtocolError s] - | otherwise = [fromMaybe (Unknown s) (GotNetMessage <$> decodeMessage m)] -decodeStanza _ s = [Unknown s] - -{- Waits for a NetMessager message to be sent, and relays it to XMPP. - - - - Chat messages must be directed to specific clients, not a base - - account JID, due to git-annex clients using a negative presence priority. - - PairingNotification messages are always directed at specific - - clients, but Pushing messages are sometimes not, and need to be exploded - - out to specific clients. - - - - Important messages, not directed at any specific client, - - are cached to be sent later when additional clients connect. - -} -relayNetMessage :: JID -> Assistant (XMPP ()) -relayNetMessage selfjid = do - msg <- waitNetMessage - debug ["sending:", logNetMessage msg] - a1 <- handleImportant msg - a2 <- convert msg - return (a1 >> a2) - where - handleImportant msg = case parseJID =<< isImportantNetMessage msg of - Just tojid - | tojid == baseJID tojid -> do - storeImportantNetMessage msg (formatJID tojid) $ - \c -> (baseJID <$> parseJID c) == Just tojid - return $ putStanza presenceQuery - _ -> return noop - convert (Pushing c pushstage) = withOtherClient selfjid c $ \tojid -> - if tojid == baseJID tojid - then do - clients <- maybe [] (S.toList . buddyAssistants) - <$> getBuddy (genBuddyKey tojid) <<~ buddyList - debug ["exploded undirected message to clients", unwords $ map logClient clients] - return $ forM_ clients $ \(Client jid) -> - putStanza $ pushMessage pushstage jid selfjid - else do - debug ["to client:", logJid tojid] - return $ putStanza $ pushMessage pushstage tojid selfjid - convert msg = convertNetMsg msg selfjid - -{- Converts a NetMessage to an XMPP action. -} -convertNetMsg :: NetMessage -> JID -> Assistant (XMPP ()) -convertNetMsg msg selfjid = convert msg - where - convert (NotifyPush us) = return $ putStanza $ pushNotification us - convert QueryPresence = return $ putStanza presenceQuery - convert (PairingNotification stage c u) = withOtherClient selfjid c $ \tojid -> do - changeBuddyPairing tojid True - return $ putStanza $ pairingNotification stage u tojid selfjid - convert (Pushing c pushstage) = withOtherClient selfjid c $ \tojid -> - return $ putStanza $ pushMessage pushstage tojid selfjid - -withOtherClient :: JID -> ClientID -> (JID -> Assistant (XMPP ())) -> Assistant (XMPP ()) -withOtherClient selfjid c a = case parseJID c of - Nothing -> return noop - Just tojid - | tojid == selfjid -> return noop - | otherwise -> a tojid - -withClient :: ClientID -> (JID -> XMPP ()) -> XMPP () -withClient c a = maybe noop a $ parseJID c - -{- Returns an IO action that runs a XMPP action in a separate thread, - - using a session to allow it to access the same XMPP client. -} -xmppSession :: XMPP () -> XMPP (IO ()) -xmppSession a = do - s <- getSession - return $ void $ runXMPP s a - -{- We only pull from one remote out of the set listed in the push - - notification, as an optimisation. - - - - Note that it might be possible (though very unlikely) for the push - - notification to take a while to be sent, and multiple pushes happen - - before it is sent, so it includes multiple remotes that were pushed - - to at different times. - - - - It could then be the case that the remote we choose had the earlier - - push sent to it, but then failed to get the later push, and so is not - - fully up-to-date. If that happens, the pushRetryThread will come along - - and retry the push, and we'll get another notification once it succeeds, - - and pull again. -} -pull :: [UUID] -> Assistant () -pull [] = noop -pull us = do - rs <- filter matching . syncGitRemotes <$> getDaemonStatus - debug $ "push notification for" : map (fromUUID . Remote.uuid ) rs - pullone rs =<< liftAnnex (join Command.Sync.getCurrBranch) - where - matching r = Remote.uuid r `S.member` s - s = S.fromList us - - pullone [] _ = noop - pullone (r:rs) branch = - unlessM (null . fst <$> manualPull branch [r]) $ - pullone rs branch - -{- PairReq from another client using our JID is automatically - - accepted. This is so pairing devices all using the same XMPP - - account works without confirmations. - - - - Also, autoaccept PairReq from the same JID of any repo we've - - already paired with, as long as the UUID in the PairReq is - - one we know about. --} -pairMsgReceived :: UrlRenderer -> PairStage -> UUID -> JID -> JID -> Assistant () -pairMsgReceived urlrenderer PairReq theiruuid selfjid theirjid - | baseJID selfjid == baseJID theirjid = autoaccept - | otherwise = do - knownjids <- mapMaybe (parseJID . getXMPPClientID) - . filter Remote.isXMPPRemote . syncRemotes <$> getDaemonStatus - um <- liftAnnex uuidMap - if elem (baseJID theirjid) knownjids && M.member theiruuid um - then autoaccept - else showalert - - where - autoaccept = do - selfuuid <- liftAnnex getUUID - sendNetMessage $ - PairingNotification PairAck (formatJID theirjid) selfuuid - finishXMPPPairing theirjid theiruuid - -- Show an alert to let the user decide if they want to pair. - showalert = do - button <- mkAlertButton True (T.pack "Respond") urlrenderer $ - ConfirmXMPPPairFriendR $ - PairKey theiruuid $ formatJID theirjid - void $ addAlert $ pairRequestReceivedAlert - (T.unpack $ buddyName theirjid) - button - -{- PairAck must come from one of the buddies we are pairing with; - - don't pair with just anyone. -} -pairMsgReceived _ PairAck theiruuid _selfjid theirjid = - whenM (isBuddyPairing theirjid) $ do - changeBuddyPairing theirjid False - selfuuid <- liftAnnex getUUID - sendNetMessage $ - PairingNotification PairDone (formatJID theirjid) selfuuid - finishXMPPPairing theirjid theiruuid - -pairMsgReceived _ PairDone _theiruuid _selfjid theirjid = - changeBuddyPairing theirjid False - -isBuddyPairing :: JID -> Assistant Bool -isBuddyPairing jid = maybe False buddyPairing <$> - getBuddy (genBuddyKey jid) <<~ buddyList - -changeBuddyPairing :: JID -> Bool -> Assistant () -changeBuddyPairing jid ispairing = - updateBuddyList (M.adjust set key) <<~ buddyList - where - key = genBuddyKey jid - set b = b { buddyPairing = ispairing } diff --git a/Assistant/Threads/XMPPPusher.hs b/Assistant/Threads/XMPPPusher.hs deleted file mode 100644 index bff17356d..000000000 --- a/Assistant/Threads/XMPPPusher.hs +++ /dev/null @@ -1,82 +0,0 @@ -{- git-annex XMPP pusher threads - - - - This is a pair of threads. One handles git send-pack, - - and the other git receive-pack. Each thread can be running at most - - one such operation at a time. - - - - Why not use a single thread? Consider two clients A and B. - - If both decide to run a receive-pack at the same time to the other, - - they would deadlock with only one thread. For larger numbers of - - clients, the two threads are also sufficient. - - - - Copyright 2013 Joey Hess <id@joeyh.name> - - - - Licensed under the GNU GPL version 3 or higher. - -} - -module Assistant.Threads.XMPPPusher where - -import Assistant.Common -import Assistant.NetMessager -import Assistant.Types.NetMessager -import Assistant.WebApp (UrlRenderer) -import Assistant.WebApp.Configurators.XMPP (checkCloudRepos) -import Assistant.XMPP.Git - -import Control.Exception as E - -xmppSendPackThread :: UrlRenderer -> NamedThread -xmppSendPackThread = pusherThread "XMPPSendPack" SendPack - -xmppReceivePackThread :: UrlRenderer -> NamedThread -xmppReceivePackThread = pusherThread "XMPPReceivePack" ReceivePack - -pusherThread :: String -> PushSide -> UrlRenderer -> NamedThread -pusherThread threadname side urlrenderer = namedThread threadname $ go Nothing - where - go lastpushedto = do - msg <- waitPushInitiation side $ selectNextPush lastpushedto - debug ["started running push", logNetMessage msg] - - runpush <- asIO $ runPush checker msg - r <- liftIO (E.try runpush :: IO (Either SomeException (Maybe ClientID))) - let successful = case r of - Right (Just _) -> True - _ -> False - - {- Empty the inbox, because stuff may have - - been left in it if the push failed. -} - let justpushedto = getclient msg - maybe noop (`emptyInbox` side) justpushedto - - debug ["finished running push", logNetMessage msg, show successful] - go $ if successful then justpushedto else lastpushedto - - checker = checkCloudRepos urlrenderer - - getclient (Pushing cid _) = Just cid - getclient _ = Nothing - -{- Select the next push to run from the queue. - - The queue cannot be empty! - - - - We prefer to select the most recently added push, because its requestor - - is more likely to still be connected. - - - - When passed the ID of a client we just pushed to, we prefer to not - - immediately push again to that same client. This avoids one client - - drowing out others. So pushes from the client we just pushed to are - - relocated to the beginning of the list, to be processed later. - -} -selectNextPush :: Maybe ClientID -> [NetMessage] -> (NetMessage, [NetMessage]) -selectNextPush _ (m:[]) = (m, []) -- common case -selectNextPush _ [] = error "selectNextPush: empty list" -selectNextPush lastpushedto l = go [] l - where - go (r:ejected) [] = (r, ejected) - go rejected (m:ms) = case m of - (Pushing clientid _) - | Just clientid /= lastpushedto -> (m, rejected ++ ms) - _ -> go (m:rejected) ms - go [] [] = error "empty push queue" - |