diff options
-rw-r--r-- | Assistant.hs | 46 | ||||
-rw-r--r-- | Assistant/Changes.hs | 27 | ||||
-rw-r--r-- | Assistant/Commits.hs | 34 | ||||
-rw-r--r-- | Assistant/DaemonStatus.hs | 2 | ||||
-rw-r--r-- | Assistant/Pushes.hs | 36 | ||||
-rw-r--r-- | Assistant/ThreadedMonad.hs | 5 | ||||
-rw-r--r-- | Assistant/Threads/Committer.hs (renamed from Assistant/Committer.hs) | 12 | ||||
-rw-r--r-- | Assistant/Threads/Merger.hs | 72 | ||||
-rw-r--r-- | Assistant/Threads/Pusher.hs | 78 | ||||
-rw-r--r-- | Assistant/Threads/SanityChecker.hs (renamed from Assistant/SanityChecker.hs) | 10 | ||||
-rw-r--r-- | Assistant/Threads/Watcher.hs (renamed from Assistant/Watcher.hs) | 6 | ||||
-rw-r--r-- | Command/Assistant.hs | 18 | ||||
-rw-r--r-- | Command/Sync.hs | 43 | ||||
-rw-r--r-- | Command/Watch.hs | 18 | ||||
-rw-r--r-- | GitAnnex.hs | 2 | ||||
-rw-r--r-- | Utility/Parallel.hs | 20 | ||||
-rw-r--r-- | Utility/TSet.hs | 39 | ||||
-rw-r--r-- | Utility/Types/DirWatcher.hs | 3 | ||||
-rw-r--r-- | doc/design/assistant/syncing.mdwn | 5 | ||||
-rw-r--r-- | doc/git-annex.mdwn | 4 |
20 files changed, 411 insertions, 69 deletions
diff --git a/Assistant.hs b/Assistant.hs index e924d9477..c054dafd3 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -22,9 +22,20 @@ - Thread 5: committer - Waits for changes to occur, and runs the git queue to update its - index, then commits. - - Thread 6: status logger + - Thread 6: pusher + - Waits for commits to be made, and pushes updated branches to remotes, + - in parallel. (Forks a process for each git push.) + - Thread 7: push retryer + - Runs every 30 minutes when there are failed pushes, and retries + - them. + - Thread 8: merger + - Waits for pushes to be received from remotes, and merges the + - updated branches into the current branch. This uses inotify + - on .git/refs/heads, so there are additional inotify threads + - associated with it, too. + - Thread 9: status logger - Wakes up periodically and records the daemon's status to disk. - - Thread 7: sanity checker + - Thread 10: sanity checker - Wakes up periodically (rarely) and does sanity checks. - - ThreadState: (MVar) @@ -39,6 +50,9 @@ - ChangeChan: (STM TChan) - Changes are indicated by writing to this channel. The committer - reads from it. + - CommitChan: (STM TChan) + - Commits are indicated by writing to this channel. The pusher reads + - from it. -} module Assistant where @@ -47,18 +61,22 @@ import Common.Annex import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.Changes -import Assistant.Watcher -import Assistant.Committer -import Assistant.SanityChecker +import Assistant.Commits +import Assistant.Pushes +import Assistant.Threads.Watcher +import Assistant.Threads.Committer +import Assistant.Threads.Pusher +import Assistant.Threads.Merger +import Assistant.Threads.SanityChecker import qualified Utility.Daemon import Utility.LogFile import Control.Concurrent -startDaemon :: Bool -> Annex () -startDaemon foreground +startDaemon :: Bool -> Bool -> Annex () +startDaemon assistant foreground | foreground = do - showStart "watch" "." + showStart (if assistant then "assistant" else "watch") "." go id | otherwise = do logfd <- liftIO . openLog =<< fromRepo gitAnnexLogFile @@ -70,12 +88,12 @@ startDaemon foreground dstatus <- startDaemonStatus liftIO $ a $ do changechan <- newChangeChan - -- The commit thread is started early, - -- so that the user can immediately - -- begin adding files and having them - -- committed, even while the startup scan - -- is taking place. - _ <- forkIO $ commitThread st changechan + commitchan <- newCommitChan + pushchan <- newFailedPushChan + _ <- forkIO $ commitThread st changechan commitchan + _ <- forkIO $ pushThread st commitchan pushchan + _ <- forkIO $ pushRetryThread st pushchan + _ <- forkIO $ mergeThread st _ <- forkIO $ daemonStatusThread st dstatus _ <- forkIO $ sanityCheckerThread st dstatus changechan -- Does not return. diff --git a/Assistant/Changes.hs b/Assistant/Changes.hs index 173ba1922..eca922109 100644 --- a/Assistant/Changes.hs +++ b/Assistant/Changes.hs @@ -1,6 +1,8 @@ {- git-annex assistant change tracking - - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. -} module Assistant.Changes where @@ -8,14 +10,14 @@ module Assistant.Changes where import Common.Annex import qualified Annex.Queue import Types.KeySource +import Utility.TSet -import Control.Concurrent.STM import Data.Time.Clock data ChangeType = AddChange | LinkChange | RmChange | RmDirChange deriving (Show, Eq) -type ChangeChan = TChan Change +type ChangeChan = TSet Change data Change = Change @@ -29,11 +31,8 @@ data Change } deriving (Show) -runChangeChan :: STM a -> IO a -runChangeChan = atomically - newChangeChan :: IO ChangeChan -newChangeChan = atomically newTChan +newChangeChan = newTSet {- Handlers call this when they made a change that needs to get committed. -} madeChange :: FilePath -> ChangeType -> Annex (Maybe Change) @@ -65,17 +64,13 @@ finishedChange c = c {- Gets all unhandled changes. - Blocks until at least one change is made. -} getChanges :: ChangeChan -> IO [Change] -getChanges chan = runChangeChan $ do - c <- readTChan chan - go [c] - where - go l = do - v <- tryReadTChan chan - case v of - Nothing -> return l - Just c -> go (c:l) +getChanges = getTSet {- Puts unhandled changes back into the channel. - Note: Original order is not preserved. -} refillChanges :: ChangeChan -> [Change] -> IO () -refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs +refillChanges = putTSet + +{- Records a change in the channel. -} +recordChange :: ChangeChan -> Change -> IO () +recordChange = putTSet1 diff --git a/Assistant/Commits.hs b/Assistant/Commits.hs new file mode 100644 index 000000000..86fd7599f --- /dev/null +++ b/Assistant/Commits.hs @@ -0,0 +1,34 @@ +{- git-annex assistant commit tracking + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Commits where + +import Utility.TSet + +import Data.Time.Clock + +type CommitChan = TSet Commit + +data Commit = Commit UTCTime + deriving (Show) + +newCommitChan :: IO CommitChan +newCommitChan = newTSet + +{- Gets all unhandled commits. + - Blocks until at least one commit is made. -} +getCommits :: CommitChan -> IO [Commit] +getCommits = getTSet + +{- Puts unhandled commits back into the channel. + - Note: Original order is not preserved. -} +refillCommits :: CommitChan -> [Commit] -> IO () +refillCommits = putTSet + +{- Records a commit in the channel. -} +recordCommit :: CommitChan -> Commit -> IO () +recordCommit = putTSet1 diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs index e5ba3d151..c7713e7d5 100644 --- a/Assistant/DaemonStatus.hs +++ b/Assistant/DaemonStatus.hs @@ -1,6 +1,8 @@ {- git-annex assistant daemon status - - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. -} module Assistant.DaemonStatus where diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs new file mode 100644 index 000000000..f3bffbf79 --- /dev/null +++ b/Assistant/Pushes.hs @@ -0,0 +1,36 @@ +{- git-annex assistant push tracking + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Pushes where + +import Common.Annex +import Utility.TSet + +import Data.Time.Clock + +type FailedPushChan = TSet FailedPush + +data FailedPush = FailedPush + { failedRemote :: Remote + , failedTimeStamp :: UTCTime + } + +newFailedPushChan :: IO FailedPushChan +newFailedPushChan = newTSet + +{- Gets all failed pushes. Blocks until there is at least one failed push. -} +getFailedPushes :: FailedPushChan -> IO [FailedPush] +getFailedPushes = getTSet + +{- Puts failed pushes back into the channel. + - Note: Original order is not preserved. -} +refillFailedPushes :: FailedPushChan -> [FailedPush] -> IO () +refillFailedPushes = putTSet + +{- Records a failed push in the channel. -} +recordFailedPush :: FailedPushChan -> FailedPush -> IO () +recordFailedPush = putTSet1 diff --git a/Assistant/ThreadedMonad.hs b/Assistant/ThreadedMonad.hs index 51f579d07..6d3d25778 100644 --- a/Assistant/ThreadedMonad.hs +++ b/Assistant/ThreadedMonad.hs @@ -1,6 +1,8 @@ {- making the Annex monad available across threads - - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. -} {-# LANGUAGE BangPatterns #-} @@ -32,7 +34,8 @@ withThreadState a = do {- Runs an Annex action, using the state from the MVar. - - - This serializes calls by threads. -} + - This serializes calls by threads; only one thread can run in Annex at a + - time. -} runThreadState :: ThreadState -> Annex a -> IO a runThreadState mvar a = do startstate <- takeMVar mvar diff --git a/Assistant/Committer.hs b/Assistant/Threads/Committer.hs index 63df8cafc..488056fa2 100644 --- a/Assistant/Committer.hs +++ b/Assistant/Threads/Committer.hs @@ -1,14 +1,17 @@ {- git-annex assistant commit thread - - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. -} -module Assistant.Committer where +module Assistant.Threads.Committer where import Common.Annex import Assistant.Changes +import Assistant.Commits import Assistant.ThreadedMonad -import Assistant.Watcher +import Assistant.Threads.Watcher import qualified Annex import qualified Annex.Queue import qualified Git.Command @@ -26,8 +29,8 @@ import qualified Data.Set as S import Data.Either {- This thread makes git commits at appropriate times. -} -commitThread :: ThreadState -> ChangeChan -> IO () -commitThread st changechan = runEvery (Seconds 1) $ do +commitThread :: ThreadState -> ChangeChan -> CommitChan -> IO () +commitThread st changechan commitchan = runEvery (Seconds 1) $ do -- We already waited one second as a simple rate limiter. -- Next, wait until at least one change is available for -- processing. @@ -40,6 +43,7 @@ commitThread st changechan = runEvery (Seconds 1) $ do if shouldCommit time readychanges then do void $ tryIO $ runThreadState st commitStaged + recordCommit commitchan (Commit time) else refillChanges changechan readychanges else refillChanges changechan changes diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs new file mode 100644 index 000000000..d2c8b9b76 --- /dev/null +++ b/Assistant/Threads/Merger.hs @@ -0,0 +1,72 @@ +{- git-annex assistant git merge thread + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.Merger where + +import Common.Annex +import Assistant.ThreadedMonad +import Utility.DirWatcher +import Utility.Types.DirWatcher +import qualified Git +import qualified Git.Merge +import qualified Git.Branch +import qualified Command.Sync + +{- This thread watches for changes to .git/refs/heads/synced/*, + - which indicate incoming pushes. It merges those pushes into the + - currently checked out branch. -} +mergeThread :: ThreadState -> IO () +mergeThread st = do + g <- runThreadState st $ fromRepo id + let dir = Git.localGitDir g </> "refs" </> "heads" </> "synced" + createDirectoryIfMissing True dir + let hook a = Just $ runHandler g a + let hooks = mkWatchHooks + { addHook = hook onAdd + , errHook = hook onErr + } + watchDir dir (const False) hooks id + where + +type Handler = Git.Repo -> FilePath -> Maybe FileStatus -> IO () + +{- Runs an action handler. + - + - Exceptions are ignored, otherwise a whole thread could be crashed. + -} +runHandler :: Git.Repo -> Handler -> FilePath -> Maybe FileStatus -> IO () +runHandler g handler file filestatus = void $ do + either print (const noop) =<< tryIO go + where + go = handler g file filestatus + +{- Called when there's an error with inotify. -} +onErr :: Handler +onErr _ msg _ = error msg + +{- Called when a new branch ref is written. + - + - This relies on git's atomic method of updating branch ref files, + - which is to first write the new file to .lock, and then rename it + - over the old file. So, ignore .lock files, and the rename ensures + - the watcher sees a new file being added on each update. + - + - At startup, synthetic add events fire, causing this to run, but that's + - ok; it ensures that any changes pushed since the last time the assistant + - ran are merged in. + -} +onAdd :: Handler +onAdd g file _ + | ".lock" `isSuffixOf` file = noop + | otherwise = do + let branch = Git.Ref $ "refs" </> "heads" </> takeFileName file + current <- Git.Branch.current g + when (Just branch == current) $ + void $ mergeBranch branch g + +mergeBranch :: Git.Ref -> Git.Repo -> IO Bool +mergeBranch = Git.Merge.mergeNonInteractive . Command.Sync.syncBranch diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs new file mode 100644 index 000000000..6a4ae7838 --- /dev/null +++ b/Assistant/Threads/Pusher.hs @@ -0,0 +1,78 @@ +{- git-annex assistant git pushing threads + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.Pusher where + +import Common.Annex +import Assistant.Commits +import Assistant.Pushes +import Assistant.ThreadedMonad +import qualified Command.Sync +import Utility.ThreadScheduler +import Utility.Parallel + +import Data.Time.Clock + +{- This thread retries pushes that failed before. -} +pushRetryThread :: ThreadState -> FailedPushChan -> IO () +pushRetryThread st pushchan = runEvery (Seconds halfhour) $ do + -- We already waited half an hour, now wait until there are failed + -- pushes to retry. + pushes <- getFailedPushes pushchan + -- Check times, to avoid repushing a push that's too new. + now <- getCurrentTime + let (newpushes, oldpushes) = partition (toorecent now . failedTimeStamp) pushes + unless (null newpushes) $ + refillFailedPushes pushchan newpushes + unless (null oldpushes) $ + pushToRemotes now st pushchan $ map failedRemote oldpushes + where + halfhour = 1800 + toorecent now time = now `diffUTCTime` time < fromIntegral halfhour + +{- This thread pushes git commits out to remotes soon after they are made. -} +pushThread :: ThreadState -> CommitChan -> FailedPushChan -> IO () +pushThread st commitchan pushchan = do + remotes <- runThreadState st $ Command.Sync.syncRemotes [] + runEvery (Seconds 2) $ do + -- We already waited two seconds as a simple rate limiter. + -- Next, wait until at least one commit has been made + commits <- getCommits commitchan + -- Now see if now's a good time to push. + now <- getCurrentTime + if shouldPush now commits + then pushToRemotes now st pushchan remotes + else refillCommits commitchan commits + +{- Decide if now is a good time to push to remotes. + - + - Current strategy: Immediately push all commits. The commit machinery + - already determines batches of changes, so we can't easily determine + - batches better. + -} +shouldPush :: UTCTime -> [Commit] -> Bool +shouldPush _now commits + | not (null commits) = True + | otherwise = False + +{- Updates the local sync branch, then pushes it to all remotes, in + - parallel. + - + - Avoids running possibly long-duration commands in the Annex monad, so + - as not to block other threads. -} +pushToRemotes :: UTCTime -> ThreadState -> FailedPushChan -> [Remote] -> IO () +pushToRemotes now st pushchan remotes = do + (g, branch) <- runThreadState st $ + (,) <$> fromRepo id <*> Command.Sync.currentBranch + Command.Sync.updateBranch (Command.Sync.syncBranch branch) g + failed <- map (`FailedPush` now) <$> inParallel (push g branch) remotes + unless (null failed) $ + refillFailedPushes pushchan failed + where + push g branch remote = + ifM (Command.Sync.pushBranch remote branch g) + ( exitSuccess, exitFailure) diff --git a/Assistant/SanityChecker.hs b/Assistant/Threads/SanityChecker.hs index e2ca9da74..4db2a61b2 100644 --- a/Assistant/SanityChecker.hs +++ b/Assistant/Threads/SanityChecker.hs @@ -1,9 +1,11 @@ {- git-annex assistant sanity checker - - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. -} -module Assistant.SanityChecker ( +module Assistant.Threads.SanityChecker ( sanityCheckerThread ) where @@ -13,7 +15,7 @@ import Assistant.DaemonStatus import Assistant.ThreadedMonad import Assistant.Changes import Utility.ThreadScheduler -import qualified Assistant.Watcher +import qualified Assistant.Threads.Watcher as Watcher import Data.Time.Clock.POSIX @@ -77,5 +79,5 @@ check st status changechan = do insanity m = runThreadState st $ warning m addsymlink file s = do insanity $ "found unstaged symlink: " ++ file - Assistant.Watcher.runHandler st status changechan - Assistant.Watcher.onAddSymlink file s + Watcher.runHandler st status changechan + Watcher.onAddSymlink file s diff --git a/Assistant/Watcher.hs b/Assistant/Threads/Watcher.hs index db58f01e8..1b6ec15f1 100644 --- a/Assistant/Watcher.hs +++ b/Assistant/Threads/Watcher.hs @@ -7,7 +7,7 @@ {-# LANGUAGE CPP #-} -module Assistant.Watcher where +module Assistant.Threads.Watcher where import Common.Annex import Assistant.ThreadedMonad @@ -27,7 +27,6 @@ import Annex.Content import Annex.CatFile import Git.Types -import Control.Concurrent.STM import Data.Bits.Utils import qualified Data.ByteString.Lazy as L @@ -96,8 +95,7 @@ runHandler st dstatus changechan handler file filestatus = void $ do case r of Left e -> print e Right Nothing -> noop - Right (Just change) -> void $ - runChangeChan $ writeTChan changechan change + Right (Just change) -> recordChange changechan change where go = runThreadState st $ handler file filestatus dstatus diff --git a/Command/Assistant.hs b/Command/Assistant.hs new file mode 100644 index 000000000..60eac5d21 --- /dev/null +++ b/Command/Assistant.hs @@ -0,0 +1,18 @@ +{- git-annex assistant + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Command.Assistant where + +import Command +import qualified Command.Watch + +def :: [Command] +def = [withOptions [Command.Watch.foregroundOption, Command.Watch.stopOption] $ + command "assistant" paramNothing seek "automatically handle changes"] + +seek :: [CommandSeek] +seek = Command.Watch.mkSeek True diff --git a/Command/Sync.hs b/Command/Sync.hs index 1da6b0b81..912ce944c 100644 --- a/Command/Sync.hs +++ b/Command/Sync.hs @@ -32,7 +32,7 @@ def = [command "sync" (paramOptional (paramRepeating paramRemote)) -- syncing involves several operations, any of which can independently fail seek :: CommandSeek seek rs = do - !branch <- fromMaybe nobranch <$> inRepo Git.Branch.current + branch <- currentBranch remotes <- syncRemotes rs return $ concat [ [ commit ] @@ -42,6 +42,11 @@ seek rs = do , [ pushLocal branch ] , [ pushRemote remote branch | remote <- remotes ] ] + +currentBranch :: Annex Git.Ref +currentBranch = do + !branch <- fromMaybe nobranch <$> inRepo Git.Branch.current + return branch where nobranch = error "no branch is checked out" @@ -91,7 +96,7 @@ mergeLocal branch = go =<< needmerge syncbranch = syncBranch branch needmerge = do unlessM (inRepo $ Git.Ref.exists syncbranch) $ - updateBranch syncbranch + inRepo $ updateBranch syncbranch inRepo $ Git.Branch.changed branch syncbranch go False = stop go True = do @@ -100,17 +105,17 @@ mergeLocal branch = go =<< needmerge pushLocal :: Git.Ref -> CommandStart pushLocal branch = do - updateBranch $ syncBranch branch + inRepo $ updateBranch $ syncBranch branch stop -updateBranch :: Git.Ref -> Annex () -updateBranch syncbranch = +updateBranch :: Git.Ref -> Git.Repo -> IO () +updateBranch syncbranch g = unlessM go $ error $ "failed to update " ++ show syncbranch where - go = inRepo $ Git.Command.runBool "branch" + go = Git.Command.runBool "branch" [ Param "-f" , Param $ show $ Git.Ref.base syncbranch - ] + ] g pullRemote :: Remote -> Git.Ref -> CommandStart pullRemote remote branch = do @@ -136,19 +141,27 @@ mergeRemote remote branch = all id <$> (mapM merge =<< tomerge) pushRemote :: Remote -> Git.Ref -> CommandStart pushRemote remote branch = go =<< needpush where - needpush = anyM (newer remote) [syncbranch, Annex.Branch.name] + needpush = anyM (newer remote) [syncBranch branch, Annex.Branch.name] go False = stop go True = do showStart "push" (Remote.name remote) next $ next $ do showOutput - inRepo $ Git.Command.runBool "push" - [ Param (Remote.name remote) - , Param (show Annex.Branch.name) - , Param refspec - ] - refspec = show (Git.Ref.base branch) ++ ":" ++ show (Git.Ref.base syncbranch) - syncbranch = syncBranch branch + inRepo $ pushBranch remote branch + +pushBranch :: Remote -> Git.Ref -> Git.Repo -> IO Bool +pushBranch remote branch g = + Git.Command.runBool "push" + [ Param (Remote.name remote) + , Param (show Annex.Branch.name) + , Param refspec + ] g + where + refspec = concat + [ show $ Git.Ref.base branch + , ":" + , show $ Git.Ref.base $ syncBranch branch + ] mergeAnnex :: CommandStart mergeAnnex = do diff --git a/Command/Watch.hs b/Command/Watch.hs index 5681b3861..744844c4d 100644 --- a/Command/Watch.hs +++ b/Command/Watch.hs @@ -1,6 +1,3 @@ -{-# LANGUAGE CPP #-} -{-# LANGUAGE BangPatterns #-} - {- git-annex watch command - - Copyright 2012 Joey Hess <joey@kitenet.net> @@ -19,10 +16,13 @@ def :: [Command] def = [withOptions [foregroundOption, stopOption] $ command "watch" paramNothing seek "watch for changes"] -seek :: [CommandSeek] -seek = [withFlag stopOption $ \stopdaemon -> +mkSeek :: Bool -> [CommandSeek] +mkSeek assistant = [withFlag stopOption $ \stopdaemon -> withFlag foregroundOption $ \foreground -> - withNothing $ start foreground stopdaemon] + withNothing $ start assistant foreground stopdaemon] + +seek :: [CommandSeek] +seek = mkSeek False foregroundOption :: Option foregroundOption = Option.flag [] "foreground" "do not daemonize" @@ -30,9 +30,9 @@ foregroundOption = Option.flag [] "foreground" "do not daemonize" stopOption :: Option stopOption = Option.flag [] "stop" "stop daemon" -start :: Bool -> Bool -> CommandStart -start foreground stopdaemon = notBareRepo $ do +start :: Bool -> Bool -> Bool -> CommandStart +start assistant foreground stopdaemon = notBareRepo $ do if stopdaemon then stopDaemon - else startDaemon foreground -- does not return + else startDaemon assistant foreground -- does not return stop diff --git a/GitAnnex.hs b/GitAnnex.hs index a4c5eb849..ee451352f 100644 --- a/GitAnnex.hs +++ b/GitAnnex.hs @@ -59,6 +59,7 @@ import qualified Command.Map import qualified Command.Upgrade import qualified Command.Version import qualified Command.Watch +import qualified Command.Assistant cmds :: [Command] cmds = concat @@ -101,6 +102,7 @@ cmds = concat , Command.Upgrade.def , Command.Version.def , Command.Watch.def + , Command.Assistant.def ] options :: [Option] diff --git a/Utility/Parallel.hs b/Utility/Parallel.hs new file mode 100644 index 000000000..6e4671c05 --- /dev/null +++ b/Utility/Parallel.hs @@ -0,0 +1,20 @@ +{- parallel processes + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Utility.Parallel where + +import Common + +{- Runs an action in parallel with a set of values. + - Returns values that caused the action to fail. -} +inParallel :: (v -> IO ()) -> [v] -> IO [v] +inParallel a l = do + pids <- mapM (forkProcess . a) l + statuses <- mapM (getProcessStatus True False) pids + return $ map fst $ filter (failed . snd) $ zip l statuses + where + failed v = v /= Just (Exited ExitSuccess) diff --git a/Utility/TSet.hs b/Utility/TSet.hs new file mode 100644 index 000000000..24d345477 --- /dev/null +++ b/Utility/TSet.hs @@ -0,0 +1,39 @@ +{- Transactional sets + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + -} + +module Utility.TSet where + +import Common + +import Control.Concurrent.STM + +type TSet = TChan + +runTSet :: STM a -> IO a +runTSet = atomically + +newTSet :: IO (TSet a) +newTSet = atomically newTChan + +{- Gets the contents of the TSet. Blocks until at least one item is + - present. -} +getTSet :: TSet a -> IO [a] +getTSet tset = runTSet $ do + c <- readTChan tset + go [c] + where + go l = do + v <- tryReadTChan tset + case v of + Nothing -> return l + Just c -> go (c:l) + +{- Puts items into a TSet. -} +putTSet :: TSet a -> [a] -> IO () +putTSet tset vs = runTSet $ mapM_ (writeTChan tset) vs + +{- Put a single item into a TSet. -} +putTSet1 :: TSet a -> a -> IO () +putTSet1 tset v = void $ runTSet $ writeTChan tset v diff --git a/Utility/Types/DirWatcher.hs b/Utility/Types/DirWatcher.hs index c828a0593..ba7eae6a1 100644 --- a/Utility/Types/DirWatcher.hs +++ b/Utility/Types/DirWatcher.hs @@ -20,3 +20,6 @@ data WatchHooks = WatchHooks , delDirHook :: Hook FilePath , errHook :: Hook String -- error message } + +mkWatchHooks :: WatchHooks +mkWatchHooks = WatchHooks Nothing Nothing Nothing Nothing Nothing diff --git a/doc/design/assistant/syncing.mdwn b/doc/design/assistant/syncing.mdwn index 8173457c5..a2b80eebc 100644 --- a/doc/design/assistant/syncing.mdwn +++ b/doc/design/assistant/syncing.mdwn @@ -13,8 +13,9 @@ all the other git clones, at both the git level and the key/value level. [The watching can be done with the existing inotify code! This avoids needing any special mechanism to notify a remote that it's been synced to.] **done** -1. Periodically retry pushes that failed. Also, detect if a push failed - due to not being up-to-date, pull, and repush. +1. Periodically retry pushes that failed. **done** (every half an hour) +1. Also, detect if a push failed due to not being up-to-date, pull, + and repush. 2. Use a git merge driver that adds both conflicting files, so conflicts never break a sync. 3. Investigate the XMPP approach like dvcs-autosync does, or other ways of diff --git a/doc/git-annex.mdwn b/doc/git-annex.mdwn index 39fad0488..965a07f0d 100644 --- a/doc/git-annex.mdwn +++ b/doc/git-annex.mdwn @@ -180,6 +180,10 @@ subdirectories). To not daemonize, run with --foreground ; to stop a running daemon, run with --stop +* assistant + + Like watch, but also automatically syncs changes to other remotes. + # REPOSITORY SETUP COMMANDS * init [description] |