summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2012-06-22 17:01:08 -0400
committerGravatar Joey Hess <joey@kitenet.net>2012-06-22 17:01:08 -0400
commite699ce18417729abbb9606f6a011628ad6616a64 (patch)
tree7ed4eb17763ab6ca3bf3c021625289cf77a150c9
parente9630e90decac4fe0c999af88131bd4b7c9d979f (diff)
added a merger thread
Wow! I can create a file in repo a, and it instantly* shows up in repo b! * under 1 second anyway
-rw-r--r--Assistant.hs23
-rw-r--r--Assistant/Merger.hs72
-rw-r--r--Assistant/Pusher.hs (renamed from Assistant/Syncer.hs)40
-rw-r--r--Utility/Types/DirWatcher.hs3
4 files changed, 112 insertions, 26 deletions
diff --git a/Assistant.hs b/Assistant.hs
index 5a3fa5a9d..ce230533c 100644
--- a/Assistant.hs
+++ b/Assistant.hs
@@ -22,11 +22,17 @@
- Thread 5: committer
- Waits for changes to occur, and runs the git queue to update its
- index, then commits.
- - Thread 6: syncer
- - Waits for commits to be made, and syncs the git repo to remotes.
- - Thread 7: status logger
+ - Thread 6: pusher
+ - Waits for commits to be made, and pushes updated branches to remotes,
+ - in parallel. (Forks a process for each git push.)
+ - Thread 7: merger
+ - Waits for pushes to be received from remotes, and merges the
+ - updated branches into the current branch. This uses inotify
+ - on .git/refs/heads, so there are additional inotify threads
+ - associated with it, too.
+ - Thread 8: status logger
- Wakes up periodically and records the daemon's status to disk.
- - Thread 8: sanity checker
+ - Thread 9: sanity checker
- Wakes up periodically (rarely) and does sanity checks.
-
- ThreadState: (MVar)
@@ -41,6 +47,9 @@
- ChangeChan: (STM TChan)
- Changes are indicated by writing to this channel. The committer
- reads from it.
+ - CommitChan: (STM TChan)
+ - Commits are indicated by writing to this channel. The pusher reads
+ - from it.
-}
module Assistant where
@@ -52,7 +61,8 @@ import Assistant.Changes
import Assistant.Commits
import Assistant.Watcher
import Assistant.Committer
-import Assistant.Syncer
+import Assistant.Pusher
+import Assistant.Merger
import Assistant.SanityChecker
import qualified Utility.Daemon
import Utility.LogFile
@@ -76,7 +86,8 @@ startDaemon assistant foreground
changechan <- newChangeChan
commitchan <- newCommitChan
_ <- forkIO $ commitThread st changechan commitchan
- _ <- forkIO $ syncThread st commitchan
+ _ <- forkIO $ pushThread st commitchan
+ _ <- forkIO $ mergeThread st
_ <- forkIO $ daemonStatusThread st dstatus
_ <- forkIO $ sanityCheckerThread st dstatus changechan
-- Does not return.
diff --git a/Assistant/Merger.hs b/Assistant/Merger.hs
new file mode 100644
index 000000000..660636842
--- /dev/null
+++ b/Assistant/Merger.hs
@@ -0,0 +1,72 @@
+{- git-annex assistant git merge thread
+ -
+ - Copyright 2012 Joey Hess <joey@kitenet.net>
+ -}
+
+module Assistant.Merger where
+
+import Common.Annex
+import Assistant.ThreadedMonad
+import Utility.DirWatcher
+import Utility.Types.DirWatcher
+import qualified Git
+import qualified Git.Command
+import qualified Git.Branch
+import qualified Command.Sync
+
+{- This thread watches for changes to .git/refs/heads/synced/*,
+ - which indicate incoming pushes. It merges those pushes into the
+ - currently checked out branch. -}
+mergeThread :: ThreadState -> IO ()
+mergeThread st = do
+ g <- runThreadState st $ fromRepo id
+ let dir = Git.localGitDir g </> "refs" </> "heads" </> "synced"
+ createDirectoryIfMissing True dir
+ let hook a = Just $ runHandler g a
+ let hooks = mkWatchHooks
+ { addHook = hook onAdd
+ , errHook = hook onErr
+ }
+ watchDir dir (const False) hooks id
+ where
+
+type Handler = Git.Repo -> FilePath -> Maybe FileStatus -> IO ()
+
+{- Runs an action handler.
+ -
+ - Exceptions are ignored, otherwise a whole thread could be crashed.
+ -}
+runHandler :: Git.Repo -> Handler -> FilePath -> Maybe FileStatus -> IO ()
+runHandler g handler file filestatus = void $ do
+ either print (const noop) =<< tryIO go
+ where
+ go = handler g file filestatus
+
+{- Called when there's an error with inotify. -}
+onErr :: Handler
+onErr _ msg _ = error msg
+
+{- Called when a new branch ref is written.
+ -
+ - This relies on git's atomic method of updating branch ref files,
+ - which is to first write the new file to .lock, and then rename it
+ - over the old file. So, ignore .lock files, and the rename ensures
+ - the watcher sees a new file being added on each update.
+ -
+ - At startup, synthetic add events fire, causing this to run, but that's
+ - ok; it ensures that any changes pushed since the last time the assistant
+ - ran are merged in.
+ -}
+onAdd :: Handler
+onAdd g file _
+ | ".lock" `isSuffixOf` file = noop
+ | otherwise = do
+ let branch = Git.Ref $ "refs" </> "heads" </> takeFileName file
+ current <- Git.Branch.current g
+ print (branch, current)
+ when (Just branch == current) $
+ void $ mergeBranch branch g
+
+mergeBranch :: Git.Ref -> Git.Repo -> IO Bool
+mergeBranch branch = Git.Command.runBool "merge"
+ [Param $ show $ Command.Sync.syncBranch branch]
diff --git a/Assistant/Syncer.hs b/Assistant/Pusher.hs
index c579c1c28..119575b92 100644
--- a/Assistant/Syncer.hs
+++ b/Assistant/Pusher.hs
@@ -1,9 +1,9 @@
-{- git-annex assistant git syncing thread
+{- git-annex assistant git pushing thread
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-}
-module Assistant.Syncer where
+module Assistant.Pusher where
import Common.Annex
import Assistant.Commits
@@ -14,39 +14,39 @@ import Utility.Parallel
import Data.Time.Clock
-data FailedSync = FailedSync
+data FailedPush = FailedPush
{ failedRemote :: Remote
, failedTimeStamp :: UTCTime
}
-{- This thread syncs git commits out to remotes. -}
-syncThread :: ThreadState -> CommitChan -> IO ()
-syncThread st commitchan = do
+{- This thread pushes git commits out to remotes. -}
+pushThread :: ThreadState -> CommitChan -> IO ()
+pushThread st commitchan = do
remotes <- runThreadState st $ Command.Sync.syncRemotes []
- runEveryWith (Seconds 2) [] $ \failedsyncs -> do
+ runEveryWith (Seconds 2) [] $ \failedpushes -> do
-- We already waited two seconds as a simple rate limiter.
-- Next, wait until at least one commit has been made
commits <- getCommits commitchan
- -- Now see if now's a good time to sync.
+ -- Now see if now's a good time to push.
time <- getCurrentTime
- if shouldSync time commits failedsyncs
- then syncToRemotes time st remotes
+ if shouldPush time commits failedpushes
+ then pushToRemotes time st remotes
else do
refillCommits commitchan commits
- return failedsyncs
+ return failedpushes
-{- Decide if now is a good time to sync to remotes.
+{- Decide if now is a good time to push to remotes.
-
- - Current strategy: Immediately sync all commits. The commit machinery
+ - Current strategy: Immediately push all commits. The commit machinery
- already determines batches of changes, so we can't easily determine
- batches better.
-
- - TODO: FailedSyncs are only retried the next time there's a commit.
+ - TODO: FailedPushs are only retried the next time there's a commit.
- Should retry them periodically, or when a remote that was not available
- becomes available.
-}
-shouldSync :: UTCTime -> [Commit] -> [FailedSync] -> Bool
-shouldSync _now commits _failedremotes
+shouldPush :: UTCTime -> [Commit] -> [FailedPush] -> Bool
+shouldPush _now commits _failedremotes
| not (null commits) = True
| otherwise = False
@@ -55,13 +55,13 @@ shouldSync _now commits _failedremotes
-
- Avoids running possibly long-duration commands in the Annex monad, so
- as not to block other threads. -}
-syncToRemotes :: UTCTime -> ThreadState -> [Remote] -> IO [FailedSync]
-syncToRemotes now st remotes = do
+pushToRemotes :: UTCTime -> ThreadState -> [Remote] -> IO [FailedPush]
+pushToRemotes now st remotes = do
(g, branch) <- runThreadState st $
(,) <$> fromRepo id <*> Command.Sync.currentBranch
Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
- map (`FailedSync` now) <$> inParallel (go g branch) remotes
+ map (`FailedPush` now) <$> inParallel (push g branch) remotes
where
- go g branch remote =
+ push g branch remote =
ifM (Command.Sync.pushBranch remote branch g)
( exitSuccess, exitFailure)
diff --git a/Utility/Types/DirWatcher.hs b/Utility/Types/DirWatcher.hs
index c828a0593..ba7eae6a1 100644
--- a/Utility/Types/DirWatcher.hs
+++ b/Utility/Types/DirWatcher.hs
@@ -20,3 +20,6 @@ data WatchHooks = WatchHooks
, delDirHook :: Hook FilePath
, errHook :: Hook String -- error message
}
+
+mkWatchHooks :: WatchHooks
+mkWatchHooks = WatchHooks Nothing Nothing Nothing Nothing Nothing