summaryrefslogtreecommitdiff
path: root/Assistant
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2013-05-21 11:06:49 -0400
committerGravatar Joey Hess <joey@kitenet.net>2013-05-21 11:08:08 -0400
commit901f2c9e218cdba36e2488c413f9e620337f3283 (patch)
tree494c5049e25c9440157a6f59441ec908c49fbad9 /Assistant
parent18bf809758a1d42a19de9d056ef35cb9c7221dac (diff)
per-client inboxes for push messages
This will avoid losing any messages received from 1 client when a push involving another client is running. Additionally, the handling of push initiation is improved, it's no longer allowed to run multiples of the same type of push to the same client. Still stalls sometimes :(
Diffstat (limited to 'Assistant')
-rw-r--r--Assistant/NetMessager.hs134
-rw-r--r--Assistant/Threads/XMPPClient.hs9
-rw-r--r--Assistant/Types/NetMessager.hs18
-rw-r--r--Assistant/XMPP/Git.hs27
4 files changed, 110 insertions, 78 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs
index fd320b00b..7ee4d464b 100644
--- a/Assistant/NetMessager.hs
+++ b/Assistant/NetMessager.hs
@@ -1,21 +1,23 @@
{- git-annex assistant out of band network messager interface
-
- - Copyright 2012 Joey Hess <joey@kitenet.net>
+ - Copyright 2012-2013 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
+{-# LANGUAGE BangPatterns #-}
+
module Assistant.NetMessager where
import Assistant.Common
import Assistant.Types.NetMessager
-import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import Control.Exception as E
import qualified Data.Set as S
import qualified Data.Map as M
+import qualified Data.DList as D
sendNetMessage :: NetMessage -> Assistant ()
sendNetMessage m =
@@ -73,60 +75,94 @@ checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager
- relating to this push, while any messages relating to other pushes
- on the same side go to netMessagesDeferred. Once the push finishes,
- those deferred messages will be fed to handledeferred for processing.
+ -
+ - If this is called when a push of the same side is running, it will
+ - block until that push completes, and then run.
-}
-runPush :: PushSide -> ClientID -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a
-runPush side clientid handledeferred a = do
+runPush :: PushSide -> ClientID -> Assistant a -> Assistant a
+runPush side clientid a = do
nm <- getAssistant netMessager
- let runningv = getSide side $ netMessagerPushRunning nm
- let setup = void $ atomically $ swapTMVar runningv $ Just clientid
- let cleanup = atomically $ do
- void $ swapTMVar runningv Nothing
- emptytchan (getSide side $ netMessagesPush nm)
+ let v = getSide side $ netMessagerPushRunning nm
+ debugmsg <- asIO1 $ \s -> netMessagerDebug clientid [s, show side]
+ let setup = do
+ debugmsg "preparing to run"
+ atomically $ ifM (isNothing <$> tryReadTMVar v)
+ ( putTMVar v clientid
+ , retry
+ )
+ debugmsg "started running"
+ let cleanup = do
+ debugmsg "finished running"
+ atomically $ takeTMVar v
r <- E.bracket_ setup cleanup <~> a
- (void . forkIO) <~> processdeferred nm
+ {- Empty the inbox, because stuff may have been left in it
+ - if the push failed. -}
+ emptyInbox clientid side
return r
- where
- emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c
- processdeferred nm = do
- s <- liftIO $ atomically $ swapTMVar (getSide side $ netMessagesPushDeferred nm) S.empty
- mapM_ rundeferred (S.toList s)
- rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ()))))
- <~> handledeferred m
-
-{- While a push is running, matching push messages are put into
- - netMessagesPush, while others that involve the same side go to
- - netMessagesPushDeferred.
+
+{- Stores messages for a push into the appropriate inbox.
-
- - When no push is running involving the same side, returns False.
+ - To avoid overflow, only 1000 messages max are stored in any
+ - inbox, which should be far more than necessary.
-
- - To avoid bloating memory, only messages that initiate pushes are
- - deferred.
+ - TODO: If we have more than 100 inboxes for different clients,
+ - discard old ones that are not currently being used by any push.
-}
-queueNetPushMessage :: NetMessage -> Assistant Bool
-queueNetPushMessage m@(Pushing clientid stage) = do
- nm <- getAssistant netMessager
- liftIO $ atomically $ do
- v <- readTMVar (getSide side $ netMessagerPushRunning nm)
- case v of
- Nothing -> return False
- (Just runningclientid)
- | isPushInitiation stage -> defer nm
- | runningclientid == clientid -> queue nm
- | otherwise -> discard
+storeInbox :: NetMessage -> Assistant ()
+storeInbox msg@(Pushing clientid stage) = do
+ inboxes <- getInboxes side
+ stored <- liftIO $ atomically $ do
+ m <- readTVar inboxes
+ let update = \v -> do
+ writeTVar inboxes $
+ M.insertWith' const clientid v m
+ return True
+ case M.lookup clientid m of
+ Nothing -> update (1, tostore)
+ Just (sz, l)
+ | sz > 1000 -> return False
+ | otherwise ->
+ let !sz' = sz + 1
+ !l' = D.append l tostore
+ in update (sz', l')
+ if stored
+ then netMessagerDebug clientid ["stored", logNetMessage msg, "in", show side, "inbox"]
+ else netMessagerDebug clientid ["discarded", logNetMessage msg, "; ", show side, "inbox is full"]
where
side = pushDestinationSide stage
- queue nm = do
- writeTChan (getSide side $ netMessagesPush nm) m
- return True
- defer nm = do
- let mv = getSide side $ netMessagesPushDeferred nm
- s <- takeTMVar mv
- putTMVar mv $ S.insert m s
- return True
- discard = return True
-queueNetPushMessage _ = return False
-
-waitNetPushMessage :: PushSide -> Assistant (NetMessage)
-waitNetPushMessage side = (atomically . readTChan)
- <<~ (getSide side . netMessagesPush . netMessager)
+ tostore = D.singleton msg
+storeInbox _ = noop
+
+{- Gets the new message for a push from its inbox.
+ - Blocks until a message has been received. -}
+waitInbox :: ClientID -> PushSide -> Assistant (NetMessage)
+waitInbox clientid side = do
+ inboxes <- getInboxes side
+ liftIO $ atomically $ do
+ m <- readTVar inboxes
+ case M.lookup clientid m of
+ Nothing -> retry
+ Just (sz, dl)
+ | sz < 1 -> retry
+ | otherwise -> do
+ let msg = D.head dl
+ let dl' = D.tail dl
+ let !sz' = sz - 1
+ writeTVar inboxes $
+ M.insertWith' const clientid (sz', dl') m
+ return msg
+
+emptyInbox :: ClientID -> PushSide -> Assistant ()
+emptyInbox clientid side = do
+ inboxes <- getInboxes side
+ liftIO $ atomically $
+ modifyTVar' inboxes $
+ M.delete clientid
+
+getInboxes :: PushSide -> Assistant Inboxes
+getInboxes side =
+ getSide side . netMessagesInboxes <$> getAssistant netMessager
+netMessagerDebug :: ClientID -> [String] -> Assistant ()
+netMessagerDebug clientid l = debug $
+ "NetMessager" : l ++ [show $ logClientID clientid]
diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs
index dd1b2ac1f..929b4c807 100644
--- a/Assistant/Threads/XMPPClient.hs
+++ b/Assistant/Threads/XMPPClient.hs
@@ -108,11 +108,10 @@ xmppClient urlrenderer d creds =
maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c)
handle _ (GotNetMessage m@(Pushing _ pushstage))
| isPushNotice pushstage = inAssistant $ handlePushNotice m
- | isPushInitiation pushstage = inAssistant $
- unlessM (queueNetPushMessage m) $ do
- let checker = checkCloudRepos urlrenderer
- void $ forkIO <~> handlePushInitiation checker m
- | otherwise = void $ inAssistant $ queueNetPushMessage m
+ | isPushInitiation pushstage = inAssistant $ do
+ let checker = checkCloudRepos urlrenderer
+ void $ forkIO <~> handlePushInitiation checker m
+ | otherwise = void $ inAssistant $ storeInbox m
handle _ (Ignorable _) = noop
handle _ (Unknown _) = noop
handle _ (ProtocolError _) = noop
diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs
index bc0bf3c22..2c9de253f 100644
--- a/Assistant/Types/NetMessager.hs
+++ b/Assistant/Types/NetMessager.hs
@@ -18,6 +18,7 @@ import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Set as S
import qualified Data.Map as M
+import qualified Data.DList as D
{- Messages that can be sent out of band by a network messager. -}
data NetMessage
@@ -117,6 +118,8 @@ mkSideMap gen = do
getSide :: PushSide -> SideMap a -> a
getSide side m = m side
+type Inboxes = TVar (M.Map ClientID (Int, D.DList NetMessage))
+
data NetMessager = NetMessager
-- outgoing messages
{ netMessages :: TChan NetMessage
@@ -127,11 +130,11 @@ data NetMessager = NetMessager
-- write to this to restart the net messager
, netMessagerRestart :: MSampleVar ()
-- only one side of a push can be running at a time
- , netMessagerPushRunning :: SideMap (TMVar (Maybe ClientID))
- -- incoming messages related to a running push
- , netMessagesPush :: SideMap (TChan NetMessage)
- -- incoming push messages, deferred to be processed later
- , netMessagesPushDeferred :: SideMap (TMVar (S.Set NetMessage))
+ -- the TMVars are empty when nothing is running
+ , netMessagerPushRunning :: SideMap (TMVar ClientID)
+ -- incoming messages containing data for a push,
+ -- on a per-client and per-side basis
+ , netMessagesInboxes :: SideMap Inboxes
}
newNetMessager :: IO NetMessager
@@ -140,6 +143,5 @@ newNetMessager = NetMessager
<*> atomically (newTMVar M.empty)
<*> atomically (newTMVar M.empty)
<*> newEmptySV
- <*> mkSideMap (newTMVar Nothing)
- <*> mkSideMap newTChan
- <*> mkSideMap (newTMVar S.empty)
+ <*> mkSideMap newEmptyTMVar
+ <*> mkSideMap (newTVar M.empty)
diff --git a/Assistant/XMPP/Git.hs b/Assistant/XMPP/Git.hs
index 7970f0506..98c70cf41 100644
--- a/Assistant/XMPP/Git.hs
+++ b/Assistant/XMPP/Git.hs
@@ -99,8 +99,8 @@ makeXMPPGitRemote buddyname jid u = do
-
- We listen at the other end of the pipe and relay to and from XMPP.
-}
-xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> (NetMessage -> Assistant ()) -> Assistant Bool
-xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do
+xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> Assistant Bool
+xmppPush cid gitpush = runPush SendPack cid $ do
u <- liftAnnex getUUID
sendNetMessage $ Pushing cid (StartingPush u)
@@ -149,7 +149,7 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do
SendPackOutput seqnum' b
toxmpp seqnum' inh
- fromxmpp outh controlh = withPushMessagesInSequence SendPack handle
+ fromxmpp outh controlh = withPushMessagesInSequence cid SendPack handle
where
handle (Just (Pushing _ (ReceivePackOutput _ b))) =
liftIO $ writeChunk outh b
@@ -236,8 +236,8 @@ xmppGitRelay = do
{- Relays git receive-pack stdin and stdout via XMPP, as well as propigating
- its exit status to XMPP. -}
-xmppReceivePack :: ClientID -> (NetMessage -> Assistant ()) -> Assistant Bool
-xmppReceivePack cid handledeferred = runPush ReceivePack cid handledeferred $ do
+xmppReceivePack :: ClientID -> Assistant Bool
+xmppReceivePack cid = runPush ReceivePack cid $ do
repodir <- liftAnnex $ fromRepo repoPath
let p = (proc "git" ["receive-pack", repodir])
{ std_in = CreatePipe
@@ -262,7 +262,7 @@ xmppReceivePack cid handledeferred = runPush ReceivePack cid handledeferred $ do
let seqnum' = succ seqnum
sendNetMessage $ Pushing cid $ ReceivePackOutput seqnum' b
relaytoxmpp seqnum' outh
- relayfromxmpp inh = withPushMessagesInSequence ReceivePack handle
+ relayfromxmpp inh = withPushMessagesInSequence cid ReceivePack handle
where
handle (Just (Pushing _ (SendPackOutput _ b))) =
liftIO $ writeChunk inh b
@@ -301,15 +301,13 @@ handlePushInitiation checkcloudrepos (Pushing cid (PushRequest theiruuid)) =
selfjid <- ((T.unpack <$>) . xmppClientID) <$> getDaemonStatus
forM_ rs $ \r -> do
void $ alertWhile (syncAlert [r]) $
- xmppPush cid
- (taggedPush u selfjid branch r)
- (handleDeferred checkcloudrepos)
+ xmppPush cid (taggedPush u selfjid branch r)
checkcloudrepos r
handlePushInitiation checkcloudrepos (Pushing cid (StartingPush theiruuid)) = do
rs <- xmppRemotes cid theiruuid
unless (null rs) $ do
void $ alertWhile (syncAlert rs) $
- xmppReceivePack cid (handleDeferred checkcloudrepos)
+ xmppReceivePack cid
mapM_ checkcloudrepos rs
handlePushInitiation _ _ = noop
@@ -320,9 +318,6 @@ handlePushNotice (Pushing cid (CanPush theiruuid)) =
sendNetMessage $ Pushing cid (PushRequest u)
handlePushNotice _ = noop
-handleDeferred :: (Remote -> Assistant ()) -> NetMessage -> Assistant ()
-handleDeferred checkcloudrepos m = handlePushInitiation checkcloudrepos m
-
writeChunk :: Handle -> B.ByteString -> IO ()
writeChunk h b = do
B.hPut h b
@@ -335,11 +330,11 @@ writeChunk h b = do
- Does not currently reorder messages, but does ensure that any
- duplicate messages, or messages not in the sequence, are discarded.
-}
-withPushMessagesInSequence :: PushSide -> (Maybe NetMessage -> Assistant ()) -> Assistant ()
-withPushMessagesInSequence side a = loop 0
+withPushMessagesInSequence :: ClientID -> PushSide -> (Maybe NetMessage -> Assistant ()) -> Assistant ()
+withPushMessagesInSequence cid side a = loop 0
where
loop seqnum = do
- m <- timeout xmppTimeout <~> waitNetPushMessage side
+ m <- timeout xmppTimeout <~> waitInbox cid side
let go s = a m >> loop s
case extractSequence =<< m of
Just seqnum'