diff options
-rw-r--r-- | Assistant.hs | 14 | ||||
-rw-r--r-- | Assistant/Pushes.hs | 36 | ||||
-rw-r--r-- | Assistant/Threads/Pusher.hs | 57 | ||||
-rw-r--r-- | Utility/ThreadScheduler.hs | 6 | ||||
-rw-r--r-- | doc/design/assistant/syncing.mdwn | 5 |
5 files changed, 82 insertions, 36 deletions
diff --git a/Assistant.hs b/Assistant.hs index ec46894a5..c054dafd3 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -25,14 +25,17 @@ - Thread 6: pusher - Waits for commits to be made, and pushes updated branches to remotes, - in parallel. (Forks a process for each git push.) - - Thread 7: merger + - Thread 7: push retryer + - Runs every 30 minutes when there are failed pushes, and retries + - them. + - Thread 8: merger - Waits for pushes to be received from remotes, and merges the - updated branches into the current branch. This uses inotify - on .git/refs/heads, so there are additional inotify threads - associated with it, too. - - Thread 8: status logger + - Thread 9: status logger - Wakes up periodically and records the daemon's status to disk. - - Thread 9: sanity checker + - Thread 10: sanity checker - Wakes up periodically (rarely) and does sanity checks. - - ThreadState: (MVar) @@ -59,6 +62,7 @@ import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.Changes import Assistant.Commits +import Assistant.Pushes import Assistant.Threads.Watcher import Assistant.Threads.Committer import Assistant.Threads.Pusher @@ -85,8 +89,10 @@ startDaemon assistant foreground liftIO $ a $ do changechan <- newChangeChan commitchan <- newCommitChan + pushchan <- newFailedPushChan _ <- forkIO $ commitThread st changechan commitchan - _ <- forkIO $ pushThread st commitchan + _ <- forkIO $ pushThread st commitchan pushchan + _ <- forkIO $ pushRetryThread st pushchan _ <- forkIO $ mergeThread st _ <- forkIO $ daemonStatusThread st dstatus _ <- forkIO $ sanityCheckerThread st dstatus changechan diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs new file mode 100644 index 000000000..f3bffbf79 --- /dev/null +++ b/Assistant/Pushes.hs @@ -0,0 +1,36 @@ +{- git-annex assistant push tracking + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Pushes where + +import Common.Annex +import Utility.TSet + +import Data.Time.Clock + +type FailedPushChan = TSet FailedPush + +data FailedPush = FailedPush + { failedRemote :: Remote + , failedTimeStamp :: UTCTime + } + +newFailedPushChan :: IO FailedPushChan +newFailedPushChan = newTSet + +{- Gets all failed pushes. Blocks until there is at least one failed push. -} +getFailedPushes :: FailedPushChan -> IO [FailedPush] +getFailedPushes = getTSet + +{- Puts failed pushes back into the channel. + - Note: Original order is not preserved. -} +refillFailedPushes :: FailedPushChan -> [FailedPush] -> IO () +refillFailedPushes = putTSet + +{- Records a failed push in the channel. -} +recordFailedPush :: FailedPushChan -> FailedPush -> IO () +recordFailedPush = putTSet1 diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs index de90d4e64..6a4ae7838 100644 --- a/Assistant/Threads/Pusher.hs +++ b/Assistant/Threads/Pusher.hs @@ -1,4 +1,4 @@ -{- git-annex assistant git pushing thread +{- git-annex assistant git pushing threads - - Copyright 2012 Joey Hess <joey@kitenet.net> - @@ -9,6 +9,7 @@ module Assistant.Threads.Pusher where import Common.Annex import Assistant.Commits +import Assistant.Pushes import Assistant.ThreadedMonad import qualified Command.Sync import Utility.ThreadScheduler @@ -16,39 +17,45 @@ import Utility.Parallel import Data.Time.Clock -data FailedPush = FailedPush - { failedRemote :: Remote - , failedTimeStamp :: UTCTime - } +{- This thread retries pushes that failed before. -} +pushRetryThread :: ThreadState -> FailedPushChan -> IO () +pushRetryThread st pushchan = runEvery (Seconds halfhour) $ do + -- We already waited half an hour, now wait until there are failed + -- pushes to retry. + pushes <- getFailedPushes pushchan + -- Check times, to avoid repushing a push that's too new. + now <- getCurrentTime + let (newpushes, oldpushes) = partition (toorecent now . failedTimeStamp) pushes + unless (null newpushes) $ + refillFailedPushes pushchan newpushes + unless (null oldpushes) $ + pushToRemotes now st pushchan $ map failedRemote oldpushes + where + halfhour = 1800 + toorecent now time = now `diffUTCTime` time < fromIntegral halfhour -{- This thread pushes git commits out to remotes. -} -pushThread :: ThreadState -> CommitChan -> IO () -pushThread st commitchan = do +{- This thread pushes git commits out to remotes soon after they are made. -} +pushThread :: ThreadState -> CommitChan -> FailedPushChan -> IO () +pushThread st commitchan pushchan = do remotes <- runThreadState st $ Command.Sync.syncRemotes [] - runEveryWith (Seconds 2) [] $ \failedpushes -> do + runEvery (Seconds 2) $ do -- We already waited two seconds as a simple rate limiter. -- Next, wait until at least one commit has been made commits <- getCommits commitchan -- Now see if now's a good time to push. - time <- getCurrentTime - if shouldPush time commits failedpushes - then pushToRemotes time st remotes - else do - refillCommits commitchan commits - return failedpushes + now <- getCurrentTime + if shouldPush now commits + then pushToRemotes now st pushchan remotes + else refillCommits commitchan commits {- Decide if now is a good time to push to remotes. - - Current strategy: Immediately push all commits. The commit machinery - already determines batches of changes, so we can't easily determine - batches better. - - - - TODO: FailedPushs are only retried the next time there's a commit. - - Should retry them periodically, or when a remote that was not available - - becomes available. -} -shouldPush :: UTCTime -> [Commit] -> [FailedPush] -> Bool -shouldPush _now commits _failedremotes +shouldPush :: UTCTime -> [Commit] -> Bool +shouldPush _now commits | not (null commits) = True | otherwise = False @@ -57,12 +64,14 @@ shouldPush _now commits _failedremotes - - Avoids running possibly long-duration commands in the Annex monad, so - as not to block other threads. -} -pushToRemotes :: UTCTime -> ThreadState -> [Remote] -> IO [FailedPush] -pushToRemotes now st remotes = do +pushToRemotes :: UTCTime -> ThreadState -> FailedPushChan -> [Remote] -> IO () +pushToRemotes now st pushchan remotes = do (g, branch) <- runThreadState st $ (,) <$> fromRepo id <*> Command.Sync.currentBranch Command.Sync.updateBranch (Command.Sync.syncBranch branch) g - map (`FailedPush` now) <$> inParallel (push g branch) remotes + failed <- map (`FailedPush` now) <$> inParallel (push g branch) remotes + unless (null failed) $ + refillFailedPushes pushchan failed where push g branch remote = ifM (Command.Sync.pushBranch remote branch g) diff --git a/Utility/ThreadScheduler.hs b/Utility/ThreadScheduler.hs index 07a740160..6557398fd 100644 --- a/Utility/ThreadScheduler.hs +++ b/Utility/ThreadScheduler.hs @@ -24,12 +24,6 @@ runEvery n a = forever $ do threadDelaySeconds n a -runEveryWith :: Seconds -> a -> (a -> IO a) -> IO () -runEveryWith n val a = do - threadDelaySeconds n - val' <- a val - runEveryWith n val' a - threadDelaySeconds :: Seconds -> IO () threadDelaySeconds (Seconds n) = unboundDelay (fromIntegral n * oneSecond) where diff --git a/doc/design/assistant/syncing.mdwn b/doc/design/assistant/syncing.mdwn index 8173457c5..a2b80eebc 100644 --- a/doc/design/assistant/syncing.mdwn +++ b/doc/design/assistant/syncing.mdwn @@ -13,8 +13,9 @@ all the other git clones, at both the git level and the key/value level. [The watching can be done with the existing inotify code! This avoids needing any special mechanism to notify a remote that it's been synced to.] **done** -1. Periodically retry pushes that failed. Also, detect if a push failed - due to not being up-to-date, pull, and repush. +1. Periodically retry pushes that failed. **done** (every half an hour) +1. Also, detect if a push failed due to not being up-to-date, pull, + and repush. 2. Use a git merge driver that adds both conflicting files, so conflicts never break a sync. 3. Investigate the XMPP approach like dvcs-autosync does, or other ways of |