From 6b91074b4dda6dff353770e054ae550c7d1c3b4c Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 29 Oct 2012 17:52:43 -0400 Subject: split and lift Assistant.Pushes --- Assistant/Monad.hs | 2 +- Assistant/Pushes.hs | 63 ++++++++++++++------------------------- Assistant/Sync.hs | 12 ++++---- Assistant/Threads/NetWatcher.hs | 2 +- Assistant/Threads/PushNotifier.hs | 20 ++++++------- Assistant/Threads/Pusher.hs | 3 +- Assistant/Types/Pushes.hs | 42 ++++++++++++++++++++++++++ 7 files changed, 81 insertions(+), 63 deletions(-) create mode 100644 Assistant/Types/Pushes.hs diff --git a/Assistant/Monad.hs b/Assistant/Monad.hs index a22b10446..1f8ccacbe 100644 --- a/Assistant/Monad.hs +++ b/Assistant/Monad.hs @@ -31,7 +31,7 @@ import Assistant.DaemonStatus import Assistant.ScanRemotes import Assistant.TransferQueue import Assistant.TransferSlots -import Assistant.Pushes +import Assistant.Types.Pushes import Assistant.Commits import Assistant.Changes import Assistant.BranchChange diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs index 49772d56a..122d46d21 100644 --- a/Assistant/Pushes.hs +++ b/Assistant/Pushes.hs @@ -7,7 +7,8 @@ module Assistant.Pushes where -import Common.Annex +import Assistant.Common +import Assistant.Types.Pushes import Utility.TSet import Control.Concurrent.STM @@ -15,59 +16,39 @@ import Control.Concurrent.MSampleVar import Data.Time.Clock import qualified Data.Map as M -{- Track the most recent push failure for each remote. -} -type PushMap = M.Map Remote UTCTime -type FailedPushMap = TMVar PushMap - -{- The TSet is recent, successful pushes that other remotes should be - - notified about. - - - - The MSampleVar is written to when the PushNotifier thread should be - - restarted for some reason. - -} -data PushNotifier = PushNotifier (TSet UUID) (MSampleVar ()) - -{- The TMVar starts empty, and is left empty when there are no - - failed pushes. This way we can block until there are some failed pushes. - -} -newFailedPushMap :: IO FailedPushMap -newFailedPushMap = atomically newEmptyTMVar - {- Blocks until there are failed pushes. - Returns Remotes whose pushes failed a given time duration or more ago. - (This may be an empty list.) -} -getFailedPushesBefore :: FailedPushMap -> NominalDiffTime -> IO [Remote] -getFailedPushesBefore v duration = do - m <- atomically $ readTMVar v - now <- getCurrentTime - return $ M.keys $ M.filter (not . toorecent now) m +getFailedPushesBefore :: NominalDiffTime -> Assistant [Remote] +getFailedPushesBefore duration = do + v <- getAssistant failedPushMap + liftIO $ do + m <- atomically $ readTMVar v + now <- getCurrentTime + return $ M.keys $ M.filter (not . toorecent now) m where toorecent now time = now `diffUTCTime` time < duration {- Modifies the map. -} -changeFailedPushMap :: FailedPushMap -> (PushMap -> PushMap) -> IO () -changeFailedPushMap v a = atomically $ - store . a . fromMaybe M.empty =<< tryTakeTMVar v +changeFailedPushMap :: (PushMap -> PushMap) -> Assistant () +changeFailedPushMap a = do + v <- getAssistant failedPushMap + liftIO $ atomically $ store v . a . fromMaybe M.empty =<< tryTakeTMVar v where {- tryTakeTMVar empties the TMVar; refill it only if - the modified map is not itself empty -} - store m + store v m | m == M.empty = noop | otherwise = putTMVar v $! m -newPushNotifier :: IO PushNotifier -newPushNotifier = PushNotifier - <$> newTSet - <*> newEmptySV - -notifyPush :: [UUID] -> PushNotifier -> IO () -notifyPush us (PushNotifier s _) = putTSet s us +notifyPush :: [UUID] -> Assistant () +notifyPush us = flip putTSet us <<~ (pushNotifierSuccesses . pushNotifier) -waitPush :: PushNotifier -> IO [UUID] -waitPush (PushNotifier s _) = getTSet s +waitPush :: Assistant [UUID] +waitPush = getTSet <<~ (pushNotifierSuccesses . pushNotifier) -notifyRestart :: PushNotifier -> IO () -notifyRestart (PushNotifier _ sv) = writeSV sv () +notifyRestart :: Assistant () +notifyRestart = flip writeSV () <<~ (pushNotifierWaiter . pushNotifier) -waitRestart :: PushNotifier -> IO () -waitRestart (PushNotifier _ sv) = readSV sv +waitRestart :: Assistant () +waitRestart = readSV <<~ (pushNotifierWaiter . pushNotifier) diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs index 6a2f5266e..b16382d82 100644 --- a/Assistant/Sync.hs +++ b/Assistant/Sync.hs @@ -100,17 +100,15 @@ pushToRemotes now notifypushes remotes = do if null failed then do when notifypushes $ - notifyPush (map Remote.uuid succeeded) <<~ pushNotifier + notifyPush (map Remote.uuid succeeded) return True else if shouldretry then retry branch g u failed else fallback branch g u failed - updatemap succeeded failed = do - pushmap <- getAssistant failedPushMap - liftIO $ changeFailedPushMap pushmap $ \m -> - M.union (makemap failed) $ - M.difference m (makemap succeeded) + updatemap succeeded failed = changeFailedPushMap $ \m -> + M.union (makemap failed) $ + M.difference m (makemap succeeded) makemap l = M.fromList $ zip l (repeat now) retry branch g u rs = do @@ -124,7 +122,7 @@ pushToRemotes now notifypushes remotes = do inParallel (pushfallback g u branch) rs updatemap succeeded failed when (notifypushes && (not $ null succeeded)) $ - notifyPush (map Remote.uuid succeeded) <<~ pushNotifier + notifyPush (map Remote.uuid succeeded) return $ null failed push g branch remote = Command.Sync.pushBranch remote branch g diff --git a/Assistant/Threads/NetWatcher.hs b/Assistant/Threads/NetWatcher.hs index 12e764016..9df4f3a4d 100644 --- a/Assistant/Threads/NetWatcher.hs +++ b/Assistant/Threads/NetWatcher.hs @@ -62,7 +62,7 @@ dbusThread = do ) handleconn = do debug ["detected network connection"] - notifyRestart <<~ pushNotifier + notifyRestart handleConnection onerr e _ = do liftAnnex $ diff --git a/Assistant/Threads/PushNotifier.hs b/Assistant/Threads/PushNotifier.hs index 17f966881..b36eb6359 100644 --- a/Assistant/Threads/PushNotifier.hs +++ b/Assistant/Threads/PushNotifier.hs @@ -28,17 +28,15 @@ pushNotifierThread :: NamedThread pushNotifierThread = NamedThread "PushNotifier" $ do iodebug <- asIO debug iopull <- asIO pull - pn <- getAssistant pushNotifier - controllerThread pn <~> xmppClient pn iodebug iopull + iowaitpush <- asIO $ const waitPush + ioclient <- asIO2 $ xmppClient $ iowaitpush () + forever $ do + tid <- liftIO $ forkIO $ ioclient iodebug iopull + waitRestart + liftIO $ killThread tid -controllerThread :: PushNotifier -> IO () -> IO () -controllerThread pushnotifier xmppclient = forever $ do - tid <- forkIO xmppclient - waitRestart pushnotifier - killThread tid - -xmppClient :: PushNotifier -> ([String] -> IO ()) -> ([UUID] -> IO ()) -> Assistant () -xmppClient pushnotifier iodebug iopull = do +xmppClient :: (IO [UUID]) -> ([String] -> IO ()) -> ([UUID] -> IO ()) -> Assistant () +xmppClient iowaitpush iodebug iopull = do v <- liftAnnex getXMPPCreds case v of Nothing -> noop @@ -63,7 +61,7 @@ xmppClient pushnotifier iodebug iopull = do threadDelaySeconds (Seconds 300) loop c =<< getCurrentTime sendnotifications = forever $ do - us <- liftIO $ waitPush pushnotifier + us <- liftIO iowaitpush putStanza $ gitAnnexPresence $ encodePushNotification us receivenotifications = forever $ do s <- getStanza diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs index 95e4e1276..c87df1610 100644 --- a/Assistant/Threads/Pusher.hs +++ b/Assistant/Threads/Pusher.hs @@ -27,8 +27,7 @@ pushRetryThread :: NamedThread pushRetryThread = NamedThread "PushRetrier" $ runEvery (Seconds halfhour) <~> do -- We already waited half an hour, now wait until there are failed -- pushes to retry. - pushmap <- getAssistant failedPushMap - topush <- liftIO $ getFailedPushesBefore pushmap (fromIntegral halfhour) + topush <- getFailedPushesBefore (fromIntegral halfhour) unless (null topush) $ do debug ["retrying", show (length topush), "failed pushes"] void $ alertWhile (pushRetryAlert topush) $ do diff --git a/Assistant/Types/Pushes.hs b/Assistant/Types/Pushes.hs new file mode 100644 index 000000000..85362860a --- /dev/null +++ b/Assistant/Types/Pushes.hs @@ -0,0 +1,42 @@ +{- git-annex assistant push tracking + - + - Copyright 2012 Joey Hess + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Types.Pushes where + +import Common.Annex +import Utility.TSet + +import Control.Concurrent.STM +import Control.Concurrent.MSampleVar +import Data.Time.Clock +import qualified Data.Map as M + +{- Track the most recent push failure for each remote. -} +type PushMap = M.Map Remote UTCTime +type FailedPushMap = TMVar PushMap + +{- The TSet is recent, successful pushes that other remotes should be + - notified about. + - + - The MSampleVar is written to when the PushNotifier thread should be + - restarted for some reason. + -} +data PushNotifier = PushNotifier + { pushNotifierSuccesses :: TSet UUID + , pushNotifierWaiter :: MSampleVar () + } + +{- The TMVar starts empty, and is left empty when there are no + - failed pushes. This way we can block until there are some failed pushes. + -} +newFailedPushMap :: IO FailedPushMap +newFailedPushMap = atomically newEmptyTMVar + +newPushNotifier :: IO PushNotifier +newPushNotifier = PushNotifier + <$> newTSet + <*> newEmptySV -- cgit v1.2.3