diff options
-rw-r--r-- | Command/Watch.hs | 76 |
1 files changed, 51 insertions, 25 deletions
diff --git a/Command/Watch.hs b/Command/Watch.hs index 76f03a097..5c354cace 100644 --- a/Command/Watch.hs +++ b/Command/Watch.hs @@ -1,13 +1,36 @@ -{- git-annex command +{-# LANGUAGE CPP #-} +{-# LANGUAGE BangPatterns #-} + +{- git-annex watch daemon + - + - Overview of threads and MVars, etc: + - + - Thread 1: Parent + - The initial thread run, double forks to background, starts other + - threads, and then stops, waiting for them to terminate. + - Thread 2: inotify + - Notices new files, and calls handlers for events, queuing changes. + - Thread 3: inotify internal + - Used by haskell inotify library to ensure inotify event buffer is + - kept drained. + - Thread 4: committer + - Waits for changes to occur, and runs the git queue to update its + - index, then commits. + - + - State MVar: + - The Annex state is stored here, which allows recuscitating the + - Annex monad in IO actions run by the inotify and committer + - threads. Thus, a single state is shared amoung the threads, and + - only one at a time can access it. + - ChangeChan STM TChan: + - Changes are indicated by writing to this channel. The committer + - reads from it. - - Copyright 2012 Joey Hess <joey@kitenet.net> - - Licensed under the GNU GPL version 3 or higher. -} -{-# LANGUAGE CPP #-} -{-# LANGUAGE BangPatterns #-} - module Command.Watch where import Common.Annex @@ -83,7 +106,7 @@ startDaemon True st = do watch :: MVar Annex.AnnexState -> IO () #if defined linux_HOST_OS watch st = withINotify $ \i -> do - changechan <- atomically newTChan + changechan <- runChangeChan newTChan let hook a = Just $ runHandler st changechan a let hooks = WatchHooks { addHook = hook onAdd @@ -131,26 +154,38 @@ withStateMVar a = do return r {- Runs an Annex action, using the state from the MVar. -} -runStateMVar :: MVar Annex.AnnexState -> Annex () -> IO () +runStateMVar :: MVar Annex.AnnexState -> Annex a -> IO a runStateMVar mvar a = do + liftIO $ putStrLn "takeMVar" startstate <- takeMVar mvar - !newstate <- Annex.exec startstate a + !(r, newstate) <- Annex.run startstate a + liftIO $ putStrLn "putMVar" putMVar mvar newstate + return r -{- Runs an action handler, inside the Annex monad. +runChangeChan :: STM a -> IO a +runChangeChan = atomically + +{- Runs an action handler, inside the Annex monad, and if there was a + - change, adds it to the ChangeChan. - - Exceptions are ignored, otherwise a whole watcher thread could be crashed. -} runHandler :: MVar Annex.AnnexState -> ChangeChan -> Handler -> FilePath -> IO () -runHandler st changechan handler file = - either (putStrLn . show) return =<< tryIO (runStateMVar st go) - where - go = maybe noop (signalChange changechan) =<< handler file +runHandler st changechan handler file = void $ do + r <- tryIO (runStateMVar st $ handler file) + case r of + Left e -> putStrLn $ show e + Right Nothing -> noop + Right (Just change) -> void $ + runChangeChan $ writeTChan changechan change {- Handlers call this when they made a change that needs to get committed. -} madeChange :: FilePath -> String -> Annex (Maybe Change) -madeChange file desc = liftIO $ - Just <$> (Change <$> getCurrentTime <*> pure file <*> pure desc) +madeChange file desc = do + -- Just in case the commit thread is not flushing the queue fast enough. + Annex.Queue.flushWhenFull + liftIO $ Just <$> (Change <$> getCurrentTime <*> pure file <*> pure desc) noChange :: Annex (Maybe Change) noChange = return Nothing @@ -243,19 +278,10 @@ stageSymlink file sha = Annex.Queue.addUpdateIndex =<< inRepo (Git.UpdateIndex.stageSymlink file sha) -{- Signals that a change has been made, that needs to get committed. -} -signalChange :: ChangeChan -> Change -> Annex () -signalChange chan change = do - liftIO $ atomically $ writeTChan chan change - - -- Just in case the commit thread is not flushing - -- the queue fast enough. - Annex.Queue.flushWhenFull - {- Gets all unhandled changes. - Blocks until at least one change is made. -} getChanges :: ChangeChan -> IO [Change] -getChanges chan = atomically $ do +getChanges chan = runChangeChan $ do c <- readTChan chan go [c] where @@ -268,7 +294,7 @@ getChanges chan = atomically $ do {- Puts unhandled changes back into the channel. - Note: Original order is not preserved. -} refillChanges :: ChangeChan -> [Change] -> IO () -refillChanges chan cs = atomically $ mapM_ (writeTChan chan) cs +refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs {- This thread makes git commits at appropriate times. -} commitThread :: MVar Annex.AnnexState -> ChangeChan -> IO () |