summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Assistant/NetMessager.hs134
-rw-r--r--Assistant/Threads/XMPPClient.hs9
-rw-r--r--Assistant/Types/NetMessager.hs18
-rw-r--r--Assistant/XMPP/Git.hs27
4 files changed, 110 insertions, 78 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs
index fd320b00b..7ee4d464b 100644
--- a/Assistant/NetMessager.hs
+++ b/Assistant/NetMessager.hs
@@ -1,21 +1,23 @@
{- git-annex assistant out of band network messager interface
-
- - Copyright 2012 Joey Hess <joey@kitenet.net>
+ - Copyright 2012-2013 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
+{-# LANGUAGE BangPatterns #-}
+
module Assistant.NetMessager where
import Assistant.Common
import Assistant.Types.NetMessager
-import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import Control.Exception as E
import qualified Data.Set as S
import qualified Data.Map as M
+import qualified Data.DList as D
sendNetMessage :: NetMessage -> Assistant ()
sendNetMessage m =
@@ -73,60 +75,94 @@ checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager
- relating to this push, while any messages relating to other pushes
- on the same side go to netMessagesDeferred. Once the push finishes,
- those deferred messages will be fed to handledeferred for processing.
+ -
+ - If this is called when a push of the same side is running, it will
+ - block until that push completes, and then run.
-}
-runPush :: PushSide -> ClientID -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a
-runPush side clientid handledeferred a = do
+runPush :: PushSide -> ClientID -> Assistant a -> Assistant a
+runPush side clientid a = do
nm <- getAssistant netMessager
- let runningv = getSide side $ netMessagerPushRunning nm
- let setup = void $ atomically $ swapTMVar runningv $ Just clientid
- let cleanup = atomically $ do
- void $ swapTMVar runningv Nothing
- emptytchan (getSide side $ netMessagesPush nm)
+ let v = getSide side $ netMessagerPushRunning nm
+ debugmsg <- asIO1 $ \s -> netMessagerDebug clientid [s, show side]
+ let setup = do
+ debugmsg "preparing to run"
+ atomically $ ifM (isNothing <$> tryReadTMVar v)
+ ( putTMVar v clientid
+ , retry
+ )
+ debugmsg "started running"
+ let cleanup = do
+ debugmsg "finished running"
+ atomically $ takeTMVar v
r <- E.bracket_ setup cleanup <~> a
- (void . forkIO) <~> processdeferred nm
+ {- Empty the inbox, because stuff may have been left in it
+ - if the push failed. -}
+ emptyInbox clientid side
return r
- where
- emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c
- processdeferred nm = do
- s <- liftIO $ atomically $ swapTMVar (getSide side $ netMessagesPushDeferred nm) S.empty
- mapM_ rundeferred (S.toList s)
- rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ()))))
- <~> handledeferred m
-
-{- While a push is running, matching push messages are put into
- - netMessagesPush, while others that involve the same side go to
- - netMessagesPushDeferred.
+
+{- Stores messages for a push into the appropriate inbox.
-
- - When no push is running involving the same side, returns False.
+ - To avoid overflow, only 1000 messages max are stored in any
+ - inbox, which should be far more than necessary.
-
- - To avoid bloating memory, only messages that initiate pushes are
- - deferred.
+ - TODO: If we have more than 100 inboxes for different clients,
+ - discard old ones that are not currently being used by any push.
-}
-queueNetPushMessage :: NetMessage -> Assistant Bool
-queueNetPushMessage m@(Pushing clientid stage) = do
- nm <- getAssistant netMessager
- liftIO $ atomically $ do
- v <- readTMVar (getSide side $ netMessagerPushRunning nm)
- case v of
- Nothing -> return False
- (Just runningclientid)
- | isPushInitiation stage -> defer nm
- | runningclientid == clientid -> queue nm
- | otherwise -> discard
+storeInbox :: NetMessage -> Assistant ()
+storeInbox msg@(Pushing clientid stage) = do
+ inboxes <- getInboxes side
+ stored <- liftIO $ atomically $ do
+ m <- readTVar inboxes
+ let update = \v -> do
+ writeTVar inboxes $
+ M.insertWith' const clientid v m
+ return True
+ case M.lookup clientid m of
+ Nothing -> update (1, tostore)
+ Just (sz, l)
+ | sz > 1000 -> return False
+ | otherwise ->
+ let !sz' = sz + 1
+ !l' = D.append l tostore
+ in update (sz', l')
+ if stored
+ then netMessagerDebug clientid ["stored", logNetMessage msg, "in", show side, "inbox"]
+ else netMessagerDebug clientid ["discarded", logNetMessage msg, "; ", show side, "inbox is full"]
where
side = pushDestinationSide stage
- queue nm = do
- writeTChan (getSide side $ netMessagesPush nm) m
- return True
- defer nm = do
- let mv = getSide side $ netMessagesPushDeferred nm
- s <- takeTMVar mv
- putTMVar mv $ S.insert m s
- return True
- discard = return True
-queueNetPushMessage _ = return False
-
-waitNetPushMessage :: PushSide -> Assistant (NetMessage)
-waitNetPushMessage side = (atomically . readTChan)
- <<~ (getSide side . netMessagesPush . netMessager)
+ tostore = D.singleton msg
+storeInbox _ = noop
+
+{- Gets the new message for a push from its inbox.
+ - Blocks until a message has been received. -}
+waitInbox :: ClientID -> PushSide -> Assistant (NetMessage)
+waitInbox clientid side = do
+ inboxes <- getInboxes side
+ liftIO $ atomically $ do
+ m <- readTVar inboxes
+ case M.lookup clientid m of
+ Nothing -> retry
+ Just (sz, dl)
+ | sz < 1 -> retry
+ | otherwise -> do
+ let msg = D.head dl
+ let dl' = D.tail dl
+ let !sz' = sz - 1
+ writeTVar inboxes $
+ M.insertWith' const clientid (sz', dl') m
+ return msg
+
+emptyInbox :: ClientID -> PushSide -> Assistant ()
+emptyInbox clientid side = do
+ inboxes <- getInboxes side
+ liftIO $ atomically $
+ modifyTVar' inboxes $
+ M.delete clientid
+
+getInboxes :: PushSide -> Assistant Inboxes
+getInboxes side =
+ getSide side . netMessagesInboxes <$> getAssistant netMessager
+netMessagerDebug :: ClientID -> [String] -> Assistant ()
+netMessagerDebug clientid l = debug $
+ "NetMessager" : l ++ [show $ logClientID clientid]
diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs
index dd1b2ac1f..929b4c807 100644
--- a/Assistant/Threads/XMPPClient.hs
+++ b/Assistant/Threads/XMPPClient.hs
@@ -108,11 +108,10 @@ xmppClient urlrenderer d creds =
maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c)
handle _ (GotNetMessage m@(Pushing _ pushstage))
| isPushNotice pushstage = inAssistant $ handlePushNotice m
- | isPushInitiation pushstage = inAssistant $
- unlessM (queueNetPushMessage m) $ do
- let checker = checkCloudRepos urlrenderer
- void $ forkIO <~> handlePushInitiation checker m
- | otherwise = void $ inAssistant $ queueNetPushMessage m
+ | isPushInitiation pushstage = inAssistant $ do
+ let checker = checkCloudRepos urlrenderer
+ void $ forkIO <~> handlePushInitiation checker m
+ | otherwise = void $ inAssistant $ storeInbox m
handle _ (Ignorable _) = noop
handle _ (Unknown _) = noop
handle _ (ProtocolError _) = noop
diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs
index bc0bf3c22..2c9de253f 100644
--- a/Assistant/Types/NetMessager.hs
+++ b/Assistant/Types/NetMessager.hs
@@ -18,6 +18,7 @@ import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Set as S
import qualified Data.Map as M
+import qualified Data.DList as D
{- Messages that can be sent out of band by a network messager. -}
data NetMessage
@@ -117,6 +118,8 @@ mkSideMap gen = do
getSide :: PushSide -> SideMap a -> a
getSide side m = m side
+type Inboxes = TVar (M.Map ClientID (Int, D.DList NetMessage))
+
data NetMessager = NetMessager
-- outgoing messages
{ netMessages :: TChan NetMessage
@@ -127,11 +130,11 @@ data NetMessager = NetMessager
-- write to this to restart the net messager
, netMessagerRestart :: MSampleVar ()
-- only one side of a push can be running at a time
- , netMessagerPushRunning :: SideMap (TMVar (Maybe ClientID))
- -- incoming messages related to a running push
- , netMessagesPush :: SideMap (TChan NetMessage)
- -- incoming push messages, deferred to be processed later
- , netMessagesPushDeferred :: SideMap (TMVar (S.Set NetMessage))
+ -- the TMVars are empty when nothing is running
+ , netMessagerPushRunning :: SideMap (TMVar ClientID)
+ -- incoming messages containing data for a push,
+ -- on a per-client and per-side basis
+ , netMessagesInboxes :: SideMap Inboxes
}
newNetMessager :: IO NetMessager
@@ -140,6 +143,5 @@ newNetMessager = NetMessager
<*> atomically (newTMVar M.empty)
<*> atomically (newTMVar M.empty)
<*> newEmptySV
- <*> mkSideMap (newTMVar Nothing)
- <*> mkSideMap newTChan
- <*> mkSideMap (newTMVar S.empty)
+ <*> mkSideMap newEmptyTMVar
+ <*> mkSideMap (newTVar M.empty)
diff --git a/Assistant/XMPP/Git.hs b/Assistant/XMPP/Git.hs
index 7970f0506..98c70cf41 100644
--- a/Assistant/XMPP/Git.hs
+++ b/Assistant/XMPP/Git.hs
@@ -99,8 +99,8 @@ makeXMPPGitRemote buddyname jid u = do
-
- We listen at the other end of the pipe and relay to and from XMPP.
-}
-xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> (NetMessage -> Assistant ()) -> Assistant Bool
-xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do
+xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> Assistant Bool
+xmppPush cid gitpush = runPush SendPack cid $ do
u <- liftAnnex getUUID
sendNetMessage $ Pushing cid (StartingPush u)
@@ -149,7 +149,7 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do
SendPackOutput seqnum' b
toxmpp seqnum' inh
- fromxmpp outh controlh = withPushMessagesInSequence SendPack handle
+ fromxmpp outh controlh = withPushMessagesInSequence cid SendPack handle
where
handle (Just (Pushing _ (ReceivePackOutput _ b))) =
liftIO $ writeChunk outh b
@@ -236,8 +236,8 @@ xmppGitRelay = do
{- Relays git receive-pack stdin and stdout via XMPP, as well as propigating
- its exit status to XMPP. -}
-xmppReceivePack :: ClientID -> (NetMessage -> Assistant ()) -> Assistant Bool
-xmppReceivePack cid handledeferred = runPush ReceivePack cid handledeferred $ do
+xmppReceivePack :: ClientID -> Assistant Bool
+xmppReceivePack cid = runPush ReceivePack cid $ do
repodir <- liftAnnex $ fromRepo repoPath
let p = (proc "git" ["receive-pack", repodir])
{ std_in = CreatePipe
@@ -262,7 +262,7 @@ xmppReceivePack cid handledeferred = runPush ReceivePack cid handledeferred $ do
let seqnum' = succ seqnum
sendNetMessage $ Pushing cid $ ReceivePackOutput seqnum' b
relaytoxmpp seqnum' outh
- relayfromxmpp inh = withPushMessagesInSequence ReceivePack handle
+ relayfromxmpp inh = withPushMessagesInSequence cid ReceivePack handle
where
handle (Just (Pushing _ (SendPackOutput _ b))) =
liftIO $ writeChunk inh b
@@ -301,15 +301,13 @@ handlePushInitiation checkcloudrepos (Pushing cid (PushRequest theiruuid)) =
selfjid <- ((T.unpack <$>) . xmppClientID) <$> getDaemonStatus
forM_ rs $ \r -> do
void $ alertWhile (syncAlert [r]) $
- xmppPush cid
- (taggedPush u selfjid branch r)
- (handleDeferred checkcloudrepos)
+ xmppPush cid (taggedPush u selfjid branch r)
checkcloudrepos r
handlePushInitiation checkcloudrepos (Pushing cid (StartingPush theiruuid)) = do
rs <- xmppRemotes cid theiruuid
unless (null rs) $ do
void $ alertWhile (syncAlert rs) $
- xmppReceivePack cid (handleDeferred checkcloudrepos)
+ xmppReceivePack cid
mapM_ checkcloudrepos rs
handlePushInitiation _ _ = noop
@@ -320,9 +318,6 @@ handlePushNotice (Pushing cid (CanPush theiruuid)) =
sendNetMessage $ Pushing cid (PushRequest u)
handlePushNotice _ = noop
-handleDeferred :: (Remote -> Assistant ()) -> NetMessage -> Assistant ()
-handleDeferred checkcloudrepos m = handlePushInitiation checkcloudrepos m
-
writeChunk :: Handle -> B.ByteString -> IO ()
writeChunk h b = do
B.hPut h b
@@ -335,11 +330,11 @@ writeChunk h b = do
- Does not currently reorder messages, but does ensure that any
- duplicate messages, or messages not in the sequence, are discarded.
-}
-withPushMessagesInSequence :: PushSide -> (Maybe NetMessage -> Assistant ()) -> Assistant ()
-withPushMessagesInSequence side a = loop 0
+withPushMessagesInSequence :: ClientID -> PushSide -> (Maybe NetMessage -> Assistant ()) -> Assistant ()
+withPushMessagesInSequence cid side a = loop 0
where
loop seqnum = do
- m <- timeout xmppTimeout <~> waitNetPushMessage side
+ m <- timeout xmppTimeout <~> waitInbox cid side
let go s = a m >> loop s
case extractSequence =<< m of
Just seqnum'