aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2012-06-22 15:46:21 -0400
committerGravatar Joey Hess <joey@kitenet.net>2012-06-22 15:49:48 -0400
commite9630e90decac4fe0c999af88131bd4b7c9d979f (patch)
treed0d7d897ab63fe8d7d76b47771e9c7c34f08618f
parent28e28bc0436cb0a33e570b1a1f678e80a770a21a (diff)
the syncer now pushes out changes to remotes, in parallel
Note that, since this always pushes branch synced/master to the remote, it assumes that master has already gotten all the commits that are on the remote merged in. Otherwise, fast-forward prevention may prevent the push. That's probably ok, because the next stage is to automatically detect incoming pushes and merge.
-rw-r--r--Assistant/Syncer.hs64
-rw-r--r--Command/Sync.hs43
-rw-r--r--Utility/Parallel.hs20
-rw-r--r--Utility/ThreadScheduler.hs6
4 files changed, 105 insertions, 28 deletions
diff --git a/Assistant/Syncer.hs b/Assistant/Syncer.hs
index 059859c07..c579c1c28 100644
--- a/Assistant/Syncer.hs
+++ b/Assistant/Syncer.hs
@@ -5,25 +5,63 @@
module Assistant.Syncer where
+import Common.Annex
import Assistant.Commits
import Assistant.ThreadedMonad
import qualified Command.Sync
import Utility.ThreadScheduler
+import Utility.Parallel
+
+import Data.Time.Clock
+
+data FailedSync = FailedSync
+ { failedRemote :: Remote
+ , failedTimeStamp :: UTCTime
+ }
{- This thread syncs git commits out to remotes. -}
syncThread :: ThreadState -> CommitChan -> IO ()
-syncThread st commitchan = runEvery (Seconds 2) $ do
- -- We already waited two seconds as a simple rate limiter.
- -- Next, wait until at least one commit has been made
- commits <- getCommits commitchan
- -- Now see if now's a good time to sync.
- if shouldSync commits
- then syncToRemotes
- else refillCommits commitchan commits
+syncThread st commitchan = do
+ remotes <- runThreadState st $ Command.Sync.syncRemotes []
+ runEveryWith (Seconds 2) [] $ \failedsyncs -> do
+ -- We already waited two seconds as a simple rate limiter.
+ -- Next, wait until at least one commit has been made
+ commits <- getCommits commitchan
+ -- Now see if now's a good time to sync.
+ time <- getCurrentTime
+ if shouldSync time commits failedsyncs
+ then syncToRemotes time st remotes
+ else do
+ refillCommits commitchan commits
+ return failedsyncs
-{- Decide if now is a good time to sync commits to remotes. -}
-shouldSync :: [Commit] -> Bool
-shouldSync commits = not (null commits)
+{- Decide if now is a good time to sync to remotes.
+ -
+ - Current strategy: Immediately sync all commits. The commit machinery
+ - already determines batches of changes, so we can't easily determine
+ - batches better.
+ -
+ - TODO: FailedSyncs are only retried the next time there's a commit.
+ - Should retry them periodically, or when a remote that was not available
+ - becomes available.
+ -}
+shouldSync :: UTCTime -> [Commit] -> [FailedSync] -> Bool
+shouldSync _now commits _failedremotes
+ | not (null commits) = True
+ | otherwise = False
-syncToRemotes :: IO ()
-syncToRemotes = return () -- TOOD
+{- Updates the local sync branch, then pushes it to all remotes, in
+ - parallel.
+ -
+ - Avoids running possibly long-duration commands in the Annex monad, so
+ - as not to block other threads. -}
+syncToRemotes :: UTCTime -> ThreadState -> [Remote] -> IO [FailedSync]
+syncToRemotes now st remotes = do
+ (g, branch) <- runThreadState st $
+ (,) <$> fromRepo id <*> Command.Sync.currentBranch
+ Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
+ map (`FailedSync` now) <$> inParallel (go g branch) remotes
+ where
+ go g branch remote =
+ ifM (Command.Sync.pushBranch remote branch g)
+ ( exitSuccess, exitFailure)
diff --git a/Command/Sync.hs b/Command/Sync.hs
index 5fb49d30c..110cf2a6c 100644
--- a/Command/Sync.hs
+++ b/Command/Sync.hs
@@ -31,7 +31,7 @@ def = [command "sync" (paramOptional (paramRepeating paramRemote))
-- syncing involves several operations, any of which can independently fail
seek :: CommandSeek
seek rs = do
- !branch <- fromMaybe nobranch <$> inRepo Git.Branch.current
+ branch <- currentBranch
remotes <- syncRemotes rs
return $ concat
[ [ commit ]
@@ -41,6 +41,11 @@ seek rs = do
, [ pushLocal branch ]
, [ pushRemote remote branch | remote <- remotes ]
]
+
+currentBranch :: Annex Git.Ref
+currentBranch = do
+ !branch <- fromMaybe nobranch <$> inRepo Git.Branch.current
+ return branch
where
nobranch = error "no branch is checked out"
@@ -90,7 +95,7 @@ mergeLocal branch = go =<< needmerge
syncbranch = syncBranch branch
needmerge = do
unlessM (inRepo $ Git.Ref.exists syncbranch) $
- updateBranch syncbranch
+ inRepo $ updateBranch syncbranch
inRepo $ Git.Branch.changed branch syncbranch
go False = stop
go True = do
@@ -99,17 +104,17 @@ mergeLocal branch = go =<< needmerge
pushLocal :: Git.Ref -> CommandStart
pushLocal branch = do
- updateBranch $ syncBranch branch
+ inRepo $ updateBranch $ syncBranch branch
stop
-updateBranch :: Git.Ref -> Annex ()
-updateBranch syncbranch =
+updateBranch :: Git.Ref -> Git.Repo -> IO ()
+updateBranch syncbranch g =
unlessM go $ error $ "failed to update " ++ show syncbranch
where
- go = inRepo $ Git.Command.runBool "branch"
+ go = Git.Command.runBool "branch"
[ Param "-f"
, Param $ show $ Git.Ref.base syncbranch
- ]
+ ] g
pullRemote :: Remote -> Git.Ref -> CommandStart
pullRemote remote branch = do
@@ -135,19 +140,27 @@ mergeRemote remote branch = all id <$> (mapM merge =<< tomerge)
pushRemote :: Remote -> Git.Ref -> CommandStart
pushRemote remote branch = go =<< needpush
where
- needpush = anyM (newer remote) [syncbranch, Annex.Branch.name]
+ needpush = anyM (newer remote) [syncBranch branch, Annex.Branch.name]
go False = stop
go True = do
showStart "push" (Remote.name remote)
next $ next $ do
showOutput
- inRepo $ Git.Command.runBool "push"
- [ Param (Remote.name remote)
- , Param (show Annex.Branch.name)
- , Param refspec
- ]
- refspec = show (Git.Ref.base branch) ++ ":" ++ show (Git.Ref.base syncbranch)
- syncbranch = syncBranch branch
+ inRepo $ pushBranch remote branch
+
+pushBranch :: Remote -> Git.Ref -> Git.Repo -> IO Bool
+pushBranch remote branch g =
+ Git.Command.runBool "push"
+ [ Param (Remote.name remote)
+ , Param (show Annex.Branch.name)
+ , Param refspec
+ ] g
+ where
+ refspec = concat
+ [ show $ Git.Ref.base branch
+ , ":"
+ , show $ Git.Ref.base $ syncBranch branch
+ ]
mergeAnnex :: CommandStart
mergeAnnex = do
diff --git a/Utility/Parallel.hs b/Utility/Parallel.hs
new file mode 100644
index 000000000..a512a6d30
--- /dev/null
+++ b/Utility/Parallel.hs
@@ -0,0 +1,20 @@
+{- parallel processes
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Utility.Parallel where
+
+import Common
+
+{- Runs an action in parallel with a set of values.
+ - Returns values that caused the action to fail. -}
+inParallel :: (v -> IO ()) -> [v] -> IO [v]
+inParallel a v = do
+ pids <- mapM (forkProcess . a) v
+ statuses <- mapM (getProcessStatus True False) pids
+ return $ map fst $ filter failed $ zip v statuses
+ where
+ failed (_, status) = status /= Just (Exited ExitSuccess)
diff --git a/Utility/ThreadScheduler.hs b/Utility/ThreadScheduler.hs
index 6557398fd..07a740160 100644
--- a/Utility/ThreadScheduler.hs
+++ b/Utility/ThreadScheduler.hs
@@ -24,6 +24,12 @@ runEvery n a = forever $ do
threadDelaySeconds n
a
+runEveryWith :: Seconds -> a -> (a -> IO a) -> IO ()
+runEveryWith n val a = do
+ threadDelaySeconds n
+ val' <- a val
+ runEveryWith n val' a
+
threadDelaySeconds :: Seconds -> IO ()
threadDelaySeconds (Seconds n) = unboundDelay (fromIntegral n * oneSecond)
where