diff options
author | Joey Hess <joey@kitenet.net> | 2012-06-22 15:46:21 -0400 |
---|---|---|
committer | Joey Hess <joey@kitenet.net> | 2012-06-22 15:49:48 -0400 |
commit | e9630e90decac4fe0c999af88131bd4b7c9d979f (patch) | |
tree | d0d7d897ab63fe8d7d76b47771e9c7c34f08618f | |
parent | 28e28bc0436cb0a33e570b1a1f678e80a770a21a (diff) |
the syncer now pushes out changes to remotes, in parallel
Note that, since this always pushes branch synced/master to the remote, it
assumes that master has already gotten all the commits that are on the
remote merged in. Otherwise, fast-forward prevention may prevent the push.
That's probably ok, because the next stage is to automatically detect
incoming pushes and merge.
-rw-r--r-- | Assistant/Syncer.hs | 64 | ||||
-rw-r--r-- | Command/Sync.hs | 43 | ||||
-rw-r--r-- | Utility/Parallel.hs | 20 | ||||
-rw-r--r-- | Utility/ThreadScheduler.hs | 6 |
4 files changed, 105 insertions, 28 deletions
diff --git a/Assistant/Syncer.hs b/Assistant/Syncer.hs index 059859c07..c579c1c28 100644 --- a/Assistant/Syncer.hs +++ b/Assistant/Syncer.hs @@ -5,25 +5,63 @@ module Assistant.Syncer where +import Common.Annex import Assistant.Commits import Assistant.ThreadedMonad import qualified Command.Sync import Utility.ThreadScheduler +import Utility.Parallel + +import Data.Time.Clock + +data FailedSync = FailedSync + { failedRemote :: Remote + , failedTimeStamp :: UTCTime + } {- This thread syncs git commits out to remotes. -} syncThread :: ThreadState -> CommitChan -> IO () -syncThread st commitchan = runEvery (Seconds 2) $ do - -- We already waited two seconds as a simple rate limiter. - -- Next, wait until at least one commit has been made - commits <- getCommits commitchan - -- Now see if now's a good time to sync. - if shouldSync commits - then syncToRemotes - else refillCommits commitchan commits +syncThread st commitchan = do + remotes <- runThreadState st $ Command.Sync.syncRemotes [] + runEveryWith (Seconds 2) [] $ \failedsyncs -> do + -- We already waited two seconds as a simple rate limiter. + -- Next, wait until at least one commit has been made + commits <- getCommits commitchan + -- Now see if now's a good time to sync. + time <- getCurrentTime + if shouldSync time commits failedsyncs + then syncToRemotes time st remotes + else do + refillCommits commitchan commits + return failedsyncs -{- Decide if now is a good time to sync commits to remotes. -} -shouldSync :: [Commit] -> Bool -shouldSync commits = not (null commits) +{- Decide if now is a good time to sync to remotes. + - + - Current strategy: Immediately sync all commits. The commit machinery + - already determines batches of changes, so we can't easily determine + - batches better. + - + - TODO: FailedSyncs are only retried the next time there's a commit. + - Should retry them periodically, or when a remote that was not available + - becomes available. + -} +shouldSync :: UTCTime -> [Commit] -> [FailedSync] -> Bool +shouldSync _now commits _failedremotes + | not (null commits) = True + | otherwise = False -syncToRemotes :: IO () -syncToRemotes = return () -- TOOD +{- Updates the local sync branch, then pushes it to all remotes, in + - parallel. + - + - Avoids running possibly long-duration commands in the Annex monad, so + - as not to block other threads. -} +syncToRemotes :: UTCTime -> ThreadState -> [Remote] -> IO [FailedSync] +syncToRemotes now st remotes = do + (g, branch) <- runThreadState st $ + (,) <$> fromRepo id <*> Command.Sync.currentBranch + Command.Sync.updateBranch (Command.Sync.syncBranch branch) g + map (`FailedSync` now) <$> inParallel (go g branch) remotes + where + go g branch remote = + ifM (Command.Sync.pushBranch remote branch g) + ( exitSuccess, exitFailure) diff --git a/Command/Sync.hs b/Command/Sync.hs index 5fb49d30c..110cf2a6c 100644 --- a/Command/Sync.hs +++ b/Command/Sync.hs @@ -31,7 +31,7 @@ def = [command "sync" (paramOptional (paramRepeating paramRemote)) -- syncing involves several operations, any of which can independently fail seek :: CommandSeek seek rs = do - !branch <- fromMaybe nobranch <$> inRepo Git.Branch.current + branch <- currentBranch remotes <- syncRemotes rs return $ concat [ [ commit ] @@ -41,6 +41,11 @@ seek rs = do , [ pushLocal branch ] , [ pushRemote remote branch | remote <- remotes ] ] + +currentBranch :: Annex Git.Ref +currentBranch = do + !branch <- fromMaybe nobranch <$> inRepo Git.Branch.current + return branch where nobranch = error "no branch is checked out" @@ -90,7 +95,7 @@ mergeLocal branch = go =<< needmerge syncbranch = syncBranch branch needmerge = do unlessM (inRepo $ Git.Ref.exists syncbranch) $ - updateBranch syncbranch + inRepo $ updateBranch syncbranch inRepo $ Git.Branch.changed branch syncbranch go False = stop go True = do @@ -99,17 +104,17 @@ mergeLocal branch = go =<< needmerge pushLocal :: Git.Ref -> CommandStart pushLocal branch = do - updateBranch $ syncBranch branch + inRepo $ updateBranch $ syncBranch branch stop -updateBranch :: Git.Ref -> Annex () -updateBranch syncbranch = +updateBranch :: Git.Ref -> Git.Repo -> IO () +updateBranch syncbranch g = unlessM go $ error $ "failed to update " ++ show syncbranch where - go = inRepo $ Git.Command.runBool "branch" + go = Git.Command.runBool "branch" [ Param "-f" , Param $ show $ Git.Ref.base syncbranch - ] + ] g pullRemote :: Remote -> Git.Ref -> CommandStart pullRemote remote branch = do @@ -135,19 +140,27 @@ mergeRemote remote branch = all id <$> (mapM merge =<< tomerge) pushRemote :: Remote -> Git.Ref -> CommandStart pushRemote remote branch = go =<< needpush where - needpush = anyM (newer remote) [syncbranch, Annex.Branch.name] + needpush = anyM (newer remote) [syncBranch branch, Annex.Branch.name] go False = stop go True = do showStart "push" (Remote.name remote) next $ next $ do showOutput - inRepo $ Git.Command.runBool "push" - [ Param (Remote.name remote) - , Param (show Annex.Branch.name) - , Param refspec - ] - refspec = show (Git.Ref.base branch) ++ ":" ++ show (Git.Ref.base syncbranch) - syncbranch = syncBranch branch + inRepo $ pushBranch remote branch + +pushBranch :: Remote -> Git.Ref -> Git.Repo -> IO Bool +pushBranch remote branch g = + Git.Command.runBool "push" + [ Param (Remote.name remote) + , Param (show Annex.Branch.name) + , Param refspec + ] g + where + refspec = concat + [ show $ Git.Ref.base branch + , ":" + , show $ Git.Ref.base $ syncBranch branch + ] mergeAnnex :: CommandStart mergeAnnex = do diff --git a/Utility/Parallel.hs b/Utility/Parallel.hs new file mode 100644 index 000000000..a512a6d30 --- /dev/null +++ b/Utility/Parallel.hs @@ -0,0 +1,20 @@ +{- parallel processes + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Utility.Parallel where + +import Common + +{- Runs an action in parallel with a set of values. + - Returns values that caused the action to fail. -} +inParallel :: (v -> IO ()) -> [v] -> IO [v] +inParallel a v = do + pids <- mapM (forkProcess . a) v + statuses <- mapM (getProcessStatus True False) pids + return $ map fst $ filter failed $ zip v statuses + where + failed (_, status) = status /= Just (Exited ExitSuccess) diff --git a/Utility/ThreadScheduler.hs b/Utility/ThreadScheduler.hs index 6557398fd..07a740160 100644 --- a/Utility/ThreadScheduler.hs +++ b/Utility/ThreadScheduler.hs @@ -24,6 +24,12 @@ runEvery n a = forever $ do threadDelaySeconds n a +runEveryWith :: Seconds -> a -> (a -> IO a) -> IO () +runEveryWith n val a = do + threadDelaySeconds n + val' <- a val + runEveryWith n val' a + threadDelaySeconds :: Seconds -> IO () threadDelaySeconds (Seconds n) = unboundDelay (fromIntegral n * oneSecond) where |