From 3c8a9043b6fc8fafbeac16e8f9199a0d12870549 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 18 Jun 2012 12:25:20 -0400 Subject: skeleton C library for calling kqueue --- .gitignore | 2 +- Assistant/Watcher.hs | 12 ++++++++++-- Makefile | 18 ++++++++++-------- Utility/Kqueue.hs | 31 +++++++++++++++++++++++++++++++ Utility/libkqueue.c | 22 ++++++++++++++++++++++ Utility/libkqueue.h | 1 + 6 files changed, 75 insertions(+), 11 deletions(-) create mode 100644 Utility/Kqueue.hs create mode 100644 Utility/libkqueue.c create mode 100644 Utility/libkqueue.h diff --git a/.gitignore b/.gitignore index d628f23b7..afb5f314e 100644 --- a/.gitignore +++ b/.gitignore @@ -11,7 +11,7 @@ html *.tix .hpc Utility/Touch.hs -Utility/libdiskfree.o +Utility/*.o dist # Sandboxed builds cabal-dev diff --git a/Assistant/Watcher.hs b/Assistant/Watcher.hs index 1d35b5c1e..7c913d98c 100644 --- a/Assistant/Watcher.hs +++ b/Assistant/Watcher.hs @@ -33,12 +33,15 @@ import qualified Data.ByteString.Lazy as L import Utility.Inotify import System.INotify #endif +#ifdef WITH_KQUEUE +import Utility.Kqueue +#endif type Handler = FilePath -> Maybe FileStatus -> DaemonStatusHandle -> Annex (Maybe Change) checkCanWatch :: Annex () checkCanWatch = do -#ifdef WITH_INOTIFY +#if (WITH_INOTIFY || WITH_KQUEUE) unlessM (liftIO (inPath "lsof") <||> Annex.getState Annex.force) $ needLsof #else @@ -82,8 +85,13 @@ watchThread st dstatus changechan = withINotify $ \i -> do , errHook = hook onErr } #else +#ifdef WITH_KQUEUE +watchThread st dstatus changechan = do + print =<< waitChange [stdError, stdOutput] +#else watchThread = undefined -#endif +#endif /* WITH_KQUEUE */ +#endif /* WITH_INOTIFY */ ignored :: FilePath -> Bool ignored ".git" = True diff --git a/Makefile b/Makefile index 8884b5c64..73fbc4140 100644 --- a/Makefile +++ b/Makefile @@ -1,13 +1,22 @@ -OS:=$(shell uname | sed 's/[-_].*//') +bins=git-annex +mans=git-annex.1 git-annex-shell.1 +sources=Build/SysConfig.hs Utility/Touch.hs +all=$(bins) $(mans) docs +OS:=$(shell uname | sed 's/[-_].*//') ifeq ($(OS),Linux) BASEFLAGS_OPTS+=-DWITH_INOTIFY +clibs=Utility/libdiskfree.o +else +BASEFLAGS_OPTS+=-DWITH_KQUEUE +clibs=Utility/libdiskfree.o Utility/libkqueue.o endif PREFIX=/usr IGNORE=-ignore-package monads-fd -ignore-package monads-tf BASEFLAGS=-Wall $(IGNORE) -outputdir tmp -IUtility -DWITH_S3 $(BASEFLAGS_OPTS) GHCFLAGS=-O2 $(BASEFLAGS) +CFLAGS=-Wall ifdef PROFILE GHCFLAGS=-prof -auto-all -rtsopts -caf-all -fforce-recomp $(BASEFLAGS) @@ -15,13 +24,6 @@ endif GHCMAKE=ghc $(GHCFLAGS) --make -bins=git-annex -mans=git-annex.1 git-annex-shell.1 -sources=Build/SysConfig.hs Utility/Touch.hs -clibs=Utility/libdiskfree.o - -all=$(bins) $(mans) docs - # Am I typing :make in vim? Do a fast build. ifdef VIM all=fast diff --git a/Utility/Kqueue.hs b/Utility/Kqueue.hs new file mode 100644 index 000000000..bfc6ee9fc --- /dev/null +++ b/Utility/Kqueue.hs @@ -0,0 +1,31 @@ +{- BSD kqueue file modification notification interface + - + - Copyright 2012 Joey Hess + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE ForeignFunctionInterface #-} + +module Utility.Kqueue ( waitChange ) where + +import Common + +import System.Posix.Types +import Foreign.C.Types +import Foreign.C.Error +import Foreign.Ptr +import Foreign.Marshal + +foreign import ccall unsafe "libkqueue.h waitchange" c_waitchange + :: Ptr Fd -> IO Fd + +waitChange :: [Fd] -> IO (Maybe Fd) +waitChange fds = withArray fds $ \c_fds -> do + ret <- c_waitchange c_fds + ifM (safeErrno <$> getErrno) + ( return $ Just ret + , return Nothing + ) + where + safeErrno (Errno v) = v == 0 diff --git a/Utility/libkqueue.c b/Utility/libkqueue.c new file mode 100644 index 000000000..0ef42b801 --- /dev/null +++ b/Utility/libkqueue.c @@ -0,0 +1,22 @@ +/* kqueue interface, C mini-library + * + * Copyright 2012 Joey Hess + * + * Licensed under the GNU GPL version 3 or higher. + */ + +#include +#include + +/* Waits for a change event on one of the array of directory fds, + * and returns the one that changed. */ +int waitchange(const int *fds) { +// if (kqueue(blah, &fds) != 0) +// return 0; /* errno is set */ +// else + errno = 0; + + printf("in waitchange!, %i %i\n", fds[0], fds[1]); + + return fds[0]; +} diff --git a/Utility/libkqueue.h b/Utility/libkqueue.h new file mode 100644 index 000000000..75af9eeba --- /dev/null +++ b/Utility/libkqueue.h @@ -0,0 +1 @@ +int waitchange(const int *fds); -- cgit v1.2.3 From dc3d9d1e982f7342dd3e2b3fc14fbbe85e7acd3e Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 18 Jun 2012 12:53:57 -0400 Subject: added dirTree --- Utility/Directory.hs | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/Utility/Directory.hs b/Utility/Directory.hs index 78bb6e701..b8ed63a36 100644 --- a/Utility/Directory.hs +++ b/Utility/Directory.hs @@ -34,7 +34,7 @@ dirCruft _ = False dirContents :: FilePath -> IO [FilePath] dirContents d = map (d ) . filter (not . dirCruft) <$> getDirectoryContents d -{- Gets contents of directory, and then its subdirectories, recursively, +{- Gets files in a directory, and then its subdirectories, recursively, - and lazily. -} dirContentsRecursive :: FilePath -> IO [FilePath] dirContentsRecursive topdir = dirContentsRecursive' topdir [""] @@ -56,6 +56,33 @@ dirContentsRecursive' topdir (dir:dirs) = unsafeInterleaveIO $ do , collect (dirEntry:files) dirs' entries ) +{- Gets the subdirectories in a directory, and their subdirectories, + - recursively, and lazily. Prunes sections of the tree matching a + - condition. -} +dirTree :: FilePath -> (FilePath -> Bool) -> IO [FilePath] +dirTree topdir prune + | prune topdir = return [] + | otherwise = (:) topdir <$> dirTree' topdir prune [""] + +dirTree' :: FilePath -> (FilePath -> Bool) -> [FilePath] -> IO [FilePath] +dirTree' _ _ [] = return [] +dirTree' topdir prune (dir:dirs) + | prune dir = dirTree' topdir prune dirs + | otherwise = unsafeInterleaveIO $ do + subdirs <- collect [] =<< dirContents (topdir dir) + subdirs' <- dirTree' topdir prune (subdirs ++ dirs) + return $ subdirs ++ subdirs' + where + collect dirs' [] = return dirs' + collect dirs' (entry:entries) + | dirCruft entry || prune entry = collect dirs' entries + | otherwise = do + let dirEntry = dir entry + ifM (doesDirectoryExist $ topdir dirEntry) + ( collect (dirEntry:dirs') entries + , collect dirs' entries + ) + {- Moves one filename to another. - First tries a rename, but falls back to moving across devices if needed. -} moveFile :: FilePath -> FilePath -> IO () -- cgit v1.2.3 From a39b73d118c18707e6549d57a902fca9966119f8 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 18 Jun 2012 13:01:58 -0400 Subject: recurse dirTree and open the directories for kqueue to watch --- Assistant/Watcher.hs | 22 +++++++++++++--------- Utility/Kqueue.hs | 23 ++++++++++++++++++----- 2 files changed, 31 insertions(+), 14 deletions(-) diff --git a/Assistant/Watcher.hs b/Assistant/Watcher.hs index 7c913d98c..52c3780ab 100644 --- a/Assistant/Watcher.hs +++ b/Assistant/Watcher.hs @@ -37,8 +37,6 @@ import System.INotify import Utility.Kqueue #endif -type Handler = FilePath -> Maybe FileStatus -> DaemonStatusHandle -> Annex (Maybe Change) - checkCanWatch :: Annex () checkCanWatch = do #if (WITH_INOTIFY || WITH_KQUEUE) @@ -66,7 +64,7 @@ watchThread st dstatus changechan = withINotify $ \i -> do showAction "scanning" -- This does not return until the startup scan is done. -- That can take some time for large trees. - watchDir i "." (ignored . takeFileName) hooks + watchDir i "." ignored hooks runThreadState st $ modifyDaemonStatus dstatus $ \s -> s { scanComplete = True } -- Notice any files that were deleted before inotify @@ -86,18 +84,24 @@ watchThread st dstatus changechan = withINotify $ \i -> do } #else #ifdef WITH_KQUEUE -watchThread st dstatus changechan = do - print =<< waitChange [stdError, stdOutput] +watchThread st dstatus changechan = forever $ do + dirs <- scanRecursive "." ignored + changeddir <- waitChange dirs + print $ "detected a change in " ++ show changeddir #else watchThread = undefined #endif /* WITH_KQUEUE */ #endif /* WITH_INOTIFY */ ignored :: FilePath -> Bool -ignored ".git" = True -ignored ".gitignore" = True -ignored ".gitattributes" = True -ignored _ = False +ignored = ig . takeFileName + where + ig ".git" = True + ig ".gitignore" = True + ig ".gitattributes" = True + ig _ = False + +type Handler = FilePath -> Maybe FileStatus -> DaemonStatusHandle -> Annex (Maybe Change) {- Runs an action handler, inside the Annex monad, and if there was a - change, adds it to the ChangeChan. diff --git a/Utility/Kqueue.hs b/Utility/Kqueue.hs index bfc6ee9fc..aabea7d03 100644 --- a/Utility/Kqueue.hs +++ b/Utility/Kqueue.hs @@ -7,7 +7,10 @@ {-# LANGUAGE ForeignFunctionInterface #-} -module Utility.Kqueue ( waitChange ) where +module Utility.Kqueue ( + waitChange, + scanRecursive +) where import Common @@ -16,16 +19,26 @@ import Foreign.C.Types import Foreign.C.Error import Foreign.Ptr import Foreign.Marshal +import qualified Data.Map as M + +type DirMap = M.Map Fd FilePath foreign import ccall unsafe "libkqueue.h waitchange" c_waitchange :: Ptr Fd -> IO Fd -waitChange :: [Fd] -> IO (Maybe Fd) -waitChange fds = withArray fds $ \c_fds -> do - ret <- c_waitchange c_fds +waitChange :: DirMap -> IO (Maybe FilePath) +waitChange dirmap = withArray (M.keys dirmap) $ \c_fds -> do + changed <- c_waitchange c_fds ifM (safeErrno <$> getErrno) - ( return $ Just ret + ( return $ M.lookup changed dirmap , return Nothing ) where safeErrno (Errno v) = v == 0 + +scanRecursive :: FilePath -> (FilePath -> Bool) -> IO DirMap +scanRecursive dir prune = M.fromList <$> (mapM opendir =<< dirTree dir prune) + where + opendir d = (,) + <$> openFd d ReadOnly Nothing defaultFileFlags + <*> pure d -- cgit v1.2.3 From 89fcee03d0f542c25d1afa9962839916f70994b3 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 18 Jun 2012 13:19:40 -0400 Subject: add some utility functions for later Will need to update the DirMap to add or remove subdirs. --- Utility/Kqueue.hs | 39 +++++++++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/Utility/Kqueue.hs b/Utility/Kqueue.hs index aabea7d03..e8ce73b26 100644 --- a/Utility/Kqueue.hs +++ b/Utility/Kqueue.hs @@ -8,8 +8,10 @@ {-# LANGUAGE ForeignFunctionInterface #-} module Utility.Kqueue ( + scanRecursive, + addSubDir, + removeSubDir, waitChange, - scanRecursive ) where import Common @@ -23,9 +25,37 @@ import qualified Data.Map as M type DirMap = M.Map Fd FilePath +{- Builds a map of directories in a tree, possibly pruning some. + - Opens each directory in the tree. -} +scanRecursive :: FilePath -> (FilePath -> Bool) -> IO DirMap +scanRecursive dir prune = M.fromList <$> (mapM opendir =<< dirTree dir prune) + where + opendir d = (,) + <$> openFd d ReadOnly Nothing defaultFileFlags + <*> pure d + +{- Adds a subdirectory (and all its subdirectories, unless pruned) to a + - directory map. -} +addSubDir :: DirMap -> FilePath -> (FilePath -> Bool) -> IO DirMap +addSubDir dirmap dir prune = M.union dirmap <$> scanRecursive dir prune + +{- Removes a subdirectory (and all its subdirectories) from a directory map. -} +removeSubDir :: FilePath -> DirMap -> DirMap +removeSubDir dir = M.filter (not . dirContains dir) + foreign import ccall unsafe "libkqueue.h waitchange" c_waitchange :: Ptr Fd -> IO Fd +{- Waits for a change in a map of directories, and returns the directory + - where the change took place. + - + - The kqueue interface does not tell what type of change took place in + - the directory; it could be an added file, a deleted file, a renamed + - file, a new subdirectory, or a deleted subdirectory, or a moved + - subdirectory. + - + - Note that if subdirectories have changed, the caller will want to + - update the map before calling this again. -} waitChange :: DirMap -> IO (Maybe FilePath) waitChange dirmap = withArray (M.keys dirmap) $ \c_fds -> do changed <- c_waitchange c_fds @@ -35,10 +65,3 @@ waitChange dirmap = withArray (M.keys dirmap) $ \c_fds -> do ) where safeErrno (Errno v) = v == 0 - -scanRecursive :: FilePath -> (FilePath -> Bool) -> IO DirMap -scanRecursive dir prune = M.fromList <$> (mapM opendir =<< dirTree dir prune) - where - opendir d = (,) - <$> openFd d ReadOnly Nothing defaultFileFlags - <*> pure d -- cgit v1.2.3 From 90d565149abd7d752e22beb4aa57bf99522e5851 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 18 Jun 2012 16:18:59 -0400 Subject: flesh out kqueue library Have not tried to build this yet. But barring minor mistakes, I think it's good. --- Utility/Kqueue.hs | 40 ++++++++++++++++++++------------- Utility/libkqueue.c | 64 ++++++++++++++++++++++++++++++++++++++++++++--------- Utility/libkqueue.h | 3 ++- 3 files changed, 81 insertions(+), 26 deletions(-) diff --git a/Utility/Kqueue.hs b/Utility/Kqueue.hs index e8ce73b26..a3d8aff2d 100644 --- a/Utility/Kqueue.hs +++ b/Utility/Kqueue.hs @@ -25,6 +25,8 @@ import qualified Data.Map as M type DirMap = M.Map Fd FilePath +data Kqueue = Kqueue Fd DirMap + {- Builds a map of directories in a tree, possibly pruning some. - Opens each directory in the tree. -} scanRecursive :: FilePath -> (FilePath -> Bool) -> IO DirMap @@ -43,25 +45,33 @@ addSubDir dirmap dir prune = M.union dirmap <$> scanRecursive dir prune removeSubDir :: FilePath -> DirMap -> DirMap removeSubDir dir = M.filter (not . dirContains dir) -foreign import ccall unsafe "libkqueue.h waitchange" c_waitchange - :: Ptr Fd -> IO Fd +foreign import ccall unsafe "libkqueue.h init_kqueue" c_init_kqueue + :: CInt -> Ptr Fd -> IO Fd +foreign import ccall unsafe "libkqueue.h waitchange_kqueue" c_waitchange_kqueue + :: Fd -> IO Fd + +{- Initializes a Kqueue to watch a map of directories. -} +initKqueue :: DirMap -> IO Kqueue +initKqueue dirmap = withArrayLen (M.keys dirmap) $ \fdcnt c_fds -> + h <- c_init_kqueue (fromIntegral fdcnt) c_fds + return $ Kqueue h dirmap -{- Waits for a change in a map of directories, and returns the directory - - where the change took place. +{- Stops a Kqueue. Note: Does not directly close the Fds in the dirmap, + - so it can be reused. -} +stopKqueue :: Kqueue -> IO +stopKqueue (Kqueue h _) = closeFd h + +{- Waits for a change on a Kqueue, and returns the directory + - or directories where a change took place. - - The kqueue interface does not tell what type of change took place in - the directory; it could be an added file, a deleted file, a renamed - file, a new subdirectory, or a deleted subdirectory, or a moved - subdirectory. - - - Note that if subdirectories have changed, the caller will want to - - update the map before calling this again. -} -waitChange :: DirMap -> IO (Maybe FilePath) -waitChange dirmap = withArray (M.keys dirmap) $ \c_fds -> do - changed <- c_waitchange c_fds - ifM (safeErrno <$> getErrno) - ( return $ M.lookup changed dirmap - , return Nothing - ) - where - safeErrno (Errno v) = v == 0 + - Note that if subdirectories have changed, the caller should re-run + - initKqueue to get them watched. -} +waitChange :: Kqueue -> IO [FilePath] +waitChange (Kqueue h dirmap) = do + changed <- c_waitchange_kqueue h + return $ M.lookup changed dirmap diff --git a/Utility/libkqueue.c b/Utility/libkqueue.c index 0ef42b801..a919a60c7 100644 --- a/Utility/libkqueue.c +++ b/Utility/libkqueue.c @@ -5,18 +5,62 @@ * Licensed under the GNU GPL version 3 or higher. */ -#include #include +#include +#include +#include +#include +#include +#include -/* Waits for a change event on one of the array of directory fds, - * and returns the one that changed. */ -int waitchange(const int *fds) { -// if (kqueue(blah, &fds) != 0) -// return 0; /* errno is set */ -// else - errno = 0; +/* Initializes a kqueue, with a list of fds to watch for changes. + * Returns the kqueue's handle. */ +int init_kqueue(const int fdcnt, const int *fdlist) { + struct nodelay = {0, 0}; + int kq; - printf("in waitchange!, %i %i\n", fds[0], fds[1]); + if ((kq = kqueue()) == -1) { + perror("kqueue"); + exit(1); + } - return fds[0]; + /* Prime the pump with the list of fds, but don't wait for any + * change events. */ + helper(kq, fdcnt, fdlist, &nodelay); + + return kq; +} + +/* Waits for a change event on a kqueue. + * + * Returns the fd that changed, or -1 on error. + */ +signed int waitchange_kqueue(const int kq) { + helper(kq, 0, NULL, NULL); +} + +/* The specified fds are added to the set of fds being watched for changes. + * Fds passed to prior calls still take effect, so it's most efficient to + * not pass the same fds repeatedly. + */ +signed int helper(const int kq, const int fdcnt, const int *fdlist, cont struct *timeout) { + int i, nev; + struct kevent evlist[1]; + struct kevent chlist[fdcnt]; + + for (i = 0; i < fdcnt; i++) { + EV_SET(&chlist[i], fdlist[i], EVFILT_VNODE, + EV_ADD | EV_ENABLE | EV_CLEAR, + NOTE_WRITE, + 1, + timeout); + } + + nev = kevent(info->kq, info->chlist, info->cnt, info->evlist, + 1, NULL); + + if (nev == 1) + return evlist[0].ident; + else + return -1; } diff --git a/Utility/libkqueue.h b/Utility/libkqueue.h index 75af9eeba..1a285b8da 100644 --- a/Utility/libkqueue.h +++ b/Utility/libkqueue.h @@ -1 +1,2 @@ -int waitchange(const int *fds); +int init_kqueue(const int fdcnt, const int *fdlist); +signed int waitchange_kqueue(const int kq); -- cgit v1.2.3 From d680ff7ef06a3b0c8310836b03446e89d0ff9764 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 18 Jun 2012 20:33:27 +0000 Subject: kqueue code compiles on debian kfreebsd --- Assistant/Watcher.hs | 8 +++++--- Utility/Kqueue.hs | 13 ++++++++----- Utility/libkqueue.c | 55 ++++++++++++++++++++++++++-------------------------- 3 files changed, 40 insertions(+), 36 deletions(-) diff --git a/Assistant/Watcher.hs b/Assistant/Watcher.hs index 52c3780ab..13c27d080 100644 --- a/Assistant/Watcher.hs +++ b/Assistant/Watcher.hs @@ -84,10 +84,12 @@ watchThread st dstatus changechan = withINotify $ \i -> do } #else #ifdef WITH_KQUEUE -watchThread st dstatus changechan = forever $ do +watchThread st dstatus changechan = do dirs <- scanRecursive "." ignored - changeddir <- waitChange dirs - print $ "detected a change in " ++ show changeddir + kqueue <- initKqueue dirs + forever $ do + changeddir <- waitChange kqueue + print $ "detected a change in " ++ show changeddir #else watchThread = undefined #endif /* WITH_KQUEUE */ diff --git a/Utility/Kqueue.hs b/Utility/Kqueue.hs index a3d8aff2d..6da97d3fa 100644 --- a/Utility/Kqueue.hs +++ b/Utility/Kqueue.hs @@ -11,6 +11,10 @@ module Utility.Kqueue ( scanRecursive, addSubDir, removeSubDir, + + initKqueue, + stopKqueue, + waitChange, ) where @@ -18,7 +22,6 @@ import Common import System.Posix.Types import Foreign.C.Types -import Foreign.C.Error import Foreign.Ptr import Foreign.Marshal import qualified Data.Map as M @@ -52,17 +55,17 @@ foreign import ccall unsafe "libkqueue.h waitchange_kqueue" c_waitchange_kqueue {- Initializes a Kqueue to watch a map of directories. -} initKqueue :: DirMap -> IO Kqueue -initKqueue dirmap = withArrayLen (M.keys dirmap) $ \fdcnt c_fds -> +initKqueue dirmap = withArrayLen (M.keys dirmap) $ \fdcnt c_fds -> do h <- c_init_kqueue (fromIntegral fdcnt) c_fds return $ Kqueue h dirmap {- Stops a Kqueue. Note: Does not directly close the Fds in the dirmap, - so it can be reused. -} -stopKqueue :: Kqueue -> IO +stopKqueue :: Kqueue -> IO () stopKqueue (Kqueue h _) = closeFd h {- Waits for a change on a Kqueue, and returns the directory - - or directories where a change took place. + - where a change took place. - - The kqueue interface does not tell what type of change took place in - the directory; it could be an added file, a deleted file, a renamed @@ -71,7 +74,7 @@ stopKqueue (Kqueue h _) = closeFd h - - Note that if subdirectories have changed, the caller should re-run - initKqueue to get them watched. -} -waitChange :: Kqueue -> IO [FilePath] +waitChange :: Kqueue -> IO (Maybe FilePath) waitChange (Kqueue h dirmap) = do changed <- c_waitchange_kqueue h return $ M.lookup changed dirmap diff --git a/Utility/libkqueue.c b/Utility/libkqueue.c index a919a60c7..999508f7e 100644 --- a/Utility/libkqueue.c +++ b/Utility/libkqueue.c @@ -13,10 +13,35 @@ #include #include +/* The specified fds are added to the set of fds being watched for changes. + * Fds passed to prior calls still take effect, so it's most efficient to + * not pass the same fds repeatedly. + */ +signed int helper(const int kq, const int fdcnt, const int *fdlist, + struct timespec *timeout) { + int i, nev; + struct kevent evlist[1]; + struct kevent chlist[fdcnt]; + + for (i = 0; i < fdcnt; i++) { + EV_SET(&chlist[i], fdlist[i], EVFILT_VNODE, + EV_ADD | EV_ENABLE | EV_CLEAR, + NOTE_WRITE, + 0, 0); + } + + nev = kevent(kq, chlist, fdcnt, evlist, 1, timeout); + + if (nev == 1) + return evlist[0].ident; + else + return -1; +} + /* Initializes a kqueue, with a list of fds to watch for changes. * Returns the kqueue's handle. */ int init_kqueue(const int fdcnt, const int *fdlist) { - struct nodelay = {0, 0}; + struct timespec nodelay = {0, 0}; int kq; if ((kq = kqueue()) == -1) { @@ -36,31 +61,5 @@ int init_kqueue(const int fdcnt, const int *fdlist) { * Returns the fd that changed, or -1 on error. */ signed int waitchange_kqueue(const int kq) { - helper(kq, 0, NULL, NULL); -} - -/* The specified fds are added to the set of fds being watched for changes. - * Fds passed to prior calls still take effect, so it's most efficient to - * not pass the same fds repeatedly. - */ -signed int helper(const int kq, const int fdcnt, const int *fdlist, cont struct *timeout) { - int i, nev; - struct kevent evlist[1]; - struct kevent chlist[fdcnt]; - - for (i = 0; i < fdcnt; i++) { - EV_SET(&chlist[i], fdlist[i], EVFILT_VNODE, - EV_ADD | EV_ENABLE | EV_CLEAR, - NOTE_WRITE, - 1, - timeout); - } - - nev = kevent(info->kq, info->chlist, info->cnt, info->evlist, - 1, NULL); - - if (nev == 1) - return evlist[0].ident; - else - return -1; + return helper(kq, 0, NULL, NULL); } -- cgit v1.2.3 From a11825a1f153fc7fe9aa469055b3935f254a4e9d Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 18 Jun 2012 20:55:06 +0000 Subject: add test stub --- Utility/libkqueue.c | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/Utility/libkqueue.c b/Utility/libkqueue.c index 999508f7e..cc001045b 100644 --- a/Utility/libkqueue.c +++ b/Utility/libkqueue.c @@ -63,3 +63,13 @@ int init_kqueue(const int fdcnt, const int *fdlist) { signed int waitchange_kqueue(const int kq) { return helper(kq, 0, NULL, NULL); } + +/* +main () { + int list[1]; + int kq; + list[0]=open(".", O_RDONLY); + kq = init_kqueue(1, list); + printf("change: %i\n", waitchange_kqueue(kq)); +} +*/ -- cgit v1.2.3 From b141d9bcc8b5a4647e3c5f106115c8bf5a67f6cd Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 18 Jun 2012 22:02:57 +0000 Subject: retry interrupted kevent calls Many thanks to geekosaur in #haskell for help with this. --- Utility/libkqueue.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/Utility/libkqueue.c b/Utility/libkqueue.c index cc001045b..5b38cdd33 100644 --- a/Utility/libkqueue.c +++ b/Utility/libkqueue.c @@ -12,6 +12,7 @@ #include #include #include +#include /* The specified fds are added to the set of fds being watched for changes. * Fds passed to prior calls still take effect, so it's most efficient to @@ -30,7 +31,11 @@ signed int helper(const int kq, const int fdcnt, const int *fdlist, 0, 0); } - nev = kevent(kq, chlist, fdcnt, evlist, 1, timeout); + while ((nev = kevent(kq, chlist, fdcnt, evlist, 1, timeout))) { + if (!(nev == -1 && errno == EINTR)) { + break; + } + } if (nev == 1) return evlist[0].ident; -- cgit v1.2.3 From 1f6d80007c0cb9bca21bc744c8e2388e2f0fa8bc Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 18 Jun 2012 18:07:29 -0400 Subject: blog for the day --- .../assistant/blog/day_12__freebsd_redux.mdwn | 23 ++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 doc/design/assistant/blog/day_12__freebsd_redux.mdwn diff --git a/doc/design/assistant/blog/day_12__freebsd_redux.mdwn b/doc/design/assistant/blog/day_12__freebsd_redux.mdwn new file mode 100644 index 000000000..ba397788a --- /dev/null +++ b/doc/design/assistant/blog/day_12__freebsd_redux.mdwn @@ -0,0 +1,23 @@ +Followed my plan from yesterday, and wrote a simple C library to interface +to `kqueue`, and Haskell code to use that library. By now I think I +understand kqueue fairly well -- there are some very tricky parts to the +interface. + +But... it still did't work. After building all this, my code was +failing the same way that the +[haskell kqueue library failed](https://github.com/hesselink/kqueue/issues/1) +yesterday. I filed a [bug report with a testcase](). + +Then I thought to ask on #haskell. Got sorted out in quick order! The +problem turns out to be that haskell's runtime has a peridic SIGALARM, +that is interrupting my kevent call. It can be worked around with `+RTS -V0`, +but I put in a fix to retry to kevent when it's interrupted. + +And now `git-annex watch` can detect changes to directories on BSD and OSX! + +Note: I said "detect", not "do something useful in response to". Getting +from the limited kqueue events to actually staging changes in the git repo +is going to be another day's work. Still, brave FreeBSD or OSX users +might want to check out the `watch` branch from git and see if +`git annex watch` will at least *say* it sees changes you make to your +repository. -- cgit v1.2.3 From ae7d07ddcb5768cf477410e019d42601d8c2b744 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 18 Jun 2012 19:14:58 -0400 Subject: close fds --- Utility/Kqueue.hs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/Utility/Kqueue.hs b/Utility/Kqueue.hs index 6da97d3fa..d0b3c8a99 100644 --- a/Utility/Kqueue.hs +++ b/Utility/Kqueue.hs @@ -45,8 +45,12 @@ addSubDir :: DirMap -> FilePath -> (FilePath -> Bool) -> IO DirMap addSubDir dirmap dir prune = M.union dirmap <$> scanRecursive dir prune {- Removes a subdirectory (and all its subdirectories) from a directory map. -} -removeSubDir :: FilePath -> DirMap -> DirMap -removeSubDir dir = M.filter (not . dirContains dir) +removeSubDir :: FilePath -> DirMap -> IO DirMap +removeSubDir dir dirmap = do + mapM_ closeFd $ M.keys toremove) $ closeFd + return rest + where + (toremove, rest) = M.partition (dirContains dir) dirmap foreign import ccall unsafe "libkqueue.h init_kqueue" c_init_kqueue :: CInt -> Ptr Fd -> IO Fd -- cgit v1.2.3 From 2bfcc0b09c5dd37c5e0ab65cb089232bfcc31934 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 18 Jun 2012 21:29:30 -0400 Subject: kqueue: add directory content tracking, and change determination This *may* now return Add or Delete Changes as appropriate. All I know for sure is that it compiles. I had hoped to avoid maintaining my own state about the content of the directory tree, and rely on git to check what was changed. But I can't; I need to know about new and deleted subdirectories to add them to the watch list, and git doesn't deal with (empty) directories. So, wrote all the code to scan directories, remember their past contents, compare with current contents, generate appropriate Change events, and update bookkeeping info appropriately. --- Utility/Directory.hs | 27 --------- Utility/Kqueue.hs | 162 ++++++++++++++++++++++++++++++++++++++------------- 2 files changed, 121 insertions(+), 68 deletions(-) diff --git a/Utility/Directory.hs b/Utility/Directory.hs index b8ed63a36..2f2960a9d 100644 --- a/Utility/Directory.hs +++ b/Utility/Directory.hs @@ -56,33 +56,6 @@ dirContentsRecursive' topdir (dir:dirs) = unsafeInterleaveIO $ do , collect (dirEntry:files) dirs' entries ) -{- Gets the subdirectories in a directory, and their subdirectories, - - recursively, and lazily. Prunes sections of the tree matching a - - condition. -} -dirTree :: FilePath -> (FilePath -> Bool) -> IO [FilePath] -dirTree topdir prune - | prune topdir = return [] - | otherwise = (:) topdir <$> dirTree' topdir prune [""] - -dirTree' :: FilePath -> (FilePath -> Bool) -> [FilePath] -> IO [FilePath] -dirTree' _ _ [] = return [] -dirTree' topdir prune (dir:dirs) - | prune dir = dirTree' topdir prune dirs - | otherwise = unsafeInterleaveIO $ do - subdirs <- collect [] =<< dirContents (topdir dir) - subdirs' <- dirTree' topdir prune (subdirs ++ dirs) - return $ subdirs ++ subdirs' - where - collect dirs' [] = return dirs' - collect dirs' (entry:entries) - | dirCruft entry || prune entry = collect dirs' entries - | otherwise = do - let dirEntry = dir entry - ifM (doesDirectoryExist $ topdir dirEntry) - ( collect (dirEntry:dirs') entries - , collect dirs' entries - ) - {- Moves one filename to another. - First tries a rename, but falls back to moving across devices if needed. -} moveFile :: FilePath -> FilePath -> IO () diff --git a/Utility/Kqueue.hs b/Utility/Kqueue.hs index d0b3c8a99..911eb71a9 100644 --- a/Utility/Kqueue.hs +++ b/Utility/Kqueue.hs @@ -8,14 +8,10 @@ {-# LANGUAGE ForeignFunctionInterface #-} module Utility.Kqueue ( - scanRecursive, - addSubDir, - removeSubDir, - initKqueue, stopKqueue, - waitChange, + Change(..), ) where import Common @@ -25,60 +21,144 @@ import Foreign.C.Types import Foreign.Ptr import Foreign.Marshal import qualified Data.Map as M +import qualified Data.Set as S + +data Change + = Deleted FilePath + | Added FilePath + deriving (Show) + +isAdd :: Change -> Bool +isAdd (Added _) = True +isAdd (Deleted _) = False + +isDelete :: Change -> Bool +isDelete = not . isAdd + +changedFile :: Change -> FilePath +changedFile (Added f) = f +changedFile (Deleted f) = f -type DirMap = M.Map Fd FilePath +data Kqueue = Kqueue Fd DirMap Pruner -data Kqueue = Kqueue Fd DirMap +type Pruner = FilePath -> Bool + +type DirMap = M.Map Fd DirInfo + +{- A directory, and its last known contents (with filenames relative to it) -} +data DirInfo = DirInfo + { dirName :: FilePath + , dirCache :: S.Set FilePath + } + deriving (Show) + +getDirInfo :: FilePath -> IO DirInfo +getDirInfo dir = do + contents <- S.fromList . filter (not . dirCruft) + <$> getDirectoryContents dir + return $ DirInfo dir contents + +{- Difference between the dirCaches of two DirInfos. -} +(//) :: DirInfo -> DirInfo -> [Change] +old // new = deleted ++ added + where + deleted = calc Deleted old new + added = calc Added new old + calc a x y = map a . map (dirName x ) $ + S.toList $ S.difference (dirCache x) (dirCache y) {- Builds a map of directories in a tree, possibly pruning some. - - Opens each directory in the tree. -} -scanRecursive :: FilePath -> (FilePath -> Bool) -> IO DirMap -scanRecursive dir prune = M.fromList <$> (mapM opendir =<< dirTree dir prune) + - Opens each directory in the tree, and records its current contents. -} +scanRecursive :: FilePath -> Pruner -> IO DirMap +scanRecursive topdir prune = M.fromList <$> walk [] [topdir] where - opendir d = (,) - <$> openFd d ReadOnly Nothing defaultFileFlags - <*> pure d - -{- Adds a subdirectory (and all its subdirectories, unless pruned) to a - - directory map. -} -addSubDir :: DirMap -> FilePath -> (FilePath -> Bool) -> IO DirMap -addSubDir dirmap dir prune = M.union dirmap <$> scanRecursive dir prune - -{- Removes a subdirectory (and all its subdirectories) from a directory map. -} -removeSubDir :: FilePath -> DirMap -> IO DirMap -removeSubDir dir dirmap = do - mapM_ closeFd $ M.keys toremove) $ closeFd + walk c [] = return c + walk c (dir:rest) + | prune dir = walk c rest + | otherwise = do + info <- getDirInfo dir + fd <- openFd dir ReadOnly Nothing defaultFileFlags + dirs <- filterM (\d -> doesDirectoryExist $ dir d) + (S.toList $ dirCache info) + walk ((fd, info):c) (dirs++rest) + +{- Adds a list of subdirectories (and all their children), unless pruned to a + - directory map. Adding a subdirectory that's already in the map will + - cause its contents to be refreshed. -} +addSubDirs :: DirMap -> Pruner -> [FilePath] -> IO DirMap +addSubDirs dirmap prune dirs = do + newmap <- foldr M.union M.empty <$> + mapM (\d -> scanRecursive d prune) dirs + return $ M.union newmap dirmap -- prefer newmap + +{- Removes a subdirectory (and all its children) from a directory map. -} +removeSubDir :: DirMap -> FilePath -> IO DirMap +removeSubDir dirmap dir = do + mapM_ closeFd $ M.keys toremove return rest where - (toremove, rest) = M.partition (dirContains dir) dirmap + (toremove, rest) = M.partition (dirContains dir . dirName) dirmap foreign import ccall unsafe "libkqueue.h init_kqueue" c_init_kqueue :: CInt -> Ptr Fd -> IO Fd foreign import ccall unsafe "libkqueue.h waitchange_kqueue" c_waitchange_kqueue :: Fd -> IO Fd -{- Initializes a Kqueue to watch a map of directories. -} -initKqueue :: DirMap -> IO Kqueue -initKqueue dirmap = withArrayLen (M.keys dirmap) $ \fdcnt c_fds -> do - h <- c_init_kqueue (fromIntegral fdcnt) c_fds - return $ Kqueue h dirmap +{- Initializes a Kqueue to watch a directory, and all its subdirectories. -} +initKqueue :: FilePath -> Pruner -> IO Kqueue +initKqueue dir pruned = do + dirmap <- scanRecursive dir pruned + withArrayLen (M.keys dirmap) $ \fdcnt c_fds -> do + h <- c_init_kqueue (fromIntegral fdcnt) c_fds + return $ Kqueue h dirmap pruned {- Stops a Kqueue. Note: Does not directly close the Fds in the dirmap, - so it can be reused. -} stopKqueue :: Kqueue -> IO () -stopKqueue (Kqueue h _) = closeFd h +stopKqueue (Kqueue h _ _) = closeFd h -{- Waits for a change on a Kqueue, and returns the directory - - where a change took place. - - - - The kqueue interface does not tell what type of change took place in +{- Waits for a change on a Kqueue. + - May update the Kqueue. + -} +waitChange :: Kqueue -> IO (Kqueue, [Change]) +waitChange kq@(Kqueue h dirmap _) = do + changedfd <- c_waitchange_kqueue h + case M.lookup changedfd dirmap of + Nothing -> return (kq, []) + Just info -> handleChange kq changedfd info + +{- The kqueue interface does not tell what type of change took place in - the directory; it could be an added file, a deleted file, a renamed - file, a new subdirectory, or a deleted subdirectory, or a moved - - subdirectory. + - subdirectory. - - - Note that if subdirectories have changed, the caller should re-run - - initKqueue to get them watched. -} -waitChange :: Kqueue -> IO (Maybe FilePath) -waitChange (Kqueue h dirmap) = do - changed <- c_waitchange_kqueue h - return $ M.lookup changed dirmap + - So to determine this, the contents of the directory are compared + - with its last cached contents. The Kqueue is updated to watch new + - directories as necessary. + -} +handleChange :: Kqueue -> Fd -> DirInfo -> IO (Kqueue, [Change]) +handleChange kq@(Kqueue h dirmap pruner) fd olddirinfo = + go =<< catchMaybeIO (getDirInfo $ dirName olddirinfo) + where + go (Just newdirinfo) = do + let changes = olddirinfo // newdirinfo + let (added, deleted) = partition isAdd changes + + -- Scan newly added directories to add to the map. + -- (Newly added files will fail getDirInfo.) + newdirinfos <- catMaybes <$> + mapM (catchMaybeIO . getDirInfo . changedFile) added + newmap <- addSubDirs dirmap pruner $ map dirName newdirinfos + + -- Remove deleted directories from the map. + newmap' <- foldM removeSubDir newmap (map changedFile deleted) + + -- Update the cached dirinfo just looked up. + let newmap'' = M.insertWith' const fd newdirinfo newmap' + ret (newmap'', changes) + go Nothing = do + -- The directory has been moved or deleted, so + -- remove it from our map. + newmap <- removeSubDir dirmap (dirName olddirinfo) + ret (newmap, []) + ret (newmap, changes) = return $ (Kqueue h newmap pruner, changes) -- cgit v1.2.3 From 5e9fdac92fe91a60d8f4570ec5d785976fe6c3ee Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 18 Jun 2012 21:46:04 -0400 Subject: update kqueue when new directories are added --- Utility/Kqueue.hs | 27 +++++++++++++++++++++------ Utility/libkqueue.c | 25 ++++++++++++------------- 2 files changed, 33 insertions(+), 19 deletions(-) diff --git a/Utility/Kqueue.hs b/Utility/Kqueue.hs index 911eb71a9..08029d703 100644 --- a/Utility/Kqueue.hs +++ b/Utility/Kqueue.hs @@ -100,7 +100,9 @@ removeSubDir dirmap dir = do (toremove, rest) = M.partition (dirContains dir . dirName) dirmap foreign import ccall unsafe "libkqueue.h init_kqueue" c_init_kqueue - :: CInt -> Ptr Fd -> IO Fd + :: IO Fd +foreign import ccall unsafe "libkqueue.h addfds_kqueue" c_addfds_kqueue + :: Fd -> CInt -> Ptr Fd -> IO () foreign import ccall unsafe "libkqueue.h waitchange_kqueue" c_waitchange_kqueue :: Fd -> IO Fd @@ -108,9 +110,16 @@ foreign import ccall unsafe "libkqueue.h waitchange_kqueue" c_waitchange_kqueue initKqueue :: FilePath -> Pruner -> IO Kqueue initKqueue dir pruned = do dirmap <- scanRecursive dir pruned + h <- c_init_kqueue + let kq = Kqueue h dirmap pruned + updateKqueue kq + return kq + +{- Updates a Kqueue, adding watches for its map. -} +updateKqueue :: Kqueue -> IO () +updateKqueue (Kqueue h dirmap _) = withArrayLen (M.keys dirmap) $ \fdcnt c_fds -> do - h <- c_init_kqueue (fromIntegral fdcnt) c_fds - return $ Kqueue h dirmap pruned + c_addfds_kqueue h (fromIntegral fdcnt) c_fds {- Stops a Kqueue. Note: Does not directly close the Fds in the dirmap, - so it can be reused. -} @@ -155,10 +164,16 @@ handleChange kq@(Kqueue h dirmap pruner) fd olddirinfo = -- Update the cached dirinfo just looked up. let newmap'' = M.insertWith' const fd newdirinfo newmap' - ret (newmap'', changes) + + -- When new directories were added, need to update + -- the kqueue to watch them. + let kq' = Kqueue h newmap'' pruner + unless (null newdirinfos) $ + updateKqueue kq' + + return (kq', changes) go Nothing = do -- The directory has been moved or deleted, so -- remove it from our map. newmap <- removeSubDir dirmap (dirName olddirinfo) - ret (newmap, []) - ret (newmap, changes) = return $ (Kqueue h newmap pruner, changes) + return (Kqueue h newmap pruner, []) diff --git a/Utility/libkqueue.c b/Utility/libkqueue.c index 5b38cdd33..b7f9595dc 100644 --- a/Utility/libkqueue.c +++ b/Utility/libkqueue.c @@ -18,11 +18,12 @@ * Fds passed to prior calls still take effect, so it's most efficient to * not pass the same fds repeatedly. */ -signed int helper(const int kq, const int fdcnt, const int *fdlist, - struct timespec *timeout) { +signed int helper(const int kq, const int fdcnt, const int *fdlist, int nodelay) { int i, nev; struct kevent evlist[1]; struct kevent chlist[fdcnt]; + struct timespec avoiddelay = {0, 0}; + struct timespec *timeout = nodelay ? &avoiddelay : NULL; for (i = 0; i < fdcnt; i++) { EV_SET(&chlist[i], fdlist[i], EVFILT_VNODE, @@ -43,30 +44,27 @@ signed int helper(const int kq, const int fdcnt, const int *fdlist, return -1; } -/* Initializes a kqueue, with a list of fds to watch for changes. - * Returns the kqueue's handle. */ +/* Initializes a new, empty kqueue. */ int init_kqueue(const int fdcnt, const int *fdlist) { - struct timespec nodelay = {0, 0}; int kq; - if ((kq = kqueue()) == -1) { perror("kqueue"); exit(1); } - - /* Prime the pump with the list of fds, but don't wait for any - * change events. */ - helper(kq, fdcnt, fdlist, &nodelay); - return kq; } +/* Adds fds to the set that should be watched. */ +void addfds_kqueue(const int kq, const int fdcnt, const int *fdlist) { + helper(kq, fdcnt, fdlist, 1); +} + /* Waits for a change event on a kqueue. * * Returns the fd that changed, or -1 on error. */ signed int waitchange_kqueue(const int kq) { - return helper(kq, 0, NULL, NULL); + return helper(kq, 0, NULL, 0); } /* @@ -74,7 +72,8 @@ main () { int list[1]; int kq; list[0]=open(".", O_RDONLY); - kq = init_kqueue(1, list); + kq = init_kqueue(); + addfds_kqueue(kq, 1, list) printf("change: %i\n", waitchange_kqueue(kq)); } */ -- cgit v1.2.3 From 3d163f5ff9cae58e8a1e27215e58dab91180faff Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 19 Jun 2012 01:52:07 +0000 Subject: fix build --- Assistant/Watcher.hs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Assistant/Watcher.hs b/Assistant/Watcher.hs index 13c27d080..e2dd5cd2a 100644 --- a/Assistant/Watcher.hs +++ b/Assistant/Watcher.hs @@ -34,7 +34,7 @@ import Utility.Inotify import System.INotify #endif #ifdef WITH_KQUEUE -import Utility.Kqueue +import qualified Utility.Kqueue as Kqueue #endif checkCanWatch :: Annex () @@ -84,12 +84,12 @@ watchThread st dstatus changechan = withINotify $ \i -> do } #else #ifdef WITH_KQUEUE -watchThread st dstatus changechan = do - dirs <- scanRecursive "." ignored - kqueue <- initKqueue dirs - forever $ do - changeddir <- waitChange kqueue - print $ "detected a change in " ++ show changeddir +watchThread st dstatus changechan = go =<< Kqueue.initKqueue "." ignored + where + go kq = do + (kq', changes) <- Kqueue.waitChange kq + print $ "detected a change in " ++ show changes + go kq' #else watchThread = undefined #endif /* WITH_KQUEUE */ -- cgit v1.2.3 From 2d457bf8dfa9e69050d213df664d0407072304ad Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 19 Jun 2012 01:52:07 +0000 Subject: fix build --- Assistant/Watcher.hs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Assistant/Watcher.hs b/Assistant/Watcher.hs index 13c27d080..e2dd5cd2a 100644 --- a/Assistant/Watcher.hs +++ b/Assistant/Watcher.hs @@ -34,7 +34,7 @@ import Utility.Inotify import System.INotify #endif #ifdef WITH_KQUEUE -import Utility.Kqueue +import qualified Utility.Kqueue as Kqueue #endif checkCanWatch :: Annex () @@ -84,12 +84,12 @@ watchThread st dstatus changechan = withINotify $ \i -> do } #else #ifdef WITH_KQUEUE -watchThread st dstatus changechan = do - dirs <- scanRecursive "." ignored - kqueue <- initKqueue dirs - forever $ do - changeddir <- waitChange kqueue - print $ "detected a change in " ++ show changeddir +watchThread st dstatus changechan = go =<< Kqueue.initKqueue "." ignored + where + go kq = do + (kq', changes) <- Kqueue.waitChange kq + print $ "detected a change in " ++ show changes + go kq' #else watchThread = undefined #endif /* WITH_KQUEUE */ -- cgit v1.2.3 From e16455327247656bc47e331be710d6bd58b2675f Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 19 Jun 2012 02:13:26 +0000 Subject: robustness fixes --- Utility/Kqueue.hs | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/Utility/Kqueue.hs b/Utility/Kqueue.hs index 08029d703..5b4920f2f 100644 --- a/Utility/Kqueue.hs +++ b/Utility/Kqueue.hs @@ -12,6 +12,9 @@ module Utility.Kqueue ( stopKqueue, waitChange, Change(..), + changedFile, + isAdd, + isDelete, ) where import Common @@ -60,10 +63,10 @@ getDirInfo dir = do {- Difference between the dirCaches of two DirInfos. -} (//) :: DirInfo -> DirInfo -> [Change] -old // new = deleted ++ added +oldc // newc = deleted ++ added where - deleted = calc Deleted old new - added = calc Added new old + deleted = calc Deleted oldc newc + added = calc Added newc oldc calc a x y = map a . map (dirName x ) $ S.toList $ S.difference (dirCache x) (dirCache y) @@ -76,11 +79,18 @@ scanRecursive topdir prune = M.fromList <$> walk [] [topdir] walk c (dir:rest) | prune dir = walk c rest | otherwise = do - info <- getDirInfo dir - fd <- openFd dir ReadOnly Nothing defaultFileFlags - dirs <- filterM (\d -> doesDirectoryExist $ dir d) - (S.toList $ dirCache info) - walk ((fd, info):c) (dirs++rest) + minfo <- catchMaybeIO $ getDirInfo dir + case minfo of + Nothing -> walk c rest + Just info -> do + mfd <- catchMaybeIO $ + openFd dir ReadOnly Nothing defaultFileFlags + case mfd of + Nothing -> walk c rest + Just fd -> do + let subdirs = map (dir ) $ + S.toList $ dirCache info + walk ((fd, info):c) (subdirs ++ rest) {- Adds a list of subdirectories (and all their children), unless pruned to a - directory map. Adding a subdirectory that's already in the map will @@ -146,7 +156,7 @@ waitChange kq@(Kqueue h dirmap _) = do - directories as necessary. -} handleChange :: Kqueue -> Fd -> DirInfo -> IO (Kqueue, [Change]) -handleChange kq@(Kqueue h dirmap pruner) fd olddirinfo = +handleChange (Kqueue h dirmap pruner) fd olddirinfo = go =<< catchMaybeIO (getDirInfo $ dirName olddirinfo) where go (Just newdirinfo) = do -- cgit v1.2.3 From 22b563408bf08158872ddb8e65c16f36b0ab712d Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 19 Jun 2012 02:13:39 +0000 Subject: refactor --- Assistant/Watcher.hs | 36 +++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/Assistant/Watcher.hs b/Assistant/Watcher.hs index e2dd5cd2a..7e0a16f40 100644 --- a/Assistant/Watcher.hs +++ b/Assistant/Watcher.hs @@ -60,18 +60,9 @@ needLsof = error $ unlines watchThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO () #ifdef WITH_INOTIFY watchThread st dstatus changechan = withINotify $ \i -> do - runThreadState st $ - showAction "scanning" - -- This does not return until the startup scan is done. - -- That can take some time for large trees. - watchDir i "." ignored hooks - runThreadState st $ - modifyDaemonStatus dstatus $ \s -> s { scanComplete = True } - -- Notice any files that were deleted before inotify - -- was started. - runThreadState st $ do - inRepo $ Git.Command.run "add" [Param "--update"] - showAction "started" + statupScan st dstatus $ + watchDir i "." ignored hooks + -- Let the inotify thread run. waitForTermination where hook a = Just $ runHandler st dstatus changechan a @@ -84,7 +75,10 @@ watchThread st dstatus changechan = withINotify $ \i -> do } #else #ifdef WITH_KQUEUE -watchThread st dstatus changechan = go =<< Kqueue.initKqueue "." ignored +watchThread st dstatus changechan = do + kq <- statupScan st dstatus $ + Kqueue.initKqueue "." ignored + go kq where go kq = do (kq', changes) <- Kqueue.waitChange kq @@ -95,6 +89,22 @@ watchThread = undefined #endif /* WITH_KQUEUE */ #endif /* WITH_INOTIFY */ +{- Initial scartup scan. The action should return once the scan is complete. -} +statupScan :: ThreadState -> DaemonStatusHandle -> IO a -> IO a +statupScan st dstatus scanner = do + runThreadState st $ + showAction "scanning" + r <- scanner + runThreadState st $ + modifyDaemonStatus dstatus $ \s -> s { scanComplete = True } + + -- Notice any files that were deleted before watching was started. + runThreadState st $ do + inRepo $ Git.Command.run "add" [Param "--update"] + showAction "started" + + return r + ignored :: FilePath -> Bool ignored = ig . takeFileName where -- cgit v1.2.3 From 7a09d74319c0e68dddfa2cf1979731a030e8881e Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 18 Jun 2012 23:47:48 -0400 Subject: lifted out the kqueue and inotify to a generic DirWatcher interface Kqueue code for dispatching events is not tested and probably doesn't build. --- Assistant/Watcher.hs | 49 +++--------- Utility/DirWatcher.hs | 53 +++++++++++++ Utility/INotify.hs | 171 +++++++++++++++++++++++++++++++++++++++++ Utility/Inotify.hs | 180 -------------------------------------------- Utility/Kqueue.hs | 25 ++++++ Utility/Types/DirWatcher.hs | 22 ++++++ debian/changelog | 7 +- 7 files changed, 284 insertions(+), 223 deletions(-) create mode 100644 Utility/DirWatcher.hs create mode 100644 Utility/INotify.hs delete mode 100644 Utility/Inotify.hs create mode 100644 Utility/Types/DirWatcher.hs diff --git a/Assistant/Watcher.hs b/Assistant/Watcher.hs index 7e0a16f40..a2ca2396e 100644 --- a/Assistant/Watcher.hs +++ b/Assistant/Watcher.hs @@ -13,7 +13,8 @@ import Common.Annex import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.Committer -import Utility.ThreadScheduler +import Utility.DirWatcher +import Utility.Types.DirWatcher import qualified Annex.Queue import qualified Git.Command import qualified Git.UpdateIndex @@ -29,25 +30,12 @@ import Control.Concurrent.STM import Data.Bits.Utils import qualified Data.ByteString.Lazy as L -#ifdef WITH_INOTIFY -import Utility.Inotify -import System.INotify -#endif -#ifdef WITH_KQUEUE -import qualified Utility.Kqueue as Kqueue -#endif - checkCanWatch :: Annex () -checkCanWatch = do -#if (WITH_INOTIFY || WITH_KQUEUE) - unlessM (liftIO (inPath "lsof") <||> Annex.getState Annex.force) $ - needLsof -#else -#if defined linux_HOST_OS -#warning "Building without inotify support; watch mode will be disabled." -#endif - error "watch mode is not available on this system" -#endif +checkCanWatch + | canWatch = + unlessM (liftIO (inPath "lsof") <||> Annex.getState Annex.force) $ + needLsof + | otherwise = error "watch mode is not available on this system" needLsof :: Annex () needLsof = error $ unlines @@ -58,13 +46,9 @@ needLsof = error $ unlines ] watchThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO () -#ifdef WITH_INOTIFY -watchThread st dstatus changechan = withINotify $ \i -> do - statupScan st dstatus $ - watchDir i "." ignored hooks - -- Let the inotify thread run. - waitForTermination +watchThread st dstatus changechan = watchDir "." ignored hooks startup where + startup = statupScan st dstatus hook a = Just $ runHandler st dstatus changechan a hooks = WatchHooks { addHook = hook onAdd @@ -73,21 +57,6 @@ watchThread st dstatus changechan = withINotify $ \i -> do , delDirHook = hook onDelDir , errHook = hook onErr } -#else -#ifdef WITH_KQUEUE -watchThread st dstatus changechan = do - kq <- statupScan st dstatus $ - Kqueue.initKqueue "." ignored - go kq - where - go kq = do - (kq', changes) <- Kqueue.waitChange kq - print $ "detected a change in " ++ show changes - go kq' -#else -watchThread = undefined -#endif /* WITH_KQUEUE */ -#endif /* WITH_INOTIFY */ {- Initial scartup scan. The action should return once the scan is complete. -} statupScan :: ThreadState -> DaemonStatusHandle -> IO a -> IO a diff --git a/Utility/DirWatcher.hs b/Utility/DirWatcher.hs new file mode 100644 index 000000000..575036190 --- /dev/null +++ b/Utility/DirWatcher.hs @@ -0,0 +1,53 @@ +{- generic directory watching interface + - + - Uses either inotify or kqueue to watch a directory (and subdirectories) + - for changes, and runs hooks for different sorts of events as they occur. + - + - Copyright 2012 Joey Hess + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} + +module Utility.DirWatcher where + +import Utility.Types.DirWatcher + +#if WITH_INOTIFY +import qualified Utility.INotify as INotify +import qualified System.INotify as INotify +import Utility.ThreadScheduler +#endif +#if WITH_KQUEUE +import qualified Utility.Kqueue as Kqueue +#endif + +type Pruner = FilePath -> Bool + +canWatch :: Bool +#if (WITH_INOTIFY || WITH_KQUEUE) +canWatch = True +#else +#if defined linux_HOST_OS +#warning "Building without inotify support" +#endif +canWatch = False +#endif + +#if WITH_INOTIFY +watchDir :: FilePath -> Pruner -> WatchHooks -> (IO () -> IO ()) -> IO () +watchDir dir prune hooks runstartup = INotify.withINotify $ \i -> do + runstartup $ INotify.watchDir i dir prune hooks + waitForTermination -- Let the inotify thread run. +#else +#if WITH_KQUEUE +watchDir :: FilePath -> Pruner -> WatchHooks -> (IO Kqueue.Kqueue -> IO Kqueue.Kqueue) -> IO () +watchDir dir ignored hooks runstartup = do + kq <- runstartup $ Kqueue.initKqueue dir ignored + Kqueue.runHooks kq hooks +#else +watchDir :: FilePath -> Pruner -> WatchHooks -> (IO () -> IO ()) -> IO () +watchDir = undefined +#endif +#endif diff --git a/Utility/INotify.hs b/Utility/INotify.hs new file mode 100644 index 000000000..bf87f4e71 --- /dev/null +++ b/Utility/INotify.hs @@ -0,0 +1,171 @@ +{- higher-level inotify interface + - + - Copyright 2012 Joey Hess + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Utility.INotify where + +import Common hiding (isDirectory) +import Utility.ThreadLock +import Utility.Types.DirWatcher + +import System.INotify +import qualified System.Posix.Files as Files +import System.IO.Error +import Control.Exception (throw) + +{- Watches for changes to files in a directory, and all its subdirectories + - that are not ignored, using inotify. This function returns after + - its initial scan is complete, leaving a thread running. Callbacks are + - made for different events. + - + - Inotify is weak at recursive directory watching; the whole directory + - tree must be scanned and watches set explicitly for each subdirectory. + - + - To notice newly created subdirectories, inotify is used, and + - watches are registered for those directories. There is a race there; + - things can be added to a directory before the watch gets registered. + - + - To close the inotify race, each time a new directory is found, it also + - recursively scans it, assuming all files in it were just added, + - and registering each subdirectory. + - + - Note: Due to the race amelioration, multiple add events may occur + - for the same file. + - + - Note: Moving a file will cause events deleting it from its old location + - and adding it to the new location. + - + - Note: Modification of files is not detected, and it's assumed that when + - a file that was open for write is closed, it's finished being written + - to, and can be added. + - + - Note: inotify has a limit to the number of watches allowed, + - /proc/sys/fs/inotify/max_user_watches (default 8192). + - So this will fail if there are too many subdirectories. The + - errHook is called when this happens. + -} +watchDir :: INotify -> FilePath -> (FilePath -> Bool) -> WatchHooks -> IO () +watchDir i dir ignored hooks + | ignored dir = noop + | otherwise = do + -- Use a lock to make sure events generated during initial + -- scan come before real inotify events. + lock <- newLock + let handler event = withLock lock (void $ go event) + void (addWatch i watchevents dir handler) + `catchIO` failedaddwatch + withLock lock $ + mapM_ scan =<< filter (not . dirCruft) <$> + getDirectoryContents dir + where + recurse d = watchDir i d ignored hooks + + -- Select only inotify events required by the enabled + -- hooks, but always include Create so new directories can + -- be scanned. + watchevents = Create : addevents ++ delevents + addevents + | hashook addHook || hashook addSymlinkHook = [MoveIn, CloseWrite] + | otherwise = [] + delevents + | hashook delHook || hashook delDirHook = [MoveOut, Delete] + | otherwise = [] + + scan f = unless (ignored f) $ do + ms <- getstatus f + case ms of + Nothing -> return () + Just s + | Files.isDirectory s -> + recurse $ indir f + | Files.isSymbolicLink s -> + runhook addSymlinkHook f ms + | Files.isRegularFile s -> + runhook addHook f ms + | otherwise -> + noop + + -- Ignore creation events for regular files, which won't be + -- done being written when initially created, but handle for + -- directories and symlinks. + go (Created { isDirectory = isd, filePath = f }) + | isd = recurse $ indir f + | hashook addSymlinkHook = + checkfiletype Files.isSymbolicLink addSymlinkHook f + | otherwise = noop + -- Closing a file is assumed to mean it's done being written. + go (Closed { isDirectory = False, maybeFilePath = Just f }) = + checkfiletype Files.isRegularFile addHook f + -- When a file or directory is moved in, scan it to add new + -- stuff. + go (MovedIn { filePath = f }) = scan f + go (MovedOut { isDirectory = isd, filePath = f }) + | isd = runhook delDirHook f Nothing + | otherwise = runhook delHook f Nothing + -- Verify that the deleted item really doesn't exist, + -- since there can be spurious deletion events for items + -- in a directory that has been moved out, but is still + -- being watched. + go (Deleted { isDirectory = isd, filePath = f }) + | isd = guarded $ runhook delDirHook f Nothing + | otherwise = guarded $ runhook delHook f Nothing + where + guarded = unlessM (filetype (const True) f) + go _ = noop + + hashook h = isJust $ h hooks + + runhook h f s + | ignored f = noop + | otherwise = maybe noop (\a -> a (indir f) s) (h hooks) + + indir f = dir f + + getstatus f = catchMaybeIO $ getSymbolicLinkStatus $ indir f + checkfiletype check h f = do + ms <- getstatus f + case ms of + Just s + | check s -> runhook h f ms + _ -> noop + filetype t f = catchBoolIO $ t <$> getSymbolicLinkStatus (indir f) + + -- Inotify fails when there are too many watches with a + -- disk full error. + failedaddwatch e + | isFullError e = + case errHook hooks of + Nothing -> throw e + Just hook -> tooManyWatches hook dir + | otherwise = throw e + +tooManyWatches :: (String -> Maybe FileStatus -> IO ()) -> FilePath -> IO () +tooManyWatches hook dir = do + sysctlval <- querySysctl [Param maxwatches] :: IO (Maybe Integer) + hook (unlines $ basewarning : maybe withoutsysctl withsysctl sysctlval) Nothing + where + maxwatches = "fs.inotify.max_user_watches" + basewarning = "Too many directories to watch! (Not watching " ++ dir ++")" + withoutsysctl = ["Increase the value in /proc/sys/fs/inotify/max_user_watches"] + withsysctl n = let new = n * 10 in + [ "Increase the limit permanently by running:" + , " echo " ++ maxwatches ++ "=" ++ show new ++ + " | sudo tee -a /etc/sysctl.conf; sudo sysctl -p" + , "Or temporarily by running:" + , " sudo sysctl -w " ++ maxwatches ++ "=" ++ show new + ] + +querySysctl :: Read a => [CommandParam] -> IO (Maybe a) +querySysctl ps = do + v <- catchMaybeIO $ hPipeFrom "sysctl" $ toCommand ps + case v of + Nothing -> return Nothing + Just (pid, h) -> do + val <- parsesysctl <$> hGetContentsStrict h + void $ getProcessStatus True False $ processID pid + return val + where + parsesysctl s = readish =<< lastMaybe (words s) diff --git a/Utility/Inotify.hs b/Utility/Inotify.hs deleted file mode 100644 index 9ad947f31..000000000 --- a/Utility/Inotify.hs +++ /dev/null @@ -1,180 +0,0 @@ -{- higher-level inotify interface - - - - Copyright 2012 Joey Hess - - - - Licensed under the GNU GPL version 3 or higher. - -} - -module Utility.Inotify where - -import Common hiding (isDirectory) -import Utility.ThreadLock - -import System.INotify -import qualified System.Posix.Files as Files -import System.IO.Error -import Control.Exception (throw) - -type Hook a = Maybe (a -> Maybe FileStatus -> IO ()) - -data WatchHooks = WatchHooks - { addHook :: Hook FilePath - , addSymlinkHook :: Hook FilePath - , delHook :: Hook FilePath - , delDirHook :: Hook FilePath - , errHook :: Hook String -- error message - } - -{- Watches for changes to files in a directory, and all its subdirectories - - that are not ignored, using inotify. This function returns after - - its initial scan is complete, leaving a thread running. Callbacks are - - made for different events. - - - - Inotify is weak at recursive directory watching; the whole directory - - tree must be scanned and watches set explicitly for each subdirectory. - - - - To notice newly created subdirectories, inotify is used, and - - watches are registered for those directories. There is a race there; - - things can be added to a directory before the watch gets registered. - - - - To close the inotify race, each time a new directory is found, it also - - recursively scans it, assuming all files in it were just added, - - and registering each subdirectory. - - - - Note: Due to the race amelioration, multiple add events may occur - - for the same file. - - - - Note: Moving a file will cause events deleting it from its old location - - and adding it to the new location. - - - - Note: Modification of files is not detected, and it's assumed that when - - a file that was open for write is closed, it's finished being written - - to, and can be added. - - - - Note: inotify has a limit to the number of watches allowed, - - /proc/sys/fs/inotify/max_user_watches (default 8192). - - So this will fail if there are too many subdirectories. The - - errHook is called when this happens. - -} -watchDir :: INotify -> FilePath -> (FilePath -> Bool) -> WatchHooks -> IO () -watchDir i dir ignored hooks - | ignored dir = noop - | otherwise = do - -- Use a lock to make sure events generated during initial - -- scan come before real inotify events. - lock <- newLock - let handler event = withLock lock (void $ go event) - void (addWatch i watchevents dir handler) - `catchIO` failedaddwatch - withLock lock $ - mapM_ scan =<< filter (not . dirCruft) <$> - getDirectoryContents dir - where - recurse d = watchDir i d ignored hooks - - -- Select only inotify events required by the enabled - -- hooks, but always include Create so new directories can - -- be scanned. - watchevents = Create : addevents ++ delevents - addevents - | hashook addHook || hashook addSymlinkHook = [MoveIn, CloseWrite] - | otherwise = [] - delevents - | hashook delHook || hashook delDirHook = [MoveOut, Delete] - | otherwise = [] - - scan f = unless (ignored f) $ do - ms <- getstatus f - case ms of - Nothing -> return () - Just s - | Files.isDirectory s -> - recurse $ indir f - | Files.isSymbolicLink s -> - runhook addSymlinkHook f ms - | Files.isRegularFile s -> - runhook addHook f ms - | otherwise -> - noop - - -- Ignore creation events for regular files, which won't be - -- done being written when initially created, but handle for - -- directories and symlinks. - go (Created { isDirectory = isd, filePath = f }) - | isd = recurse $ indir f - | hashook addSymlinkHook = - checkfiletype Files.isSymbolicLink addSymlinkHook f - | otherwise = noop - -- Closing a file is assumed to mean it's done being written. - go (Closed { isDirectory = False, maybeFilePath = Just f }) = - checkfiletype Files.isRegularFile addHook f - -- When a file or directory is moved in, scan it to add new - -- stuff. - go (MovedIn { filePath = f }) = scan f - go (MovedOut { isDirectory = isd, filePath = f }) - | isd = runhook delDirHook f Nothing - | otherwise = runhook delHook f Nothing - -- Verify that the deleted item really doesn't exist, - -- since there can be spurious deletion events for items - -- in a directory that has been moved out, but is still - -- being watched. - go (Deleted { isDirectory = isd, filePath = f }) - | isd = guarded $ runhook delDirHook f Nothing - | otherwise = guarded $ runhook delHook f Nothing - where - guarded = unlessM (filetype (const True) f) - go _ = noop - - hashook h = isJust $ h hooks - - runhook h f s - | ignored f = noop - | otherwise = maybe noop (\a -> a (indir f) s) (h hooks) - - indir f = dir f - - getstatus f = catchMaybeIO $ getSymbolicLinkStatus $ indir f - checkfiletype check h f = do - ms <- getstatus f - case ms of - Just s - | check s -> runhook h f ms - _ -> noop - filetype t f = catchBoolIO $ t <$> getSymbolicLinkStatus (indir f) - - -- Inotify fails when there are too many watches with a - -- disk full error. - failedaddwatch e - | isFullError e = - case errHook hooks of - Nothing -> throw e - Just hook -> tooManyWatches hook dir - | otherwise = throw e - -tooManyWatches :: (String -> Maybe FileStatus -> IO ()) -> FilePath -> IO () -tooManyWatches hook dir = do - sysctlval <- querySysctl [Param maxwatches] :: IO (Maybe Integer) - hook (unlines $ basewarning : maybe withoutsysctl withsysctl sysctlval) Nothing - where - maxwatches = "fs.inotify.max_user_watches" - basewarning = "Too many directories to watch! (Not watching " ++ dir ++")" - withoutsysctl = ["Increase the value in /proc/sys/fs/inotify/max_user_watches"] - withsysctl n = let new = n * 10 in - [ "Increase the limit permanently by running:" - , " echo " ++ maxwatches ++ "=" ++ show new ++ - " | sudo tee -a /etc/sysctl.conf; sudo sysctl -p" - , "Or temporarily by running:" - , " sudo sysctl -w " ++ maxwatches ++ "=" ++ show new - ] - -querySysctl :: Read a => [CommandParam] -> IO (Maybe a) -querySysctl ps = do - v <- catchMaybeIO $ hPipeFrom "sysctl" $ toCommand ps - case v of - Nothing -> return Nothing - Just (pid, h) -> do - val <- parsesysctl <$> hGetContentsStrict h - void $ getProcessStatus True False $ processID pid - return val - where - parsesysctl s = readish =<< lastMaybe (words s) diff --git a/Utility/Kqueue.hs b/Utility/Kqueue.hs index 5b4920f2f..30218bc29 100644 --- a/Utility/Kqueue.hs +++ b/Utility/Kqueue.hs @@ -15,9 +15,11 @@ module Utility.Kqueue ( changedFile, isAdd, isDelete, + runHooks, ) where import Common +import Utility.Types.DirWatcher import System.Posix.Types import Foreign.C.Types @@ -187,3 +189,26 @@ handleChange (Kqueue h dirmap pruner) fd olddirinfo = -- remove it from our map. newmap <- removeSubDir dirmap (dirName olddirinfo) return (Kqueue h newmap pruner, []) + +{- Processes changes on the Kqueue, calling the hooks as appropriate. + - Never returns. -} +runHooks :: Kqueue -> WatchHooks -> IO () +runHooks kq hooks = do + (kq', changes) <- Kqueue.waitChange kq + forM_ changes $ dispatch kq' + runHooks kq' hooks + where + -- Kqueue returns changes for both whole directories + -- being added and deleted, and individual files being + -- added and deleted. + dispatch q change status + | isAdd change = withstatus s (dispatchadd q) + | isDelete change = callhook delDirHook change + dispatchadd q change s + | Files.isSymbolicLink = callhook addSymlinkHook change + | Files.isDirectory = print $ "TODO: recursive directory add: " ++ show change + | Files.isRegularFile = callhook addHook change + | otherwise = noop + callhook h change = hooks h $ changedFile change + withstatus change a = maybe noop (a change) =<< + (catchMaybeIO (getSymbolicLinkStatus (changedFile change) diff --git a/Utility/Types/DirWatcher.hs b/Utility/Types/DirWatcher.hs new file mode 100644 index 000000000..c828a0593 --- /dev/null +++ b/Utility/Types/DirWatcher.hs @@ -0,0 +1,22 @@ +{- generic directory watching types + - + - Copyright 2012 Joey Hess + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE CPP #-} + +module Utility.Types.DirWatcher where + +import Common + +type Hook a = Maybe (a -> Maybe FileStatus -> IO ()) + +data WatchHooks = WatchHooks + { addHook :: Hook FilePath + , addSymlinkHook :: Hook FilePath + , delHook :: Hook FilePath + , delDirHook :: Hook FilePath + , errHook :: Hook String -- error message + } diff --git a/debian/changelog b/debian/changelog index 9a47447ce..f756a8538 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,8 +1,9 @@ git-annex (3.20120616) UNRELEASED; urgency=low - * watch: New subcommand, which uses inotify to watch for changes to - files and automatically annexes new files, etc, so you don't need - to manually run git commands when manipulating files. + * watch: New subcommand, a daemon which notices changes to + files and automatically annexes new files, etc, so you don't + need to manually run git commands when manipulating files. + Available on Linux, BSDs, and OSX! * Enable diskfree on kfreebsd, using statvfs. -- Joey Hess Tue, 12 Jun 2012 11:35:59 -0400 -- cgit v1.2.3 From 02e9fdb0a5940a1c059445c616338dc147a32544 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 19 Jun 2012 04:04:40 +0000 Subject: kqueue build fix new event dispatch seems a bit broken though --- Utility/Kqueue.hs | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/Utility/Kqueue.hs b/Utility/Kqueue.hs index 30218bc29..da43a2d86 100644 --- a/Utility/Kqueue.hs +++ b/Utility/Kqueue.hs @@ -8,6 +8,7 @@ {-# LANGUAGE ForeignFunctionInterface #-} module Utility.Kqueue ( + Kqueue, initKqueue, stopKqueue, waitChange, @@ -27,6 +28,7 @@ import Foreign.Ptr import Foreign.Marshal import qualified Data.Map as M import qualified Data.Set as S +import qualified System.Posix.Files as Files data Change = Deleted FilePath @@ -194,21 +196,25 @@ handleChange (Kqueue h dirmap pruner) fd olddirinfo = - Never returns. -} runHooks :: Kqueue -> WatchHooks -> IO () runHooks kq hooks = do - (kq', changes) <- Kqueue.waitChange kq - forM_ changes $ dispatch kq' + (kq', changes) <- waitChange kq + forM_ changes $ \c -> do + print c + dispatch kq' c runHooks kq' hooks where -- Kqueue returns changes for both whole directories -- being added and deleted, and individual files being -- added and deleted. - dispatch q change status - | isAdd change = withstatus s (dispatchadd q) - | isDelete change = callhook delDirHook change + dispatch q change + | isAdd change = withstatus change $ dispatchadd q + | otherwise = callhook delDirHook Nothing change dispatchadd q change s - | Files.isSymbolicLink = callhook addSymlinkHook change - | Files.isDirectory = print $ "TODO: recursive directory add: " ++ show change - | Files.isRegularFile = callhook addHook change + | Files.isSymbolicLink s = callhook addSymlinkHook (Just s) change + | Files.isDirectory s = print $ "TODO: recursive directory add: " ++ show change + | Files.isRegularFile s = callhook addHook (Just s) change | otherwise = noop - callhook h change = hooks h $ changedFile change + callhook h s change = case h hooks of + Nothing -> noop + Just a -> a (changedFile change) s withstatus change a = maybe noop (a change) =<< - (catchMaybeIO (getSymbolicLinkStatus (changedFile change) + (catchMaybeIO (getSymbolicLinkStatus (changedFile change))) -- cgit v1.2.3 From a5cceb7d4ff83b11da95cac204e99d1bfdbaecc9 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 19 Jun 2012 00:23:14 -0400 Subject: make --force really bypass lsof check --- Assistant/Committer.hs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/Assistant/Committer.hs b/Assistant/Committer.hs index a2b65dae5..b482e5e7a 100644 --- a/Assistant/Committer.hs +++ b/Assistant/Committer.hs @@ -7,6 +7,7 @@ module Assistant.Committer where import Common.Annex import Assistant.ThreadedMonad +import qualified Annex import qualified Annex.Queue import qualified Git.Command import qualified Command.Add @@ -153,18 +154,21 @@ handleAdds st changechan cs {- Checks which of a set of files can safely be added. - Files are locked down as hard links in a temp directory, - - with their write bits disabled. But some may have already - - been opened for write, so lsof is run on the temp directory + - with their write bits disabled. But some may still be + - opened for write, so lsof is run on the temp directory - to check them. -} safeToAdd :: ThreadState -> [FilePath] -> IO [KeySource] safeToAdd st files = do locked <- catMaybes <$> lockdown files - runThreadState st $ do - tmpdir <- fromRepo gitAnnexTmpDir - open <- S.fromList . map fst3 . filter openwrite <$> - liftIO (Lsof.queryDir tmpdir) - catMaybes <$> forM locked (go open) + runThreadState st $ ifM (Annex.getState Annex.force) + ( return locked -- force bypasses lsof check + , do + tmpdir <- fromRepo gitAnnexTmpDir + open <- S.fromList . map fst3 . filter openwrite <$> + liftIO (Lsof.queryDir tmpdir) + catMaybes <$> forM locked (go open) + ) where go open keysource | S.member (contentLocation keysource) open = do -- cgit v1.2.3 From 03b9341356c8d4eabfec5864957a4e49e7fcac67 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 19 Jun 2012 04:52:55 +0000 Subject: fix scheduling Handle kevent interruptions in the haskell code, so it can yield to other threads --- Utility/Kqueue.hs | 17 ++++++++++++----- Utility/libkqueue.c | 14 ++++---------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/Utility/Kqueue.hs b/Utility/Kqueue.hs index da43a2d86..1f65b2dba 100644 --- a/Utility/Kqueue.hs +++ b/Utility/Kqueue.hs @@ -24,11 +24,13 @@ import Utility.Types.DirWatcher import System.Posix.Types import Foreign.C.Types +import Foreign.C.Error import Foreign.Ptr import Foreign.Marshal import qualified Data.Map as M import qualified Data.Set as S import qualified System.Posix.Files as Files +import Control.Concurrent data Change = Deleted FilePath @@ -146,9 +148,14 @@ stopKqueue (Kqueue h _ _) = closeFd h waitChange :: Kqueue -> IO (Kqueue, [Change]) waitChange kq@(Kqueue h dirmap _) = do changedfd <- c_waitchange_kqueue h - case M.lookup changedfd dirmap of - Nothing -> return (kq, []) - Just info -> handleChange kq changedfd info + if changedfd == -1 + then ifM ((==) eINTR <$> getErrno) + (yield >> waitChange kq, nochange) + else case M.lookup changedfd dirmap of + Nothing -> nochange + Just info -> handleChange kq changedfd info + where + nochange = return (kq, []) {- The kqueue interface does not tell what type of change took place in - the directory; it could be an added file, a deleted file, a renamed @@ -212,9 +219,9 @@ runHooks kq hooks = do | Files.isSymbolicLink s = callhook addSymlinkHook (Just s) change | Files.isDirectory s = print $ "TODO: recursive directory add: " ++ show change | Files.isRegularFile s = callhook addHook (Just s) change - | otherwise = noop + | otherwise = print "not a file??" callhook h s change = case h hooks of - Nothing -> noop + Nothing -> print "missing hook??" Just a -> a (changedFile change) s withstatus change a = maybe noop (a change) =<< (catchMaybeIO (getSymbolicLinkStatus (changedFile change))) diff --git a/Utility/libkqueue.c b/Utility/libkqueue.c index b7f9595dc..643a63b97 100644 --- a/Utility/libkqueue.c +++ b/Utility/libkqueue.c @@ -17,6 +17,8 @@ /* The specified fds are added to the set of fds being watched for changes. * Fds passed to prior calls still take effect, so it's most efficient to * not pass the same fds repeatedly. + * + * Returns the fd that changed, or -1 on error. */ signed int helper(const int kq, const int fdcnt, const int *fdlist, int nodelay) { int i, nev; @@ -32,12 +34,7 @@ signed int helper(const int kq, const int fdcnt, const int *fdlist, int nodelay) 0, 0); } - while ((nev = kevent(kq, chlist, fdcnt, evlist, 1, timeout))) { - if (!(nev == -1 && errno == EINTR)) { - break; - } - } - + nev = kevent(kq, chlist, fdcnt, evlist, 1, timeout); if (nev == 1) return evlist[0].ident; else @@ -59,10 +56,7 @@ void addfds_kqueue(const int kq, const int fdcnt, const int *fdlist) { helper(kq, fdcnt, fdlist, 1); } -/* Waits for a change event on a kqueue. - * - * Returns the fd that changed, or -1 on error. - */ +/* Waits for a change event on a kqueue. */ signed int waitchange_kqueue(const int kq) { return helper(kq, 0, NULL, 0); } -- cgit v1.2.3 From fd3e94593224fe0e656c7bb1dc117db057575f4e Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 19 Jun 2012 01:56:36 -0400 Subject: fix prototype --- Utility/libkqueue.c | 2 +- Utility/libkqueue.h | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/Utility/libkqueue.c b/Utility/libkqueue.c index 643a63b97..b5a19a135 100644 --- a/Utility/libkqueue.c +++ b/Utility/libkqueue.c @@ -42,7 +42,7 @@ signed int helper(const int kq, const int fdcnt, const int *fdlist, int nodelay) } /* Initializes a new, empty kqueue. */ -int init_kqueue(const int fdcnt, const int *fdlist) { +int init_kqueue() { int kq; if ((kq = kqueue()) == -1) { perror("kqueue"); diff --git a/Utility/libkqueue.h b/Utility/libkqueue.h index 1a285b8da..692b47f14 100644 --- a/Utility/libkqueue.h +++ b/Utility/libkqueue.h @@ -1,2 +1,3 @@ -int init_kqueue(const int fdcnt, const int *fdlist); +int init_kqueue(); +void addfds_kqueue(const int kq, const int fdcnt, const int *fdlist); signed int waitchange_kqueue(const int kq); -- cgit v1.2.3 From 4ab9449cee0cb1377a768b44fe832282ac1f88b9 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 19 Jun 2012 02:23:45 -0400 Subject: add eventsCoalesce --- Utility/DirWatcher.hs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/Utility/DirWatcher.hs b/Utility/DirWatcher.hs index 575036190..bf184ff8a 100644 --- a/Utility/DirWatcher.hs +++ b/Utility/DirWatcher.hs @@ -35,6 +35,24 @@ canWatch = True canWatch = False #endif +/* With inotify, discrete events will be received when making multiple changes + * to the same filename. For example, adding it, deleting it, and adding it + * again will be three events. + * + * OTOH, with kqueue, often only one event is received, indicating the most + * recent state of the file. + */ +eventsCoalesce :: Bool +#if WITH_INOTIFY +eventsCoalesce = False +#else +#if WITH_KQUEUE +eventsCoalesce = True +#else +eventsCoalesce = undefined +#endif +#endif + #if WITH_INOTIFY watchDir :: FilePath -> Pruner -> WatchHooks -> (IO () -> IO ()) -> IO () watchDir dir prune hooks runstartup = INotify.withINotify $ \i -> do -- cgit v1.2.3 From 57cf65eb6d811ba7fd19eb62a54e3b83a0c2dfa7 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 19 Jun 2012 02:40:21 -0400 Subject: fix kevent symlink creation --- Assistant.hs | 1 + Assistant/Changes.hs | 59 ++++++++++++++++++++++++++++++++++++++ Assistant/Committer.hs | 70 +++++++++++----------------------------------- Assistant/SanityChecker.hs | 2 +- Assistant/Watcher.hs | 2 +- Command/Add.hs | 6 ++-- 6 files changed, 83 insertions(+), 57 deletions(-) create mode 100644 Assistant/Changes.hs diff --git a/Assistant.hs b/Assistant.hs index 880d3eb5e..e924d9477 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -46,6 +46,7 @@ module Assistant where import Common.Annex import Assistant.ThreadedMonad import Assistant.DaemonStatus +import Assistant.Changes import Assistant.Watcher import Assistant.Committer import Assistant.SanityChecker diff --git a/Assistant/Changes.hs b/Assistant/Changes.hs new file mode 100644 index 000000000..1cad42326 --- /dev/null +++ b/Assistant/Changes.hs @@ -0,0 +1,59 @@ +{- git-annex assistant change tracking + - + - Copyright 2012 Joey Hess + -} + +module Assistant.Changes where + +import Common.Annex +import qualified Annex.Queue + +import Control.Concurrent.STM +import Data.Time.Clock + +data ChangeType = PendingAddChange | LinkChange | RmChange | RmDirChange + deriving (Show, Eq) + +type ChangeChan = TChan Change + +data Change = Change + { changeTime :: UTCTime + , changeFile :: FilePath + , changeType :: ChangeType + } + deriving (Show) + +runChangeChan :: STM a -> IO a +runChangeChan = atomically + +newChangeChan :: IO ChangeChan +newChangeChan = atomically newTChan + +{- Handlers call this when they made a change that needs to get committed. -} +madeChange :: FilePath -> ChangeType -> Annex (Maybe Change) +madeChange f t = do + -- Just in case the commit thread is not flushing the queue fast enough. + when (t /= PendingAddChange) $ + Annex.Queue.flushWhenFull + liftIO $ Just <$> (Change <$> getCurrentTime <*> pure f <*> pure t) + +noChange :: Annex (Maybe Change) +noChange = return Nothing + +{- Gets all unhandled changes. + - Blocks until at least one change is made. -} +getChanges :: ChangeChan -> IO [Change] +getChanges chan = runChangeChan $ do + c <- readTChan chan + go [c] + where + go l = do + v <- tryReadTChan chan + case v of + Nothing -> return l + Just c -> go (c:l) + +{- Puts unhandled changes back into the channel. + - Note: Original order is not preserved. -} +refillChanges :: ChangeChan -> [Change] -> IO () +refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs diff --git a/Assistant/Committer.hs b/Assistant/Committer.hs index b482e5e7a..d3f7f15c5 100644 --- a/Assistant/Committer.hs +++ b/Assistant/Committer.hs @@ -1,4 +1,4 @@ -{- git-annex assistant change tracking and committing +{- git-annex assistant commit thread - - Copyright 2012 Joey Hess -} @@ -6,67 +6,24 @@ module Assistant.Committer where import Common.Annex +import Assistant.Changes import Assistant.ThreadedMonad +import Assistant.Watcher import qualified Annex import qualified Annex.Queue import qualified Git.Command +import qualified Git.HashObject +import Git.Types import qualified Command.Add import Utility.ThreadScheduler import qualified Utility.Lsof as Lsof +import qualified Utility.DirWatcher as DirWatcher import Types.Backend -import Control.Concurrent.STM import Data.Time.Clock import Data.Tuple.Utils import qualified Data.Set as S -data ChangeType = PendingAddChange | LinkChange | RmChange | RmDirChange - deriving (Show, Eq) - -type ChangeChan = TChan Change - -data Change = Change - { changeTime :: UTCTime - , changeFile :: FilePath - , changeType :: ChangeType - } - deriving (Show) - -runChangeChan :: STM a -> IO a -runChangeChan = atomically - -newChangeChan :: IO ChangeChan -newChangeChan = atomically newTChan - -{- Handlers call this when they made a change that needs to get committed. -} -madeChange :: FilePath -> ChangeType -> Annex (Maybe Change) -madeChange f t = do - -- Just in case the commit thread is not flushing the queue fast enough. - when (t /= PendingAddChange) $ - Annex.Queue.flushWhenFull - liftIO $ Just <$> (Change <$> getCurrentTime <*> pure f <*> pure t) - -noChange :: Annex (Maybe Change) -noChange = return Nothing - -{- Gets all unhandled changes. - - Blocks until at least one change is made. -} -getChanges :: ChangeChan -> IO [Change] -getChanges chan = runChangeChan $ do - c <- readTChan chan - go [c] - where - go l = do - v <- tryReadTChan chan - case v of - Nothing -> return l - Just c -> go (c:l) - -{- Puts unhandled changes back into the channel. - - Note: Original order is not preserved. -} -refillChanges :: ChangeChan -> [Change] -> IO () -refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs - {- This thread makes git commits at appropriate times. -} commitThread :: ThreadState -> ChangeChan -> IO () commitThread st changechan = runEvery (Seconds 1) $ do @@ -122,7 +79,9 @@ shouldCommit now changes - - When a file is added, Inotify will notice the new symlink. So this waits - for additional Changes to arrive, so that the symlink has hopefully been - - staged before returning, and will be committed. + - staged before returning, and will be committed immediately. OTOH, for + - kqueue, eventsCoalesce, so instead the symlink is directly created and + - staged. -} handleAdds :: ThreadState -> ChangeChan -> [Change] -> IO () handleAdds st changechan cs @@ -131,8 +90,9 @@ handleAdds st changechan cs toadd' <- safeToAdd st toadd unless (null toadd') $ do added <- filter id <$> forM toadd' add - unless (null added) $ - handleAdds st changechan =<< getChanges changechan + when (DirWatcher.eventsCoalesce && not (null added)) $ + handleAdds st changechan + =<< getChanges changechan where toadd = map changeFile $ filter isPendingAdd cs @@ -148,7 +108,11 @@ handleAdds st changechan cs showEndFail return False handle file (Just key) = do - Command.Add.link file key True + link <- Command.Add.link file key True + when DirWatcher.eventsCoalesce $ do + sha <- inRepo $ + Git.HashObject.hashObject BlobObject link + stageSymlink file sha showEndOk return True diff --git a/Assistant/SanityChecker.hs b/Assistant/SanityChecker.hs index a5f138024..e2ca9da74 100644 --- a/Assistant/SanityChecker.hs +++ b/Assistant/SanityChecker.hs @@ -11,7 +11,7 @@ import Common.Annex import qualified Git.LsFiles import Assistant.DaemonStatus import Assistant.ThreadedMonad -import Assistant.Committer +import Assistant.Changes import Utility.ThreadScheduler import qualified Assistant.Watcher diff --git a/Assistant/Watcher.hs b/Assistant/Watcher.hs index a2ca2396e..cb7ede920 100644 --- a/Assistant/Watcher.hs +++ b/Assistant/Watcher.hs @@ -12,7 +12,7 @@ module Assistant.Watcher where import Common.Annex import Assistant.ThreadedMonad import Assistant.DaemonStatus -import Assistant.Committer +import Assistant.Changes import Utility.DirWatcher import Utility.Types.DirWatcher import qualified Annex.Queue diff --git a/Command/Add.hs b/Command/Add.hs index 7a6696063..43f186fbf 100644 --- a/Command/Add.hs +++ b/Command/Add.hs @@ -97,8 +97,8 @@ undo file key e = do src <- inRepo $ gitAnnexLocation key liftIO $ moveFile src file -{- Creates the symlink to the annexed content. -} -link :: FilePath -> Key -> Bool -> Annex () +{- Creates the symlink to the annexed content, returns the link target. -} +link :: FilePath -> Key -> Bool -> Annex String link file key hascontent = handle (undo file key) $ do l <- calcGitLink file key liftIO $ createSymbolicLink l file @@ -112,6 +112,8 @@ link file key hascontent = handle (undo file key) $ do mtime <- modificationTime <$> getFileStatus file touch file (TimeSpec mtime) False + return l + {- Note: Several other commands call this, and expect it to - create the symlink and add it. -} cleanup :: FilePath -> Key -> Bool -> CommandCleanup -- cgit v1.2.3 From 627504744c80c8a7b3f4b43e3646a5ad5c35d92f Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 19 Jun 2012 09:17:06 -0400 Subject: inverted logic --- Assistant/Committer.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Assistant/Committer.hs b/Assistant/Committer.hs index d3f7f15c5..74f0922b7 100644 --- a/Assistant/Committer.hs +++ b/Assistant/Committer.hs @@ -90,7 +90,7 @@ handleAdds st changechan cs toadd' <- safeToAdd st toadd unless (null toadd') $ do added <- filter id <$> forM toadd' add - when (DirWatcher.eventsCoalesce && not (null added)) $ + unless (DirWatcher.eventsCoalesce || null added) $ handleAdds st changechan =<< getChanges changechan where -- cgit v1.2.3 From 2a61df23e72ed4880f8927e6094acd9b256bb13b Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 19 Jun 2012 09:56:03 -0400 Subject: kqueue recursive directory adding --- Utility/Kqueue.hs | 41 ++++++++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/Utility/Kqueue.hs b/Utility/Kqueue.hs index 1f65b2dba..a0edcb5a9 100644 --- a/Utility/Kqueue.hs +++ b/Utility/Kqueue.hs @@ -48,7 +48,11 @@ changedFile :: Change -> FilePath changedFile (Added f) = f changedFile (Deleted f) = f -data Kqueue = Kqueue Fd DirMap Pruner +data Kqueue = Kqueue + { kqueueFd :: Fd + , kqueueMap :: DirMap + , kqueuePruner :: Pruner + } type Pruner = FilePath -> Bool @@ -115,6 +119,13 @@ removeSubDir dirmap dir = do where (toremove, rest) = M.partition (dirContains dir . dirName) dirmap +findDirContents :: DirMap -> FilePath -> [FilePath] +findDirContents dirmap dir = concatMap absolutecontents $ search + where + absolutecontents i = map (dirName i ) (S.toList $ dirCache i) + search = map snd $ M.toList $ + M.filter (\i -> dirName i == dir) dirmap + foreign import ccall unsafe "libkqueue.h init_kqueue" c_init_kqueue :: IO Fd foreign import ccall unsafe "libkqueue.h addfds_kqueue" c_addfds_kqueue @@ -140,7 +151,7 @@ updateKqueue (Kqueue h dirmap _) = {- Stops a Kqueue. Note: Does not directly close the Fds in the dirmap, - so it can be reused. -} stopKqueue :: Kqueue -> IO () -stopKqueue (Kqueue h _ _) = closeFd h +stopKqueue = closeFd . kqueueFd {- Waits for a change on a Kqueue. - May update the Kqueue. @@ -206,22 +217,30 @@ runHooks kq hooks = do (kq', changes) <- waitChange kq forM_ changes $ \c -> do print c - dispatch kq' c + dispatch (kqueueMap kq') c runHooks kq' hooks where -- Kqueue returns changes for both whole directories -- being added and deleted, and individual files being -- added and deleted. - dispatch q change - | isAdd change = withstatus change $ dispatchadd q + dispatch dirmap change + | isAdd change = withstatus change $ dispatchadd dirmap | otherwise = callhook delDirHook Nothing change - dispatchadd q change s - | Files.isSymbolicLink s = callhook addSymlinkHook (Just s) change - | Files.isDirectory s = print $ "TODO: recursive directory add: " ++ show change - | Files.isRegularFile s = callhook addHook (Just s) change - | otherwise = print "not a file??" + dispatchadd dirmap change s + | Files.isSymbolicLink s = + callhook addSymlinkHook (Just s) change + | Files.isDirectory s = do + -- Recursively add directory contents. + let contents = findDirContents dirmap $ + changedFile change + forM_ contents $ \f -> + withstatus (Added f) $ + dispatchadd dirmap + | Files.isRegularFile s = + callhook addHook (Just s) change + | otherwise = noop callhook h s change = case h hooks of - Nothing -> print "missing hook??" + Nothing -> noop Just a -> a (changedFile change) s withstatus change a = maybe noop (a change) =<< (catchMaybeIO (getSymbolicLinkStatus (changedFile change))) -- cgit v1.2.3 From e68b3c99f44a00cb6e5c405115746b6bbad1e2cc Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 19 Jun 2012 10:08:06 -0400 Subject: kqueue synthetic add events on startup --- Utility/Kqueue.hs | 40 +++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/Utility/Kqueue.hs b/Utility/Kqueue.hs index a0edcb5a9..7e7e653ec 100644 --- a/Utility/Kqueue.hs +++ b/Utility/Kqueue.hs @@ -50,8 +50,9 @@ changedFile (Deleted f) = f data Kqueue = Kqueue { kqueueFd :: Fd + , kqueueTop :: FilePath , kqueueMap :: DirMap - , kqueuePruner :: Pruner + , _kqueuePruner :: Pruner } type Pruner = FilePath -> Bool @@ -138,13 +139,13 @@ initKqueue :: FilePath -> Pruner -> IO Kqueue initKqueue dir pruned = do dirmap <- scanRecursive dir pruned h <- c_init_kqueue - let kq = Kqueue h dirmap pruned + let kq = Kqueue h dir dirmap pruned updateKqueue kq return kq {- Updates a Kqueue, adding watches for its map. -} updateKqueue :: Kqueue -> IO () -updateKqueue (Kqueue h dirmap _) = +updateKqueue (Kqueue h _ dirmap _) = withArrayLen (M.keys dirmap) $ \fdcnt c_fds -> do c_addfds_kqueue h (fromIntegral fdcnt) c_fds @@ -157,7 +158,7 @@ stopKqueue = closeFd . kqueueFd - May update the Kqueue. -} waitChange :: Kqueue -> IO (Kqueue, [Change]) -waitChange kq@(Kqueue h dirmap _) = do +waitChange kq@(Kqueue h _ dirmap _) = do changedfd <- c_waitchange_kqueue h if changedfd == -1 then ifM ((==) eINTR <$> getErrno) @@ -178,7 +179,7 @@ waitChange kq@(Kqueue h dirmap _) = do - directories as necessary. -} handleChange :: Kqueue -> Fd -> DirInfo -> IO (Kqueue, [Change]) -handleChange (Kqueue h dirmap pruner) fd olddirinfo = +handleChange kq@(Kqueue _ _ dirmap pruner) fd olddirinfo = go =<< catchMaybeIO (getDirInfo $ dirName olddirinfo) where go (Just newdirinfo) = do @@ -199,7 +200,7 @@ handleChange (Kqueue h dirmap pruner) fd olddirinfo = -- When new directories were added, need to update -- the kqueue to watch them. - let kq' = Kqueue h newmap'' pruner + let kq' = kq { kqueueMap = newmap'' } unless (null newdirinfos) $ updateKqueue kq' @@ -208,18 +209,21 @@ handleChange (Kqueue h dirmap pruner) fd olddirinfo = -- The directory has been moved or deleted, so -- remove it from our map. newmap <- removeSubDir dirmap (dirName olddirinfo) - return (Kqueue h newmap pruner, []) + return (kq { kqueueMap = newmap }, []) {- Processes changes on the Kqueue, calling the hooks as appropriate. - Never returns. -} runHooks :: Kqueue -> WatchHooks -> IO () runHooks kq hooks = do - (kq', changes) <- waitChange kq - forM_ changes $ \c -> do - print c - dispatch (kqueueMap kq') c - runHooks kq' hooks + -- First, synthetic add events for the whole directory tree contents, + -- to catch any files created beforehand. + recursiveadd (kqueueMap kq) (Added $ kqueueTop kq) + loop kq where + loop q = do + (q', changes) <- waitChange q + forM_ changes $ dispatch (kqueueMap q') + loop q' -- Kqueue returns changes for both whole directories -- being added and deleted, and individual files being -- added and deleted. @@ -229,16 +233,14 @@ runHooks kq hooks = do dispatchadd dirmap change s | Files.isSymbolicLink s = callhook addSymlinkHook (Just s) change - | Files.isDirectory s = do - -- Recursively add directory contents. - let contents = findDirContents dirmap $ - changedFile change - forM_ contents $ \f -> - withstatus (Added f) $ - dispatchadd dirmap + | Files.isDirectory s = recursiveadd dirmap change | Files.isRegularFile s = callhook addHook (Just s) change | otherwise = noop + recursiveadd dirmap change = do + let contents = findDirContents dirmap $ changedFile change + forM_ contents $ \f -> + withstatus (Added f) $ dispatchadd dirmap callhook h s change = case h hooks of Nothing -> noop Just a -> a (changedFile change) s -- cgit v1.2.3 From 5580af5789427fc5fd7cd74fd4a2529668621a68 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 19 Jun 2012 10:22:36 -0400 Subject: add closingTracked flag --- Utility/DirWatcher.hs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/Utility/DirWatcher.hs b/Utility/DirWatcher.hs index bf184ff8a..baab244c7 100644 --- a/Utility/DirWatcher.hs +++ b/Utility/DirWatcher.hs @@ -53,6 +53,25 @@ eventsCoalesce = undefined #endif #endif +/* With inotify, file closing is tracked to some extent, so an add event + * will always be received for a file once its writer closes it, and + * (typically) not before. This may mean multiple add events for the same file. + * + * OTOH, with kqueue, add events will often be received while a file is + * still being written to, and then no add event will be received once the + * writer closes it. + */ +closingTracked :: Bool +#if WITH_INOTIFY +closingTracked = True +#else +#if WITH_KQUEUE +closingTracked = False +#else +eventsCoalesce = undefined +#endif +#endif + #if WITH_INOTIFY watchDir :: FilePath -> Pruner -> WatchHooks -> (IO () -> IO ()) -> IO () watchDir dir prune hooks runstartup = INotify.withINotify $ \i -> do -- cgit v1.2.3 From 88e26046d7916e306e819c33aba1d6365167b6c5 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 20 Jun 2012 15:27:54 -0400 Subject: typo --- debian/changelog | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debian/changelog b/debian/changelog index a0e15946f..5d8d973c6 100644 --- a/debian/changelog +++ b/debian/changelog @@ -4,7 +4,7 @@ git-annex (3.20120616) UNRELEASED; urgency=low files and automatically annexes new files, etc, so you don't need to manually run git commands when manipulating files. Available on Linux, BSDs, and OSX! - * Enable diskfree on kfreebsd, using statvfs. + * Enable diskfree on kfreebsd, using kqueue. * unused: Fix crash when key names contain invalid utf8. -- Joey Hess Tue, 12 Jun 2012 11:35:59 -0400 -- cgit v1.2.3 From ad11de94e54d17c765d980bfe249eca1c9b6cabd Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 20 Jun 2012 15:53:56 -0400 Subject: typo --- Utility/DirWatcher.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Utility/DirWatcher.hs b/Utility/DirWatcher.hs index baab244c7..11ce7baef 100644 --- a/Utility/DirWatcher.hs +++ b/Utility/DirWatcher.hs @@ -68,7 +68,7 @@ closingTracked = True #if WITH_KQUEUE closingTracked = False #else -eventsCoalesce = undefined +closingTracked = undefined #endif #endif -- cgit v1.2.3 From e0fdfb2e706da2cb1451193c658dc676b0530968 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 20 Jun 2012 16:07:14 -0400 Subject: maintain set of files pendingAdd Kqueue needs to remember which files failed to be added due to being open, and retry them. This commit gets the data in place for such a retry thread. Broke KeySource out into its own file, and added Eq and Ord instances so it can be stored in a Set. --- Assistant.hs | 2 +- Assistant/Committer.hs | 55 +++++++++++++++++++++++++++++++---------------- Assistant/DaemonStatus.hs | 10 ++++++++- Backend.hs | 6 +++--- Backend/SHA.hs | 1 + Backend/WORM.hs | 1 + Command/Add.hs | 1 + Command/AddUrl.hs | 1 + Command/Migrate.hs | 1 + Types/Backend.hs | 8 +------ Types/KeySource.hs | 33 ++++++++++++++++++++++++++++ 11 files changed, 89 insertions(+), 30 deletions(-) create mode 100644 Types/KeySource.hs diff --git a/Assistant.hs b/Assistant.hs index e924d9477..554c37290 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -75,8 +75,8 @@ startDaemon foreground -- begin adding files and having them -- committed, even while the startup scan -- is taking place. - _ <- forkIO $ commitThread st changechan _ <- forkIO $ daemonStatusThread st dstatus + _ <- forkIO $ commitThread st dstatus changechan _ <- forkIO $ sanityCheckerThread st dstatus changechan -- Does not return. watchThread st dstatus changechan diff --git a/Assistant/Committer.hs b/Assistant/Committer.hs index 74f0922b7..600034a0a 100644 --- a/Assistant/Committer.hs +++ b/Assistant/Committer.hs @@ -7,6 +7,7 @@ module Assistant.Committer where import Common.Annex import Assistant.Changes +import Assistant.DaemonStatus import Assistant.ThreadedMonad import Assistant.Watcher import qualified Annex @@ -18,15 +19,15 @@ import qualified Command.Add import Utility.ThreadScheduler import qualified Utility.Lsof as Lsof import qualified Utility.DirWatcher as DirWatcher -import Types.Backend +import Types.KeySource import Data.Time.Clock import Data.Tuple.Utils import qualified Data.Set as S {- This thread makes git commits at appropriate times. -} -commitThread :: ThreadState -> ChangeChan -> IO () -commitThread st changechan = runEvery (Seconds 1) $ do +commitThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO () +commitThread st dstatus changechan = runEvery (Seconds 1) $ do -- We already waited one second as a simple rate limiter. -- Next, wait until at least one change has been made. cs <- getChanges changechan @@ -34,7 +35,7 @@ commitThread st changechan = runEvery (Seconds 1) $ do time <- getCurrentTime if shouldCommit time cs then do - handleAdds st changechan cs + handleAdds st dstatus changechan cs void $ tryIO $ runThreadState st commitStaged else refillChanges changechan cs @@ -79,19 +80,20 @@ shouldCommit now changes - - When a file is added, Inotify will notice the new symlink. So this waits - for additional Changes to arrive, so that the symlink has hopefully been - - staged before returning, and will be committed immediately. OTOH, for - - kqueue, eventsCoalesce, so instead the symlink is directly created and - - staged. + - staged before returning, and will be committed immediately. + - + - OTOH, for kqueue, eventsCoalesce, so instead the symlink is directly + - created and staged, if the file is not open. -} -handleAdds :: ThreadState -> ChangeChan -> [Change] -> IO () -handleAdds st changechan cs +handleAdds :: ThreadState -> DaemonStatusHandle -> ChangeChan -> [Change] -> IO () +handleAdds st dstatus changechan cs | null toadd = noop | otherwise = do - toadd' <- safeToAdd st toadd + toadd' <- safeToAdd st dstatus toadd unless (null toadd') $ do added <- filter id <$> forM toadd' add unless (DirWatcher.eventsCoalesce || null added) $ - handleAdds st changechan + handleAdds st dstatus changechan =<< getChanges changechan where toadd = map changeFile $ filter isPendingAdd cs @@ -122,8 +124,8 @@ handleAdds st changechan cs - opened for write, so lsof is run on the temp directory - to check them. -} -safeToAdd :: ThreadState -> [FilePath] -> IO [KeySource] -safeToAdd st files = do +safeToAdd :: ThreadState -> DaemonStatusHandle -> [FilePath] -> IO [KeySource] +safeToAdd st dstatus files = do locked <- catMaybes <$> lockdown files runThreadState st $ ifM (Annex.getState Annex.force) ( return locked -- force bypasses lsof check @@ -134,16 +136,33 @@ safeToAdd st files = do catMaybes <$> forM locked (go open) ) where + {- When a file is still open, it can be put into pendingAdd + - to be checked again later. However when closingTracked + - is supported, another event will be received once it's + - closed, so there's no point in doing so. -} go open keysource | S.member (contentLocation keysource) open = do - warning $ keyFilename keysource - ++ " still has writers, not adding" - -- remove the hard link - --_ <- liftIO $ tryIO $ - -- removeFile $ contentLocation keysource + if DirWatcher.closingTracked + then do + warning $ keyFilename keysource + ++ " still has writers, not adding" + void $ liftIO $ canceladd keysource + else void $ addpending keysource return Nothing | otherwise = return $ Just keysource + canceladd keysource = tryIO $ + -- remove the hard link + removeFile $ contentLocation keysource + + {- The same file (or a file with the same name) + - could already be pending add; if so this KeySource + - superscedes the old one. -} + addpending keysource = modifyDaemonStatusM dstatus $ \s -> do + let set = pendingAdd s + mapM_ canceladd $ S.toList $ S.filter (== keysource) set + return $ s { pendingAdd = S.insert keysource set } + lockdown = mapM $ \file -> do ms <- catchMaybeIO $ getSymbolicLinkStatus file case ms of diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs index e5ba3d151..289a97bb2 100644 --- a/Assistant/DaemonStatus.hs +++ b/Assistant/DaemonStatus.hs @@ -9,12 +9,14 @@ import Common.Annex import Assistant.ThreadedMonad import Utility.ThreadScheduler import Utility.TempFile +import Types.KeySource import Control.Concurrent import System.Posix.Types import Data.Time.Clock.POSIX import Data.Time import System.Locale +import qualified Data.Set as S data DaemonStatus = DaemonStatus -- False when the daemon is performing its startup scan @@ -25,6 +27,8 @@ data DaemonStatus = DaemonStatus , sanityCheckRunning :: Bool -- Last time the sanity checker ran , lastSanityCheck :: Maybe POSIXTime + -- Files that are in the process of being added to the annex. + , pendingAdd :: S.Set KeySource } deriving (Show) @@ -36,13 +40,17 @@ newDaemonStatus = DaemonStatus , lastRunning = Nothing , sanityCheckRunning = False , lastSanityCheck = Nothing + , pendingAdd = S.empty } getDaemonStatus :: DaemonStatusHandle -> Annex DaemonStatus getDaemonStatus = liftIO . readMVar modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> Annex () -modifyDaemonStatus handle a = liftIO $ modifyMVar_ handle (return . a) +modifyDaemonStatus handle a = modifyDaemonStatusM handle (return . a) + +modifyDaemonStatusM :: DaemonStatusHandle -> (DaemonStatus -> IO DaemonStatus) -> Annex () +modifyDaemonStatusM handle a = liftIO $ modifyMVar_ handle a {- Load any previous daemon status file, and store it in the MVar for this - process to use as its DaemonStatus. -} diff --git a/Backend.hs b/Backend.hs index bde1aad78..d1dfdef3c 100644 --- a/Backend.hs +++ b/Backend.hs @@ -6,7 +6,6 @@ -} module Backend ( - B.KeySource(..), list, orderedList, genKey, @@ -23,6 +22,7 @@ import Config import qualified Annex import Annex.CheckAttr import Types.Key +import Types.KeySource import qualified Types.Backend as B -- When adding a new backend, import it here and add it to the list. @@ -54,12 +54,12 @@ orderedList = do {- Generates a key for a file, trying each backend in turn until one - accepts it. -} -genKey :: B.KeySource -> Maybe Backend -> Annex (Maybe (Key, Backend)) +genKey :: KeySource -> Maybe Backend -> Annex (Maybe (Key, Backend)) genKey source trybackend = do bs <- orderedList let bs' = maybe bs (: bs) trybackend genKey' bs' source -genKey' :: [Backend] -> B.KeySource -> Annex (Maybe (Key, Backend)) +genKey' :: [Backend] -> KeySource -> Annex (Maybe (Key, Backend)) genKey' [] _ = return Nothing genKey' (b:bs) source = do r <- B.getKey b source diff --git a/Backend/SHA.hs b/Backend/SHA.hs index df613bbcd..838a97ab8 100644 --- a/Backend/SHA.hs +++ b/Backend/SHA.hs @@ -11,6 +11,7 @@ import Common.Annex import qualified Annex import Types.Backend import Types.Key +import Types.KeySource import qualified Build.SysConfig as SysConfig type SHASize = Int diff --git a/Backend/WORM.hs b/Backend/WORM.hs index 630000fa2..523203713 100644 --- a/Backend/WORM.hs +++ b/Backend/WORM.hs @@ -10,6 +10,7 @@ module Backend.WORM (backends) where import Common.Annex import Types.Backend import Types.Key +import Types.KeySource backends :: [Backend] backends = [backend] diff --git a/Command/Add.hs b/Command/Add.hs index 43f186fbf..73edb5eaa 100644 --- a/Command/Add.hs +++ b/Command/Add.hs @@ -12,6 +12,7 @@ import Annex.Exception import Command import qualified Annex import qualified Annex.Queue +import Types.KeySource import Backend import Logs.Location import Annex.Content diff --git a/Command/AddUrl.hs b/Command/AddUrl.hs index 369940bdf..bef1d6875 100644 --- a/Command/AddUrl.hs +++ b/Command/AddUrl.hs @@ -20,6 +20,7 @@ import Annex.Content import Logs.Web import qualified Option import Types.Key +import Types.KeySource import Config def :: [Command] diff --git a/Command/Migrate.hs b/Command/Migrate.hs index 29e664ce2..c7c0d7af3 100644 --- a/Command/Migrate.hs +++ b/Command/Migrate.hs @@ -11,6 +11,7 @@ import Common.Annex import Command import Backend import qualified Types.Key +import Types.KeySource import Annex.Content import qualified Command.ReKey diff --git a/Types/Backend.hs b/Types/Backend.hs index 97f7cef90..d79787c27 100644 --- a/Types/Backend.hs +++ b/Types/Backend.hs @@ -10,13 +10,7 @@ module Types.Backend where import Types.Key - -{- The source used to generate a key. The location of the content - - may be different from the filename associated with the key. -} -data KeySource = KeySource - { keyFilename :: FilePath - , contentLocation :: FilePath - } +import Types.KeySource data BackendA a = Backend { name :: String diff --git a/Types/KeySource.hs b/Types/KeySource.hs new file mode 100644 index 000000000..9d1fa173f --- /dev/null +++ b/Types/KeySource.hs @@ -0,0 +1,33 @@ +{- KeySource data type + - + - Copyright 2012 Joey Hess + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Types.KeySource where + +import Data.Ord + +{- When content is in the process of being added to the annex, + - and a Key generated from it, this data type is used. + - + - The contentLocation may be different from the filename + - associated with the key. For example, the add command + - temporarily puts the content into a lockdown directory + - for checking. The migrate command uses the content + - of a different Key. -} +data KeySource = KeySource + { keyFilename :: FilePath + , contentLocation :: FilePath + } + deriving (Show) + +{- KeySources are assumed to be equal when the same filename is associated + - with the key. The contentLocation can be a random temp file. + -} +instance Eq KeySource where + x == y = keyFilename x == keyFilename y + +instance Ord KeySource where + compare = comparing keyFilename -- cgit v1.2.3 From 33b914bcf1f277aecccb4194e296f17f4708e434 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 20 Jun 2012 19:04:16 -0400 Subject: pending adds now retried for kqueue Rethought how to keep track of pending adds that need to be retried later. The commit thread already run up every second when there are changes, so let's keep pending adds queued as changes until they're safe to add. Also, the committer is now smarter about avoiding empty commits when all the adds are currently unsafe, or in the rare case that an add event for a symlink is not received in time. It may avoid them entirely. This seems to work as before for inotify, and is untested for kqueue. (Actually commit batching seems to be improved for inotify, although I'm not sure why. I'm seeing only two commits made during large batch operations, and the first of those is the non-batch mode commit.) --- Assistant.hs | 2 +- Assistant/Changes.hs | 38 ++++++++--- Assistant/Committer.hs | 162 ++++++++++++++++++++++++---------------------- Assistant/DaemonStatus.hs | 10 +-- Assistant/Watcher.hs | 32 +++++---- 5 files changed, 136 insertions(+), 108 deletions(-) diff --git a/Assistant.hs b/Assistant.hs index 554c37290..e924d9477 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -75,8 +75,8 @@ startDaemon foreground -- begin adding files and having them -- committed, even while the startup scan -- is taking place. + _ <- forkIO $ commitThread st changechan _ <- forkIO $ daemonStatusThread st dstatus - _ <- forkIO $ commitThread st dstatus changechan _ <- forkIO $ sanityCheckerThread st dstatus changechan -- Does not return. watchThread st dstatus changechan diff --git a/Assistant/Changes.hs b/Assistant/Changes.hs index 1cad42326..173ba1922 100644 --- a/Assistant/Changes.hs +++ b/Assistant/Changes.hs @@ -7,20 +7,26 @@ module Assistant.Changes where import Common.Annex import qualified Annex.Queue +import Types.KeySource import Control.Concurrent.STM import Data.Time.Clock -data ChangeType = PendingAddChange | LinkChange | RmChange | RmDirChange +data ChangeType = AddChange | LinkChange | RmChange | RmDirChange deriving (Show, Eq) type ChangeChan = TChan Change -data Change = Change - { changeTime :: UTCTime - , changeFile :: FilePath - , changeType :: ChangeType - } +data Change + = Change + { changeTime :: UTCTime + , changeFile :: FilePath + , changeType :: ChangeType + } + | PendingAddChange + { changeTime ::UTCTime + , keySource :: KeySource + } deriving (Show) runChangeChan :: STM a -> IO a @@ -33,13 +39,29 @@ newChangeChan = atomically newTChan madeChange :: FilePath -> ChangeType -> Annex (Maybe Change) madeChange f t = do -- Just in case the commit thread is not flushing the queue fast enough. - when (t /= PendingAddChange) $ - Annex.Queue.flushWhenFull + Annex.Queue.flushWhenFull liftIO $ Just <$> (Change <$> getCurrentTime <*> pure f <*> pure t) noChange :: Annex (Maybe Change) noChange = return Nothing +{- Indicates an add is in progress. -} +pendingAddChange :: KeySource -> Annex (Maybe Change) +pendingAddChange ks = + liftIO $ Just <$> (PendingAddChange <$> getCurrentTime <*> pure ks) + +isPendingAddChange :: Change -> Bool +isPendingAddChange (PendingAddChange {}) = True +isPendingAddChange _ = False + +finishedChange :: Change -> Change +finishedChange c@(PendingAddChange { keySource = ks }) = Change + { changeTime = changeTime c + , changeFile = keyFilename ks + , changeType = AddChange + } +finishedChange c = c + {- Gets all unhandled changes. - Blocks until at least one change is made. -} getChanges :: ChangeChan -> IO [Change] diff --git a/Assistant/Committer.hs b/Assistant/Committer.hs index 600034a0a..46fee1b74 100644 --- a/Assistant/Committer.hs +++ b/Assistant/Committer.hs @@ -7,7 +7,6 @@ module Assistant.Committer where import Common.Annex import Assistant.Changes -import Assistant.DaemonStatus import Assistant.ThreadedMonad import Assistant.Watcher import qualified Annex @@ -24,20 +23,25 @@ import Types.KeySource import Data.Time.Clock import Data.Tuple.Utils import qualified Data.Set as S +import Data.Either {- This thread makes git commits at appropriate times. -} -commitThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO () -commitThread st dstatus changechan = runEvery (Seconds 1) $ do +commitThread :: ThreadState -> ChangeChan -> IO () +commitThread st changechan = runEvery (Seconds 1) $ do -- We already waited one second as a simple rate limiter. - -- Next, wait until at least one change has been made. - cs <- getChanges changechan + -- Next, wait until at least one change is available for + -- processing. + changes <- getChanges changechan -- Now see if now's a good time to commit. time <- getCurrentTime - if shouldCommit time cs + if shouldCommit time changes then do - handleAdds st dstatus changechan cs - void $ tryIO $ runThreadState st commitStaged - else refillChanges changechan cs + readychanges <- handleAdds st changechan changes + if shouldCommit time readychanges + then do + void $ tryIO $ runThreadState st commitStaged + else refillChanges changechan readychanges + else refillChanges changechan changes commitStaged :: Annex () commitStaged = do @@ -83,95 +87,99 @@ shouldCommit now changes - staged before returning, and will be committed immediately. - - OTOH, for kqueue, eventsCoalesce, so instead the symlink is directly - - created and staged, if the file is not open. + - created and staged. + - + - Returns a list of all changes that are ready to be committed. + - Any pending adds that are not ready yet are put back into the ChangeChan, + - where they will be retried later. -} -handleAdds :: ThreadState -> DaemonStatusHandle -> ChangeChan -> [Change] -> IO () -handleAdds st dstatus changechan cs - | null toadd = noop - | otherwise = do - toadd' <- safeToAdd st dstatus toadd - unless (null toadd') $ do - added <- filter id <$> forM toadd' add - unless (DirWatcher.eventsCoalesce || null added) $ - handleAdds st dstatus changechan +handleAdds :: ThreadState -> ChangeChan -> [Change] -> IO [Change] +handleAdds st changechan cs = returnWhen (null pendingadds) $ do + (postponed, toadd) <- partitionEithers <$> + safeToAdd st pendingadds + + unless (null postponed) $ + refillChanges changechan postponed + + returnWhen (null toadd) $ do + added <- catMaybes <$> forM toadd add + if (DirWatcher.eventsCoalesce || null added) + then return $ added ++ otherchanges + else do + r <- handleAdds st changechan =<< getChanges changechan + return $ r ++ added ++ otherchanges where - toadd = map changeFile $ filter isPendingAdd cs + (pendingadds, otherchanges) = partition isPendingAddChange cs + + returnWhen c a + | c = return otherchanges + | otherwise = a - isPendingAdd (Change { changeType = PendingAddChange }) = True - isPendingAdd _ = False + add :: Change -> IO (Maybe Change) + add change@(PendingAddChange { keySource = ks }) = do + r <- catchMaybeIO $ runThreadState st $ do + showStart "add" $ keyFilename ks + handle (finishedChange change) (keyFilename ks) + =<< Command.Add.ingest ks + return $ maybeMaybe r + add _ = return Nothing - add keysource = catchBoolIO $ runThreadState st $ do - showStart "add" $ keyFilename keysource - handle (keyFilename keysource) - =<< Command.Add.ingest keysource + maybeMaybe (Just j@(Just _)) = j + maybeMaybe _ = Nothing - handle _ Nothing = do + handle _ _ Nothing = do showEndFail - return False - handle file (Just key) = do + return Nothing + handle change file (Just key) = do link <- Command.Add.link file key True when DirWatcher.eventsCoalesce $ do sha <- inRepo $ Git.HashObject.hashObject BlobObject link stageSymlink file sha showEndOk - return True + return $ Just change -{- Checks which of a set of files can safely be added. - - Files are locked down as hard links in a temp directory, - - with their write bits disabled. But some may still be - - opened for write, so lsof is run on the temp directory - - to check them. +{- PendingAddChanges can Either be Right to be added now, + - or are unsafe, and must be Left for later. + - + - Check by running lsof on the temp directory, which + - the KeySources are locked down in. -} -safeToAdd :: ThreadState -> DaemonStatusHandle -> [FilePath] -> IO [KeySource] -safeToAdd st dstatus files = do - locked <- catMaybes <$> lockdown files - runThreadState st $ ifM (Annex.getState Annex.force) - ( return locked -- force bypasses lsof check +safeToAdd :: ThreadState -> [Change] -> IO [Either Change Change] +safeToAdd st changes = runThreadState st $ + ifM (Annex.getState Annex.force) + ( allRight changes -- force bypasses lsof check , do tmpdir <- fromRepo gitAnnexTmpDir - open <- S.fromList . map fst3 . filter openwrite <$> + openfiles <- S.fromList . map fst3 . filter openwrite <$> liftIO (Lsof.queryDir tmpdir) - catMaybes <$> forM locked (go open) + + let checked = map (check openfiles) changes + + {- If new events are received when files are closed, + - there's no need to retry any changes that cannot + - be done now. -} + if DirWatcher.closingTracked + then do + mapM_ canceladd $ lefts checked + allRight $ rights checked + else return checked ) where - {- When a file is still open, it can be put into pendingAdd - - to be checked again later. However when closingTracked - - is supported, another event will be received once it's - - closed, so there's no point in doing so. -} - go open keysource - | S.member (contentLocation keysource) open = do - if DirWatcher.closingTracked - then do - warning $ keyFilename keysource - ++ " still has writers, not adding" - void $ liftIO $ canceladd keysource - else void $ addpending keysource - return Nothing - | otherwise = return $ Just keysource - - canceladd keysource = tryIO $ + check openfiles change@(PendingAddChange { keySource = ks }) + | S.member (contentLocation ks) openfiles = Left change + check _ change = Right change + + canceladd (PendingAddChange { keySource = ks }) = do + warning $ keyFilename ks + ++ " still has writers, not adding" -- remove the hard link - removeFile $ contentLocation keysource - - {- The same file (or a file with the same name) - - could already be pending add; if so this KeySource - - superscedes the old one. -} - addpending keysource = modifyDaemonStatusM dstatus $ \s -> do - let set = pendingAdd s - mapM_ canceladd $ S.toList $ S.filter (== keysource) set - return $ s { pendingAdd = S.insert keysource set } - - lockdown = mapM $ \file -> do - ms <- catchMaybeIO $ getSymbolicLinkStatus file - case ms of - Just s - | isRegularFile s -> - catchMaybeIO $ runThreadState st $ - Command.Add.lockDown file - _ -> return Nothing - + void $ liftIO $ tryIO $ + removeFile $ contentLocation ks + canceladd _ = noop openwrite (_file, mode, _pid) = mode == Lsof.OpenWriteOnly || mode == Lsof.OpenReadWrite + + allRight = return . map Right diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs index 289a97bb2..e5ba3d151 100644 --- a/Assistant/DaemonStatus.hs +++ b/Assistant/DaemonStatus.hs @@ -9,14 +9,12 @@ import Common.Annex import Assistant.ThreadedMonad import Utility.ThreadScheduler import Utility.TempFile -import Types.KeySource import Control.Concurrent import System.Posix.Types import Data.Time.Clock.POSIX import Data.Time import System.Locale -import qualified Data.Set as S data DaemonStatus = DaemonStatus -- False when the daemon is performing its startup scan @@ -27,8 +25,6 @@ data DaemonStatus = DaemonStatus , sanityCheckRunning :: Bool -- Last time the sanity checker ran , lastSanityCheck :: Maybe POSIXTime - -- Files that are in the process of being added to the annex. - , pendingAdd :: S.Set KeySource } deriving (Show) @@ -40,17 +36,13 @@ newDaemonStatus = DaemonStatus , lastRunning = Nothing , sanityCheckRunning = False , lastSanityCheck = Nothing - , pendingAdd = S.empty } getDaemonStatus :: DaemonStatusHandle -> Annex DaemonStatus getDaemonStatus = liftIO . readMVar modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> Annex () -modifyDaemonStatus handle a = modifyDaemonStatusM handle (return . a) - -modifyDaemonStatusM :: DaemonStatusHandle -> (DaemonStatus -> IO DaemonStatus) -> Annex () -modifyDaemonStatusM handle a = liftIO $ modifyMVar_ handle a +modifyDaemonStatus handle a = liftIO $ modifyMVar_ handle (return . a) {- Load any previous daemon status file, and store it in the MVar for this - process to use as its DaemonStatus. -} diff --git a/Assistant/Watcher.hs b/Assistant/Watcher.hs index cb7ede920..db58f01e8 100644 --- a/Assistant/Watcher.hs +++ b/Assistant/Watcher.hs @@ -15,13 +15,14 @@ import Assistant.DaemonStatus import Assistant.Changes import Utility.DirWatcher import Utility.Types.DirWatcher +import qualified Annex import qualified Annex.Queue import qualified Git.Command import qualified Git.UpdateIndex import qualified Git.HashObject import qualified Git.LsFiles import qualified Backend -import qualified Annex +import qualified Command.Add import Annex.Content import Annex.CatFile import Git.Types @@ -110,22 +111,27 @@ runHandler st dstatus changechan handler file filestatus = void $ do - and only one has just closed it. We want to avoid adding a file to the - annex that is open for write, to avoid anything being able to change it. - - - We could run lsof on the file here to check for other writer. - - But, that's slow. Instead, a Change is returned that indicates this file - - still needs to be added. The committer will handle bundles of these - - Changes at once. + - We could run lsof on the file here to check for other writers. + - But, that's slow, and even if there is currently a writer, we will want + - to add the file *eventually*. Instead, the file is locked down as a hard + - link in a temp directory, with its write bits disabled, for later + - checking with lsof, and a Change is returned containing a KeySource + - using that hard link. The committer handles running lsof and finishing + - the add. -} onAdd :: Handler -onAdd file _filestatus dstatus = do - ifM (scanComplete <$> getDaemonStatus dstatus) - ( go - , ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file])) - ( noChange - , go +onAdd file filestatus dstatus + | maybe False isRegularFile filestatus = do + ifM (scanComplete <$> getDaemonStatus dstatus) + ( go + , ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file])) + ( noChange + , go + ) ) - ) + | otherwise = noChange where - go = madeChange file PendingAddChange + go = pendingAddChange =<< Command.Add.lockDown file {- A symlink might be an arbitrary symlink, which is just added. - Or, if it is a git-annex symlink, ensure it points to the content -- cgit v1.2.3 From 75dba7f7bc50b75e08ee49cb4b758a375ef70d68 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 20 Jun 2012 20:05:40 -0400 Subject: belt and suspenders check It's possible for there to be multiple queued changes all adding the same file, and for those changes to be reordered. Maybe. This check will guard against that ending up adding the wrong version of the file last. --- Assistant/Committer.hs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/Assistant/Committer.hs b/Assistant/Committer.hs index 46fee1b74..63df8cafc 100644 --- a/Assistant/Committer.hs +++ b/Assistant/Committer.hs @@ -118,7 +118,7 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do add :: Change -> IO (Maybe Change) add change@(PendingAddChange { keySource = ks }) = do - r <- catchMaybeIO $ runThreadState st $ do + r <- catchMaybeIO $ sanitycheck ks $ runThreadState st $ do showStart "add" $ keyFilename ks handle (finishedChange change) (keyFilename ks) =<< Command.Add.ingest ks @@ -140,6 +140,16 @@ handleAdds st changechan cs = returnWhen (null pendingadds) $ do showEndOk return $ Just change + {- Check that the keysource's keyFilename still exists, + - and is still a hard link to its contentLocation, + - before ingesting it. -} + sanitycheck keysource a = do + fs <- getSymbolicLinkStatus $ keyFilename keysource + ks <- getSymbolicLinkStatus $ contentLocation keysource + if deviceID ks == deviceID fs && fileID ks == fileID fs + then a + else return Nothing + {- PendingAddChanges can Either be Right to be added now, - or are unsafe, and must be Left for later. - -- cgit v1.2.3 From 7db83a1b0ff49ddbc316556d416ce67418428d13 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Thu, 21 Jun 2012 00:28:56 -0400 Subject: demote lsof not available on kfreebsd, and only used by watch --- debian/control | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/debian/control b/debian/control index 3b142dc5f..bcecbec3d 100644 --- a/debian/control +++ b/debian/control @@ -41,8 +41,8 @@ Depends: ${misc:Depends}, ${shlibs:Depends}, uuid, rsync, wget | curl, - openssh-client (>= 1:5.6p1), - lsof + openssh-client (>= 1:5.6p1) +Recommends: lsof Suggests: graphviz, bup, gnupg Description: manage files with git, without checking their contents into git git-annex allows managing files with git, without checking the file -- cgit v1.2.3