diff options
Diffstat (limited to 'Assistant/NetMessager.hs')
-rw-r--r-- | Assistant/NetMessager.hs | 180 |
1 files changed, 0 insertions, 180 deletions
diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs deleted file mode 100644 index dd1811141..000000000 --- a/Assistant/NetMessager.hs +++ /dev/null @@ -1,180 +0,0 @@ -{- git-annex assistant out of band network messager interface - - - - Copyright 2012-2013 Joey Hess <id@joeyh.name> - - - - Licensed under the GNU GPL version 3 or higher. - -} - -{-# LANGUAGE BangPatterns #-} - -module Assistant.NetMessager where - -import Assistant.Common -import Assistant.Types.NetMessager - -import Control.Concurrent.STM -import Control.Concurrent.MSampleVar -import qualified Data.Set as S -import qualified Data.Map as M -import qualified Data.DList as D - -sendNetMessage :: NetMessage -> Assistant () -sendNetMessage m = - (atomically . flip writeTChan m) <<~ (netMessages . netMessager) - -waitNetMessage :: Assistant (NetMessage) -waitNetMessage = (atomically . readTChan) <<~ (netMessages . netMessager) - -notifyNetMessagerRestart :: Assistant () -notifyNetMessagerRestart = - flip writeSV () <<~ (netMessagerRestart . netMessager) - -{- This can be used to get an early indication if the network has - - changed, to immediately restart a connection. However, that is not - - available on all systems, so clients also need to deal with - - restarting dropped connections in the usual way. -} -waitNetMessagerRestart :: Assistant () -waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager) - -{- Store a new important NetMessage for a client, and if an equivilant - - older message is already stored, remove it from both importantNetMessages - - and sentImportantNetMessages. -} -storeImportantNetMessage :: NetMessage -> ClientID -> (ClientID -> Bool) -> Assistant () -storeImportantNetMessage m client matchingclient = go <<~ netMessager - where - go nm = atomically $ do - q <- takeTMVar $ importantNetMessages nm - sent <- takeTMVar $ sentImportantNetMessages nm - putTMVar (importantNetMessages nm) $ - M.alter (Just . maybe (S.singleton m) (S.insert m)) client $ - M.mapWithKey removematching q - putTMVar (sentImportantNetMessages nm) $ - M.mapWithKey removematching sent - removematching someclient s - | matchingclient someclient = S.filter (not . equivilantImportantNetMessages m) s - | otherwise = s - -{- Indicates that an important NetMessage has been sent to a client. -} -sentImportantNetMessage :: NetMessage -> ClientID -> Assistant () -sentImportantNetMessage m client = go <<~ (sentImportantNetMessages . netMessager) - where - go v = atomically $ do - sent <- takeTMVar v - putTMVar v $ - M.alter (Just . maybe (S.singleton m) (S.insert m)) client sent - -{- Checks for important NetMessages that have been stored for a client, and - - sent to a client. Typically the same client for both, although - - a modified or more specific client may need to be used. -} -checkImportantNetMessages :: (ClientID, ClientID) -> Assistant (S.Set NetMessage, S.Set NetMessage) -checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager - where - go nm = atomically $ do - stored <- M.lookup storedclient <$> (readTMVar $ importantNetMessages nm) - sent <- M.lookup sentclient <$> (readTMVar $ sentImportantNetMessages nm) - return (fromMaybe S.empty stored, fromMaybe S.empty sent) - -{- Queues a push initiation message in the queue for the appropriate - - side of the push but only if there is not already an initiation message - - from the same client in the queue. -} -queuePushInitiation :: NetMessage -> Assistant () -queuePushInitiation msg@(Pushing clientid stage) = do - tv <- getPushInitiationQueue side - liftIO $ atomically $ do - r <- tryTakeTMVar tv - case r of - Nothing -> putTMVar tv [msg] - Just l -> do - let !l' = msg : filter differentclient l - putTMVar tv l' - where - side = pushDestinationSide stage - differentclient (Pushing cid _) = cid /= clientid - differentclient _ = True -queuePushInitiation _ = noop - -{- Waits for a push inititation message to be received, and runs - - function to select a message from the queue. -} -waitPushInitiation :: PushSide -> ([NetMessage] -> (NetMessage, [NetMessage])) -> Assistant NetMessage -waitPushInitiation side selector = do - tv <- getPushInitiationQueue side - liftIO $ atomically $ do - q <- takeTMVar tv - if null q - then retry - else do - let (msg, !q') = selector q - unless (null q') $ - putTMVar tv q' - return msg - -{- Stores messages for a push into the appropriate inbox. - - - - To avoid overflow, only 1000 messages max are stored in any - - inbox, which should be far more than necessary. - - - - TODO: If we have more than 100 inboxes for different clients, - - discard old ones that are not currently being used by any push. - -} -storeInbox :: NetMessage -> Assistant () -storeInbox msg@(Pushing clientid stage) = do - inboxes <- getInboxes side - stored <- liftIO $ atomically $ do - m <- readTVar inboxes - let update = \v -> do - writeTVar inboxes $ - M.insertWith' const clientid v m - return True - case M.lookup clientid m of - Nothing -> update (1, tostore) - Just (sz, l) - | sz > 1000 -> return False - | otherwise -> - let !sz' = sz + 1 - !l' = D.append l tostore - in update (sz', l') - if stored - then netMessagerDebug clientid ["stored", logNetMessage msg, "in", show side, "inbox"] - else netMessagerDebug clientid ["discarded", logNetMessage msg, "; ", show side, "inbox is full"] - where - side = pushDestinationSide stage - tostore = D.singleton msg -storeInbox _ = noop - -{- Gets the new message for a push from its inbox. - - Blocks until a message has been received. -} -waitInbox :: ClientID -> PushSide -> Assistant (NetMessage) -waitInbox clientid side = do - inboxes <- getInboxes side - liftIO $ atomically $ do - m <- readTVar inboxes - case M.lookup clientid m of - Nothing -> retry - Just (sz, dl) - | sz < 1 -> retry - | otherwise -> do - let msg = D.head dl - let dl' = D.tail dl - let !sz' = sz - 1 - writeTVar inboxes $ - M.insertWith' const clientid (sz', dl') m - return msg - -emptyInbox :: ClientID -> PushSide -> Assistant () -emptyInbox clientid side = do - inboxes <- getInboxes side - liftIO $ atomically $ - modifyTVar' inboxes $ - M.delete clientid - -getInboxes :: PushSide -> Assistant Inboxes -getInboxes side = - getSide side . netMessagerInboxes <$> getAssistant netMessager - -getPushInitiationQueue :: PushSide -> Assistant (TMVar [NetMessage]) -getPushInitiationQueue side = - getSide side . netMessagerPushInitiations <$> getAssistant netMessager - -netMessagerDebug :: ClientID -> [String] -> Assistant () -netMessagerDebug clientid l = debug $ - "NetMessager" : l ++ [show $ logClientID clientid] |