diff options
author | Joey Hess <joey@kitenet.net> | 2012-07-18 19:13:56 -0400 |
---|---|---|
committer | Joey Hess <joey@kitenet.net> | 2012-07-18 19:15:34 -0400 |
commit | cf47bb3f509ae63ad868b66c0b6f2baecb93e4c7 (patch) | |
tree | 60bfdab5cf877c4de206146c65e759f82cdf2e85 /Assistant | |
parent | eea0a3616cd1cbaf033649c11a5c2b650b6b632f (diff) |
run file transfers in threads, not processes
This should fix OSX/BSD issues with not noticing transfer information
files with kqueue. Now that threads are used, the thread can manage the
transfer slot allocation and deallocation by itself; much cleaner.
Diffstat (limited to 'Assistant')
-rw-r--r-- | Assistant/ThreadedMonad.hs | 9 | ||||
-rw-r--r-- | Assistant/Threads/TransferWatcher.hs | 46 | ||||
-rw-r--r-- | Assistant/Threads/Transferrer.hs | 5 | ||||
-rw-r--r-- | Assistant/TransferQueue.hs | 1 | ||||
-rw-r--r-- | Assistant/TransferSlots.hs | 16 |
5 files changed, 28 insertions, 49 deletions
diff --git a/Assistant/ThreadedMonad.hs b/Assistant/ThreadedMonad.hs index 2fc526599..1decd8e91 100644 --- a/Assistant/ThreadedMonad.hs +++ b/Assistant/ThreadedMonad.hs @@ -37,14 +37,13 @@ withThreadState a = do runThreadState :: ThreadState -> Annex a -> IO a runThreadState mvar a = modifyMVar mvar $ \state -> swap <$> Annex.run state a -{- Runs an Annex action in a separate thread, using a copy of the state - - from the MVar. +{- Runs an Annex action, using a copy of the state from the MVar. - - It's up to the action to perform any necessary shutdown tasks in order - for state to not be lost. And it's up to the caller to resynchronise - with any changes the action makes to eg, the git-annex branch. -} -unsafeForkIOThreadState :: ThreadState -> Annex a -> IO ThreadId -unsafeForkIOThreadState mvar a = do +unsafeRunThreadState :: ThreadState -> Annex a -> IO () +unsafeRunThreadState mvar a = do state <- readMVar mvar - forkIO $ void $ Annex.eval state a + void $ Annex.eval state a diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs index 766c1f89e..364ce0468 100644 --- a/Assistant/Threads/TransferWatcher.hs +++ b/Assistant/Threads/TransferWatcher.hs @@ -10,23 +10,20 @@ module Assistant.Threads.TransferWatcher where import Common.Annex import Assistant.ThreadedMonad import Assistant.DaemonStatus -import Assistant.TransferSlots import Logs.Transfer import Utility.DirWatcher import Utility.Types.DirWatcher -import Annex.BranchState import Data.Map as M -import System.Posix.Process {- This thread watches for changes to the gitAnnexTransferDir, - and updates the DaemonStatus's map of ongoing transfers. -} -transferWatcherThread :: ThreadState -> DaemonStatusHandle -> TransferSlots -> IO () -transferWatcherThread st dstatus transferslots = do +transferWatcherThread :: ThreadState -> DaemonStatusHandle -> IO () +transferWatcherThread st dstatus = do g <- runThreadState st $ fromRepo id let dir = gitAnnexTransferDir g createDirectoryIfMissing True dir - let hook a = Just $ runHandler st dstatus transferslots a + let hook a = Just $ runHandler st dstatus a let hooks = mkWatchHooks { addHook = hook onAdd , delHook = hook onDel @@ -34,51 +31,36 @@ transferWatcherThread st dstatus transferslots = do } void $ watchDir dir (const False) hooks id -type Handler = ThreadState -> DaemonStatusHandle -> TransferSlots -> FilePath -> Maybe FileStatus -> IO () +type Handler = ThreadState -> DaemonStatusHandle -> FilePath -> Maybe FileStatus -> IO () {- Runs an action handler. - - Exceptions are ignored, otherwise a whole thread could be crashed. -} -runHandler :: ThreadState -> DaemonStatusHandle -> TransferSlots -> Handler -> FilePath -> Maybe FileStatus -> IO () -runHandler st dstatus transferslots handler file filestatus = void $ do +runHandler :: ThreadState -> DaemonStatusHandle -> Handler -> FilePath -> Maybe FileStatus -> IO () +runHandler st dstatus handler file filestatus = void $ do either print (const noop) =<< tryIO go where - go = handler st dstatus transferslots file filestatus + go = handler st dstatus file filestatus {- Called when there's an error with inotify. -} onErr :: Handler -onErr _ _ _ msg _ = error msg +onErr _ _ msg _ = error msg {- Called when a new transfer information file is written. -} onAdd :: Handler -onAdd st dstatus _ file _ = case parseTransferFile file of +onAdd st dstatus file _ = case parseTransferFile file of Nothing -> noop Just t -> runThreadState st $ go t =<< checkTransfer t where go _ Nothing = noop -- transfer already finished go t (Just info) = adjustTransfers dstatus $ M.insertWith' merge t info - -- preseve shouldWait flag, which is not written to disk - merge new old = new { shouldWait = shouldWait old } + -- preseve transferTid, which is not written to disk + merge new old = new { transferTid = transferTid old } -{- Called when a transfer information file is removed. - - - - When the transfer process is a child of this process, wait on it - - to avoid zombies. - -} +{- Called when a transfer information file is removed. -} onDel :: Handler -onDel st dstatus transferslots file _ = case parseTransferFile file of +onDel st dstatus file _ = case parseTransferFile file of Nothing -> noop - Just t -> maybe noop waitchild - =<< runThreadState st (removeTransfer dstatus t) - where - waitchild info - | shouldWait info = case transferPid info of - Nothing -> noop - Just pid -> do - void $ tryIO $ - getProcessStatus True False pid - runThreadState st invalidateCache - transferComplete transferslots - | otherwise = noop + Just t -> void $ runThreadState st $ removeTransfer dstatus t diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs index dd63d4d12..c439d8b7e 100644 --- a/Assistant/Threads/Transferrer.hs +++ b/Assistant/Threads/Transferrer.hs @@ -74,9 +74,8 @@ runTransfer st dstatus slots t info = case (transferRemote info, associatedFile (Nothing, _) -> noop (_, Nothing) -> noop (Just remote, Just file) -> do - tid <- inTransferSlot slots $ - unsafeForkIOThreadState st $ - transferprocess remote file + tid <- inTransferSlot slots st $ + transferprocess remote file now <- getCurrentTime runThreadState st $ adjustTransfers dstatus $ M.insertWith' const t info diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs index fb7fa87cd..b0eca96c8 100644 --- a/Assistant/TransferQueue.hs +++ b/Assistant/TransferQueue.hs @@ -28,7 +28,6 @@ stubInfo f = TransferInfo , transferRemote = Nothing , bytesComplete = Nothing , associatedFile = f - , shouldWait = False } {- Adds pending transfers to the end of the queue for some of the known diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs index 1859b281b..dc077254d 100644 --- a/Assistant/TransferSlots.hs +++ b/Assistant/TransferSlots.hs @@ -10,6 +10,9 @@ module Assistant.TransferSlots where import Control.Exception import Control.Concurrent +import Common.Annex +import Assistant.ThreadedMonad + type TransferSlots = QSemN {- Number of concurrent transfers allowed to be run from the assistant. @@ -24,16 +27,13 @@ newTransferSlots :: IO TransferSlots newTransferSlots = newQSemN numSlots {- Waits until a transfer slot becomes available, and runs a transfer - - action in the slot. If the action throws an exception, its slot is - - freed here, otherwise it should be freed by the TransferWatcher when - - the transfer is complete. - -} -inTransferSlot :: TransferSlots -> IO a -> IO a -inTransferSlot s a = bracketOnError start abort run + - action in the slot, in its own thread. -} +inTransferSlot :: TransferSlots -> ThreadState -> Annex a -> IO ThreadId +inTransferSlot s st a = forkIO $ bracket_ start done run where start = waitQSemN s 1 - abort = const $ transferComplete s - run = const a + done = transferComplete s + run = unsafeRunThreadState st a {- Call when a transfer is complete. -} transferComplete :: TransferSlots -> IO () |