diff options
author | Joey Hess <joey@kitenet.net> | 2012-07-05 18:57:06 -0600 |
---|---|---|
committer | Joey Hess <joey@kitenet.net> | 2012-07-05 18:57:06 -0600 |
commit | a92f5589fcf5549832914fdee34596818bfdc583 (patch) | |
tree | 45712848f8a7bddf19c90fc082ee657c40243a38 | |
parent | 0c563c39dfcd515b115aa37c03551dceffb882c0 (diff) |
unfinished (and unbuildable) work toward separate transfer processes
-rw-r--r-- | Assistant.hs | 24 | ||||
-rw-r--r-- | Assistant/Threads/Transferrer.hs | 103 | ||||
-rw-r--r-- | Logs/Transfer.hs | 5 |
3 files changed, 63 insertions, 69 deletions
diff --git a/Assistant.hs b/Assistant.hs index e751b4ae8..38ed539a1 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -31,14 +31,15 @@ - them. - Thread 8: merger - Waits for pushes to be received from remotes, and merges the - - updated branches into the current branch. This uses inotify - - on .git/refs/heads, so there are additional inotify threads - - associated with it, too. + - updated branches into the current branch. + - (This uses inotify on .git/refs/heads, so there are additional + - inotify threads associated with it, too.) - Thread 9: transfer watcher - Watches for transfer information files being created and removed, - - and maintains the DaemonStatus currentTransfers map. This uses - - inotify on .git/annex/transfer/, so there are additional inotify - - threads associated with it, too. + - and maintains the DaemonStatus currentTransfers map and the + - TransferSlots QSemN. + - (This uses inotify on .git/annex/transfer/, so there are + - additional inotify threads associated with it, too.) - Thread 10: transferrer - Waits for Transfers to be queued and does them. - Thread 11: status logger @@ -66,6 +67,12 @@ - retrier blocks until they're available. - TransferQueue (STM TChan) - Transfers to make are indicated by writing to this channel. + - TransferSlots (QSemN) + - Count of the number of currently available transfer slots. + - Updated by the transfer watcher, this allows other threads + - to block until a slot is available. + - This MVar should only be manipulated from inside the Annex monad, + - which ensures it's accessed only after the ThreadState MVar. -} module Assistant where @@ -109,15 +116,16 @@ startDaemon assistant foreground commitchan <- newCommitChan pushmap <- newFailedPushMap transferqueue <- newTransferQueue + transferslots <- newTransferSlots mapM_ (void . forkIO) [ commitThread st changechan commitchan transferqueue dstatus , pushThread st dstatus commitchan pushmap , pushRetryThread st pushmap , mergeThread st - , transferWatcherThread st dstatus + , transferWatcherThread st dstatus transferslots + , transfererThread st dstatus transferqueue transferslots , daemonStatusThread st dstatus , sanityCheckerThread st dstatus transferqueue changechan - , transfererThread st dstatus transferqueue , watchThread st dstatus transferqueue changechan ] waitForTermination diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs index 0b47e9781..249e15cf2 100644 --- a/Assistant/Threads/Transferrer.hs +++ b/Assistant/Threads/Transferrer.hs @@ -14,6 +14,7 @@ import Assistant.TransferQueue import Logs.Transfer import Annex.Content import Annex.BranchState +import Utility.ThreadScheduler import Command import qualified Command.Move @@ -22,68 +23,58 @@ import Control.Concurrent import Data.Time.Clock import qualified Data.Map as M -{- Dispatches transfers from the queue. - - - - This is currently very simplistic, and runs only one transfer at a time. - -} +{- For now only one transfer is run at a time. -} +maxTransfers :: Int +maxTransfers = 1 + +{- Dispatches transfers from the queue. -} transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> IO () -transfererThread st dstatus transferqueue = do - mypid <- getProcessID - mytid <- myThreadId - go mypid mytid +transfererThread st dstatus transferqueue = runEvery (Seconds 1) $ do + (t, info) <- getNextTransfer transferqueue + go =<< runThreadState st $ shouldTransfer t where - go mypid mytid = do - (t, info) <- getNextTransfer transferqueue - - now <- getCurrentTime - let info' = info - { startedTime = Just now - , transferPid = Just mypid - , transferThread = Just mytid - } - - ifM (runThreadState st $ shouldtransfer t info') - ( runTransfer st t info' - , noop - ) - go mypid mytid + go Yes = runTransfer st t + go No = noop + go TooMany = waitTransfer >> go Yes - -- Check if the transfer is already running, - -- and if not, add it to the TransferMap. - shouldtransfer t info = do - current <- currentTransfers <$> getDaemonStatus dstatus - if M.member t current - then return False - else ifM (validtransfer t) - ( do - adjustTransfers dstatus $ - M.insertWith' const t info - return True - , return False - ) +data ShouldTransfer = Yes | Skip | TooMany - validtransfer t +{- Checks if the requested transfer is already running, or + - the file to download is already present. + - + - There also may be too many transfers already running to service this + - transfer yet. -} +shouldTransfer :: DaemonStatusHandle -> Transfer -> Annex ShouldTransfer +shouldTransfer dstatus t = go =<< currentTransfers <$> getDaemonStatus dstatus + where + go m + | M.member t m = return Skip + | M.size m > maxTransfers = return TooMany | transferDirection t == Download = - not <$> inAnnex (transferKey t) - | otherwise = return True + ifM (inAnnex $ transferKey t) (No, Yes) + | otherwise = return Yes -{- A transfer is run in a separate thread, with a *copy* of the Annex +{- Waits for any of the transfers in the map to complete. -} +waitTransfer :: IO () +waitTransfer = error "TODO" +-- getProcessStatus True False pid +-- runThreadState st invalidateCache + +{- A transfer is run in a separate process, with a *copy* of the Annex - state. This is necessary to avoid blocking the rest of the assistant - on the transfer completing, and also to allow multiple transfers to run - at once. - - - However, it means that the transfer threads are responsible + - However, it means that the transfer processes are responsible - for doing any necessary shutdown cleanups, and that the parent - - thread's cache must be invalidated, as changes may have been made to the - - git-annex branch. + - thread's cache must be invalidated once a transfer completes, as + - changes may have been made to the git-annex branch. - - - Currently a minimal shutdown is done; the transfer threads are + - Currently a minimal shutdown is done; the transfer processes are - effectively running in oneshot mode, without committing changes to the - git-annex branch, and transfers should never queue git commands to run. - - - - Note: It is unsafe to call getDaemonStatus inside the transfer thread. -} -runTransfer :: ThreadState -> Transfer -> TransferInfo -> IO () +runTransfer :: ThreadState -> Transfer -> TransferInfo -> IO ProcessID runTransfer st t info | transferDirection t == Download = go Command.Move.fromStart | otherwise = go Command.Move.toStart @@ -91,12 +82,12 @@ runTransfer st t info go cmd = case (transferRemote info, associatedFile info) of (Nothing, _) -> noop (_, Nothing) -> noop - (Just remote, Just file) -> - inthread $ void $ doCommand $ - cmd remote False file (transferKey t) - inthread a = do - mvar <- newEmptyMVar - void $ forkIO $ - unsafeRunThreadState st a `E.finally` putMVar mvar () - void $ takeMVar mvar -- wait for transfer thread - runThreadState st invalidateCache + (Just remote, Just file) -> do + now <- getCurrentTime + pid <- forkProcess $ unsafeRunThreadState st $ + doCommand $ cmd remote False file (transferKey t) + adjustTransfers dstatus $ + M.insertWith' const t info + { startedTime = Just now + , transferPid = Just pid + } diff --git a/Logs/Transfer.hs b/Logs/Transfer.hs index 12ab8ff11..54f98da5c 100644 --- a/Logs/Transfer.hs +++ b/Logs/Transfer.hs @@ -14,7 +14,6 @@ import qualified Git import Types.Remote import qualified Fields -import Control.Concurrent import System.Posix.Types import Data.Time.Clock @@ -36,7 +35,6 @@ data Transfer = Transfer data TransferInfo = TransferInfo { startedTime :: Maybe UTCTime , transferPid :: Maybe ProcessID - , transferThread :: Maybe ThreadId , transferRemote :: Maybe Remote , bytesComplete :: Maybe Integer , associatedFile :: Maybe FilePath @@ -79,7 +77,6 @@ transfer t file a = do info <- liftIO $ TransferInfo <$> (Just <$> getCurrentTime) <*> pure Nothing -- pid not stored in file, so omitted for speed - <*> pure Nothing -- threadid not stored in file, so omitted for speed <*> pure Nothing -- not 0; transfer may be resuming <*> pure Nothing <*> pure file @@ -158,7 +155,6 @@ writeTransferInfo :: TransferInfo -> String writeTransferInfo info = unlines -- transferPid is not included; instead obtained by looking at -- the process that locks the file. - -- transferThread is not included; not relevant for other processes [ show $ startedTime info -- bytesComplete is not included; changes too fast , fromMaybe "" $ associatedFile info -- comes last; arbitrary content @@ -172,7 +168,6 @@ readTransferInfo pid s = <*> pure (Just pid) <*> pure Nothing <*> pure Nothing - <*> pure Nothing <*> pure (if null filename then Nothing else Just filename) _ -> Nothing where |