summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Assistant.hs69
-rw-r--r--Assistant/Changes.hs27
-rw-r--r--Assistant/Commits.hs34
-rw-r--r--Assistant/DaemonStatus.hs2
-rw-r--r--Assistant/Pushes.hs46
-rw-r--r--Assistant/ThreadedMonad.hs18
-rw-r--r--Assistant/Threads/Committer.hs (renamed from Assistant/Committer.hs)12
-rw-r--r--Assistant/Threads/Merger.hs87
-rw-r--r--Assistant/Threads/Pusher.hs87
-rw-r--r--Assistant/Threads/SanityChecker.hs (renamed from Assistant/SanityChecker.hs)10
-rw-r--r--Assistant/Threads/Watcher.hs (renamed from Assistant/Watcher.hs)8
-rw-r--r--Command/Assistant.hs18
-rw-r--r--Command/Sync.hs43
-rw-r--r--Command/Watch.hs18
-rw-r--r--GitAnnex.hs2
-rw-r--r--Utility/DirWatcher.hs36
-rw-r--r--Utility/Parallel.hs22
-rw-r--r--Utility/TSet.hs39
-rw-r--r--Utility/Types/DirWatcher.hs3
-rw-r--r--doc/design/assistant/syncing.mdwn9
-rw-r--r--doc/git-annex.mdwn4
21 files changed, 497 insertions, 97 deletions
diff --git a/Assistant.hs b/Assistant.hs
index e924d9477..a077cf10f 100644
--- a/Assistant.hs
+++ b/Assistant.hs
@@ -22,14 +22,25 @@
- Thread 5: committer
- Waits for changes to occur, and runs the git queue to update its
- index, then commits.
- - Thread 6: status logger
+ - Thread 6: pusher
+ - Waits for commits to be made, and pushes updated branches to remotes,
+ - in parallel. (Forks a process for each git push.)
+ - Thread 7: push retryer
+ - Runs every 30 minutes when there are failed pushes, and retries
+ - them.
+ - Thread 8: merger
+ - Waits for pushes to be received from remotes, and merges the
+ - updated branches into the current branch. This uses inotify
+ - on .git/refs/heads, so there are additional inotify threads
+ - associated with it, too.
+ - Thread 9: status logger
- Wakes up periodically and records the daemon's status to disk.
- - Thread 7: sanity checker
+ - Thread 10: sanity checker
- Wakes up periodically (rarely) and does sanity checks.
-
- ThreadState: (MVar)
- The Annex state is stored here, which allows resuscitating the
- - Annex monad in IO actions run by the inotify and committer
+ - Annex monad in IO actions run by the watcher and committer
- threads. Thus, a single state is shared amoung the threads, and
- only one at a time can access it.
- DaemonStatusHandle: (MVar)
@@ -39,6 +50,12 @@
- ChangeChan: (STM TChan)
- Changes are indicated by writing to this channel. The committer
- reads from it.
+ - CommitChan: (STM TChan)
+ - Commits are indicated by writing to this channel. The pusher reads
+ - from it.
+ - FailedPushMap (STM TMVar)
+ - Failed pushes are indicated by writing to this TMVar. The push
+ - retrier blocks until they're available.
-}
module Assistant where
@@ -47,39 +64,47 @@ import Common.Annex
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.Changes
-import Assistant.Watcher
-import Assistant.Committer
-import Assistant.SanityChecker
+import Assistant.Commits
+import Assistant.Pushes
+import Assistant.Threads.Watcher
+import Assistant.Threads.Committer
+import Assistant.Threads.Pusher
+import Assistant.Threads.Merger
+import Assistant.Threads.SanityChecker
import qualified Utility.Daemon
import Utility.LogFile
+import Utility.ThreadScheduler
import Control.Concurrent
-startDaemon :: Bool -> Annex ()
-startDaemon foreground
+startDaemon :: Bool -> Bool -> Annex ()
+startDaemon assistant foreground
| foreground = do
- showStart "watch" "."
+ showStart (if assistant then "assistant" else "watch") "."
go id
| otherwise = do
logfd <- liftIO . openLog =<< fromRepo gitAnnexLogFile
pidfile <- fromRepo gitAnnexPidFile
go $ Utility.Daemon.daemonize logfd (Just pidfile) False
where
- go a = withThreadState $ \st -> do
+ go daemonize = withThreadState $ \st -> do
checkCanWatch
dstatus <- startDaemonStatus
- liftIO $ a $ do
- changechan <- newChangeChan
- -- The commit thread is started early,
- -- so that the user can immediately
- -- begin adding files and having them
- -- committed, even while the startup scan
- -- is taking place.
- _ <- forkIO $ commitThread st changechan
- _ <- forkIO $ daemonStatusThread st dstatus
- _ <- forkIO $ sanityCheckerThread st dstatus changechan
- -- Does not return.
- watchThread st dstatus changechan
+ liftIO $ daemonize $ run dstatus st
+ run dstatus st = do
+ changechan <- newChangeChan
+ commitchan <- newCommitChan
+ pushmap <- newFailedPushMap
+ mapM_ (void . forkIO)
+ [ commitThread st changechan commitchan
+ , pushThread st commitchan pushmap
+ , pushRetryThread st pushmap
+ , mergeThread st
+ , daemonStatusThread st dstatus
+ , sanityCheckerThread st dstatus changechan
+ , watchThread st dstatus changechan
+ ]
+ waitForTermination
stopDaemon :: Annex ()
stopDaemon = liftIO . Utility.Daemon.stopDaemon =<< fromRepo gitAnnexPidFile
diff --git a/Assistant/Changes.hs b/Assistant/Changes.hs
index 173ba1922..eca922109 100644
--- a/Assistant/Changes.hs
+++ b/Assistant/Changes.hs
@@ -1,6 +1,8 @@
{- git-annex assistant change tracking
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.Changes where
@@ -8,14 +10,14 @@ module Assistant.Changes where
import Common.Annex
import qualified Annex.Queue
import Types.KeySource
+import Utility.TSet
-import Control.Concurrent.STM
import Data.Time.Clock
data ChangeType = AddChange | LinkChange | RmChange | RmDirChange
deriving (Show, Eq)
-type ChangeChan = TChan Change
+type ChangeChan = TSet Change
data Change
= Change
@@ -29,11 +31,8 @@ data Change
}
deriving (Show)
-runChangeChan :: STM a -> IO a
-runChangeChan = atomically
-
newChangeChan :: IO ChangeChan
-newChangeChan = atomically newTChan
+newChangeChan = newTSet
{- Handlers call this when they made a change that needs to get committed. -}
madeChange :: FilePath -> ChangeType -> Annex (Maybe Change)
@@ -65,17 +64,13 @@ finishedChange c = c
{- Gets all unhandled changes.
- Blocks until at least one change is made. -}
getChanges :: ChangeChan -> IO [Change]
-getChanges chan = runChangeChan $ do
- c <- readTChan chan
- go [c]
- where
- go l = do
- v <- tryReadTChan chan
- case v of
- Nothing -> return l
- Just c -> go (c:l)
+getChanges = getTSet
{- Puts unhandled changes back into the channel.
- Note: Original order is not preserved. -}
refillChanges :: ChangeChan -> [Change] -> IO ()
-refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs
+refillChanges = putTSet
+
+{- Records a change in the channel. -}
+recordChange :: ChangeChan -> Change -> IO ()
+recordChange = putTSet1
diff --git a/Assistant/Commits.hs b/Assistant/Commits.hs
new file mode 100644
index 000000000..86fd7599f
--- /dev/null
+++ b/Assistant/Commits.hs
@@ -0,0 +1,34 @@
+{- git-annex assistant commit tracking
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Commits where
+
+import Utility.TSet
+
+import Data.Time.Clock
+
+type CommitChan = TSet Commit
+
+data Commit = Commit UTCTime
+ deriving (Show)
+
+newCommitChan :: IO CommitChan
+newCommitChan = newTSet
+
+{- Gets all unhandled commits.
+ - Blocks until at least one commit is made. -}
+getCommits :: CommitChan -> IO [Commit]
+getCommits = getTSet
+
+{- Puts unhandled commits back into the channel.
+ - Note: Original order is not preserved. -}
+refillCommits :: CommitChan -> [Commit] -> IO ()
+refillCommits = putTSet
+
+{- Records a commit in the channel. -}
+recordCommit :: CommitChan -> Commit -> IO ()
+recordCommit = putTSet1
diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs
index e5ba3d151..c7713e7d5 100644
--- a/Assistant/DaemonStatus.hs
+++ b/Assistant/DaemonStatus.hs
@@ -1,6 +1,8 @@
{- git-annex assistant daemon status
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.DaemonStatus where
diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs
new file mode 100644
index 000000000..f411dda07
--- /dev/null
+++ b/Assistant/Pushes.hs
@@ -0,0 +1,46 @@
+{- git-annex assistant push tracking
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Pushes where
+
+import Common.Annex
+
+import Control.Concurrent.STM
+import Data.Time.Clock
+import qualified Data.Map as M
+
+{- Track the most recent push failure for each remote. -}
+type PushMap = M.Map Remote UTCTime
+type FailedPushMap = TMVar PushMap
+
+{- The TMVar starts empty, and is left empty when there are no
+ - failed pushes. This way we can block until there are some failed pushes.
+ -}
+newFailedPushMap :: IO FailedPushMap
+newFailedPushMap = atomically newEmptyTMVar
+
+{- Blocks until there are failed pushes.
+ - Returns Remotes whose pushes failed a given time duration or more ago.
+ - (This may be an empty list.) -}
+getFailedPushesBefore :: FailedPushMap -> NominalDiffTime -> IO [Remote]
+getFailedPushesBefore v duration = do
+ m <- atomically $ readTMVar v
+ now <- getCurrentTime
+ return $ M.keys $ M.filter (not . toorecent now) m
+ where
+ toorecent now time = now `diffUTCTime` time < duration
+
+{- Modifies the map. -}
+changeFailedPushMap :: FailedPushMap -> (PushMap -> PushMap) -> IO ()
+changeFailedPushMap v a = atomically $
+ store . a . fromMaybe M.empty =<< tryTakeTMVar v
+ where
+ {- tryTakeTMVar empties the TMVar; refill it only if
+ - the modified map is not itself empty -}
+ store m
+ | m == M.empty = noop
+ | otherwise = putTMVar v $! m
diff --git a/Assistant/ThreadedMonad.hs b/Assistant/ThreadedMonad.hs
index 51f579d07..7b915e12c 100644
--- a/Assistant/ThreadedMonad.hs
+++ b/Assistant/ThreadedMonad.hs
@@ -1,17 +1,17 @@
{- making the Annex monad available across threads
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
-{-# LANGUAGE BangPatterns #-}
-
module Assistant.ThreadedMonad where
import Common.Annex
import qualified Annex
import Control.Concurrent
-import Control.Exception (throw)
+import Data.Tuple
{- The Annex state is stored in a MVar, so that threaded actions can access
- it. -}
@@ -32,13 +32,7 @@ withThreadState a = do
{- Runs an Annex action, using the state from the MVar.
-
- - This serializes calls by threads. -}
+ - This serializes calls by threads; only one thread can run in Annex at a
+ - time. -}
runThreadState :: ThreadState -> Annex a -> IO a
-runThreadState mvar a = do
- startstate <- takeMVar mvar
- -- catch IO errors and rethrow after restoring the MVar
- !(r, newstate) <- catchIO (Annex.run startstate a) $ \e -> do
- putMVar mvar startstate
- throw e
- putMVar mvar newstate
- return r
+runThreadState mvar a = modifyMVar mvar $ \state -> swap <$> Annex.run state a
diff --git a/Assistant/Committer.hs b/Assistant/Threads/Committer.hs
index 63df8cafc..488056fa2 100644
--- a/Assistant/Committer.hs
+++ b/Assistant/Threads/Committer.hs
@@ -1,14 +1,17 @@
{- git-annex assistant commit thread
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
-module Assistant.Committer where
+module Assistant.Threads.Committer where
import Common.Annex
import Assistant.Changes
+import Assistant.Commits
import Assistant.ThreadedMonad
-import Assistant.Watcher
+import Assistant.Threads.Watcher
import qualified Annex
import qualified Annex.Queue
import qualified Git.Command
@@ -26,8 +29,8 @@ import qualified Data.Set as S
import Data.Either
{- This thread makes git commits at appropriate times. -}
-commitThread :: ThreadState -> ChangeChan -> IO ()
-commitThread st changechan = runEvery (Seconds 1) $ do
+commitThread :: ThreadState -> ChangeChan -> CommitChan -> IO ()
+commitThread st changechan commitchan = runEvery (Seconds 1) $ do
-- We already waited one second as a simple rate limiter.
-- Next, wait until at least one change is available for
-- processing.
@@ -40,6 +43,7 @@ commitThread st changechan = runEvery (Seconds 1) $ do
if shouldCommit time readychanges
then do
void $ tryIO $ runThreadState st commitStaged
+ recordCommit commitchan (Commit time)
else refillChanges changechan readychanges
else refillChanges changechan changes
diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs
new file mode 100644
index 000000000..c7da86a8d
--- /dev/null
+++ b/Assistant/Threads/Merger.hs
@@ -0,0 +1,87 @@
+{- git-annex assistant git merge thread
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.Merger where
+
+import Common.Annex
+import Assistant.ThreadedMonad
+import Utility.DirWatcher
+import Utility.Types.DirWatcher
+import qualified Annex.Branch
+import qualified Git
+import qualified Git.Command
+import qualified Git.Merge
+import qualified Git.Branch
+import qualified Command.Sync
+import qualified Remote
+
+{- This thread watches for changes to .git/refs/heads/synced/*,
+ - which indicate incoming pushes. It merges those pushes into the
+ - currently checked out branch. -}
+mergeThread :: ThreadState -> IO ()
+mergeThread st = do
+ g <- runThreadState st $ fromRepo id
+ let dir = Git.localGitDir g </> "refs" </> "heads" </> "synced"
+ createDirectoryIfMissing True dir
+ let hook a = Just $ runHandler g a
+ let hooks = mkWatchHooks
+ { addHook = hook onAdd
+ , errHook = hook onErr
+ }
+ void $ watchDir dir (const False) hooks id
+
+type Handler = Git.Repo -> FilePath -> Maybe FileStatus -> IO ()
+
+{- Runs an action handler.
+ -
+ - Exceptions are ignored, otherwise a whole thread could be crashed.
+ -}
+runHandler :: Git.Repo -> Handler -> FilePath -> Maybe FileStatus -> IO ()
+runHandler g handler file filestatus = void $ do
+ either print (const noop) =<< tryIO go
+ where
+ go = handler g file filestatus
+
+{- Called when there's an error with inotify. -}
+onErr :: Handler
+onErr _ msg _ = error msg
+
+{- Called when a new branch ref is written.
+ -
+ - This relies on git's atomic method of updating branch ref files,
+ - which is to first write the new file to .lock, and then rename it
+ - over the old file. So, ignore .lock files, and the rename ensures
+ - the watcher sees a new file being added on each update.
+ -
+ - At startup, synthetic add events fire, causing this to run, but that's
+ - ok; it ensures that any changes pushed since the last time the assistant
+ - ran are merged in.
+ -}
+onAdd :: Handler
+onAdd g file _
+ | ".lock" `isSuffixOf` file = noop
+ | otherwise = do
+ let changedbranch = Git.Ref $
+ "refs" </> "heads" </> takeFileName file
+ current <- Git.Branch.current g
+ when (Just changedbranch == current) $
+ void $ mergeBranch changedbranch g
+
+mergeBranch :: Git.Ref -> Git.Repo -> IO Bool
+mergeBranch = Git.Merge.mergeNonInteractive . Command.Sync.syncBranch
+
+{- Manually pull from remotes and merge their branches. Called by the pusher
+ - when a push fails, which can happen due to a remote not having pushed
+ - changes to us. That could be because it doesn't have us as a remote, or
+ - because the assistant is not running there, or other reasons. -}
+manualPull :: Git.Ref -> [Remote] -> Annex ()
+manualPull currentbranch remotes = do
+ forM_ remotes $ \r ->
+ inRepo $ Git.Command.runBool "fetch" [Param $ Remote.name r]
+ Annex.Branch.forceUpdate
+ forM_ remotes $ \r ->
+ Command.Sync.mergeRemote r currentbranch
diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs
new file mode 100644
index 000000000..04d343528
--- /dev/null
+++ b/Assistant/Threads/Pusher.hs
@@ -0,0 +1,87 @@
+{- git-annex assistant git pushing threads
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Threads.Pusher where
+
+import Common.Annex
+import Assistant.Commits
+import Assistant.Pushes
+import Assistant.ThreadedMonad
+import Assistant.Threads.Merger
+import qualified Command.Sync
+import Utility.ThreadScheduler
+import Utility.Parallel
+
+import Data.Time.Clock
+import qualified Data.Map as M
+
+{- This thread retries pushes that failed before. -}
+pushRetryThread :: ThreadState -> FailedPushMap -> IO ()
+pushRetryThread st pushmap = runEvery (Seconds halfhour) $ do
+ -- We already waited half an hour, now wait until there are failed
+ -- pushes to retry.
+ topush <- getFailedPushesBefore pushmap (fromIntegral halfhour)
+ unless (null topush) $ do
+ now <- getCurrentTime
+ pushToRemotes now st pushmap topush
+ where
+ halfhour = 1800
+
+{- This thread pushes git commits out to remotes soon after they are made. -}
+pushThread :: ThreadState -> CommitChan -> FailedPushMap -> IO ()
+pushThread st commitchan pushmap = do
+ remotes <- runThreadState st $ Command.Sync.syncRemotes []
+ runEvery (Seconds 2) $ do
+ -- We already waited two seconds as a simple rate limiter.
+ -- Next, wait until at least one commit has been made
+ commits <- getCommits commitchan
+ -- Now see if now's a good time to push.
+ now <- getCurrentTime
+ if shouldPush now commits
+ then pushToRemotes now st pushmap remotes
+ else refillCommits commitchan commits
+
+{- Decide if now is a good time to push to remotes.
+ -
+ - Current strategy: Immediately push all commits. The commit machinery
+ - already determines batches of changes, so we can't easily determine
+ - batches better.
+ -}
+shouldPush :: UTCTime -> [Commit] -> Bool
+shouldPush _now commits
+ | not (null commits) = True
+ | otherwise = False
+
+{- Updates the local sync branch, then pushes it to all remotes, in
+ - parallel.
+ -
+ - Avoids running possibly long-duration commands in the Annex monad, so
+ - as not to block other threads. -}
+pushToRemotes :: UTCTime -> ThreadState -> FailedPushMap -> [Remote] -> IO ()
+pushToRemotes now st pushmap remotes = do
+ (g, branch) <- runThreadState st $
+ (,) <$> fromRepo id <*> Command.Sync.currentBranch
+ go True branch g remotes
+ where
+ go shouldretry branch g rs = do
+ Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
+ (succeeded, failed) <- inParallel (push g branch) rs
+ changeFailedPushMap pushmap $ \m ->
+ M.union (makemap failed) $
+ M.difference m (makemap succeeded)
+ unless (null failed || not shouldretry) $
+ retry branch g failed
+
+ makemap l = M.fromList $ zip l (repeat now)
+
+ push g branch remote =
+ ifM (Command.Sync.pushBranch remote branch g)
+ ( exitSuccess, exitFailure)
+
+ retry branch g rs = do
+ runThreadState st $ manualPull branch rs
+ go False branch g rs
diff --git a/Assistant/SanityChecker.hs b/Assistant/Threads/SanityChecker.hs
index e2ca9da74..4db2a61b2 100644
--- a/Assistant/SanityChecker.hs
+++ b/Assistant/Threads/SanityChecker.hs
@@ -1,9 +1,11 @@
{- git-annex assistant sanity checker
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
-}
-module Assistant.SanityChecker (
+module Assistant.Threads.SanityChecker (
sanityCheckerThread
) where
@@ -13,7 +15,7 @@ import Assistant.DaemonStatus
import Assistant.ThreadedMonad
import Assistant.Changes
import Utility.ThreadScheduler
-import qualified Assistant.Watcher
+import qualified Assistant.Threads.Watcher as Watcher
import Data.Time.Clock.POSIX
@@ -77,5 +79,5 @@ check st status changechan = do
insanity m = runThreadState st $ warning m
addsymlink file s = do
insanity $ "found unstaged symlink: " ++ file
- Assistant.Watcher.runHandler st status changechan
- Assistant.Watcher.onAddSymlink file s
+ Watcher.runHandler st status changechan
+ Watcher.onAddSymlink file s
diff --git a/Assistant/Watcher.hs b/Assistant/Threads/Watcher.hs
index db58f01e8..e250f4b4a 100644
--- a/Assistant/Watcher.hs
+++ b/Assistant/Threads/Watcher.hs
@@ -7,7 +7,7 @@
{-# LANGUAGE CPP #-}
-module Assistant.Watcher where
+module Assistant.Threads.Watcher where
import Common.Annex
import Assistant.ThreadedMonad
@@ -27,7 +27,6 @@ import Annex.Content
import Annex.CatFile
import Git.Types
-import Control.Concurrent.STM
import Data.Bits.Utils
import qualified Data.ByteString.Lazy as L
@@ -47,7 +46,7 @@ needLsof = error $ unlines
]
watchThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO ()
-watchThread st dstatus changechan = watchDir "." ignored hooks startup
+watchThread st dstatus changechan = void $ watchDir "." ignored hooks startup
where
startup = statupScan st dstatus
hook a = Just $ runHandler st dstatus changechan a
@@ -96,8 +95,7 @@ runHandler st dstatus changechan handler file filestatus = void $ do
case r of
Left e -> print e
Right Nothing -> noop
- Right (Just change) -> void $
- runChangeChan $ writeTChan changechan change
+ Right (Just change) -> recordChange changechan change
where
go = runThreadState st $ handler file filestatus dstatus
diff --git a/Command/Assistant.hs b/Command/Assistant.hs
new file mode 100644
index 000000000..60eac5d21
--- /dev/null
+++ b/Command/Assistant.hs
@@ -0,0 +1,18 @@
+{- git-annex assistant
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Command.Assistant where
+
+import Command
+import qualified Command.Watch
+
+def :: [Command]
+def = [withOptions [Command.Watch.foregroundOption, Command.Watch.stopOption] $
+ command "assistant" paramNothing seek "automatically handle changes"]
+
+seek :: [CommandSeek]
+seek = Command.Watch.mkSeek True
diff --git a/Command/Sync.hs b/Command/Sync.hs
index bdb5d47a7..dfaed5949 100644
--- a/Command/Sync.hs
+++ b/Command/Sync.hs
@@ -39,7 +39,7 @@ def = [command "sync" (paramOptional (paramRepeating paramRemote))
-- syncing involves several operations, any of which can independently fail
seek :: CommandSeek
seek rs = do
- !branch <- fromMaybe nobranch <$> inRepo Git.Branch.current
+ branch <- currentBranch
remotes <- syncRemotes rs
return $ concat
[ [ commit ]
@@ -49,6 +49,11 @@ seek rs = do
, [ pushLocal branch ]
, [ pushRemote remote branch | remote <- remotes ]
]
+
+currentBranch :: Annex Git.Ref
+currentBranch = do
+ !branch <- fromMaybe nobranch <$> inRepo Git.Branch.current
+ return branch
where
nobranch = error "no branch is checked out"
@@ -98,7 +103,7 @@ mergeLocal branch = go =<< needmerge
syncbranch = syncBranch branch
needmerge = do
unlessM (inRepo $ Git.Ref.exists syncbranch) $
- updateBranch syncbranch
+ inRepo $ updateBranch syncbranch
inRepo $ Git.Branch.changed branch syncbranch
go False = stop
go True = do
@@ -107,17 +112,17 @@ mergeLocal branch = go =<< needmerge
pushLocal :: Git.Ref -> CommandStart
pushLocal branch = do
- updateBranch $ syncBranch branch
+ inRepo $ updateBranch $ syncBranch branch
stop
-updateBranch :: Git.Ref -> Annex ()
-updateBranch syncbranch =
+updateBranch :: Git.Ref -> Git.Repo -> IO ()
+updateBranch syncbranch g =
unlessM go $ error $ "failed to update " ++ show syncbranch
where
- go = inRepo $ Git.Command.runBool "branch"
+ go = Git.Command.runBool "branch"
[ Param "-f"
, Param $ show $ Git.Ref.base syncbranch
- ]
+ ] g
pullRemote :: Remote -> Git.Ref -> CommandStart
pullRemote remote branch = do
@@ -143,19 +148,27 @@ mergeRemote remote branch = all id <$> (mapM merge =<< tomerge)
pushRemote :: Remote -> Git.Ref -> CommandStart
pushRemote remote branch = go =<< needpush
where
- needpush = anyM (newer remote) [syncbranch, Annex.Branch.name]
+ needpush = anyM (newer remote) [syncBranch branch, Annex.Branch.name]
go False = stop
go True = do
showStart "push" (Remote.name remote)
next $ next $ do
showOutput
- inRepo $ Git.Command.runBool "push"
- [ Param (Remote.name remote)
- , Param (show Annex.Branch.name)
- , Param refspec
- ]
- refspec = show (Git.Ref.base branch) ++ ":" ++ show (Git.Ref.base syncbranch)
- syncbranch = syncBranch branch
+ inRepo $ pushBranch remote branch
+
+pushBranch :: Remote -> Git.Ref -> Git.Repo -> IO Bool
+pushBranch remote branch g =
+ Git.Command.runBool "push"
+ [ Param (Remote.name remote)
+ , Param (show Annex.Branch.name)
+ , Param refspec
+ ] g
+ where
+ refspec = concat
+ [ show $ Git.Ref.base branch
+ , ":"
+ , show $ Git.Ref.base $ syncBranch branch
+ ]
mergeAnnex :: CommandStart
mergeAnnex = do
diff --git a/Command/Watch.hs b/Command/Watch.hs
index 5681b3861..744844c4d 100644
--- a/Command/Watch.hs
+++ b/Command/Watch.hs
@@ -1,6 +1,3 @@
-{-# LANGUAGE CPP #-}
-{-# LANGUAGE BangPatterns #-}
-
{- git-annex watch command
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
@@ -19,10 +16,13 @@ def :: [Command]
def = [withOptions [foregroundOption, stopOption] $
command "watch" paramNothing seek "watch for changes"]
-seek :: [CommandSeek]
-seek = [withFlag stopOption $ \stopdaemon ->
+mkSeek :: Bool -> [CommandSeek]
+mkSeek assistant = [withFlag stopOption $ \stopdaemon ->
withFlag foregroundOption $ \foreground ->
- withNothing $ start foreground stopdaemon]
+ withNothing $ start assistant foreground stopdaemon]
+
+seek :: [CommandSeek]
+seek = mkSeek False
foregroundOption :: Option
foregroundOption = Option.flag [] "foreground" "do not daemonize"
@@ -30,9 +30,9 @@ foregroundOption = Option.flag [] "foreground" "do not daemonize"
stopOption :: Option
stopOption = Option.flag [] "stop" "stop daemon"
-start :: Bool -> Bool -> CommandStart
-start foreground stopdaemon = notBareRepo $ do
+start :: Bool -> Bool -> Bool -> CommandStart
+start assistant foreground stopdaemon = notBareRepo $ do
if stopdaemon
then stopDaemon
- else startDaemon foreground -- does not return
+ else startDaemon assistant foreground -- does not return
stop
diff --git a/GitAnnex.hs b/GitAnnex.hs
index bf1f27bfd..7b1fa5986 100644
--- a/GitAnnex.hs
+++ b/GitAnnex.hs
@@ -62,6 +62,7 @@ import qualified Command.Upgrade
import qualified Command.Version
#ifdef WITH_ASSISTANT
import qualified Command.Watch
+import qualified Command.Assistant
#endif
cmds :: [Command]
@@ -106,6 +107,7 @@ cmds = concat
, Command.Version.def
#ifdef WITH_ASSISTANT
, Command.Watch.def
+ , Command.Assistant.def
#endif
]
diff --git a/Utility/DirWatcher.hs b/Utility/DirWatcher.hs
index 11ce7baef..213aeb50a 100644
--- a/Utility/DirWatcher.hs
+++ b/Utility/DirWatcher.hs
@@ -17,10 +17,10 @@ import Utility.Types.DirWatcher
#if WITH_INOTIFY
import qualified Utility.INotify as INotify
import qualified System.INotify as INotify
-import Utility.ThreadScheduler
#endif
#if WITH_KQUEUE
import qualified Utility.Kqueue as Kqueue
+import Control.Concurrent
#endif
type Pruner = FilePath -> Bool
@@ -72,19 +72,41 @@ closingTracked = undefined
#endif
#endif
+/* Starts a watcher thread. The runStartup action is passed a scanner action
+ * to run, that will return once the initial directory scan is complete.
+ * Once runStartup returns, the watcher thread continues running,
+ * and processing events. Returns a DirWatcherHandle that can be used
+ * to shutdown later. */
#if WITH_INOTIFY
-watchDir :: FilePath -> Pruner -> WatchHooks -> (IO () -> IO ()) -> IO ()
-watchDir dir prune hooks runstartup = INotify.withINotify $ \i -> do
+type DirWatcherHandle = INotify.INotify
+watchDir :: FilePath -> Pruner -> WatchHooks -> (IO () -> IO ()) -> IO DirWatcherHandle
+watchDir dir prune hooks runstartup = do
+ i <- INotify.initINotify
runstartup $ INotify.watchDir i dir prune hooks
- waitForTermination -- Let the inotify thread run.
+ return i
#else
#if WITH_KQUEUE
-watchDir :: FilePath -> Pruner -> WatchHooks -> (IO Kqueue.Kqueue -> IO Kqueue.Kqueue) -> IO ()
+type DirWatcherHandle = ThreadId
+watchDir :: FilePath -> Pruner -> WatchHooks -> (IO Kqueue.Kqueue -> IO Kqueue.Kqueue) -> IO DirWatcherHandle
watchDir dir ignored hooks runstartup = do
kq <- runstartup $ Kqueue.initKqueue dir ignored
- Kqueue.runHooks kq hooks
+ forkIO $ Kqueue.runHooks kq hooks
#else
-watchDir :: FilePath -> Pruner -> WatchHooks -> (IO () -> IO ()) -> IO ()
+type DirWatcherHandle = ()
+watchDir :: FilePath -> Pruner -> WatchHooks -> (IO () -> IO ()) -> IO DirWatcherHandle
watchDir = undefined
#endif
#endif
+
+#if WITH_INOTIFY
+stopWatchDir :: DirWatcherHandle -> IO ()
+stopWatchDir = INotify.killINotify
+#else
+#if WITH_KQUEUE
+stopWatchDir :: DirWatcherHandle -> IO ()
+stopWatchDir = killThread
+#else
+stopWatchDir :: DirWatcherHandle -> IO ()
+stopWatchDir = undefined
+#endif
+#endif
diff --git a/Utility/Parallel.hs b/Utility/Parallel.hs
new file mode 100644
index 000000000..9df95ab2b
--- /dev/null
+++ b/Utility/Parallel.hs
@@ -0,0 +1,22 @@
+{- parallel processes
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Utility.Parallel where
+
+import Common
+
+{- Runs an action in parallel with a set of values.
+ - Returns the values partitioned into ones with which the action succeeded,
+ - and ones with which it failed. -}
+inParallel :: (v -> IO ()) -> [v] -> IO ([v], [v])
+inParallel a l = do
+ pids <- mapM (forkProcess . a) l
+ statuses <- mapM (getProcessStatus True False) pids
+ return $ reduce $ partition (succeeded . snd) $ zip l statuses
+ where
+ succeeded v = v == Just (Exited ExitSuccess)
+ reduce (x,y) = (map fst x, map fst y)
diff --git a/Utility/TSet.hs b/Utility/TSet.hs
new file mode 100644
index 000000000..24d345477
--- /dev/null
+++ b/Utility/TSet.hs
@@ -0,0 +1,39 @@
+{- Transactional sets
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -}
+
+module Utility.TSet where
+
+import Common
+
+import Control.Concurrent.STM
+
+type TSet = TChan
+
+runTSet :: STM a -> IO a
+runTSet = atomically
+
+newTSet :: IO (TSet a)
+newTSet = atomically newTChan
+
+{- Gets the contents of the TSet. Blocks until at least one item is
+ - present. -}
+getTSet :: TSet a -> IO [a]
+getTSet tset = runTSet $ do
+ c <- readTChan tset
+ go [c]
+ where
+ go l = do
+ v <- tryReadTChan tset
+ case v of
+ Nothing -> return l
+ Just c -> go (c:l)
+
+{- Puts items into a TSet. -}
+putTSet :: TSet a -> [a] -> IO ()
+putTSet tset vs = runTSet $ mapM_ (writeTChan tset) vs
+
+{- Put a single item into a TSet. -}
+putTSet1 :: TSet a -> a -> IO ()
+putTSet1 tset v = void $ runTSet $ writeTChan tset v
diff --git a/Utility/Types/DirWatcher.hs b/Utility/Types/DirWatcher.hs
index c828a0593..ba7eae6a1 100644
--- a/Utility/Types/DirWatcher.hs
+++ b/Utility/Types/DirWatcher.hs
@@ -20,3 +20,6 @@ data WatchHooks = WatchHooks
, delDirHook :: Hook FilePath
, errHook :: Hook String -- error message
}
+
+mkWatchHooks :: WatchHooks
+mkWatchHooks = WatchHooks Nothing Nothing Nothing Nothing Nothing
diff --git a/doc/design/assistant/syncing.mdwn b/doc/design/assistant/syncing.mdwn
index 8757c3ae2..e3fdca316 100644
--- a/doc/design/assistant/syncing.mdwn
+++ b/doc/design/assistant/syncing.mdwn
@@ -53,6 +53,12 @@ all the other git clones, at both the git level and the key/value level.
4. Add a hook, so when there's a change to sync, a program can be run
and do its own signaling.
+## misc todo
+
+* --debug will show often unnecessary work being done. Optimise.
+* It would be nice if, when a USB drive is connected,
+ syncing starts automatically. Use dbus on Linux?
+
## data syncing
There are two parts to data syncing. First, map the network and second,
@@ -111,8 +117,5 @@ complete (necessary for tracking uploads).
## other considerations
-It would be nice if, when a USB drive is connected,
-syncing starts automatically. Use dbus on Linux?
-
This assumes the network is connected. It's often not, so the
[[cloud]] needs to be used to bridge between LANs.
diff --git a/doc/git-annex.mdwn b/doc/git-annex.mdwn
index c52a5f3bf..85a5a18f0 100644
--- a/doc/git-annex.mdwn
+++ b/doc/git-annex.mdwn
@@ -185,6 +185,10 @@ subdirectories).
To not daemonize, run with --foreground ; to stop a running daemon,
run with --stop
+* assistant
+
+ Like watch, but also automatically syncs changes to other remotes.
+
# REPOSITORY SETUP COMMANDS
* init [description]