diff options
author | Joey Hess <joey@kitenet.net> | 2012-06-25 16:10:10 -0400 |
---|---|---|
committer | Joey Hess <joey@kitenet.net> | 2012-06-25 16:10:24 -0400 |
commit | 0b146f9ecc36545478c4a2218981b376828c61db (patch) | |
tree | c7c758fb5421d61e6b286b76ec474dd9b04450df /Assistant/Threads | |
parent | 19eee6a1df2a6c724e6d6dbe842b40dc1c17f65b (diff) |
reorg threads
Diffstat (limited to 'Assistant/Threads')
-rw-r--r-- | Assistant/Threads/Committer.hs | 199 | ||||
-rw-r--r-- | Assistant/Threads/Merger.hs | 72 | ||||
-rw-r--r-- | Assistant/Threads/Pusher.hs | 69 | ||||
-rw-r--r-- | Assistant/Threads/SanityChecker.hs | 83 | ||||
-rw-r--r-- | Assistant/Threads/Watcher.hs | 218 |
5 files changed, 641 insertions, 0 deletions
diff --git a/Assistant/Threads/Committer.hs b/Assistant/Threads/Committer.hs new file mode 100644 index 000000000..488056fa2 --- /dev/null +++ b/Assistant/Threads/Committer.hs @@ -0,0 +1,199 @@ +{- git-annex assistant commit thread + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.Committer where + +import Common.Annex +import Assistant.Changes +import Assistant.Commits +import Assistant.ThreadedMonad +import Assistant.Threads.Watcher +import qualified Annex +import qualified Annex.Queue +import qualified Git.Command +import qualified Git.HashObject +import Git.Types +import qualified Command.Add +import Utility.ThreadScheduler +import qualified Utility.Lsof as Lsof +import qualified Utility.DirWatcher as DirWatcher +import Types.KeySource + +import Data.Time.Clock +import Data.Tuple.Utils +import qualified Data.Set as S +import Data.Either + +{- This thread makes git commits at appropriate times. -} +commitThread :: ThreadState -> ChangeChan -> CommitChan -> IO () +commitThread st changechan commitchan = runEvery (Seconds 1) $ do + -- We already waited one second as a simple rate limiter. + -- Next, wait until at least one change is available for + -- processing. + changes <- getChanges changechan + -- Now see if now's a good time to commit. + time <- getCurrentTime + if shouldCommit time changes + then do + readychanges <- handleAdds st changechan changes + if shouldCommit time readychanges + then do + void $ tryIO $ runThreadState st commitStaged + recordCommit commitchan (Commit time) + else refillChanges changechan readychanges + else refillChanges changechan changes + +commitStaged :: Annex () +commitStaged = do + Annex.Queue.flush + inRepo $ Git.Command.run "commit" + [ Param "--allow-empty-message" + , Param "-m", Param "" + -- Empty commits may be made if tree changes cancel + -- each other out, etc + , Param "--allow-empty" + -- Avoid running the usual git-annex pre-commit hook; + -- watch does the same symlink fixing, and we don't want + -- to deal with unlocked files in these commits. + , Param "--quiet" + ] + +{- Decide if now is a good time to make a commit. + - Note that the list of change times has an undefined order. + - + - Current strategy: If there have been 10 changes within the past second, + - a batch activity is taking place, so wait for later. + -} +shouldCommit :: UTCTime -> [Change] -> Bool +shouldCommit now changes + | len == 0 = False + | len > 10000 = True -- avoid bloating queue too much + | length (filter thisSecond changes) < 10 = True + | otherwise = False -- batch activity + where + len = length changes + thisSecond c = now `diffUTCTime` changeTime c <= 1 + +{- If there are PendingAddChanges, the files have not yet actually been + - added to the annex (probably), and that has to be done now, before + - committing. + - + - Deferring the adds to this point causes batches to be bundled together, + - which allows faster checking with lsof that the files are not still open + - for write by some other process. + - + - When a file is added, Inotify will notice the new symlink. So this waits + - for additional Changes to arrive, so that the symlink has hopefully been + - staged before returning, and will be committed immediately. + - + - OTOH, for kqueue, eventsCoalesce, so instead the symlink is directly + - created and staged. + - + - Returns a list of all changes that are ready to be committed. + - Any pending adds that are not ready yet are put back into the ChangeChan, + - where they will be retried later. + -} +handleAdds :: ThreadState -> ChangeChan -> [Change] -> IO [Change] +handleAdds st changechan cs = returnWhen (null pendingadds) $ do + (postponed, toadd) <- partitionEithers <$> + safeToAdd st pendingadds + + unless (null postponed) $ + refillChanges changechan postponed + + returnWhen (null toadd) $ do + added <- catMaybes <$> forM toadd add + if (DirWatcher.eventsCoalesce || null added) + then return $ added ++ otherchanges + else do + r <- handleAdds st changechan + =<< getChanges changechan + return $ r ++ added ++ otherchanges + where + (pendingadds, otherchanges) = partition isPendingAddChange cs + + returnWhen c a + | c = return otherchanges + | otherwise = a + + add :: Change -> IO (Maybe Change) + add change@(PendingAddChange { keySource = ks }) = do + r <- catchMaybeIO $ sanitycheck ks $ runThreadState st $ do + showStart "add" $ keyFilename ks + handle (finishedChange change) (keyFilename ks) + =<< Command.Add.ingest ks + return $ maybeMaybe r + add _ = return Nothing + + maybeMaybe (Just j@(Just _)) = j + maybeMaybe _ = Nothing + + handle _ _ Nothing = do + showEndFail + return Nothing + handle change file (Just key) = do + link <- Command.Add.link file key True + when DirWatcher.eventsCoalesce $ do + sha <- inRepo $ + Git.HashObject.hashObject BlobObject link + stageSymlink file sha + showEndOk + return $ Just change + + {- Check that the keysource's keyFilename still exists, + - and is still a hard link to its contentLocation, + - before ingesting it. -} + sanitycheck keysource a = do + fs <- getSymbolicLinkStatus $ keyFilename keysource + ks <- getSymbolicLinkStatus $ contentLocation keysource + if deviceID ks == deviceID fs && fileID ks == fileID fs + then a + else return Nothing + +{- PendingAddChanges can Either be Right to be added now, + - or are unsafe, and must be Left for later. + - + - Check by running lsof on the temp directory, which + - the KeySources are locked down in. + -} +safeToAdd :: ThreadState -> [Change] -> IO [Either Change Change] +safeToAdd st changes = runThreadState st $ + ifM (Annex.getState Annex.force) + ( allRight changes -- force bypasses lsof check + , do + tmpdir <- fromRepo gitAnnexTmpDir + openfiles <- S.fromList . map fst3 . filter openwrite <$> + liftIO (Lsof.queryDir tmpdir) + + let checked = map (check openfiles) changes + + {- If new events are received when files are closed, + - there's no need to retry any changes that cannot + - be done now. -} + if DirWatcher.closingTracked + then do + mapM_ canceladd $ lefts checked + allRight $ rights checked + else return checked + ) + where + check openfiles change@(PendingAddChange { keySource = ks }) + | S.member (contentLocation ks) openfiles = Left change + check _ change = Right change + + canceladd (PendingAddChange { keySource = ks }) = do + warning $ keyFilename ks + ++ " still has writers, not adding" + -- remove the hard link + void $ liftIO $ tryIO $ + removeFile $ contentLocation ks + canceladd _ = noop + + openwrite (_file, mode, _pid) = + mode == Lsof.OpenWriteOnly || mode == Lsof.OpenReadWrite + + allRight = return . map Right diff --git a/Assistant/Threads/Merger.hs b/Assistant/Threads/Merger.hs new file mode 100644 index 000000000..d2c8b9b76 --- /dev/null +++ b/Assistant/Threads/Merger.hs @@ -0,0 +1,72 @@ +{- git-annex assistant git merge thread + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.Merger where + +import Common.Annex +import Assistant.ThreadedMonad +import Utility.DirWatcher +import Utility.Types.DirWatcher +import qualified Git +import qualified Git.Merge +import qualified Git.Branch +import qualified Command.Sync + +{- This thread watches for changes to .git/refs/heads/synced/*, + - which indicate incoming pushes. It merges those pushes into the + - currently checked out branch. -} +mergeThread :: ThreadState -> IO () +mergeThread st = do + g <- runThreadState st $ fromRepo id + let dir = Git.localGitDir g </> "refs" </> "heads" </> "synced" + createDirectoryIfMissing True dir + let hook a = Just $ runHandler g a + let hooks = mkWatchHooks + { addHook = hook onAdd + , errHook = hook onErr + } + watchDir dir (const False) hooks id + where + +type Handler = Git.Repo -> FilePath -> Maybe FileStatus -> IO () + +{- Runs an action handler. + - + - Exceptions are ignored, otherwise a whole thread could be crashed. + -} +runHandler :: Git.Repo -> Handler -> FilePath -> Maybe FileStatus -> IO () +runHandler g handler file filestatus = void $ do + either print (const noop) =<< tryIO go + where + go = handler g file filestatus + +{- Called when there's an error with inotify. -} +onErr :: Handler +onErr _ msg _ = error msg + +{- Called when a new branch ref is written. + - + - This relies on git's atomic method of updating branch ref files, + - which is to first write the new file to .lock, and then rename it + - over the old file. So, ignore .lock files, and the rename ensures + - the watcher sees a new file being added on each update. + - + - At startup, synthetic add events fire, causing this to run, but that's + - ok; it ensures that any changes pushed since the last time the assistant + - ran are merged in. + -} +onAdd :: Handler +onAdd g file _ + | ".lock" `isSuffixOf` file = noop + | otherwise = do + let branch = Git.Ref $ "refs" </> "heads" </> takeFileName file + current <- Git.Branch.current g + when (Just branch == current) $ + void $ mergeBranch branch g + +mergeBranch :: Git.Ref -> Git.Repo -> IO Bool +mergeBranch = Git.Merge.mergeNonInteractive . Command.Sync.syncBranch diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs new file mode 100644 index 000000000..de90d4e64 --- /dev/null +++ b/Assistant/Threads/Pusher.hs @@ -0,0 +1,69 @@ +{- git-annex assistant git pushing thread + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.Pusher where + +import Common.Annex +import Assistant.Commits +import Assistant.ThreadedMonad +import qualified Command.Sync +import Utility.ThreadScheduler +import Utility.Parallel + +import Data.Time.Clock + +data FailedPush = FailedPush + { failedRemote :: Remote + , failedTimeStamp :: UTCTime + } + +{- This thread pushes git commits out to remotes. -} +pushThread :: ThreadState -> CommitChan -> IO () +pushThread st commitchan = do + remotes <- runThreadState st $ Command.Sync.syncRemotes [] + runEveryWith (Seconds 2) [] $ \failedpushes -> do + -- We already waited two seconds as a simple rate limiter. + -- Next, wait until at least one commit has been made + commits <- getCommits commitchan + -- Now see if now's a good time to push. + time <- getCurrentTime + if shouldPush time commits failedpushes + then pushToRemotes time st remotes + else do + refillCommits commitchan commits + return failedpushes + +{- Decide if now is a good time to push to remotes. + - + - Current strategy: Immediately push all commits. The commit machinery + - already determines batches of changes, so we can't easily determine + - batches better. + - + - TODO: FailedPushs are only retried the next time there's a commit. + - Should retry them periodically, or when a remote that was not available + - becomes available. + -} +shouldPush :: UTCTime -> [Commit] -> [FailedPush] -> Bool +shouldPush _now commits _failedremotes + | not (null commits) = True + | otherwise = False + +{- Updates the local sync branch, then pushes it to all remotes, in + - parallel. + - + - Avoids running possibly long-duration commands in the Annex monad, so + - as not to block other threads. -} +pushToRemotes :: UTCTime -> ThreadState -> [Remote] -> IO [FailedPush] +pushToRemotes now st remotes = do + (g, branch) <- runThreadState st $ + (,) <$> fromRepo id <*> Command.Sync.currentBranch + Command.Sync.updateBranch (Command.Sync.syncBranch branch) g + map (`FailedPush` now) <$> inParallel (push g branch) remotes + where + push g branch remote = + ifM (Command.Sync.pushBranch remote branch g) + ( exitSuccess, exitFailure) diff --git a/Assistant/Threads/SanityChecker.hs b/Assistant/Threads/SanityChecker.hs new file mode 100644 index 000000000..4db2a61b2 --- /dev/null +++ b/Assistant/Threads/SanityChecker.hs @@ -0,0 +1,83 @@ +{- git-annex assistant sanity checker + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.SanityChecker ( + sanityCheckerThread +) where + +import Common.Annex +import qualified Git.LsFiles +import Assistant.DaemonStatus +import Assistant.ThreadedMonad +import Assistant.Changes +import Utility.ThreadScheduler +import qualified Assistant.Threads.Watcher as Watcher + +import Data.Time.Clock.POSIX + +{- This thread wakes up occasionally to make sure the tree is in good shape. -} +sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO () +sanityCheckerThread st status changechan = forever $ do + waitForNextCheck st status + + runThreadState st $ + modifyDaemonStatus status $ \s -> s + { sanityCheckRunning = True } + + now <- getPOSIXTime -- before check started + catchIO (check st status changechan) + (runThreadState st . warning . show) + + runThreadState st $ do + modifyDaemonStatus status $ \s -> s + { sanityCheckRunning = False + , lastSanityCheck = Just now + } + +{- Only run one check per day, from the time of the last check. -} +waitForNextCheck :: ThreadState -> DaemonStatusHandle -> IO () +waitForNextCheck st status = do + v <- runThreadState st $ + lastSanityCheck <$> getDaemonStatus status + now <- getPOSIXTime + threadDelaySeconds $ Seconds $ calcdelay now v + where + calcdelay _ Nothing = oneDay + calcdelay now (Just lastcheck) + | lastcheck < now = max oneDay $ + oneDay - truncate (now - lastcheck) + | otherwise = oneDay + +oneDay :: Int +oneDay = 24 * 60 * 60 + +{- It's important to stay out of the Annex monad as much as possible while + - running potentially expensive parts of this check, since remaining in it + - will block the watcher. -} +check :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO () +check st status changechan = do + g <- runThreadState st $ do + showSideAction "Running daily check" + fromRepo id + -- Find old unstaged symlinks, and add them to git. + unstaged <- Git.LsFiles.notInRepo False ["."] g + now <- getPOSIXTime + forM_ unstaged $ \file -> do + ms <- catchMaybeIO $ getSymbolicLinkStatus file + case ms of + Just s | toonew (statusChangeTime s) now -> noop + | isSymbolicLink s -> + addsymlink file ms + _ -> noop + where + toonew timestamp now = now < (realToFrac (timestamp + slop) :: POSIXTime) + slop = fromIntegral tenMinutes + insanity m = runThreadState st $ warning m + addsymlink file s = do + insanity $ "found unstaged symlink: " ++ file + Watcher.runHandler st status changechan + Watcher.onAddSymlink file s diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs new file mode 100644 index 000000000..1b6ec15f1 --- /dev/null +++ b/Assistant/Threads/Watcher.hs @@ -0,0 +1,218 @@ +{- git-annex assistant tree watcher + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} + +module Assistant.Threads.Watcher where + +import Common.Annex +import Assistant.ThreadedMonad +import Assistant.DaemonStatus +import Assistant.Changes +import Utility.DirWatcher +import Utility.Types.DirWatcher +import qualified Annex +import qualified Annex.Queue +import qualified Git.Command +import qualified Git.UpdateIndex +import qualified Git.HashObject +import qualified Git.LsFiles +import qualified Backend +import qualified Command.Add +import Annex.Content +import Annex.CatFile +import Git.Types + +import Data.Bits.Utils +import qualified Data.ByteString.Lazy as L + +checkCanWatch :: Annex () +checkCanWatch + | canWatch = + unlessM (liftIO (inPath "lsof") <||> Annex.getState Annex.force) $ + needLsof + | otherwise = error "watch mode is not available on this system" + +needLsof :: Annex () +needLsof = error $ unlines + [ "The lsof command is needed for watch mode to be safe, and is not in PATH." + , "To override lsof checks to ensure that files are not open for writing" + , "when added to the annex, you can use --force" + , "Be warned: This can corrupt data in the annex, and make fsck complain." + ] + +watchThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO () +watchThread st dstatus changechan = watchDir "." ignored hooks startup + where + startup = statupScan st dstatus + hook a = Just $ runHandler st dstatus changechan a + hooks = WatchHooks + { addHook = hook onAdd + , delHook = hook onDel + , addSymlinkHook = hook onAddSymlink + , delDirHook = hook onDelDir + , errHook = hook onErr + } + +{- Initial scartup scan. The action should return once the scan is complete. -} +statupScan :: ThreadState -> DaemonStatusHandle -> IO a -> IO a +statupScan st dstatus scanner = do + runThreadState st $ + showAction "scanning" + r <- scanner + runThreadState st $ + modifyDaemonStatus dstatus $ \s -> s { scanComplete = True } + + -- Notice any files that were deleted before watching was started. + runThreadState st $ do + inRepo $ Git.Command.run "add" [Param "--update"] + showAction "started" + + return r + +ignored :: FilePath -> Bool +ignored = ig . takeFileName + where + ig ".git" = True + ig ".gitignore" = True + ig ".gitattributes" = True + ig _ = False + +type Handler = FilePath -> Maybe FileStatus -> DaemonStatusHandle -> Annex (Maybe Change) + +{- Runs an action handler, inside the Annex monad, and if there was a + - change, adds it to the ChangeChan. + - + - Exceptions are ignored, otherwise a whole watcher thread could be crashed. + -} +runHandler :: ThreadState -> DaemonStatusHandle -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO () +runHandler st dstatus changechan handler file filestatus = void $ do + r <- tryIO go + case r of + Left e -> print e + Right Nothing -> noop + Right (Just change) -> recordChange changechan change + where + go = runThreadState st $ handler file filestatus dstatus + +{- During initial directory scan, this will be run for any regular files + - that are already checked into git. We don't want to turn those into + - symlinks, so do a check. This is rather expensive, but only happens + - during startup. + - + - It's possible for the file to still be open for write by some process. + - This can happen in a few ways; one is if two processes had the file open + - and only one has just closed it. We want to avoid adding a file to the + - annex that is open for write, to avoid anything being able to change it. + - + - We could run lsof on the file here to check for other writers. + - But, that's slow, and even if there is currently a writer, we will want + - to add the file *eventually*. Instead, the file is locked down as a hard + - link in a temp directory, with its write bits disabled, for later + - checking with lsof, and a Change is returned containing a KeySource + - using that hard link. The committer handles running lsof and finishing + - the add. + -} +onAdd :: Handler +onAdd file filestatus dstatus + | maybe False isRegularFile filestatus = do + ifM (scanComplete <$> getDaemonStatus dstatus) + ( go + , ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file])) + ( noChange + , go + ) + ) + | otherwise = noChange + where + go = pendingAddChange =<< Command.Add.lockDown file + +{- A symlink might be an arbitrary symlink, which is just added. + - Or, if it is a git-annex symlink, ensure it points to the content + - before adding it. + -} +onAddSymlink :: Handler +onAddSymlink file filestatus dstatus = go =<< Backend.lookupFile file + where + go (Just (key, _)) = do + link <- calcGitLink file key + ifM ((==) link <$> liftIO (readSymbolicLink file)) + ( ensurestaged link =<< getDaemonStatus dstatus + , do + liftIO $ removeFile file + liftIO $ createSymbolicLink link file + addlink link + ) + go Nothing = do -- other symlink + link <- liftIO (readSymbolicLink file) + ensurestaged link =<< getDaemonStatus dstatus + + {- This is often called on symlinks that are already + - staged correctly. A symlink may have been deleted + - and being re-added, or added when the watcher was + - not running. So they're normally restaged to make sure. + - + - As an optimisation, during the status scan, avoid + - restaging everything. Only links that were created since + - the last time the daemon was running are staged. + - (If the daemon has never ran before, avoid staging + - links too.) + -} + ensurestaged link daemonstatus + | scanComplete daemonstatus = addlink link + | otherwise = case filestatus of + Just s + | not (afterLastDaemonRun (statusChangeTime s) daemonstatus) -> noChange + _ -> addlink link + + {- For speed, tries to reuse the existing blob for + - the symlink target. -} + addlink link = do + v <- catObjectDetails $ Ref $ ':':file + case v of + Just (currlink, sha) + | s2w8 link == L.unpack currlink -> + stageSymlink file sha + _ -> do + sha <- inRepo $ + Git.HashObject.hashObject BlobObject link + stageSymlink file sha + madeChange file LinkChange + +onDel :: Handler +onDel file _ _dstatus = do + Annex.Queue.addUpdateIndex =<< + inRepo (Git.UpdateIndex.unstageFile file) + madeChange file RmChange + +{- A directory has been deleted, or moved, so tell git to remove anything + - that was inside it from its cache. Since it could reappear at any time, + - use --cached to only delete it from the index. + - + - Note: This could use unstageFile, but would need to run another git + - command to get the recursive list of files in the directory, so rm is + - just as good. -} +onDelDir :: Handler +onDelDir dir _ _dstatus = do + Annex.Queue.addCommand "rm" + [Params "--quiet -r --cached --ignore-unmatch --"] [dir] + madeChange dir RmDirChange + +{- Called when there's an error with inotify. -} +onErr :: Handler +onErr msg _ _dstatus = do + warning msg + return Nothing + +{- Adds a symlink to the index, without ever accessing the actual symlink + - on disk. This avoids a race if git add is used, where the symlink is + - changed to something else immediately after creation. + -} +stageSymlink :: FilePath -> Sha -> Annex () +stageSymlink file sha = + Annex.Queue.addUpdateIndex =<< + inRepo (Git.UpdateIndex.stageSymlink file sha) |