summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Assistant.hs46
-rw-r--r--Assistant/Changes.hs27
-rw-r--r--Assistant/Commits.hs34
-rw-r--r--Assistant/DaemonStatus.hs2
-rw-r--r--Assistant/Pushes.hs36
-rw-r--r--Assistant/ThreadedMonad.hs5
-rw-r--r--Assistant/Threads/Committer.hs (renamed from Assistant/Committer.hs)12
-rw-r--r--Assistant/Threads/Merger.hs72
-rw-r--r--Assistant/Threads/Pusher.hs78
-rw-r--r--Assistant/Threads/SanityChecker.hs (renamed from Assistant/SanityChecker.hs)10
-rw-r--r--Assistant/Threads/Watcher.hs (renamed from Assistant/Watcher.hs)6
-rw-r--r--Command/Assistant.hs18
-rw-r--r--Command/Sync.hs43
-rw-r--r--Command/Watch.hs18
-rw-r--r--GitAnnex.hs2
-rw-r--r--Utility/Parallel.hs20
-rw-r--r--Utility/TSet.hs39
-rw-r--r--Utility/Types/DirWatcher.hs3
-rw-r--r--doc/design/assistant/syncing.mdwn5
-rw-r--r--doc/git-annex.mdwn4
20 files changed, 411 insertions, 69 deletions
diff --git a/Assistant.hs b/Assistant.hs
index e924d9477..c054dafd3 100644
--- a/Assistant.hs
+++ b/Assistant.hs
@@ -22,9 +22,20 @@
- Thread 5: committer
- Waits for changes to occur, and runs the git queue to update its
- index, then commits.
- - Thread 6: status logger
+ - Thread 6: pusher
+ - Waits for commits to be made, and pushes updated branches to remotes,
+ - in parallel. (Forks a process for each git push.)
+ - Thread 7: push retryer
+ - Runs every 30 minutes when there are failed pushes, and retries
+ - them.
+ - Thread 8: merger
+ - Waits for pushes to be received from remotes, and merges the
+ - updated branches into the current branch. This uses inotify
+ - on .git/refs/heads, so there are additional inotify threads
+ - associated with it, too.
+ - Thread 9: status logger
- Wakes up periodically and records the daemon's status to disk.
- - Thread 7: sanity checker
+ - Thread 10: sanity checker
- Wakes up periodically (rarely) and does sanity checks.
-
- ThreadState: (MVar)
@@ -39,6 +50,9 @@
- ChangeChan: (STM TChan)
- Changes are indicated by writing to this channel. The committer
- reads from it.
+ - CommitChan: (STM TChan)
+ - Commits are indicated by writing to this channel. The pusher reads
+ - from it.
-}
module Assistant where
@@ -47,18 +61,22 @@ import Common.Annex
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.Changes
-import Assistant.Watcher
-import Assistant.Committer
-import Assistant.SanityChecker
+import Assistant.Commits
+import Assistant.Pushes
+import Assistant.Threads.Watcher
+import Assistant.Threads.Committer
+import Assistant.Threads.Pusher
+import Assistant.Threads.Merger
+import Assistant.Threads.SanityChecker
import qualified Utility.Daemon
import Utility.LogFile
import Control.Concurrent
-startDaemon :: Bool -> Annex ()
-startDaemon foreground
+startDaemon :: Bool -> Bool -> Annex ()
+startDaemon assistant foreground
| foreground = do
- showStart "watch" "."
+ showStart (if assistant then "assistant" else "watch") "."
go id
| otherwise = do
logfd <- liftIO . openLog =<< fromRepo gitAnnexLogFile
@@ -70,12 +88,12 @@ startDaemon foreground
dstatus <- startDaemonStatus
liftIO $ a $ do
changechan <- newChangeChan
- -- The commit thread is started early,
- -- so that the user can immediately
- -- begin adding files and having them
- -- committed, even while the startup scan
- -- is taking place.
- _ <- forkIO $ commitThread st changechan
+ commitchan <- newCommitChan
+ pushchan <- newFailedPushChan
+ _ <- forkIO $ commitThread st changechan commitchan
+ _ <- forkIO $ pushThread st commitchan pushchan
+ _ <- forkIO $ pushRetryThread st pushchan
+ _ <- forkIO $ mergeThread st
_ <- forkIO $ daemonStatusThread st dstatus
_ <- forkIO $ sanityCheckerThread st dstatus changechan
-- Does not return.
diff --git a/Assistant/Changes.hs b/Assistant/Changes.hs
index 173ba1922..eca922109 100644
--- a/Assistant/Changes.hs
+++ b/Assistant/Changes.hs
@@ -1,6 +1,8 @@
{- git-annex assistant change tracking
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.Changes where
@@ -8,14 +10,14 @@ module Assistant.Changes where
import Common.Annex
import qualified Annex.Queue
import Types.KeySource
+import Utility.TSet
-import Control.Concurrent.STM
import Data.Time.Clock
data ChangeType = AddChange | LinkChange | RmChange | RmDirChange
deriving (Show, Eq)
-type ChangeChan = TChan Change
+type ChangeChan = TSet Change
data Change
= Change
@@ -29,11 +31,8 @@ data Change
}
deriving (Show)
-runChangeChan :: STM a -> IO a
-runChangeChan = atomically
-
newChangeChan :: IO ChangeChan
-newChangeChan = atomically newTChan
+newChangeChan = newTSet
{- Handlers call this when they made a change that needs to get committed. -}
madeChange :: FilePath -> ChangeType -> Annex (Maybe Change)
@@ -65,17 +64,13 @@ finishedChange c = c
{- Gets all unhandled changes.
- Blocks until at least one change is made. -}
getChanges :: ChangeChan -> IO [Change]
-getChanges chan = runChangeChan $ do
- c <- readTChan chan
- go [c]
- where
- go l = do
- v <- tryReadTChan chan
- case v of
- Nothing -> return l
- Just c -> go (c:l)
+getChanges = getTSet
{- Puts unhandled changes back into the channel.
- Note: Original order is not preserved. -}
refillChanges :: ChangeChan -> [Change] -> IO ()
-refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs
+refillChanges = putTSet
+
+{- Records a change in the channel. -}
+recordChange :: ChangeChan -> Change -> IO ()
+recordChange = putTSet1
diff --git a/Assistant/Commits.hs b/Assistant/Commits.hs
new file mode 100644
index 000000000..86fd7599f
--- /dev/null
+++ b/Assistant/Commits.hs
@@ -0,0 +1,34 @@
+{- git-annex assistant commit tracking
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Commits where
+
+import Utility.TSet
+
+import Data.Time.Clock
+
+type CommitChan = TSet Commit
+
+data Commit = Commit UTCTime
+ deriving (Show)
+
+newCommitChan :: IO CommitChan
+newCommitChan = newTSet
+
+{- Gets all unhandled commits.
+ - Blocks until at least one commit is made. -}
+getCommits :: CommitChan -> IO [Commit]
+getCommits = getTSet
+
+{- Puts unhandled commits back into the channel.
+ - Note: Original order is not preserved. -}
+refillCommits :: CommitChan -> [Commit] -> IO ()
+refillCommits = putTSet
+
+{- Records a commit in the channel. -}
+recordCommit :: CommitChan -> Commit -> IO ()
+recordCommit = putTSet1
diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs
index e5ba3d151..c7713e7d5 100644
--- a/Assistant/DaemonStatus.hs
+++ b/Assistant/DaemonStatus.hs
@@ -1,6 +1,8 @@
{- git-annex assistant daemon status
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.DaemonStatus where
diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs
new file mode 100644
index 000000000..f3bffbf79
--- /dev/null
+++ b/Assistant/Pushes.hs
@@ -0,0 +1,36 @@
+{- git-annex assistant push tracking
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Pushes where
+
+import Common.Annex
+import Utility.TSet
+
+import Data.Time.Clock
+
+type FailedPushChan = TSet FailedPush
+
+data FailedPush = FailedPush
+ { failedRemote :: Remote
+ , failedTimeStamp :: UTCTime
+ }
+
+newFailedPushChan :: IO FailedPushChan
+newFailedPushChan = newTSet
+
+{- Gets all failed pushes. Blocks until there is at least one failed push. -}
+getFailedPushes :: FailedPushChan -> IO [FailedPush]
+getFailedPushes = getTSet
+
+{- Puts failed pushes back into the channel.
+ - Note: Original order is not preserved. -}
+refillFailedPushes :: FailedPushChan -> [FailedPush] -> IO ()
+refillFailedPushes = putTSet
+
+{- Records a failed push in the channel. -}
+recordFailedPush :: FailedPushChan -> FailedPush -> IO ()
+recordFailedPush = putTSet1
diff --git a/Assistant/ThreadedMonad.hs b/Assistant/ThreadedMonad.hs
index 51f579d07..6d3d25778 100644
--- a/Assistant/ThreadedMonad.hs
+++ b/Assistant/ThreadedMonad.hs
@@ -1,6 +1,8 @@
{- making the Annex monad available across threads
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
{-# LANGUAGE BangPatterns #-}
@@ -32,7 +34,8 @@ withThreadState a = do
{- Runs an Annex action, using the state from the MVar.
-
- - This serializes calls by threads. -}
+ - This serializes calls by threads; only one thread can run in Annex at a
+ - time. -}
runThreadState :: ThreadState -> Annex a -> IO a
runThreadState mvar a = do
startstate <- takeMVar mvar
diff --git a/Assistant/Committer.hs b/Assistant/Threads/Committer.hs
index 63df8cafc..488056fa2 100644
--- a/Assistant/Committer.hs
+++ b/Assistant/Threads/Committer.hs
@@ -1,14 +1,17 @@
{- git-annex assistant commit thread
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
-module Assistant.Committer where
+module Assistant.Threads.Committer where
import Common.Annex
import Assistant.Changes
+import Assistant.Commits
import Assistant.ThreadedMonad
-import Assistant.Watcher
+import Assistant.Threads.Watcher
import qualified Annex
import qualified Annex.Queue
import qualified Git.Command
@@ -26,8 +29,8 @@ import qualified Data.Set as S
import Data.Either
{- This thread makes git commits at appropriate times. -}
-commitThread :: ThreadState -> ChangeChan -> IO ()
-commitThread st changechan = runEvery (Seconds 1) $ do
+commitThread :: ThreadState -> ChangeChan -> CommitChan -> IO ()
+commitThread st changechan commitchan = runEvery (Seconds 1) $ do
-- We already waited one second as a simple rate limiter.
-- Next, wait until at least one change is available for
-- processing.
@@ -40,6 +43,7 @@ commitThread st changechan = runEvery (Seconds 1) $ do
if shouldCommit time readychanges
then do
void $ tryIO $ runThreadState st commitStaged
+ recordCommit commitchan (Commit time)
else refillChanges changechan readychanges
else refillChanges changechan changes
diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs
new file mode 100644
index 000000000..d2c8b9b76
--- /dev/null
+++ b/Assistant/Threads/Merger.hs
@@ -0,0 +1,72 @@
+{- git-annex assistant git merge thread
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.Merger where
+
+import Common.Annex
+import Assistant.ThreadedMonad
+import Utility.DirWatcher
+import Utility.Types.DirWatcher
+import qualified Git
+import qualified Git.Merge
+import qualified Git.Branch
+import qualified Command.Sync
+
+{- This thread watches for changes to .git/refs/heads/synced/*,
+ - which indicate incoming pushes. It merges those pushes into the
+ - currently checked out branch. -}
+mergeThread :: ThreadState -> IO ()
+mergeThread st = do
+ g <- runThreadState st $ fromRepo id
+ let dir = Git.localGitDir g </> "refs" </> "heads" </> "synced"
+ createDirectoryIfMissing True dir
+ let hook a = Just $ runHandler g a
+ let hooks = mkWatchHooks
+ { addHook = hook onAdd
+ , errHook = hook onErr
+ }
+ watchDir dir (const False) hooks id
+ where
+
+type Handler = Git.Repo -> FilePath -> Maybe FileStatus -> IO ()
+
+{- Runs an action handler.
+ -
+ - Exceptions are ignored, otherwise a whole thread could be crashed.
+ -}
+runHandler :: Git.Repo -> Handler -> FilePath -> Maybe FileStatus -> IO ()
+runHandler g handler file filestatus = void $ do
+ either print (const noop) =<< tryIO go
+ where
+ go = handler g file filestatus
+
+{- Called when there's an error with inotify. -}
+onErr :: Handler
+onErr _ msg _ = error msg
+
+{- Called when a new branch ref is written.
+ -
+ - This relies on git's atomic method of updating branch ref files,
+ - which is to first write the new file to .lock, and then rename it
+ - over the old file. So, ignore .lock files, and the rename ensures
+ - the watcher sees a new file being added on each update.
+ -
+ - At startup, synthetic add events fire, causing this to run, but that's
+ - ok; it ensures that any changes pushed since the last time the assistant
+ - ran are merged in.
+ -}
+onAdd :: Handler
+onAdd g file _
+ | ".lock" `isSuffixOf` file = noop
+ | otherwise = do
+ let branch = Git.Ref $ "refs" </> "heads" </> takeFileName file
+ current <- Git.Branch.current g
+ when (Just branch == current) $
+ void $ mergeBranch branch g
+
+mergeBranch :: Git.Ref -> Git.Repo -> IO Bool
+mergeBranch = Git.Merge.mergeNonInteractive . Command.Sync.syncBranch
diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs
new file mode 100644
index 000000000..6a4ae7838
--- /dev/null
+++ b/Assistant/Threads/Pusher.hs
@@ -0,0 +1,78 @@
+{- git-annex assistant git pushing threads
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.Pusher where
+
+import Common.Annex
+import Assistant.Commits
+import Assistant.Pushes
+import Assistant.ThreadedMonad
+import qualified Command.Sync
+import Utility.ThreadScheduler
+import Utility.Parallel
+
+import Data.Time.Clock
+
+{- This thread retries pushes that failed before. -}
+pushRetryThread :: ThreadState -> FailedPushChan -> IO ()
+pushRetryThread st pushchan = runEvery (Seconds halfhour) $ do
+ -- We already waited half an hour, now wait until there are failed
+ -- pushes to retry.
+ pushes <- getFailedPushes pushchan
+ -- Check times, to avoid repushing a push that's too new.
+ now <- getCurrentTime
+ let (newpushes, oldpushes) = partition (toorecent now . failedTimeStamp) pushes
+ unless (null newpushes) $
+ refillFailedPushes pushchan newpushes
+ unless (null oldpushes) $
+ pushToRemotes now st pushchan $ map failedRemote oldpushes
+ where
+ halfhour = 1800
+ toorecent now time = now `diffUTCTime` time < fromIntegral halfhour
+
+{- This thread pushes git commits out to remotes soon after they are made. -}
+pushThread :: ThreadState -> CommitChan -> FailedPushChan -> IO ()
+pushThread st commitchan pushchan = do
+ remotes <- runThreadState st $ Command.Sync.syncRemotes []
+ runEvery (Seconds 2) $ do
+ -- We already waited two seconds as a simple rate limiter.
+ -- Next, wait until at least one commit has been made
+ commits <- getCommits commitchan
+ -- Now see if now's a good time to push.
+ now <- getCurrentTime
+ if shouldPush now commits
+ then pushToRemotes now st pushchan remotes
+ else refillCommits commitchan commits
+
+{- Decide if now is a good time to push to remotes.
+ -
+ - Current strategy: Immediately push all commits. The commit machinery
+ - already determines batches of changes, so we can't easily determine
+ - batches better.
+ -}
+shouldPush :: UTCTime -> [Commit] -> Bool
+shouldPush _now commits
+ | not (null commits) = True
+ | otherwise = False
+
+{- Updates the local sync branch, then pushes it to all remotes, in
+ - parallel.
+ -
+ - Avoids running possibly long-duration commands in the Annex monad, so
+ - as not to block other threads. -}
+pushToRemotes :: UTCTime -> ThreadState -> FailedPushChan -> [Remote] -> IO ()
+pushToRemotes now st pushchan remotes = do
+ (g, branch) <- runThreadState st $
+ (,) <$> fromRepo id <*> Command.Sync.currentBranch
+ Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
+ failed <- map (`FailedPush` now) <$> inParallel (push g branch) remotes
+ unless (null failed) $
+ refillFailedPushes pushchan failed
+ where
+ push g branch remote =
+ ifM (Command.Sync.pushBranch remote branch g)
+ ( exitSuccess, exitFailure)
diff --git a/Assistant/SanityChecker.hs b/Assistant/Threads/SanityChecker.hs
index e2ca9da74..4db2a61b2 100644
--- a/Assistant/SanityChecker.hs
+++ b/Assistant/Threads/SanityChecker.hs
@@ -1,9 +1,11 @@
{- git-annex assistant sanity checker
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
-module Assistant.SanityChecker (
+module Assistant.Threads.SanityChecker (
sanityCheckerThread
) where
@@ -13,7 +15,7 @@ import Assistant.DaemonStatus
import Assistant.ThreadedMonad
import Assistant.Changes
import Utility.ThreadScheduler
-import qualified Assistant.Watcher
+import qualified Assistant.Threads.Watcher as Watcher
import Data.Time.Clock.POSIX
@@ -77,5 +79,5 @@ check st status changechan = do
insanity m = runThreadState st $ warning m
addsymlink file s = do
insanity $ "found unstaged symlink: " ++ file
- Assistant.Watcher.runHandler st status changechan
- Assistant.Watcher.onAddSymlink file s
+ Watcher.runHandler st status changechan
+ Watcher.onAddSymlink file s
diff --git a/Assistant/Watcher.hs b/Assistant/Threads/Watcher.hs
index db58f01e8..1b6ec15f1 100644
--- a/Assistant/Watcher.hs
+++ b/Assistant/Threads/Watcher.hs
@@ -7,7 +7,7 @@
{-# LANGUAGE CPP #-}
-module Assistant.Watcher where
+module Assistant.Threads.Watcher where
import Common.Annex
import Assistant.ThreadedMonad
@@ -27,7 +27,6 @@ import Annex.Content
import Annex.CatFile
import Git.Types
-import Control.Concurrent.STM
import Data.Bits.Utils
import qualified Data.ByteString.Lazy as L
@@ -96,8 +95,7 @@ runHandler st dstatus changechan handler file filestatus = void $ do
case r of
Left e -> print e
Right Nothing -> noop
- Right (Just change) -> void $
- runChangeChan $ writeTChan changechan change
+ Right (Just change) -> recordChange changechan change
where
go = runThreadState st $ handler file filestatus dstatus
diff --git a/Command/Assistant.hs b/Command/Assistant.hs
new file mode 100644
index 000000000..60eac5d21
--- /dev/null
+++ b/Command/Assistant.hs
@@ -0,0 +1,18 @@
+{- git-annex assistant
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Command.Assistant where
+
+import Command
+import qualified Command.Watch
+
+def :: [Command]
+def = [withOptions [Command.Watch.foregroundOption, Command.Watch.stopOption] $
+ command "assistant" paramNothing seek "automatically handle changes"]
+
+seek :: [CommandSeek]
+seek = Command.Watch.mkSeek True
diff --git a/Command/Sync.hs b/Command/Sync.hs
index 1da6b0b81..912ce944c 100644
--- a/Command/Sync.hs
+++ b/Command/Sync.hs
@@ -32,7 +32,7 @@ def = [command "sync" (paramOptional (paramRepeating paramRemote))
-- syncing involves several operations, any of which can independently fail
seek :: CommandSeek
seek rs = do
- !branch <- fromMaybe nobranch <$> inRepo Git.Branch.current
+ branch <- currentBranch
remotes <- syncRemotes rs
return $ concat
[ [ commit ]
@@ -42,6 +42,11 @@ seek rs = do
, [ pushLocal branch ]
, [ pushRemote remote branch | remote <- remotes ]
]
+
+currentBranch :: Annex Git.Ref
+currentBranch = do
+ !branch <- fromMaybe nobranch <$> inRepo Git.Branch.current
+ return branch
where
nobranch = error "no branch is checked out"
@@ -91,7 +96,7 @@ mergeLocal branch = go =<< needmerge
syncbranch = syncBranch branch
needmerge = do
unlessM (inRepo $ Git.Ref.exists syncbranch) $
- updateBranch syncbranch
+ inRepo $ updateBranch syncbranch
inRepo $ Git.Branch.changed branch syncbranch
go False = stop
go True = do
@@ -100,17 +105,17 @@ mergeLocal branch = go =<< needmerge
pushLocal :: Git.Ref -> CommandStart
pushLocal branch = do
- updateBranch $ syncBranch branch
+ inRepo $ updateBranch $ syncBranch branch
stop
-updateBranch :: Git.Ref -> Annex ()
-updateBranch syncbranch =
+updateBranch :: Git.Ref -> Git.Repo -> IO ()
+updateBranch syncbranch g =
unlessM go $ error $ "failed to update " ++ show syncbranch
where
- go = inRepo $ Git.Command.runBool "branch"
+ go = Git.Command.runBool "branch"
[ Param "-f"
, Param $ show $ Git.Ref.base syncbranch
- ]
+ ] g
pullRemote :: Remote -> Git.Ref -> CommandStart
pullRemote remote branch = do
@@ -136,19 +141,27 @@ mergeRemote remote branch = all id <$> (mapM merge =<< tomerge)
pushRemote :: Remote -> Git.Ref -> CommandStart
pushRemote remote branch = go =<< needpush
where
- needpush = anyM (newer remote) [syncbranch, Annex.Branch.name]
+ needpush = anyM (newer remote) [syncBranch branch, Annex.Branch.name]
go False = stop
go True = do
showStart "push" (Remote.name remote)
next $ next $ do
showOutput
- inRepo $ Git.Command.runBool "push"
- [ Param (Remote.name remote)
- , Param (show Annex.Branch.name)
- , Param refspec
- ]
- refspec = show (Git.Ref.base branch) ++ ":" ++ show (Git.Ref.base syncbranch)
- syncbranch = syncBranch branch
+ inRepo $ pushBranch remote branch
+
+pushBranch :: Remote -> Git.Ref -> Git.Repo -> IO Bool
+pushBranch remote branch g =
+ Git.Command.runBool "push"
+ [ Param (Remote.name remote)
+ , Param (show Annex.Branch.name)
+ , Param refspec
+ ] g
+ where
+ refspec = concat
+ [ show $ Git.Ref.base branch
+ , ":"
+ , show $ Git.Ref.base $ syncBranch branch
+ ]
mergeAnnex :: CommandStart
mergeAnnex = do
diff --git a/Command/Watch.hs b/Command/Watch.hs
index 5681b3861..744844c4d 100644
--- a/Command/Watch.hs
+++ b/Command/Watch.hs
@@ -1,6 +1,3 @@
-{-# LANGUAGE CPP #-}
-{-# LANGUAGE BangPatterns #-}
-
{- git-annex watch command
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
@@ -19,10 +16,13 @@ def :: [Command]
def = [withOptions [foregroundOption, stopOption] $
command "watch" paramNothing seek "watch for changes"]
-seek :: [CommandSeek]
-seek = [withFlag stopOption $ \stopdaemon ->
+mkSeek :: Bool -> [CommandSeek]
+mkSeek assistant = [withFlag stopOption $ \stopdaemon ->
withFlag foregroundOption $ \foreground ->
- withNothing $ start foreground stopdaemon]
+ withNothing $ start assistant foreground stopdaemon]
+
+seek :: [CommandSeek]
+seek = mkSeek False
foregroundOption :: Option
foregroundOption = Option.flag [] "foreground" "do not daemonize"
@@ -30,9 +30,9 @@ foregroundOption = Option.flag [] "foreground" "do not daemonize"
stopOption :: Option
stopOption = Option.flag [] "stop" "stop daemon"
-start :: Bool -> Bool -> CommandStart
-start foreground stopdaemon = notBareRepo $ do
+start :: Bool -> Bool -> Bool -> CommandStart
+start assistant foreground stopdaemon = notBareRepo $ do
if stopdaemon
then stopDaemon
- else startDaemon foreground -- does not return
+ else startDaemon assistant foreground -- does not return
stop
diff --git a/GitAnnex.hs b/GitAnnex.hs
index a4c5eb849..ee451352f 100644
--- a/GitAnnex.hs
+++ b/GitAnnex.hs
@@ -59,6 +59,7 @@ import qualified Command.Map
import qualified Command.Upgrade
import qualified Command.Version
import qualified Command.Watch
+import qualified Command.Assistant
cmds :: [Command]
cmds = concat
@@ -101,6 +102,7 @@ cmds = concat
, Command.Upgrade.def
, Command.Version.def
, Command.Watch.def
+ , Command.Assistant.def
]
options :: [Option]
diff --git a/Utility/Parallel.hs b/Utility/Parallel.hs
new file mode 100644
index 000000000..6e4671c05
--- /dev/null
+++ b/Utility/Parallel.hs
@@ -0,0 +1,20 @@
+{- parallel processes
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Utility.Parallel where
+
+import Common
+
+{- Runs an action in parallel with a set of values.
+ - Returns values that caused the action to fail. -}
+inParallel :: (v -> IO ()) -> [v] -> IO [v]
+inParallel a l = do
+ pids <- mapM (forkProcess . a) l
+ statuses <- mapM (getProcessStatus True False) pids
+ return $ map fst $ filter (failed . snd) $ zip l statuses
+ where
+ failed v = v /= Just (Exited ExitSuccess)
diff --git a/Utility/TSet.hs b/Utility/TSet.hs
new file mode 100644
index 000000000..24d345477
--- /dev/null
+++ b/Utility/TSet.hs
@@ -0,0 +1,39 @@
+{- Transactional sets
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -}
+
+module Utility.TSet where
+
+import Common
+
+import Control.Concurrent.STM
+
+type TSet = TChan
+
+runTSet :: STM a -> IO a
+runTSet = atomically
+
+newTSet :: IO (TSet a)
+newTSet = atomically newTChan
+
+{- Gets the contents of the TSet. Blocks until at least one item is
+ - present. -}
+getTSet :: TSet a -> IO [a]
+getTSet tset = runTSet $ do
+ c <- readTChan tset
+ go [c]
+ where
+ go l = do
+ v <- tryReadTChan tset
+ case v of
+ Nothing -> return l
+ Just c -> go (c:l)
+
+{- Puts items into a TSet. -}
+putTSet :: TSet a -> [a] -> IO ()
+putTSet tset vs = runTSet $ mapM_ (writeTChan tset) vs
+
+{- Put a single item into a TSet. -}
+putTSet1 :: TSet a -> a -> IO ()
+putTSet1 tset v = void $ runTSet $ writeTChan tset v
diff --git a/Utility/Types/DirWatcher.hs b/Utility/Types/DirWatcher.hs
index c828a0593..ba7eae6a1 100644
--- a/Utility/Types/DirWatcher.hs
+++ b/Utility/Types/DirWatcher.hs
@@ -20,3 +20,6 @@ data WatchHooks = WatchHooks
, delDirHook :: Hook FilePath
, errHook :: Hook String -- error message
}
+
+mkWatchHooks :: WatchHooks
+mkWatchHooks = WatchHooks Nothing Nothing Nothing Nothing Nothing
diff --git a/doc/design/assistant/syncing.mdwn b/doc/design/assistant/syncing.mdwn
index 8173457c5..a2b80eebc 100644
--- a/doc/design/assistant/syncing.mdwn
+++ b/doc/design/assistant/syncing.mdwn
@@ -13,8 +13,9 @@ all the other git clones, at both the git level and the key/value level.
[The watching can be done with the existing inotify code! This avoids needing
any special mechanism to notify a remote that it's been synced to.]
**done**
-1. Periodically retry pushes that failed. Also, detect if a push failed
- due to not being up-to-date, pull, and repush.
+1. Periodically retry pushes that failed. **done** (every half an hour)
+1. Also, detect if a push failed due to not being up-to-date, pull,
+ and repush.
2. Use a git merge driver that adds both conflicting files,
so conflicts never break a sync.
3. Investigate the XMPP approach like dvcs-autosync does, or other ways of
diff --git a/doc/git-annex.mdwn b/doc/git-annex.mdwn
index 39fad0488..965a07f0d 100644
--- a/doc/git-annex.mdwn
+++ b/doc/git-annex.mdwn
@@ -180,6 +180,10 @@ subdirectories).
To not daemonize, run with --foreground ; to stop a running daemon,
run with --stop
+* assistant
+
+ Like watch, but also automatically syncs changes to other remotes.
+
# REPOSITORY SETUP COMMANDS
* init [description]