summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Assistant.hs14
-rw-r--r--Assistant/Pushes.hs36
-rw-r--r--Assistant/Threads/Pusher.hs57
-rw-r--r--Utility/ThreadScheduler.hs6
-rw-r--r--doc/design/assistant/syncing.mdwn5
5 files changed, 82 insertions, 36 deletions
diff --git a/Assistant.hs b/Assistant.hs
index ec46894a5..c054dafd3 100644
--- a/Assistant.hs
+++ b/Assistant.hs
@@ -25,14 +25,17 @@
- Thread 6: pusher
- Waits for commits to be made, and pushes updated branches to remotes,
- in parallel. (Forks a process for each git push.)
- - Thread 7: merger
+ - Thread 7: push retryer
+ - Runs every 30 minutes when there are failed pushes, and retries
+ - them.
+ - Thread 8: merger
- Waits for pushes to be received from remotes, and merges the
- updated branches into the current branch. This uses inotify
- on .git/refs/heads, so there are additional inotify threads
- associated with it, too.
- - Thread 8: status logger
+ - Thread 9: status logger
- Wakes up periodically and records the daemon's status to disk.
- - Thread 9: sanity checker
+ - Thread 10: sanity checker
- Wakes up periodically (rarely) and does sanity checks.
-
- ThreadState: (MVar)
@@ -59,6 +62,7 @@ import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.Changes
import Assistant.Commits
+import Assistant.Pushes
import Assistant.Threads.Watcher
import Assistant.Threads.Committer
import Assistant.Threads.Pusher
@@ -85,8 +89,10 @@ startDaemon assistant foreground
liftIO $ a $ do
changechan <- newChangeChan
commitchan <- newCommitChan
+ pushchan <- newFailedPushChan
_ <- forkIO $ commitThread st changechan commitchan
- _ <- forkIO $ pushThread st commitchan
+ _ <- forkIO $ pushThread st commitchan pushchan
+ _ <- forkIO $ pushRetryThread st pushchan
_ <- forkIO $ mergeThread st
_ <- forkIO $ daemonStatusThread st dstatus
_ <- forkIO $ sanityCheckerThread st dstatus changechan
diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs
new file mode 100644
index 000000000..f3bffbf79
--- /dev/null
+++ b/Assistant/Pushes.hs
@@ -0,0 +1,36 @@
+{- git-annex assistant push tracking
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Pushes where
+
+import Common.Annex
+import Utility.TSet
+
+import Data.Time.Clock
+
+type FailedPushChan = TSet FailedPush
+
+data FailedPush = FailedPush
+ { failedRemote :: Remote
+ , failedTimeStamp :: UTCTime
+ }
+
+newFailedPushChan :: IO FailedPushChan
+newFailedPushChan = newTSet
+
+{- Gets all failed pushes. Blocks until there is at least one failed push. -}
+getFailedPushes :: FailedPushChan -> IO [FailedPush]
+getFailedPushes = getTSet
+
+{- Puts failed pushes back into the channel.
+ - Note: Original order is not preserved. -}
+refillFailedPushes :: FailedPushChan -> [FailedPush] -> IO ()
+refillFailedPushes = putTSet
+
+{- Records a failed push in the channel. -}
+recordFailedPush :: FailedPushChan -> FailedPush -> IO ()
+recordFailedPush = putTSet1
diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs
index de90d4e64..6a4ae7838 100644
--- a/Assistant/Threads/Pusher.hs
+++ b/Assistant/Threads/Pusher.hs
@@ -1,4 +1,4 @@
-{- git-annex assistant git pushing thread
+{- git-annex assistant git pushing threads
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
@@ -9,6 +9,7 @@ module Assistant.Threads.Pusher where
import Common.Annex
import Assistant.Commits
+import Assistant.Pushes
import Assistant.ThreadedMonad
import qualified Command.Sync
import Utility.ThreadScheduler
@@ -16,39 +17,45 @@ import Utility.Parallel
import Data.Time.Clock
-data FailedPush = FailedPush
- { failedRemote :: Remote
- , failedTimeStamp :: UTCTime
- }
+{- This thread retries pushes that failed before. -}
+pushRetryThread :: ThreadState -> FailedPushChan -> IO ()
+pushRetryThread st pushchan = runEvery (Seconds halfhour) $ do
+ -- We already waited half an hour, now wait until there are failed
+ -- pushes to retry.
+ pushes <- getFailedPushes pushchan
+ -- Check times, to avoid repushing a push that's too new.
+ now <- getCurrentTime
+ let (newpushes, oldpushes) = partition (toorecent now . failedTimeStamp) pushes
+ unless (null newpushes) $
+ refillFailedPushes pushchan newpushes
+ unless (null oldpushes) $
+ pushToRemotes now st pushchan $ map failedRemote oldpushes
+ where
+ halfhour = 1800
+ toorecent now time = now `diffUTCTime` time < fromIntegral halfhour
-{- This thread pushes git commits out to remotes. -}
-pushThread :: ThreadState -> CommitChan -> IO ()
-pushThread st commitchan = do
+{- This thread pushes git commits out to remotes soon after they are made. -}
+pushThread :: ThreadState -> CommitChan -> FailedPushChan -> IO ()
+pushThread st commitchan pushchan = do
remotes <- runThreadState st $ Command.Sync.syncRemotes []
- runEveryWith (Seconds 2) [] $ \failedpushes -> do
+ runEvery (Seconds 2) $ do
-- We already waited two seconds as a simple rate limiter.
-- Next, wait until at least one commit has been made
commits <- getCommits commitchan
-- Now see if now's a good time to push.
- time <- getCurrentTime
- if shouldPush time commits failedpushes
- then pushToRemotes time st remotes
- else do
- refillCommits commitchan commits
- return failedpushes
+ now <- getCurrentTime
+ if shouldPush now commits
+ then pushToRemotes now st pushchan remotes
+ else refillCommits commitchan commits
{- Decide if now is a good time to push to remotes.
-
- Current strategy: Immediately push all commits. The commit machinery
- already determines batches of changes, so we can't easily determine
- batches better.
- -
- - TODO: FailedPushs are only retried the next time there's a commit.
- - Should retry them periodically, or when a remote that was not available
- - becomes available.
-}
-shouldPush :: UTCTime -> [Commit] -> [FailedPush] -> Bool
-shouldPush _now commits _failedremotes
+shouldPush :: UTCTime -> [Commit] -> Bool
+shouldPush _now commits
| not (null commits) = True
| otherwise = False
@@ -57,12 +64,14 @@ shouldPush _now commits _failedremotes
-
- Avoids running possibly long-duration commands in the Annex monad, so
- as not to block other threads. -}
-pushToRemotes :: UTCTime -> ThreadState -> [Remote] -> IO [FailedPush]
-pushToRemotes now st remotes = do
+pushToRemotes :: UTCTime -> ThreadState -> FailedPushChan -> [Remote] -> IO ()
+pushToRemotes now st pushchan remotes = do
(g, branch) <- runThreadState st $
(,) <$> fromRepo id <*> Command.Sync.currentBranch
Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
- map (`FailedPush` now) <$> inParallel (push g branch) remotes
+ failed <- map (`FailedPush` now) <$> inParallel (push g branch) remotes
+ unless (null failed) $
+ refillFailedPushes pushchan failed
where
push g branch remote =
ifM (Command.Sync.pushBranch remote branch g)
diff --git a/Utility/ThreadScheduler.hs b/Utility/ThreadScheduler.hs
index 07a740160..6557398fd 100644
--- a/Utility/ThreadScheduler.hs
+++ b/Utility/ThreadScheduler.hs
@@ -24,12 +24,6 @@ runEvery n a = forever $ do
threadDelaySeconds n
a
-runEveryWith :: Seconds -> a -> (a -> IO a) -> IO ()
-runEveryWith n val a = do
- threadDelaySeconds n
- val' <- a val
- runEveryWith n val' a
-
threadDelaySeconds :: Seconds -> IO ()
threadDelaySeconds (Seconds n) = unboundDelay (fromIntegral n * oneSecond)
where
diff --git a/doc/design/assistant/syncing.mdwn b/doc/design/assistant/syncing.mdwn
index 8173457c5..a2b80eebc 100644
--- a/doc/design/assistant/syncing.mdwn
+++ b/doc/design/assistant/syncing.mdwn
@@ -13,8 +13,9 @@ all the other git clones, at both the git level and the key/value level.
[The watching can be done with the existing inotify code! This avoids needing
any special mechanism to notify a remote that it's been synced to.]
**done**
-1. Periodically retry pushes that failed. Also, detect if a push failed
- due to not being up-to-date, pull, and repush.
+1. Periodically retry pushes that failed. **done** (every half an hour)
+1. Also, detect if a push failed due to not being up-to-date, pull,
+ and repush.
2. Use a git merge driver that adds both conflicting files,
so conflicts never break a sync.
3. Investigate the XMPP approach like dvcs-autosync does, or other ways of