From cf47bb3f509ae63ad868b66c0b6f2baecb93e4c7 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 18 Jul 2012 19:13:56 -0400 Subject: run file transfers in threads, not processes This should fix OSX/BSD issues with not noticing transfer information files with kqueue. Now that threads are used, the thread can manage the transfer slot allocation and deallocation by itself; much cleaner. --- Assistant.hs | 2 +- Assistant/ThreadedMonad.hs | 9 ++++--- Assistant/Threads/TransferWatcher.hs | 46 +++++++++++------------------------- Assistant/Threads/Transferrer.hs | 5 ++-- Assistant/TransferQueue.hs | 1 - Assistant/TransferSlots.hs | 16 ++++++------- Logs/Transfer.hs | 3 --- 7 files changed, 29 insertions(+), 53 deletions(-) diff --git a/Assistant.hs b/Assistant.hs index 91ebf2d2e..06484b086 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -123,7 +123,7 @@ startDaemon assistant foreground , pushThread st dstatus commitchan pushmap , pushRetryThread st pushmap , mergeThread st - , transferWatcherThread st dstatus transferslots + , transferWatcherThread st dstatus , transfererThread st dstatus transferqueue transferslots , daemonStatusThread st dstatus , sanityCheckerThread st dstatus transferqueue changechan diff --git a/Assistant/ThreadedMonad.hs b/Assistant/ThreadedMonad.hs index 2fc526599..1decd8e91 100644 --- a/Assistant/ThreadedMonad.hs +++ b/Assistant/ThreadedMonad.hs @@ -37,14 +37,13 @@ withThreadState a = do runThreadState :: ThreadState -> Annex a -> IO a runThreadState mvar a = modifyMVar mvar $ \state -> swap <$> Annex.run state a -{- Runs an Annex action in a separate thread, using a copy of the state - - from the MVar. +{- Runs an Annex action, using a copy of the state from the MVar. - - It's up to the action to perform any necessary shutdown tasks in order - for state to not be lost. And it's up to the caller to resynchronise - with any changes the action makes to eg, the git-annex branch. -} -unsafeForkIOThreadState :: ThreadState -> Annex a -> IO ThreadId -unsafeForkIOThreadState mvar a = do +unsafeRunThreadState :: ThreadState -> Annex a -> IO () +unsafeRunThreadState mvar a = do state <- readMVar mvar - forkIO $ void $ Annex.eval state a + void $ Annex.eval state a diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs index 766c1f89e..364ce0468 100644 --- a/Assistant/Threads/TransferWatcher.hs +++ b/Assistant/Threads/TransferWatcher.hs @@ -10,23 +10,20 @@ module Assistant.Threads.TransferWatcher where import Common.Annex import Assistant.ThreadedMonad import Assistant.DaemonStatus -import Assistant.TransferSlots import Logs.Transfer import Utility.DirWatcher import Utility.Types.DirWatcher -import Annex.BranchState import Data.Map as M -import System.Posix.Process {- This thread watches for changes to the gitAnnexTransferDir, - and updates the DaemonStatus's map of ongoing transfers. -} -transferWatcherThread :: ThreadState -> DaemonStatusHandle -> TransferSlots -> IO () -transferWatcherThread st dstatus transferslots = do +transferWatcherThread :: ThreadState -> DaemonStatusHandle -> IO () +transferWatcherThread st dstatus = do g <- runThreadState st $ fromRepo id let dir = gitAnnexTransferDir g createDirectoryIfMissing True dir - let hook a = Just $ runHandler st dstatus transferslots a + let hook a = Just $ runHandler st dstatus a let hooks = mkWatchHooks { addHook = hook onAdd , delHook = hook onDel @@ -34,51 +31,36 @@ transferWatcherThread st dstatus transferslots = do } void $ watchDir dir (const False) hooks id -type Handler = ThreadState -> DaemonStatusHandle -> TransferSlots -> FilePath -> Maybe FileStatus -> IO () +type Handler = ThreadState -> DaemonStatusHandle -> FilePath -> Maybe FileStatus -> IO () {- Runs an action handler. - - Exceptions are ignored, otherwise a whole thread could be crashed. -} -runHandler :: ThreadState -> DaemonStatusHandle -> TransferSlots -> Handler -> FilePath -> Maybe FileStatus -> IO () -runHandler st dstatus transferslots handler file filestatus = void $ do +runHandler :: ThreadState -> DaemonStatusHandle -> Handler -> FilePath -> Maybe FileStatus -> IO () +runHandler st dstatus handler file filestatus = void $ do either print (const noop) =<< tryIO go where - go = handler st dstatus transferslots file filestatus + go = handler st dstatus file filestatus {- Called when there's an error with inotify. -} onErr :: Handler -onErr _ _ _ msg _ = error msg +onErr _ _ msg _ = error msg {- Called when a new transfer information file is written. -} onAdd :: Handler -onAdd st dstatus _ file _ = case parseTransferFile file of +onAdd st dstatus file _ = case parseTransferFile file of Nothing -> noop Just t -> runThreadState st $ go t =<< checkTransfer t where go _ Nothing = noop -- transfer already finished go t (Just info) = adjustTransfers dstatus $ M.insertWith' merge t info - -- preseve shouldWait flag, which is not written to disk - merge new old = new { shouldWait = shouldWait old } + -- preseve transferTid, which is not written to disk + merge new old = new { transferTid = transferTid old } -{- Called when a transfer information file is removed. - - - - When the transfer process is a child of this process, wait on it - - to avoid zombies. - -} +{- Called when a transfer information file is removed. -} onDel :: Handler -onDel st dstatus transferslots file _ = case parseTransferFile file of +onDel st dstatus file _ = case parseTransferFile file of Nothing -> noop - Just t -> maybe noop waitchild - =<< runThreadState st (removeTransfer dstatus t) - where - waitchild info - | shouldWait info = case transferPid info of - Nothing -> noop - Just pid -> do - void $ tryIO $ - getProcessStatus True False pid - runThreadState st invalidateCache - transferComplete transferslots - | otherwise = noop + Just t -> void $ runThreadState st $ removeTransfer dstatus t diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs index dd63d4d12..c439d8b7e 100644 --- a/Assistant/Threads/Transferrer.hs +++ b/Assistant/Threads/Transferrer.hs @@ -74,9 +74,8 @@ runTransfer st dstatus slots t info = case (transferRemote info, associatedFile (Nothing, _) -> noop (_, Nothing) -> noop (Just remote, Just file) -> do - tid <- inTransferSlot slots $ - unsafeForkIOThreadState st $ - transferprocess remote file + tid <- inTransferSlot slots st $ + transferprocess remote file now <- getCurrentTime runThreadState st $ adjustTransfers dstatus $ M.insertWith' const t info diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs index fb7fa87cd..b0eca96c8 100644 --- a/Assistant/TransferQueue.hs +++ b/Assistant/TransferQueue.hs @@ -28,7 +28,6 @@ stubInfo f = TransferInfo , transferRemote = Nothing , bytesComplete = Nothing , associatedFile = f - , shouldWait = False } {- Adds pending transfers to the end of the queue for some of the known diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs index 1859b281b..dc077254d 100644 --- a/Assistant/TransferSlots.hs +++ b/Assistant/TransferSlots.hs @@ -10,6 +10,9 @@ module Assistant.TransferSlots where import Control.Exception import Control.Concurrent +import Common.Annex +import Assistant.ThreadedMonad + type TransferSlots = QSemN {- Number of concurrent transfers allowed to be run from the assistant. @@ -24,16 +27,13 @@ newTransferSlots :: IO TransferSlots newTransferSlots = newQSemN numSlots {- Waits until a transfer slot becomes available, and runs a transfer - - action in the slot. If the action throws an exception, its slot is - - freed here, otherwise it should be freed by the TransferWatcher when - - the transfer is complete. - -} -inTransferSlot :: TransferSlots -> IO a -> IO a -inTransferSlot s a = bracketOnError start abort run + - action in the slot, in its own thread. -} +inTransferSlot :: TransferSlots -> ThreadState -> Annex a -> IO ThreadId +inTransferSlot s st a = forkIO $ bracket_ start done run where start = waitQSemN s 1 - abort = const $ transferComplete s - run = const a + done = transferComplete s + run = unsafeRunThreadState st a {- Call when a transfer is complete. -} transferComplete :: TransferSlots -> IO () diff --git a/Logs/Transfer.hs b/Logs/Transfer.hs index f74d128dc..1e3d0abdb 100644 --- a/Logs/Transfer.hs +++ b/Logs/Transfer.hs @@ -43,7 +43,6 @@ data TransferInfo = TransferInfo , transferRemote :: Maybe Remote , bytesComplete :: Maybe Integer , associatedFile :: Maybe FilePath - , shouldWait :: Bool } deriving (Show, Eq, Ord) @@ -87,7 +86,6 @@ transfer t file a = do <*> pure Nothing -- not 0; transfer may be resuming <*> pure Nothing <*> pure file - <*> pure False bracketIO (prep tfile mode info) (cleanup tfile) a where prep tfile mode info = do @@ -180,7 +178,6 @@ readTransferInfo pid s = <*> pure Nothing <*> pure Nothing <*> pure (if null filename then Nothing else Just filename) - <*> pure False _ -> Nothing where (bits, filebits) = splitAt 1 $ lines s -- cgit v1.2.3