summaryrefslogtreecommitdiff
path: root/Command/Watch.hs
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2012-06-11 15:08:04 -0400
committerGravatar Joey Hess <joey@kitenet.net>2012-06-11 15:29:11 -0400
commit7f3934520a8aa877be17e5d9a0d5dbc77b274c5e (patch)
tree3a36ed11fa528bf245a4360b9e6dee9203f9e184 /Command/Watch.hs
parentf7dbcd58ffb65258d914f1ecd6c0ff8f6bf2d3aa (diff)
avoid using STM while the MVar is held
I thought this might be a lock conflict that explains the deadlock when built with -threaded, but it seems not.. it still locks! It even locks without the committer thread. Indeed, it locks when running "git annex add"! -threaded is exposing some other problem. Still, this seems conceptually cleaner and did not add any inneficiencies. Also added some high-level documentation about the threads used.
Diffstat (limited to 'Command/Watch.hs')
-rw-r--r--Command/Watch.hs76
1 files changed, 51 insertions, 25 deletions
diff --git a/Command/Watch.hs b/Command/Watch.hs
index 76f03a097..5c354cace 100644
--- a/Command/Watch.hs
+++ b/Command/Watch.hs
@@ -1,13 +1,36 @@
-{- git-annex command
+{-# LANGUAGE CPP #-}
+{-# LANGUAGE BangPatterns #-}
+
+{- git-annex watch daemon
+ -
+ - Overview of threads and MVars, etc:
+ -
+ - Thread 1: Parent
+ - The initial thread run, double forks to background, starts other
+ - threads, and then stops, waiting for them to terminate.
+ - Thread 2: inotify
+ - Notices new files, and calls handlers for events, queuing changes.
+ - Thread 3: inotify internal
+ - Used by haskell inotify library to ensure inotify event buffer is
+ - kept drained.
+ - Thread 4: committer
+ - Waits for changes to occur, and runs the git queue to update its
+ - index, then commits.
+ -
+ - State MVar:
+ - The Annex state is stored here, which allows recuscitating the
+ - Annex monad in IO actions run by the inotify and committer
+ - threads. Thus, a single state is shared amoung the threads, and
+ - only one at a time can access it.
+ - ChangeChan STM TChan:
+ - Changes are indicated by writing to this channel. The committer
+ - reads from it.
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
-{-# LANGUAGE CPP #-}
-{-# LANGUAGE BangPatterns #-}
-
module Command.Watch where
import Common.Annex
@@ -83,7 +106,7 @@ startDaemon True st = do
watch :: MVar Annex.AnnexState -> IO ()
#if defined linux_HOST_OS
watch st = withINotify $ \i -> do
- changechan <- atomically newTChan
+ changechan <- runChangeChan newTChan
let hook a = Just $ runHandler st changechan a
let hooks = WatchHooks
{ addHook = hook onAdd
@@ -131,26 +154,38 @@ withStateMVar a = do
return r
{- Runs an Annex action, using the state from the MVar. -}
-runStateMVar :: MVar Annex.AnnexState -> Annex () -> IO ()
+runStateMVar :: MVar Annex.AnnexState -> Annex a -> IO a
runStateMVar mvar a = do
+ liftIO $ putStrLn "takeMVar"
startstate <- takeMVar mvar
- !newstate <- Annex.exec startstate a
+ !(r, newstate) <- Annex.run startstate a
+ liftIO $ putStrLn "putMVar"
putMVar mvar newstate
+ return r
-{- Runs an action handler, inside the Annex monad.
+runChangeChan :: STM a -> IO a
+runChangeChan = atomically
+
+{- Runs an action handler, inside the Annex monad, and if there was a
+ - change, adds it to the ChangeChan.
-
- Exceptions are ignored, otherwise a whole watcher thread could be crashed.
-}
runHandler :: MVar Annex.AnnexState -> ChangeChan -> Handler -> FilePath -> IO ()
-runHandler st changechan handler file =
- either (putStrLn . show) return =<< tryIO (runStateMVar st go)
- where
- go = maybe noop (signalChange changechan) =<< handler file
+runHandler st changechan handler file = void $ do
+ r <- tryIO (runStateMVar st $ handler file)
+ case r of
+ Left e -> putStrLn $ show e
+ Right Nothing -> noop
+ Right (Just change) -> void $
+ runChangeChan $ writeTChan changechan change
{- Handlers call this when they made a change that needs to get committed. -}
madeChange :: FilePath -> String -> Annex (Maybe Change)
-madeChange file desc = liftIO $
- Just <$> (Change <$> getCurrentTime <*> pure file <*> pure desc)
+madeChange file desc = do
+ -- Just in case the commit thread is not flushing the queue fast enough.
+ Annex.Queue.flushWhenFull
+ liftIO $ Just <$> (Change <$> getCurrentTime <*> pure file <*> pure desc)
noChange :: Annex (Maybe Change)
noChange = return Nothing
@@ -243,19 +278,10 @@ stageSymlink file sha =
Annex.Queue.addUpdateIndex =<<
inRepo (Git.UpdateIndex.stageSymlink file sha)
-{- Signals that a change has been made, that needs to get committed. -}
-signalChange :: ChangeChan -> Change -> Annex ()
-signalChange chan change = do
- liftIO $ atomically $ writeTChan chan change
-
- -- Just in case the commit thread is not flushing
- -- the queue fast enough.
- Annex.Queue.flushWhenFull
-
{- Gets all unhandled changes.
- Blocks until at least one change is made. -}
getChanges :: ChangeChan -> IO [Change]
-getChanges chan = atomically $ do
+getChanges chan = runChangeChan $ do
c <- readTChan chan
go [c]
where
@@ -268,7 +294,7 @@ getChanges chan = atomically $ do
{- Puts unhandled changes back into the channel.
- Note: Original order is not preserved. -}
refillChanges :: ChangeChan -> [Change] -> IO ()
-refillChanges chan cs = atomically $ mapM_ (writeTChan chan) cs
+refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs
{- This thread makes git commits at appropriate times. -}
commitThread :: MVar Annex.AnnexState -> ChangeChan -> IO ()