summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Assistant.hs2
-rw-r--r--Assistant/Changes.hs38
-rw-r--r--Assistant/Committer.hs162
-rw-r--r--Assistant/DaemonStatus.hs10
-rw-r--r--Assistant/Watcher.hs32
5 files changed, 136 insertions, 108 deletions
diff --git a/Assistant.hs b/Assistant.hs
index 554c37290..e924d9477 100644
--- a/Assistant.hs
+++ b/Assistant.hs
@@ -75,8 +75,8 @@ startDaemon foreground
-- begin adding files and having them
-- committed, even while the startup scan
-- is taking place.
+ _ <- forkIO $ commitThread st changechan
_ <- forkIO $ daemonStatusThread st dstatus
- _ <- forkIO $ commitThread st dstatus changechan
_ <- forkIO $ sanityCheckerThread st dstatus changechan
-- Does not return.
watchThread st dstatus changechan
diff --git a/Assistant/Changes.hs b/Assistant/Changes.hs
index 1cad42326..173ba1922 100644
--- a/Assistant/Changes.hs
+++ b/Assistant/Changes.hs
@@ -7,20 +7,26 @@ module Assistant.Changes where
import Common.Annex
import qualified Annex.Queue
+import Types.KeySource
import Control.Concurrent.STM
import Data.Time.Clock
-data ChangeType = PendingAddChange | LinkChange | RmChange | RmDirChange
+data ChangeType = AddChange | LinkChange | RmChange | RmDirChange
deriving (Show, Eq)
type ChangeChan = TChan Change
-data Change = Change
- { changeTime :: UTCTime
- , changeFile :: FilePath
- , changeType :: ChangeType
- }
+data Change
+ = Change
+ { changeTime :: UTCTime
+ , changeFile :: FilePath
+ , changeType :: ChangeType
+ }
+ | PendingAddChange
+ { changeTime ::UTCTime
+ , keySource :: KeySource
+ }
deriving (Show)
runChangeChan :: STM a -> IO a
@@ -33,13 +39,29 @@ newChangeChan = atomically newTChan
madeChange :: FilePath -> ChangeType -> Annex (Maybe Change)
madeChange f t = do
-- Just in case the commit thread is not flushing the queue fast enough.
- when (t /= PendingAddChange) $
- Annex.Queue.flushWhenFull
+ Annex.Queue.flushWhenFull
liftIO $ Just <$> (Change <$> getCurrentTime <*> pure f <*> pure t)
noChange :: Annex (Maybe Change)
noChange = return Nothing
+{- Indicates an add is in progress. -}
+pendingAddChange :: KeySource -> Annex (Maybe Change)
+pendingAddChange ks =
+ liftIO $ Just <$> (PendingAddChange <$> getCurrentTime <*> pure ks)
+
+isPendingAddChange :: Change -> Bool
+isPendingAddChange (PendingAddChange {}) = True
+isPendingAddChange _ = False
+
+finishedChange :: Change -> Change
+finishedChange c@(PendingAddChange { keySource = ks }) = Change
+ { changeTime = changeTime c
+ , changeFile = keyFilename ks
+ , changeType = AddChange
+ }
+finishedChange c = c
+
{- Gets all unhandled changes.
- Blocks until at least one change is made. -}
getChanges :: ChangeChan -> IO [Change]
diff --git a/Assistant/Committer.hs b/Assistant/Committer.hs
index 600034a0a..46fee1b74 100644
--- a/Assistant/Committer.hs
+++ b/Assistant/Committer.hs
@@ -7,7 +7,6 @@ module Assistant.Committer where
import Common.Annex
import Assistant.Changes
-import Assistant.DaemonStatus
import Assistant.ThreadedMonad
import Assistant.Watcher
import qualified Annex
@@ -24,20 +23,25 @@ import Types.KeySource
import Data.Time.Clock
import Data.Tuple.Utils
import qualified Data.Set as S
+import Data.Either
{- This thread makes git commits at appropriate times. -}
-commitThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO ()
-commitThread st dstatus changechan = runEvery (Seconds 1) $ do
+commitThread :: ThreadState -> ChangeChan -> IO ()
+commitThread st changechan = runEvery (Seconds 1) $ do
-- We already waited one second as a simple rate limiter.
- -- Next, wait until at least one change has been made.
- cs <- getChanges changechan
+ -- Next, wait until at least one change is available for
+ -- processing.
+ changes <- getChanges changechan
-- Now see if now's a good time to commit.
time <- getCurrentTime
- if shouldCommit time cs
+ if shouldCommit time changes
then do
- handleAdds st dstatus changechan cs
- void $ tryIO $ runThreadState st commitStaged
- else refillChanges changechan cs
+ readychanges <- handleAdds st changechan changes
+ if shouldCommit time readychanges
+ then do
+ void $ tryIO $ runThreadState st commitStaged
+ else refillChanges changechan readychanges
+ else refillChanges changechan changes
commitStaged :: Annex ()
commitStaged = do
@@ -83,95 +87,99 @@ shouldCommit now changes
- staged before returning, and will be committed immediately.
-
- OTOH, for kqueue, eventsCoalesce, so instead the symlink is directly
- - created and staged, if the file is not open.
+ - created and staged.
+ -
+ - Returns a list of all changes that are ready to be committed.
+ - Any pending adds that are not ready yet are put back into the ChangeChan,
+ - where they will be retried later.
-}
-handleAdds :: ThreadState -> DaemonStatusHandle -> ChangeChan -> [Change] -> IO ()
-handleAdds st dstatus changechan cs
- | null toadd = noop
- | otherwise = do
- toadd' <- safeToAdd st dstatus toadd
- unless (null toadd') $ do
- added <- filter id <$> forM toadd' add
- unless (DirWatcher.eventsCoalesce || null added) $
- handleAdds st dstatus changechan
+handleAdds :: ThreadState -> ChangeChan -> [Change] -> IO [Change]
+handleAdds st changechan cs = returnWhen (null pendingadds) $ do
+ (postponed, toadd) <- partitionEithers <$>
+ safeToAdd st pendingadds
+
+ unless (null postponed) $
+ refillChanges changechan postponed
+
+ returnWhen (null toadd) $ do
+ added <- catMaybes <$> forM toadd add
+ if (DirWatcher.eventsCoalesce || null added)
+ then return $ added ++ otherchanges
+ else do
+ r <- handleAdds st changechan
=<< getChanges changechan
+ return $ r ++ added ++ otherchanges
where
- toadd = map changeFile $ filter isPendingAdd cs
+ (pendingadds, otherchanges) = partition isPendingAddChange cs
+
+ returnWhen c a
+ | c = return otherchanges
+ | otherwise = a
- isPendingAdd (Change { changeType = PendingAddChange }) = True
- isPendingAdd _ = False
+ add :: Change -> IO (Maybe Change)
+ add change@(PendingAddChange { keySource = ks }) = do
+ r <- catchMaybeIO $ runThreadState st $ do
+ showStart "add" $ keyFilename ks
+ handle (finishedChange change) (keyFilename ks)
+ =<< Command.Add.ingest ks
+ return $ maybeMaybe r
+ add _ = return Nothing
- add keysource = catchBoolIO $ runThreadState st $ do
- showStart "add" $ keyFilename keysource
- handle (keyFilename keysource)
- =<< Command.Add.ingest keysource
+ maybeMaybe (Just j@(Just _)) = j
+ maybeMaybe _ = Nothing
- handle _ Nothing = do
+ handle _ _ Nothing = do
showEndFail
- return False
- handle file (Just key) = do
+ return Nothing
+ handle change file (Just key) = do
link <- Command.Add.link file key True
when DirWatcher.eventsCoalesce $ do
sha <- inRepo $
Git.HashObject.hashObject BlobObject link
stageSymlink file sha
showEndOk
- return True
+ return $ Just change
-{- Checks which of a set of files can safely be added.
- - Files are locked down as hard links in a temp directory,
- - with their write bits disabled. But some may still be
- - opened for write, so lsof is run on the temp directory
- - to check them.
+{- PendingAddChanges can Either be Right to be added now,
+ - or are unsafe, and must be Left for later.
+ -
+ - Check by running lsof on the temp directory, which
+ - the KeySources are locked down in.
-}
-safeToAdd :: ThreadState -> DaemonStatusHandle -> [FilePath] -> IO [KeySource]
-safeToAdd st dstatus files = do
- locked <- catMaybes <$> lockdown files
- runThreadState st $ ifM (Annex.getState Annex.force)
- ( return locked -- force bypasses lsof check
+safeToAdd :: ThreadState -> [Change] -> IO [Either Change Change]
+safeToAdd st changes = runThreadState st $
+ ifM (Annex.getState Annex.force)
+ ( allRight changes -- force bypasses lsof check
, do
tmpdir <- fromRepo gitAnnexTmpDir
- open <- S.fromList . map fst3 . filter openwrite <$>
+ openfiles <- S.fromList . map fst3 . filter openwrite <$>
liftIO (Lsof.queryDir tmpdir)
- catMaybes <$> forM locked (go open)
+
+ let checked = map (check openfiles) changes
+
+ {- If new events are received when files are closed,
+ - there's no need to retry any changes that cannot
+ - be done now. -}
+ if DirWatcher.closingTracked
+ then do
+ mapM_ canceladd $ lefts checked
+ allRight $ rights checked
+ else return checked
)
where
- {- When a file is still open, it can be put into pendingAdd
- - to be checked again later. However when closingTracked
- - is supported, another event will be received once it's
- - closed, so there's no point in doing so. -}
- go open keysource
- | S.member (contentLocation keysource) open = do
- if DirWatcher.closingTracked
- then do
- warning $ keyFilename keysource
- ++ " still has writers, not adding"
- void $ liftIO $ canceladd keysource
- else void $ addpending keysource
- return Nothing
- | otherwise = return $ Just keysource
-
- canceladd keysource = tryIO $
+ check openfiles change@(PendingAddChange { keySource = ks })
+ | S.member (contentLocation ks) openfiles = Left change
+ check _ change = Right change
+
+ canceladd (PendingAddChange { keySource = ks }) = do
+ warning $ keyFilename ks
+ ++ " still has writers, not adding"
-- remove the hard link
- removeFile $ contentLocation keysource
-
- {- The same file (or a file with the same name)
- - could already be pending add; if so this KeySource
- - superscedes the old one. -}
- addpending keysource = modifyDaemonStatusM dstatus $ \s -> do
- let set = pendingAdd s
- mapM_ canceladd $ S.toList $ S.filter (== keysource) set
- return $ s { pendingAdd = S.insert keysource set }
-
- lockdown = mapM $ \file -> do
- ms <- catchMaybeIO $ getSymbolicLinkStatus file
- case ms of
- Just s
- | isRegularFile s ->
- catchMaybeIO $ runThreadState st $
- Command.Add.lockDown file
- _ -> return Nothing
-
+ void $ liftIO $ tryIO $
+ removeFile $ contentLocation ks
+ canceladd _ = noop
openwrite (_file, mode, _pid) =
mode == Lsof.OpenWriteOnly || mode == Lsof.OpenReadWrite
+
+ allRight = return . map Right
diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs
index 289a97bb2..e5ba3d151 100644
--- a/Assistant/DaemonStatus.hs
+++ b/Assistant/DaemonStatus.hs
@@ -9,14 +9,12 @@ import Common.Annex
import Assistant.ThreadedMonad
import Utility.ThreadScheduler
import Utility.TempFile
-import Types.KeySource
import Control.Concurrent
import System.Posix.Types
import Data.Time.Clock.POSIX
import Data.Time
import System.Locale
-import qualified Data.Set as S
data DaemonStatus = DaemonStatus
-- False when the daemon is performing its startup scan
@@ -27,8 +25,6 @@ data DaemonStatus = DaemonStatus
, sanityCheckRunning :: Bool
-- Last time the sanity checker ran
, lastSanityCheck :: Maybe POSIXTime
- -- Files that are in the process of being added to the annex.
- , pendingAdd :: S.Set KeySource
}
deriving (Show)
@@ -40,17 +36,13 @@ newDaemonStatus = DaemonStatus
, lastRunning = Nothing
, sanityCheckRunning = False
, lastSanityCheck = Nothing
- , pendingAdd = S.empty
}
getDaemonStatus :: DaemonStatusHandle -> Annex DaemonStatus
getDaemonStatus = liftIO . readMVar
modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> Annex ()
-modifyDaemonStatus handle a = modifyDaemonStatusM handle (return . a)
-
-modifyDaemonStatusM :: DaemonStatusHandle -> (DaemonStatus -> IO DaemonStatus) -> Annex ()
-modifyDaemonStatusM handle a = liftIO $ modifyMVar_ handle a
+modifyDaemonStatus handle a = liftIO $ modifyMVar_ handle (return . a)
{- Load any previous daemon status file, and store it in the MVar for this
- process to use as its DaemonStatus. -}
diff --git a/Assistant/Watcher.hs b/Assistant/Watcher.hs
index cb7ede920..db58f01e8 100644
--- a/Assistant/Watcher.hs
+++ b/Assistant/Watcher.hs
@@ -15,13 +15,14 @@ import Assistant.DaemonStatus
import Assistant.Changes
import Utility.DirWatcher
import Utility.Types.DirWatcher
+import qualified Annex
import qualified Annex.Queue
import qualified Git.Command
import qualified Git.UpdateIndex
import qualified Git.HashObject
import qualified Git.LsFiles
import qualified Backend
-import qualified Annex
+import qualified Command.Add
import Annex.Content
import Annex.CatFile
import Git.Types
@@ -110,22 +111,27 @@ runHandler st dstatus changechan handler file filestatus = void $ do
- and only one has just closed it. We want to avoid adding a file to the
- annex that is open for write, to avoid anything being able to change it.
-
- - We could run lsof on the file here to check for other writer.
- - But, that's slow. Instead, a Change is returned that indicates this file
- - still needs to be added. The committer will handle bundles of these
- - Changes at once.
+ - We could run lsof on the file here to check for other writers.
+ - But, that's slow, and even if there is currently a writer, we will want
+ - to add the file *eventually*. Instead, the file is locked down as a hard
+ - link in a temp directory, with its write bits disabled, for later
+ - checking with lsof, and a Change is returned containing a KeySource
+ - using that hard link. The committer handles running lsof and finishing
+ - the add.
-}
onAdd :: Handler
-onAdd file _filestatus dstatus = do
- ifM (scanComplete <$> getDaemonStatus dstatus)
- ( go
- , ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file]))
- ( noChange
- , go
+onAdd file filestatus dstatus
+ | maybe False isRegularFile filestatus = do
+ ifM (scanComplete <$> getDaemonStatus dstatus)
+ ( go
+ , ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file]))
+ ( noChange
+ , go
+ )
)
- )
+ | otherwise = noChange
where
- go = madeChange file PendingAddChange
+ go = pendingAddChange =<< Command.Add.lockDown file
{- A symlink might be an arbitrary symlink, which is just added.
- Or, if it is a git-annex symlink, ensure it points to the content