diff options
author | Joey Hess <joey@kitenet.net> | 2012-10-29 09:55:40 -0400 |
---|---|---|
committer | Joey Hess <joey@kitenet.net> | 2012-10-29 09:55:40 -0400 |
commit | 710dfa7e3ec897d6f02930540b10bb303e3a9c91 (patch) | |
tree | 5f046ee72ac000008b1e4c7853d5c3a788ec802e | |
parent | 579f63b6b756ca51b8f9fe53c3e668500718d91f (diff) |
convert Watcher thread to Assistant monad
This is a nice win; much less code runs in Annex, so other threads have
more chances to run concurrently.
I do notice that renaming a file has gone from 1 to 2 commits. I think this
is due to the above improvement letting the committer run more frequently,
so it commits the rm first.
-rw-r--r-- | Assistant.hs | 2 | ||||
-rw-r--r-- | Assistant/Changes.hs | 15 | ||||
-rw-r--r-- | Assistant/Threads/SanityChecker.hs | 6 | ||||
-rw-r--r-- | Assistant/Threads/Watcher.hs | 154 |
4 files changed, 89 insertions, 88 deletions
diff --git a/Assistant.hs b/Assistant.hs index 8a1be3130..07f022aa6 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -215,7 +215,7 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do #ifdef WITH_XMPP , assist $ pushNotifierThread st dstatus pushnotifier #endif - , watch $ watchThread st dstatus transferqueue changechan + , watch $ watchThread ] liftIO waitForTermination diff --git a/Assistant/Changes.hs b/Assistant/Changes.hs index cccc372c1..b20dce09a 100644 --- a/Assistant/Changes.hs +++ b/Assistant/Changes.hs @@ -8,7 +8,6 @@ module Assistant.Changes where import Common.Annex -import qualified Annex.Queue import Types.KeySource import Utility.TSet @@ -39,19 +38,15 @@ newChangeChan :: IO ChangeChan newChangeChan = newTSet {- Handlers call this when they made a change that needs to get committed. -} -madeChange :: FilePath -> ChangeType -> Annex (Maybe Change) -madeChange f t = do - -- Just in case the commit thread is not flushing the queue fast enough. - Annex.Queue.flushWhenFull - liftIO $ Just <$> (Change <$> getCurrentTime <*> pure f <*> pure t) +madeChange :: FilePath -> ChangeType -> IO (Maybe Change) +madeChange f t = Just <$> (Change <$> getCurrentTime <*> pure f <*> pure t) -noChange :: Annex (Maybe Change) +noChange :: IO (Maybe Change) noChange = return Nothing {- Indicates an add needs to be done, but has not started yet. -} -pendingAddChange :: FilePath -> Annex (Maybe Change) -pendingAddChange f = - liftIO $ Just <$> (PendingAddChange <$> getCurrentTime <*> pure f) +pendingAddChange :: FilePath -> IO (Maybe Change) +pendingAddChange f = Just <$> (PendingAddChange <$> getCurrentTime <*> pure f) isPendingAddChange :: Change -> Bool isPendingAddChange (PendingAddChange {}) = True diff --git a/Assistant/Threads/SanityChecker.hs b/Assistant/Threads/SanityChecker.hs index 6379eee46..d92c6c394 100644 --- a/Assistant/Threads/SanityChecker.hs +++ b/Assistant/Threads/SanityChecker.hs @@ -90,9 +90,5 @@ check = do dstatus <- getAssistant daemonStatusHandle liftIO $ void $ addAlert dstatus $ sanityCheckFixAlert msg addsymlink file s = do - d <- getAssistant id - liftIO $ Watcher.runHandler (threadName d) - (threadState d) (daemonStatusHandle d) - (transferQueue d) (changeChan d) - Watcher.onAddSymlink file s + Watcher.runHandler Watcher.onAddSymlink file s insanity $ "found unstaged symlink: " ++ file diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs index 7ab124b14..33d4196da 100644 --- a/Assistant/Threads/Watcher.hs +++ b/Assistant/Threads/Watcher.hs @@ -15,7 +15,6 @@ module Assistant.Threads.Watcher ( ) where import Assistant.Common -import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.Changes import Assistant.TransferQueue @@ -37,9 +36,6 @@ import Git.Types import Data.Bits.Utils import qualified Data.ByteString.Lazy as L -thisThread :: ThreadName -thisThread = "Watcher" - checkCanWatch :: Annex () checkCanWatch | canWatch = @@ -55,35 +51,42 @@ needLsof = error $ unlines , "Be warned: This can corrupt data in the annex, and make fsck complain." ] -watchThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> NamedThread -watchThread st dstatus transferqueue changechan = NamedThread thisThread $ liftIO $ do - void $ watchDir "." ignored hooks startup - brokendebug thisThread [ "watching", "."] - where - startup = startupScan st dstatus - hook a = Just $ runHandler thisThread st dstatus transferqueue changechan a - hooks = mkWatchHooks - { addHook = hook onAdd - , delHook = hook onDel - , addSymlinkHook = hook onAddSymlink - , delDirHook = hook onDelDir - , errHook = hook onErr +watchThread :: NamedThread +watchThread = NamedThread "Watcher" $ do + startup <- asIO startupScan + addhook <- hook onAdd + delhook <- hook onDel + addsymlinkhook <- hook onAddSymlink + deldirhook <- hook onDelDir + errhook <- hook onErr + let hooks = mkWatchHooks + { addHook = addhook + , delHook = delhook + , addSymlinkHook = addsymlinkhook + , delDirHook = deldirhook + , errHook = errhook } + void $ liftIO $ watchDir "." ignored hooks startup + debug [ "watching", "."] + where + hook a = Just <$> asIO2 (runHandler a) {- Initial scartup scan. The action should return once the scan is complete. -} -startupScan :: ThreadState -> DaemonStatusHandle -> IO a -> IO a -startupScan st dstatus scanner = do - runThreadState st $ showAction "scanning" - alertWhile' dstatus startupScanAlert $ do - r <- scanner +startupScan :: IO a -> Assistant a +startupScan scanner = do + liftAnnex $ showAction "scanning" + dstatus <- getAssistant daemonStatusHandle + alertWhile' dstatus startupScanAlert <~> do + r <- liftIO $ scanner -- Notice any files that were deleted before -- watching was started. - runThreadState st $ do + liftAnnex $ do inRepo $ Git.Command.run "add" [Param "--update"] showAction "started" - modifyDaemonStatus_ dstatus $ \s -> s { scanComplete = True } + liftIO $ modifyDaemonStatus_ dstatus $ + \s -> s { scanComplete = True } return (True, r) @@ -95,52 +98,52 @@ ignored = ig . takeFileName ig ".gitattributes" = True ig _ = False -type Handler = ThreadName -> FilePath -> Maybe FileStatus -> DaemonStatusHandle -> TransferQueue -> Annex (Maybe Change) +type Handler = FilePath -> Maybe FileStatus -> Assistant (Maybe Change) -{- Runs an action handler, inside the Annex monad, and if there was a - - change, adds it to the ChangeChan. +{- Runs an action handler, and if there was a change, adds it to the ChangeChan. - - Exceptions are ignored, otherwise a whole watcher thread could be crashed. -} -runHandler :: ThreadName -> ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO () -runHandler threadname st dstatus transferqueue changechan handler file filestatus = void $ do - r <- tryIO go +runHandler :: Handler -> FilePath -> Maybe FileStatus -> Assistant () +runHandler handler file filestatus = void $ do + r <- tryIO <~> handler file filestatus case r of - Left e -> print e + Left e -> liftIO $ print e Right Nothing -> noop - Right (Just change) -> recordChange changechan change - where - go = runThreadState st $ handler threadname file filestatus dstatus transferqueue + Right (Just change) -> do + -- Just in case the commit thread is not + -- flushing the queue fast enough. + liftAnnex $ Annex.Queue.flushWhenFull + flip recordChange change <<~ changeChan onAdd :: Handler -onAdd _ file filestatus _ _ - | maybe False isRegularFile filestatus = pendingAddChange file - | otherwise = noChange +onAdd file filestatus + | maybe False isRegularFile filestatus = liftIO $ pendingAddChange file + | otherwise = liftIO $ noChange {- A symlink might be an arbitrary symlink, which is just added. - Or, if it is a git-annex symlink, ensure it points to the content - before adding it. -} onAddSymlink :: Handler -onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.lookupFile file +onAddSymlink file filestatus = go =<< liftAnnex (Backend.lookupFile file) where go (Just (key, _)) = do - link <- calcGitLink file key + link <- liftAnnex $ calcGitLink file key ifM ((==) link <$> liftIO (readSymbolicLink file)) ( do - s <- liftIO $ getDaemonStatus dstatus + s <- daemonStatus checkcontent key s ensurestaged link s , do - liftIO $ brokendebug threadname ["fix symlink", file] liftIO $ removeFile file liftIO $ createSymbolicLink link file - checkcontent key =<< liftIO (getDaemonStatus dstatus) + checkcontent key =<< daemonStatus addlink link ) go Nothing = do -- other symlink link <- liftIO (readSymbolicLink file) - ensurestaged link =<< liftIO (getDaemonStatus dstatus) + ensurestaged link =<< daemonStatus {- This is often called on symlinks that are already - staged correctly. A symlink may have been deleted @@ -156,41 +159,47 @@ onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.l ensurestaged link daemonstatus | scanComplete daemonstatus = addlink link | otherwise = case filestatus of - Just s - | not (afterLastDaemonRun (statusChangeTime s) daemonstatus) -> noChange + Just s | changedrecently s -> liftIO noChange _ -> addlink link + where + changedrecently s = not $ + afterLastDaemonRun (statusChangeTime s) daemonstatus {- For speed, tries to reuse the existing blob for symlink target. -} addlink link = do - liftIO $ brokendebug threadname ["add symlink", file] - v <- catObjectDetails $ Ref $ ':':file - case v of - Just (currlink, sha) - | s2w8 link == L.unpack currlink -> + debug ["add symlink", file] + liftAnnex $ do + v <- catObjectDetails $ Ref $ ':':file + case v of + Just (currlink, sha) + | s2w8 link == L.unpack currlink -> + stageSymlink file sha + _ -> do + sha <- inRepo $ + Git.HashObject.hashObject BlobObject link stageSymlink file sha - _ -> do - sha <- inRepo $ - Git.HashObject.hashObject BlobObject link - stageSymlink file sha - madeChange file LinkChange + liftIO $ madeChange file LinkChange {- When a new link appears, or a link is changed, after the startup - scan, handle getting or dropping the key's content. -} checkcontent key daemonstatus | scanComplete daemonstatus = do - present <- inAnnex key - unless present $ - queueTransfers Next transferqueue dstatus - key (Just file) Download - handleDrops dstatus present key (Just file) + present <- liftAnnex $ inAnnex key + dstatus <- getAssistant daemonStatusHandle + unless present $ do + transferqueue <- getAssistant transferQueue + liftAnnex $ queueTransfers Next transferqueue + dstatus key (Just file) Download + liftAnnex $ handleDrops dstatus present key (Just file) | otherwise = noop onDel :: Handler -onDel threadname file _ _dstatus _ = do - liftIO $ brokendebug threadname ["file deleted", file] - Annex.Queue.addUpdateIndex =<< - inRepo (Git.UpdateIndex.unstageFile file) - madeChange file RmChange +onDel file _ = do + debug ["file deleted", file] + liftAnnex $ + Annex.Queue.addUpdateIndex =<< + inRepo (Git.UpdateIndex.unstageFile file) + liftIO $ madeChange file RmChange {- A directory has been deleted, or moved, so tell git to remove anything - that was inside it from its cache. Since it could reappear at any time, @@ -200,18 +209,19 @@ onDel threadname file _ _dstatus _ = do - command to get the recursive list of files in the directory, so rm is - just as good. -} onDelDir :: Handler -onDelDir threadname dir _ _dstatus _ = do - liftIO $ brokendebug threadname ["directory deleted", dir] - Annex.Queue.addCommand "rm" +onDelDir dir _ = do + debug ["directory deleted", dir] + liftAnnex $ Annex.Queue.addCommand "rm" [Params "--quiet -r --cached --ignore-unmatch --"] [dir] - madeChange dir RmDirChange + liftIO $ madeChange dir RmDirChange {- Called when there's an error with inotify or kqueue. -} onErr :: Handler -onErr _ msg _ dstatus _ = do - warning msg +onErr msg _ = do + liftAnnex $ warning msg + dstatus <- getAssistant daemonStatusHandle void $ liftIO $ addAlert dstatus $ warningAlert "watcher" msg - return Nothing + liftIO noChange {- Adds a symlink to the index, without ever accessing the actual symlink - on disk. This avoids a race if git add is used, where the symlink is |