summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2012-10-29 09:55:40 -0400
committerGravatar Joey Hess <joey@kitenet.net>2012-10-29 09:55:40 -0400
commit710dfa7e3ec897d6f02930540b10bb303e3a9c91 (patch)
tree5f046ee72ac000008b1e4c7853d5c3a788ec802e
parent579f63b6b756ca51b8f9fe53c3e668500718d91f (diff)
convert Watcher thread to Assistant monad
This is a nice win; much less code runs in Annex, so other threads have more chances to run concurrently. I do notice that renaming a file has gone from 1 to 2 commits. I think this is due to the above improvement letting the committer run more frequently, so it commits the rm first.
-rw-r--r--Assistant.hs2
-rw-r--r--Assistant/Changes.hs15
-rw-r--r--Assistant/Threads/SanityChecker.hs6
-rw-r--r--Assistant/Threads/Watcher.hs154
4 files changed, 89 insertions, 88 deletions
diff --git a/Assistant.hs b/Assistant.hs
index 8a1be3130..07f022aa6 100644
--- a/Assistant.hs
+++ b/Assistant.hs
@@ -215,7 +215,7 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do
#ifdef WITH_XMPP
, assist $ pushNotifierThread st dstatus pushnotifier
#endif
- , watch $ watchThread st dstatus transferqueue changechan
+ , watch $ watchThread
]
liftIO waitForTermination
diff --git a/Assistant/Changes.hs b/Assistant/Changes.hs
index cccc372c1..b20dce09a 100644
--- a/Assistant/Changes.hs
+++ b/Assistant/Changes.hs
@@ -8,7 +8,6 @@
module Assistant.Changes where
import Common.Annex
-import qualified Annex.Queue
import Types.KeySource
import Utility.TSet
@@ -39,19 +38,15 @@ newChangeChan :: IO ChangeChan
newChangeChan = newTSet
{- Handlers call this when they made a change that needs to get committed. -}
-madeChange :: FilePath -> ChangeType -> Annex (Maybe Change)
-madeChange f t = do
- -- Just in case the commit thread is not flushing the queue fast enough.
- Annex.Queue.flushWhenFull
- liftIO $ Just <$> (Change <$> getCurrentTime <*> pure f <*> pure t)
+madeChange :: FilePath -> ChangeType -> IO (Maybe Change)
+madeChange f t = Just <$> (Change <$> getCurrentTime <*> pure f <*> pure t)
-noChange :: Annex (Maybe Change)
+noChange :: IO (Maybe Change)
noChange = return Nothing
{- Indicates an add needs to be done, but has not started yet. -}
-pendingAddChange :: FilePath -> Annex (Maybe Change)
-pendingAddChange f =
- liftIO $ Just <$> (PendingAddChange <$> getCurrentTime <*> pure f)
+pendingAddChange :: FilePath -> IO (Maybe Change)
+pendingAddChange f = Just <$> (PendingAddChange <$> getCurrentTime <*> pure f)
isPendingAddChange :: Change -> Bool
isPendingAddChange (PendingAddChange {}) = True
diff --git a/Assistant/Threads/SanityChecker.hs b/Assistant/Threads/SanityChecker.hs
index 6379eee46..d92c6c394 100644
--- a/Assistant/Threads/SanityChecker.hs
+++ b/Assistant/Threads/SanityChecker.hs
@@ -90,9 +90,5 @@ check = do
dstatus <- getAssistant daemonStatusHandle
liftIO $ void $ addAlert dstatus $ sanityCheckFixAlert msg
addsymlink file s = do
- d <- getAssistant id
- liftIO $ Watcher.runHandler (threadName d)
- (threadState d) (daemonStatusHandle d)
- (transferQueue d) (changeChan d)
- Watcher.onAddSymlink file s
+ Watcher.runHandler Watcher.onAddSymlink file s
insanity $ "found unstaged symlink: " ++ file
diff --git a/Assistant/Threads/Watcher.hs b/Assistant/Threads/Watcher.hs
index 7ab124b14..33d4196da 100644
--- a/Assistant/Threads/Watcher.hs
+++ b/Assistant/Threads/Watcher.hs
@@ -15,7 +15,6 @@ module Assistant.Threads.Watcher (
) where
import Assistant.Common
-import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.Changes
import Assistant.TransferQueue
@@ -37,9 +36,6 @@ import Git.Types
import Data.Bits.Utils
import qualified Data.ByteString.Lazy as L
-thisThread :: ThreadName
-thisThread = "Watcher"
-
checkCanWatch :: Annex ()
checkCanWatch
| canWatch =
@@ -55,35 +51,42 @@ needLsof = error $ unlines
, "Be warned: This can corrupt data in the annex, and make fsck complain."
]
-watchThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> NamedThread
-watchThread st dstatus transferqueue changechan = NamedThread thisThread $ liftIO $ do
- void $ watchDir "." ignored hooks startup
- brokendebug thisThread [ "watching", "."]
- where
- startup = startupScan st dstatus
- hook a = Just $ runHandler thisThread st dstatus transferqueue changechan a
- hooks = mkWatchHooks
- { addHook = hook onAdd
- , delHook = hook onDel
- , addSymlinkHook = hook onAddSymlink
- , delDirHook = hook onDelDir
- , errHook = hook onErr
+watchThread :: NamedThread
+watchThread = NamedThread "Watcher" $ do
+ startup <- asIO startupScan
+ addhook <- hook onAdd
+ delhook <- hook onDel
+ addsymlinkhook <- hook onAddSymlink
+ deldirhook <- hook onDelDir
+ errhook <- hook onErr
+ let hooks = mkWatchHooks
+ { addHook = addhook
+ , delHook = delhook
+ , addSymlinkHook = addsymlinkhook
+ , delDirHook = deldirhook
+ , errHook = errhook
}
+ void $ liftIO $ watchDir "." ignored hooks startup
+ debug [ "watching", "."]
+ where
+ hook a = Just <$> asIO2 (runHandler a)
{- Initial scartup scan. The action should return once the scan is complete. -}
-startupScan :: ThreadState -> DaemonStatusHandle -> IO a -> IO a
-startupScan st dstatus scanner = do
- runThreadState st $ showAction "scanning"
- alertWhile' dstatus startupScanAlert $ do
- r <- scanner
+startupScan :: IO a -> Assistant a
+startupScan scanner = do
+ liftAnnex $ showAction "scanning"
+ dstatus <- getAssistant daemonStatusHandle
+ alertWhile' dstatus startupScanAlert <~> do
+ r <- liftIO $ scanner
-- Notice any files that were deleted before
-- watching was started.
- runThreadState st $ do
+ liftAnnex $ do
inRepo $ Git.Command.run "add" [Param "--update"]
showAction "started"
- modifyDaemonStatus_ dstatus $ \s -> s { scanComplete = True }
+ liftIO $ modifyDaemonStatus_ dstatus $
+ \s -> s { scanComplete = True }
return (True, r)
@@ -95,52 +98,52 @@ ignored = ig . takeFileName
ig ".gitattributes" = True
ig _ = False
-type Handler = ThreadName -> FilePath -> Maybe FileStatus -> DaemonStatusHandle -> TransferQueue -> Annex (Maybe Change)
+type Handler = FilePath -> Maybe FileStatus -> Assistant (Maybe Change)
-{- Runs an action handler, inside the Annex monad, and if there was a
- - change, adds it to the ChangeChan.
+{- Runs an action handler, and if there was a change, adds it to the ChangeChan.
-
- Exceptions are ignored, otherwise a whole watcher thread could be crashed.
-}
-runHandler :: ThreadName -> ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO ()
-runHandler threadname st dstatus transferqueue changechan handler file filestatus = void $ do
- r <- tryIO go
+runHandler :: Handler -> FilePath -> Maybe FileStatus -> Assistant ()
+runHandler handler file filestatus = void $ do
+ r <- tryIO <~> handler file filestatus
case r of
- Left e -> print e
+ Left e -> liftIO $ print e
Right Nothing -> noop
- Right (Just change) -> recordChange changechan change
- where
- go = runThreadState st $ handler threadname file filestatus dstatus transferqueue
+ Right (Just change) -> do
+ -- Just in case the commit thread is not
+ -- flushing the queue fast enough.
+ liftAnnex $ Annex.Queue.flushWhenFull
+ flip recordChange change <<~ changeChan
onAdd :: Handler
-onAdd _ file filestatus _ _
- | maybe False isRegularFile filestatus = pendingAddChange file
- | otherwise = noChange
+onAdd file filestatus
+ | maybe False isRegularFile filestatus = liftIO $ pendingAddChange file
+ | otherwise = liftIO $ noChange
{- A symlink might be an arbitrary symlink, which is just added.
- Or, if it is a git-annex symlink, ensure it points to the content
- before adding it.
-}
onAddSymlink :: Handler
-onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.lookupFile file
+onAddSymlink file filestatus = go =<< liftAnnex (Backend.lookupFile file)
where
go (Just (key, _)) = do
- link <- calcGitLink file key
+ link <- liftAnnex $ calcGitLink file key
ifM ((==) link <$> liftIO (readSymbolicLink file))
( do
- s <- liftIO $ getDaemonStatus dstatus
+ s <- daemonStatus
checkcontent key s
ensurestaged link s
, do
- liftIO $ brokendebug threadname ["fix symlink", file]
liftIO $ removeFile file
liftIO $ createSymbolicLink link file
- checkcontent key =<< liftIO (getDaemonStatus dstatus)
+ checkcontent key =<< daemonStatus
addlink link
)
go Nothing = do -- other symlink
link <- liftIO (readSymbolicLink file)
- ensurestaged link =<< liftIO (getDaemonStatus dstatus)
+ ensurestaged link =<< daemonStatus
{- This is often called on symlinks that are already
- staged correctly. A symlink may have been deleted
@@ -156,41 +159,47 @@ onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.l
ensurestaged link daemonstatus
| scanComplete daemonstatus = addlink link
| otherwise = case filestatus of
- Just s
- | not (afterLastDaemonRun (statusChangeTime s) daemonstatus) -> noChange
+ Just s | changedrecently s -> liftIO noChange
_ -> addlink link
+ where
+ changedrecently s = not $
+ afterLastDaemonRun (statusChangeTime s) daemonstatus
{- For speed, tries to reuse the existing blob for symlink target. -}
addlink link = do
- liftIO $ brokendebug threadname ["add symlink", file]
- v <- catObjectDetails $ Ref $ ':':file
- case v of
- Just (currlink, sha)
- | s2w8 link == L.unpack currlink ->
+ debug ["add symlink", file]
+ liftAnnex $ do
+ v <- catObjectDetails $ Ref $ ':':file
+ case v of
+ Just (currlink, sha)
+ | s2w8 link == L.unpack currlink ->
+ stageSymlink file sha
+ _ -> do
+ sha <- inRepo $
+ Git.HashObject.hashObject BlobObject link
stageSymlink file sha
- _ -> do
- sha <- inRepo $
- Git.HashObject.hashObject BlobObject link
- stageSymlink file sha
- madeChange file LinkChange
+ liftIO $ madeChange file LinkChange
{- When a new link appears, or a link is changed, after the startup
- scan, handle getting or dropping the key's content. -}
checkcontent key daemonstatus
| scanComplete daemonstatus = do
- present <- inAnnex key
- unless present $
- queueTransfers Next transferqueue dstatus
- key (Just file) Download
- handleDrops dstatus present key (Just file)
+ present <- liftAnnex $ inAnnex key
+ dstatus <- getAssistant daemonStatusHandle
+ unless present $ do
+ transferqueue <- getAssistant transferQueue
+ liftAnnex $ queueTransfers Next transferqueue
+ dstatus key (Just file) Download
+ liftAnnex $ handleDrops dstatus present key (Just file)
| otherwise = noop
onDel :: Handler
-onDel threadname file _ _dstatus _ = do
- liftIO $ brokendebug threadname ["file deleted", file]
- Annex.Queue.addUpdateIndex =<<
- inRepo (Git.UpdateIndex.unstageFile file)
- madeChange file RmChange
+onDel file _ = do
+ debug ["file deleted", file]
+ liftAnnex $
+ Annex.Queue.addUpdateIndex =<<
+ inRepo (Git.UpdateIndex.unstageFile file)
+ liftIO $ madeChange file RmChange
{- A directory has been deleted, or moved, so tell git to remove anything
- that was inside it from its cache. Since it could reappear at any time,
@@ -200,18 +209,19 @@ onDel threadname file _ _dstatus _ = do
- command to get the recursive list of files in the directory, so rm is
- just as good. -}
onDelDir :: Handler
-onDelDir threadname dir _ _dstatus _ = do
- liftIO $ brokendebug threadname ["directory deleted", dir]
- Annex.Queue.addCommand "rm"
+onDelDir dir _ = do
+ debug ["directory deleted", dir]
+ liftAnnex $ Annex.Queue.addCommand "rm"
[Params "--quiet -r --cached --ignore-unmatch --"] [dir]
- madeChange dir RmDirChange
+ liftIO $ madeChange dir RmDirChange
{- Called when there's an error with inotify or kqueue. -}
onErr :: Handler
-onErr _ msg _ dstatus _ = do
- warning msg
+onErr msg _ = do
+ liftAnnex $ warning msg
+ dstatus <- getAssistant daemonStatusHandle
void $ liftIO $ addAlert dstatus $ warningAlert "watcher" msg
- return Nothing
+ liftIO noChange
{- Adds a symlink to the index, without ever accessing the actual symlink
- on disk. This avoids a race if git add is used, where the symlink is