diff options
-rw-r--r-- | Assistant/NetMessager.hs | 134 | ||||
-rw-r--r-- | Assistant/Threads/XMPPClient.hs | 9 | ||||
-rw-r--r-- | Assistant/Types/NetMessager.hs | 18 | ||||
-rw-r--r-- | Assistant/XMPP/Git.hs | 27 |
4 files changed, 110 insertions, 78 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs index fd320b00b..7ee4d464b 100644 --- a/Assistant/NetMessager.hs +++ b/Assistant/NetMessager.hs @@ -1,21 +1,23 @@ {- git-annex assistant out of band network messager interface - - - Copyright 2012 Joey Hess <joey@kitenet.net> + - Copyright 2012-2013 Joey Hess <joey@kitenet.net> - - Licensed under the GNU GPL version 3 or higher. -} +{-# LANGUAGE BangPatterns #-} + module Assistant.NetMessager where import Assistant.Common import Assistant.Types.NetMessager -import Control.Concurrent import Control.Concurrent.STM import Control.Concurrent.MSampleVar import Control.Exception as E import qualified Data.Set as S import qualified Data.Map as M +import qualified Data.DList as D sendNetMessage :: NetMessage -> Assistant () sendNetMessage m = @@ -73,60 +75,94 @@ checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager - relating to this push, while any messages relating to other pushes - on the same side go to netMessagesDeferred. Once the push finishes, - those deferred messages will be fed to handledeferred for processing. + - + - If this is called when a push of the same side is running, it will + - block until that push completes, and then run. -} -runPush :: PushSide -> ClientID -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a -runPush side clientid handledeferred a = do +runPush :: PushSide -> ClientID -> Assistant a -> Assistant a +runPush side clientid a = do nm <- getAssistant netMessager - let runningv = getSide side $ netMessagerPushRunning nm - let setup = void $ atomically $ swapTMVar runningv $ Just clientid - let cleanup = atomically $ do - void $ swapTMVar runningv Nothing - emptytchan (getSide side $ netMessagesPush nm) + let v = getSide side $ netMessagerPushRunning nm + debugmsg <- asIO1 $ \s -> netMessagerDebug clientid [s, show side] + let setup = do + debugmsg "preparing to run" + atomically $ ifM (isNothing <$> tryReadTMVar v) + ( putTMVar v clientid + , retry + ) + debugmsg "started running" + let cleanup = do + debugmsg "finished running" + atomically $ takeTMVar v r <- E.bracket_ setup cleanup <~> a - (void . forkIO) <~> processdeferred nm + {- Empty the inbox, because stuff may have been left in it + - if the push failed. -} + emptyInbox clientid side return r - where - emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c - processdeferred nm = do - s <- liftIO $ atomically $ swapTMVar (getSide side $ netMessagesPushDeferred nm) S.empty - mapM_ rundeferred (S.toList s) - rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ())))) - <~> handledeferred m - -{- While a push is running, matching push messages are put into - - netMessagesPush, while others that involve the same side go to - - netMessagesPushDeferred. + +{- Stores messages for a push into the appropriate inbox. - - - When no push is running involving the same side, returns False. + - To avoid overflow, only 1000 messages max are stored in any + - inbox, which should be far more than necessary. - - - To avoid bloating memory, only messages that initiate pushes are - - deferred. + - TODO: If we have more than 100 inboxes for different clients, + - discard old ones that are not currently being used by any push. -} -queueNetPushMessage :: NetMessage -> Assistant Bool -queueNetPushMessage m@(Pushing clientid stage) = do - nm <- getAssistant netMessager - liftIO $ atomically $ do - v <- readTMVar (getSide side $ netMessagerPushRunning nm) - case v of - Nothing -> return False - (Just runningclientid) - | isPushInitiation stage -> defer nm - | runningclientid == clientid -> queue nm - | otherwise -> discard +storeInbox :: NetMessage -> Assistant () +storeInbox msg@(Pushing clientid stage) = do + inboxes <- getInboxes side + stored <- liftIO $ atomically $ do + m <- readTVar inboxes + let update = \v -> do + writeTVar inboxes $ + M.insertWith' const clientid v m + return True + case M.lookup clientid m of + Nothing -> update (1, tostore) + Just (sz, l) + | sz > 1000 -> return False + | otherwise -> + let !sz' = sz + 1 + !l' = D.append l tostore + in update (sz', l') + if stored + then netMessagerDebug clientid ["stored", logNetMessage msg, "in", show side, "inbox"] + else netMessagerDebug clientid ["discarded", logNetMessage msg, "; ", show side, "inbox is full"] where side = pushDestinationSide stage - queue nm = do - writeTChan (getSide side $ netMessagesPush nm) m - return True - defer nm = do - let mv = getSide side $ netMessagesPushDeferred nm - s <- takeTMVar mv - putTMVar mv $ S.insert m s - return True - discard = return True -queueNetPushMessage _ = return False - -waitNetPushMessage :: PushSide -> Assistant (NetMessage) -waitNetPushMessage side = (atomically . readTChan) - <<~ (getSide side . netMessagesPush . netMessager) + tostore = D.singleton msg +storeInbox _ = noop + +{- Gets the new message for a push from its inbox. + - Blocks until a message has been received. -} +waitInbox :: ClientID -> PushSide -> Assistant (NetMessage) +waitInbox clientid side = do + inboxes <- getInboxes side + liftIO $ atomically $ do + m <- readTVar inboxes + case M.lookup clientid m of + Nothing -> retry + Just (sz, dl) + | sz < 1 -> retry + | otherwise -> do + let msg = D.head dl + let dl' = D.tail dl + let !sz' = sz - 1 + writeTVar inboxes $ + M.insertWith' const clientid (sz', dl') m + return msg + +emptyInbox :: ClientID -> PushSide -> Assistant () +emptyInbox clientid side = do + inboxes <- getInboxes side + liftIO $ atomically $ + modifyTVar' inboxes $ + M.delete clientid + +getInboxes :: PushSide -> Assistant Inboxes +getInboxes side = + getSide side . netMessagesInboxes <$> getAssistant netMessager +netMessagerDebug :: ClientID -> [String] -> Assistant () +netMessagerDebug clientid l = debug $ + "NetMessager" : l ++ [show $ logClientID clientid] diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs index dd1b2ac1f..929b4c807 100644 --- a/Assistant/Threads/XMPPClient.hs +++ b/Assistant/Threads/XMPPClient.hs @@ -108,11 +108,10 @@ xmppClient urlrenderer d creds = maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c) handle _ (GotNetMessage m@(Pushing _ pushstage)) | isPushNotice pushstage = inAssistant $ handlePushNotice m - | isPushInitiation pushstage = inAssistant $ - unlessM (queueNetPushMessage m) $ do - let checker = checkCloudRepos urlrenderer - void $ forkIO <~> handlePushInitiation checker m - | otherwise = void $ inAssistant $ queueNetPushMessage m + | isPushInitiation pushstage = inAssistant $ do + let checker = checkCloudRepos urlrenderer + void $ forkIO <~> handlePushInitiation checker m + | otherwise = void $ inAssistant $ storeInbox m handle _ (Ignorable _) = noop handle _ (Unknown _) = noop handle _ (ProtocolError _) = noop diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs index bc0bf3c22..2c9de253f 100644 --- a/Assistant/Types/NetMessager.hs +++ b/Assistant/Types/NetMessager.hs @@ -18,6 +18,7 @@ import Data.Text (Text) import qualified Data.Text as T import qualified Data.Set as S import qualified Data.Map as M +import qualified Data.DList as D {- Messages that can be sent out of band by a network messager. -} data NetMessage @@ -117,6 +118,8 @@ mkSideMap gen = do getSide :: PushSide -> SideMap a -> a getSide side m = m side +type Inboxes = TVar (M.Map ClientID (Int, D.DList NetMessage)) + data NetMessager = NetMessager -- outgoing messages { netMessages :: TChan NetMessage @@ -127,11 +130,11 @@ data NetMessager = NetMessager -- write to this to restart the net messager , netMessagerRestart :: MSampleVar () -- only one side of a push can be running at a time - , netMessagerPushRunning :: SideMap (TMVar (Maybe ClientID)) - -- incoming messages related to a running push - , netMessagesPush :: SideMap (TChan NetMessage) - -- incoming push messages, deferred to be processed later - , netMessagesPushDeferred :: SideMap (TMVar (S.Set NetMessage)) + -- the TMVars are empty when nothing is running + , netMessagerPushRunning :: SideMap (TMVar ClientID) + -- incoming messages containing data for a push, + -- on a per-client and per-side basis + , netMessagesInboxes :: SideMap Inboxes } newNetMessager :: IO NetMessager @@ -140,6 +143,5 @@ newNetMessager = NetMessager <*> atomically (newTMVar M.empty) <*> atomically (newTMVar M.empty) <*> newEmptySV - <*> mkSideMap (newTMVar Nothing) - <*> mkSideMap newTChan - <*> mkSideMap (newTMVar S.empty) + <*> mkSideMap newEmptyTMVar + <*> mkSideMap (newTVar M.empty) diff --git a/Assistant/XMPP/Git.hs b/Assistant/XMPP/Git.hs index 7970f0506..98c70cf41 100644 --- a/Assistant/XMPP/Git.hs +++ b/Assistant/XMPP/Git.hs @@ -99,8 +99,8 @@ makeXMPPGitRemote buddyname jid u = do - - We listen at the other end of the pipe and relay to and from XMPP. -} -xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> (NetMessage -> Assistant ()) -> Assistant Bool -xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do +xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> Assistant Bool +xmppPush cid gitpush = runPush SendPack cid $ do u <- liftAnnex getUUID sendNetMessage $ Pushing cid (StartingPush u) @@ -149,7 +149,7 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do SendPackOutput seqnum' b toxmpp seqnum' inh - fromxmpp outh controlh = withPushMessagesInSequence SendPack handle + fromxmpp outh controlh = withPushMessagesInSequence cid SendPack handle where handle (Just (Pushing _ (ReceivePackOutput _ b))) = liftIO $ writeChunk outh b @@ -236,8 +236,8 @@ xmppGitRelay = do {- Relays git receive-pack stdin and stdout via XMPP, as well as propigating - its exit status to XMPP. -} -xmppReceivePack :: ClientID -> (NetMessage -> Assistant ()) -> Assistant Bool -xmppReceivePack cid handledeferred = runPush ReceivePack cid handledeferred $ do +xmppReceivePack :: ClientID -> Assistant Bool +xmppReceivePack cid = runPush ReceivePack cid $ do repodir <- liftAnnex $ fromRepo repoPath let p = (proc "git" ["receive-pack", repodir]) { std_in = CreatePipe @@ -262,7 +262,7 @@ xmppReceivePack cid handledeferred = runPush ReceivePack cid handledeferred $ do let seqnum' = succ seqnum sendNetMessage $ Pushing cid $ ReceivePackOutput seqnum' b relaytoxmpp seqnum' outh - relayfromxmpp inh = withPushMessagesInSequence ReceivePack handle + relayfromxmpp inh = withPushMessagesInSequence cid ReceivePack handle where handle (Just (Pushing _ (SendPackOutput _ b))) = liftIO $ writeChunk inh b @@ -301,15 +301,13 @@ handlePushInitiation checkcloudrepos (Pushing cid (PushRequest theiruuid)) = selfjid <- ((T.unpack <$>) . xmppClientID) <$> getDaemonStatus forM_ rs $ \r -> do void $ alertWhile (syncAlert [r]) $ - xmppPush cid - (taggedPush u selfjid branch r) - (handleDeferred checkcloudrepos) + xmppPush cid (taggedPush u selfjid branch r) checkcloudrepos r handlePushInitiation checkcloudrepos (Pushing cid (StartingPush theiruuid)) = do rs <- xmppRemotes cid theiruuid unless (null rs) $ do void $ alertWhile (syncAlert rs) $ - xmppReceivePack cid (handleDeferred checkcloudrepos) + xmppReceivePack cid mapM_ checkcloudrepos rs handlePushInitiation _ _ = noop @@ -320,9 +318,6 @@ handlePushNotice (Pushing cid (CanPush theiruuid)) = sendNetMessage $ Pushing cid (PushRequest u) handlePushNotice _ = noop -handleDeferred :: (Remote -> Assistant ()) -> NetMessage -> Assistant () -handleDeferred checkcloudrepos m = handlePushInitiation checkcloudrepos m - writeChunk :: Handle -> B.ByteString -> IO () writeChunk h b = do B.hPut h b @@ -335,11 +330,11 @@ writeChunk h b = do - Does not currently reorder messages, but does ensure that any - duplicate messages, or messages not in the sequence, are discarded. -} -withPushMessagesInSequence :: PushSide -> (Maybe NetMessage -> Assistant ()) -> Assistant () -withPushMessagesInSequence side a = loop 0 +withPushMessagesInSequence :: ClientID -> PushSide -> (Maybe NetMessage -> Assistant ()) -> Assistant () +withPushMessagesInSequence cid side a = loop 0 where loop seqnum = do - m <- timeout xmppTimeout <~> waitNetPushMessage side + m <- timeout xmppTimeout <~> waitInbox cid side let go s = a m >> loop s case extractSequence =<< m of Just seqnum' |