summaryrefslogtreecommitdiff
path: root/Assistant/NetMessager.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Assistant/NetMessager.hs')
-rw-r--r--Assistant/NetMessager.hs134
1 files changed, 85 insertions, 49 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs
index fd320b00b..7ee4d464b 100644
--- a/Assistant/NetMessager.hs
+++ b/Assistant/NetMessager.hs
@@ -1,21 +1,23 @@
{- git-annex assistant out of band network messager interface
-
- - Copyright 2012 Joey Hess <joey@kitenet.net>
+ - Copyright 2012-2013 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
+{-# LANGUAGE BangPatterns #-}
+
module Assistant.NetMessager where
import Assistant.Common
import Assistant.Types.NetMessager
-import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import Control.Exception as E
import qualified Data.Set as S
import qualified Data.Map as M
+import qualified Data.DList as D
sendNetMessage :: NetMessage -> Assistant ()
sendNetMessage m =
@@ -73,60 +75,94 @@ checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager
- relating to this push, while any messages relating to other pushes
- on the same side go to netMessagesDeferred. Once the push finishes,
- those deferred messages will be fed to handledeferred for processing.
+ -
+ - If this is called when a push of the same side is running, it will
+ - block until that push completes, and then run.
-}
-runPush :: PushSide -> ClientID -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a
-runPush side clientid handledeferred a = do
+runPush :: PushSide -> ClientID -> Assistant a -> Assistant a
+runPush side clientid a = do
nm <- getAssistant netMessager
- let runningv = getSide side $ netMessagerPushRunning nm
- let setup = void $ atomically $ swapTMVar runningv $ Just clientid
- let cleanup = atomically $ do
- void $ swapTMVar runningv Nothing
- emptytchan (getSide side $ netMessagesPush nm)
+ let v = getSide side $ netMessagerPushRunning nm
+ debugmsg <- asIO1 $ \s -> netMessagerDebug clientid [s, show side]
+ let setup = do
+ debugmsg "preparing to run"
+ atomically $ ifM (isNothing <$> tryReadTMVar v)
+ ( putTMVar v clientid
+ , retry
+ )
+ debugmsg "started running"
+ let cleanup = do
+ debugmsg "finished running"
+ atomically $ takeTMVar v
r <- E.bracket_ setup cleanup <~> a
- (void . forkIO) <~> processdeferred nm
+ {- Empty the inbox, because stuff may have been left in it
+ - if the push failed. -}
+ emptyInbox clientid side
return r
- where
- emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c
- processdeferred nm = do
- s <- liftIO $ atomically $ swapTMVar (getSide side $ netMessagesPushDeferred nm) S.empty
- mapM_ rundeferred (S.toList s)
- rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ()))))
- <~> handledeferred m
-
-{- While a push is running, matching push messages are put into
- - netMessagesPush, while others that involve the same side go to
- - netMessagesPushDeferred.
+
+{- Stores messages for a push into the appropriate inbox.
-
- - When no push is running involving the same side, returns False.
+ - To avoid overflow, only 1000 messages max are stored in any
+ - inbox, which should be far more than necessary.
-
- - To avoid bloating memory, only messages that initiate pushes are
- - deferred.
+ - TODO: If we have more than 100 inboxes for different clients,
+ - discard old ones that are not currently being used by any push.
-}
-queueNetPushMessage :: NetMessage -> Assistant Bool
-queueNetPushMessage m@(Pushing clientid stage) = do
- nm <- getAssistant netMessager
- liftIO $ atomically $ do
- v <- readTMVar (getSide side $ netMessagerPushRunning nm)
- case v of
- Nothing -> return False
- (Just runningclientid)
- | isPushInitiation stage -> defer nm
- | runningclientid == clientid -> queue nm
- | otherwise -> discard
+storeInbox :: NetMessage -> Assistant ()
+storeInbox msg@(Pushing clientid stage) = do
+ inboxes <- getInboxes side
+ stored <- liftIO $ atomically $ do
+ m <- readTVar inboxes
+ let update = \v -> do
+ writeTVar inboxes $
+ M.insertWith' const clientid v m
+ return True
+ case M.lookup clientid m of
+ Nothing -> update (1, tostore)
+ Just (sz, l)
+ | sz > 1000 -> return False
+ | otherwise ->
+ let !sz' = sz + 1
+ !l' = D.append l tostore
+ in update (sz', l')
+ if stored
+ then netMessagerDebug clientid ["stored", logNetMessage msg, "in", show side, "inbox"]
+ else netMessagerDebug clientid ["discarded", logNetMessage msg, "; ", show side, "inbox is full"]
where
side = pushDestinationSide stage
- queue nm = do
- writeTChan (getSide side $ netMessagesPush nm) m
- return True
- defer nm = do
- let mv = getSide side $ netMessagesPushDeferred nm
- s <- takeTMVar mv
- putTMVar mv $ S.insert m s
- return True
- discard = return True
-queueNetPushMessage _ = return False
-
-waitNetPushMessage :: PushSide -> Assistant (NetMessage)
-waitNetPushMessage side = (atomically . readTChan)
- <<~ (getSide side . netMessagesPush . netMessager)
+ tostore = D.singleton msg
+storeInbox _ = noop
+
+{- Gets the new message for a push from its inbox.
+ - Blocks until a message has been received. -}
+waitInbox :: ClientID -> PushSide -> Assistant (NetMessage)
+waitInbox clientid side = do
+ inboxes <- getInboxes side
+ liftIO $ atomically $ do
+ m <- readTVar inboxes
+ case M.lookup clientid m of
+ Nothing -> retry
+ Just (sz, dl)
+ | sz < 1 -> retry
+ | otherwise -> do
+ let msg = D.head dl
+ let dl' = D.tail dl
+ let !sz' = sz - 1
+ writeTVar inboxes $
+ M.insertWith' const clientid (sz', dl') m
+ return msg
+
+emptyInbox :: ClientID -> PushSide -> Assistant ()
+emptyInbox clientid side = do
+ inboxes <- getInboxes side
+ liftIO $ atomically $
+ modifyTVar' inboxes $
+ M.delete clientid
+
+getInboxes :: PushSide -> Assistant Inboxes
+getInboxes side =
+ getSide side . netMessagesInboxes <$> getAssistant netMessager
+netMessagerDebug :: ClientID -> [String] -> Assistant ()
+netMessagerDebug clientid l = debug $
+ "NetMessager" : l ++ [show $ logClientID clientid]