aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2012-06-26 17:33:34 -0400
committerGravatar Joey Hess <joey@kitenet.net>2012-06-26 19:21:44 -0400
commit67c8ef7de25ad6f433db2fa5d5fc764dd515a5b2 (patch)
tree3666619b0752df6c336fdbac270e09dbd125bcec
parente0a65247aebb8a821f4f0b717d39a4a35136a2e6 (diff)
use a TMVar
SampleMVar won't work; between getting the current value and changing it, another thread could made a change, which would get lost. TMVar works well; this update situation is handled by atomic transactions.
-rw-r--r--Assistant.hs9
-rw-r--r--Assistant/Pushes.hs49
-rw-r--r--Assistant/Threads/Pusher.hs44
-rw-r--r--Utility/Parallel.hs10
4 files changed, 63 insertions, 49 deletions
diff --git a/Assistant.hs b/Assistant.hs
index c054dafd3..4f8a868f4 100644
--- a/Assistant.hs
+++ b/Assistant.hs
@@ -53,6 +53,9 @@
- CommitChan: (STM TChan)
- Commits are indicated by writing to this channel. The pusher reads
- from it.
+ - FailedPushMap (STM TMVar)
+ - Failed pushes are indicated by writing to this TMVar. The push
+ - retrier blocks until they're available.
-}
module Assistant where
@@ -89,10 +92,10 @@ startDaemon assistant foreground
liftIO $ a $ do
changechan <- newChangeChan
commitchan <- newCommitChan
- pushchan <- newFailedPushChan
+ pushmap <- newFailedPushMap
_ <- forkIO $ commitThread st changechan commitchan
- _ <- forkIO $ pushThread st commitchan pushchan
- _ <- forkIO $ pushRetryThread st pushchan
+ _ <- forkIO $ pushThread st commitchan pushmap
+ _ <- forkIO $ pushRetryThread st pushmap
_ <- forkIO $ mergeThread st
_ <- forkIO $ daemonStatusThread st dstatus
_ <- forkIO $ sanityCheckerThread st dstatus changechan
diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs
index 61d2b798b..f411dda07 100644
--- a/Assistant/Pushes.hs
+++ b/Assistant/Pushes.hs
@@ -8,30 +8,39 @@
module Assistant.Pushes where
import Common.Annex
-import Control.Concurrent.SampleVar
+import Control.Concurrent.STM
import Data.Time.Clock
import qualified Data.Map as M
{- Track the most recent push failure for each remote. -}
type PushMap = M.Map Remote UTCTime
-type FailedPushes = SampleVar PushMap
+type FailedPushMap = TMVar PushMap
-newFailedPushChan :: IO FailedPushChan
-newFailedPushChan = newEmptySampleVar
-
-{- Gets all failed pushes. Blocks until set. -}
-getFailedPushes :: FailedPushChan -> IO PushMap
-getFailedPushes = readSampleVar
-
-{- Sets all failed pushes to passed PushMap -}
-setFailedPushes :: FailedPushChan -> PushMap -> IO ()
-setFailedPushes = writeSampleVar
-
-{- Indicates a failure to push to a single remote. -}
-failedPush :: FailedPushChan -> Remote -> IO ()
-failedPush c r =
-
-{- Indicates that a remote was pushed to successfully. -}
-successfulPush :: FailedPushChan -> Remote -> IO ()
-successfulPush c r =
+{- The TMVar starts empty, and is left empty when there are no
+ - failed pushes. This way we can block until there are some failed pushes.
+ -}
+newFailedPushMap :: IO FailedPushMap
+newFailedPushMap = atomically newEmptyTMVar
+
+{- Blocks until there are failed pushes.
+ - Returns Remotes whose pushes failed a given time duration or more ago.
+ - (This may be an empty list.) -}
+getFailedPushesBefore :: FailedPushMap -> NominalDiffTime -> IO [Remote]
+getFailedPushesBefore v duration = do
+ m <- atomically $ readTMVar v
+ now <- getCurrentTime
+ return $ M.keys $ M.filter (not . toorecent now) m
+ where
+ toorecent now time = now `diffUTCTime` time < duration
+
+{- Modifies the map. -}
+changeFailedPushMap :: FailedPushMap -> (PushMap -> PushMap) -> IO ()
+changeFailedPushMap v a = atomically $
+ store . a . fromMaybe M.empty =<< tryTakeTMVar v
+ where
+ {- tryTakeTMVar empties the TMVar; refill it only if
+ - the modified map is not itself empty -}
+ store m
+ | m == M.empty = noop
+ | otherwise = putTMVar v $! m
diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs
index 82c37de5f..04d343528 100644
--- a/Assistant/Threads/Pusher.hs
+++ b/Assistant/Threads/Pusher.hs
@@ -17,27 +17,23 @@ import Utility.ThreadScheduler
import Utility.Parallel
import Data.Time.Clock
+import qualified Data.Map as M
{- This thread retries pushes that failed before. -}
-pushRetryThread :: ThreadState -> FailedPushChan -> IO ()
-pushRetryThread st pushchan = runEvery (Seconds halfhour) $ do
+pushRetryThread :: ThreadState -> FailedPushMap -> IO ()
+pushRetryThread st pushmap = runEvery (Seconds halfhour) $ do
-- We already waited half an hour, now wait until there are failed
-- pushes to retry.
- pushes <- getFailedPushes pushchan
- -- Check times, to avoid repushing a push that's too new.
- now <- getCurrentTime
- let (newpushes, oldpushes) = partition (toorecent now . failedTimeStamp) pushes
- unless (null newpushes) $
- refillFailedPushes pushchan newpushes
- unless (null oldpushes) $
- pushToRemotes now st pushchan $ map failedRemote oldpushes
+ topush <- getFailedPushesBefore pushmap (fromIntegral halfhour)
+ unless (null topush) $ do
+ now <- getCurrentTime
+ pushToRemotes now st pushmap topush
where
halfhour = 1800
- toorecent now time = now `diffUTCTime` time < fromIntegral halfhour
{- This thread pushes git commits out to remotes soon after they are made. -}
-pushThread :: ThreadState -> CommitChan -> FailedPushChan -> IO ()
-pushThread st commitchan pushchan = do
+pushThread :: ThreadState -> CommitChan -> FailedPushMap -> IO ()
+pushThread st commitchan pushmap = do
remotes <- runThreadState st $ Command.Sync.syncRemotes []
runEvery (Seconds 2) $ do
-- We already waited two seconds as a simple rate limiter.
@@ -46,7 +42,7 @@ pushThread st commitchan pushchan = do
-- Now see if now's a good time to push.
now <- getCurrentTime
if shouldPush now commits
- then pushToRemotes now st pushchan remotes
+ then pushToRemotes now st pushmap remotes
else refillCommits commitchan commits
{- Decide if now is a good time to push to remotes.
@@ -65,23 +61,27 @@ shouldPush _now commits
-
- Avoids running possibly long-duration commands in the Annex monad, so
- as not to block other threads. -}
-pushToRemotes :: UTCTime -> ThreadState -> FailedPushChan -> [Remote] -> IO ()
-pushToRemotes now st pushchan remotes = do
+pushToRemotes :: UTCTime -> ThreadState -> FailedPushMap -> [Remote] -> IO ()
+pushToRemotes now st pushmap remotes = do
(g, branch) <- runThreadState st $
(,) <$> fromRepo id <*> Command.Sync.currentBranch
go True branch g remotes
where
go shouldretry branch g rs = do
Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
- failed <- inParallel (push g branch) rs
- unless (null failed) $
- if shouldretry
- then retry branch g rs
- else refillFailedPushes pushchan $
- map (`FailedPush` now) failed
+ (succeeded, failed) <- inParallel (push g branch) rs
+ changeFailedPushMap pushmap $ \m ->
+ M.union (makemap failed) $
+ M.difference m (makemap succeeded)
+ unless (null failed || not shouldretry) $
+ retry branch g failed
+
+ makemap l = M.fromList $ zip l (repeat now)
+
push g branch remote =
ifM (Command.Sync.pushBranch remote branch g)
( exitSuccess, exitFailure)
+
retry branch g rs = do
runThreadState st $ manualPull branch rs
go False branch g rs
diff --git a/Utility/Parallel.hs b/Utility/Parallel.hs
index 6e4671c05..9df95ab2b 100644
--- a/Utility/Parallel.hs
+++ b/Utility/Parallel.hs
@@ -10,11 +10,13 @@ module Utility.Parallel where
import Common
{- Runs an action in parallel with a set of values.
- - Returns values that caused the action to fail. -}
-inParallel :: (v -> IO ()) -> [v] -> IO [v]
+ - Returns the values partitioned into ones with which the action succeeded,
+ - and ones with which it failed. -}
+inParallel :: (v -> IO ()) -> [v] -> IO ([v], [v])
inParallel a l = do
pids <- mapM (forkProcess . a) l
statuses <- mapM (getProcessStatus True False) pids
- return $ map fst $ filter (failed . snd) $ zip l statuses
+ return $ reduce $ partition (succeeded . snd) $ zip l statuses
where
- failed v = v /= Just (Exited ExitSuccess)
+ succeeded v = v == Just (Exited ExitSuccess)
+ reduce (x,y) = (map fst x, map fst y)