diff options
author | 2012-07-05 14:34:20 -0600 | |
---|---|---|
committer | 2012-07-05 14:34:20 -0600 | |
commit | 71b5ad8398c4d86d5e9b993e175b48f2c5f0861d (patch) | |
tree | 46bd1db5caa16256b948146ecd2d61057bed44fb /Assistant | |
parent | e8df726d07657c82c012a02697282c2bc642e742 (diff) |
wrote transfer thread
finally!
Diffstat (limited to 'Assistant')
-rw-r--r-- | Assistant/DaemonStatus.hs | 9 | ||||
-rw-r--r-- | Assistant/Threads/TransferWatcher.hs | 20 | ||||
-rw-r--r-- | Assistant/Threads/Transferrer.hs | 102 | ||||
-rw-r--r-- | Assistant/TransferQueue.hs | 8 |
4 files changed, 124 insertions, 15 deletions
diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs index a3e909904..40816bb1a 100644 --- a/Assistant/DaemonStatus.hs +++ b/Assistant/DaemonStatus.hs @@ -31,12 +31,14 @@ data DaemonStatus = DaemonStatus -- Last time the sanity checker ran , lastSanityCheck :: Maybe POSIXTime -- Currently running file content transfers - , currentTransfers :: M.Map Transfer TransferInfo + , currentTransfers :: TransferMap -- Ordered list of remotes to talk to. , knownRemotes :: [Remote] } deriving (Show) +type TransferMap = M.Map Transfer TransferInfo + type DaemonStatusHandle = MVar DaemonStatus newDaemonStatus :: DaemonStatus @@ -132,3 +134,8 @@ afterLastDaemonRun timestamp status = maybe False (< t) (lastRunning status) tenMinutes :: Int tenMinutes = 10 * 60 + +{- Mutates the transfer map. -} +adjustTransfers :: DaemonStatusHandle -> (TransferMap -> TransferMap) -> Annex () +adjustTransfers dstatus a = modifyDaemonStatus dstatus $ + \s -> s { currentTransfers = a (currentTransfers s) } diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs index 811b045a8..f18d4e3f8 100644 --- a/Assistant/Threads/TransferWatcher.hs +++ b/Assistant/Threads/TransferWatcher.hs @@ -58,21 +58,17 @@ onAdd :: Handler onAdd st dstatus file _ = case parseTransferFile file of Nothing -> noop Just t -> do - minfo <- runThreadState st $ checkTransfer t pid <- getProcessID - case minfo of - Nothing -> noop -- transfer already finished - Just info - | transferPid info == Just pid -> noop - | otherwise -> adjustTransfers st dstatus - (M.insertWith' const t info) + runThreadState st $ go t pid =<< checkTransfer t + where + go _ _ Nothing = noop -- transfer already finished + go t pid (Just info) + | transferPid info == Just pid = noop + | otherwise = adjustTransfers dstatus $ + M.insertWith' const t info {- Called when a transfer information file is removed. -} onDel :: Handler onDel st dstatus file _ = case parseTransferFile file of Nothing -> noop - Just t -> adjustTransfers st dstatus (M.delete t) - -adjustTransfers :: ThreadState -> DaemonStatusHandle -> (M.Map Transfer TransferInfo -> M.Map Transfer TransferInfo) -> IO () -adjustTransfers st dstatus a = runThreadState st $ modifyDaemonStatus dstatus $ - \s -> s { currentTransfers = a (currentTransfers s) } + Just t -> runThreadState st $ adjustTransfers dstatus $ M.delete t diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs new file mode 100644 index 000000000..0562a607c --- /dev/null +++ b/Assistant/Threads/Transferrer.hs @@ -0,0 +1,102 @@ +{- git-annex assistant data transferrer thread + - + - Copyright 2012 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.Transferrer where + +import Common.Annex +import Assistant.ThreadedMonad +import Assistant.DaemonStatus +import Assistant.TransferQueue +import Logs.Transfer +import Annex.Content +import Annex.BranchState +import Command +import qualified Command.Move + +import Control.Exception as E +import Control.Concurrent +import Data.Time.Clock +import qualified Data.Map as M + +{- Dispatches transfers from the queue. + - + - This is currently very simplistic, and runs only one transfer at a time. + -} +transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> IO () +transfererThread st dstatus transferqueue = do + mypid <- getProcessID + mytid <- myThreadId + go mypid mytid + where + go mypid mytid = do + (t, info) <- getNextTransfer transferqueue + + now <- getCurrentTime + let info' = info + { startedTime = Just now + , transferPid = Just mypid + , transferThread = Just mytid + } + + ifM (runThreadState st $ shouldtransfer t info') + ( runTransfer st t info' + , noop + ) + go mypid mytid + + -- Check if the transfer is already running, + -- and if not, add it to the TransferMap. + shouldtransfer t info = do + current <- currentTransfers <$> getDaemonStatus dstatus + if M.member t current + then ifM (validtransfer t) + ( do + adjustTransfers dstatus $ + M.insertWith' const t info + return True + , return False + ) + else return False + + validtransfer t + | transferDirection t == Download = + not <$> inAnnex (transferKey t) + | otherwise = return True + +{- A transfer is run in a separate thread, with a *copy* of the Annex + - state. This is necessary to avoid blocking the rest of the assistant + - on the transfer completing, and also to allow multiple transfers to run + - at once. + - + - However, it means that the transfer threads are responsible + - for doing any necessary shutdown cleanups, and that the parent + - thread's cache must be invalidated, as changes may have been made to the + - git-annex branch. + - + - Currently a minimal shutdown is done; the transfer threads are + - effectively running in oneshot mode, without committing changes to the + - git-annex branch, and transfers should never queue git commands to run. + - + - Note: It is unsafe to call getDaemonStatus inside the transfer thread. + -} +runTransfer :: ThreadState -> Transfer -> TransferInfo -> IO () +runTransfer st t info + | transferDirection t == Download = go Command.Move.fromStart + | otherwise = go Command.Move.toStart + where + go cmd = case (transferRemote info, associatedFile info) of + (Nothing, _) -> noop + (_, Nothing) -> noop + (Just remote, Just file) -> + inthread $ void $ doCommand $ + cmd remote False file (transferKey t) + inthread a = do + mvar <- newEmptyMVar + void $ forkIO $ + runThreadState st a `E.finally` putMVar mvar () + void $ takeMVar mvar -- wait for transfer thread + runThreadState st invalidateCache diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs index f1f4882be..a35815ca1 100644 --- a/Assistant/TransferQueue.hs +++ b/Assistant/TransferQueue.hs @@ -25,6 +25,7 @@ stubInfo f = TransferInfo { startedTime = Nothing , transferPid = Nothing , transferThread = Nothing + , transferRemote = Nothing , bytesComplete = Nothing , associatedFile = f } @@ -33,7 +34,7 @@ stubInfo f = TransferInfo - remotes. -} queueTransfers :: TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex () queueTransfers q daemonstatus k f direction = - mapM_ (liftIO . queueTransfer q f . gentransfer) + mapM_ (\r -> queue r $ gentransfer r) =<< sufficientremotes . knownRemotes <$> getDaemonStatus daemonstatus where @@ -53,8 +54,11 @@ queueTransfers q daemonstatus k f direction = gentransfer r = Transfer { transferDirection = direction , transferKey = k - , transferRemote = Remote.uuid r + , transferUUID = Remote.uuid r } + queue r t = liftIO $ void $ atomically $ do + let info = (stubInfo f) { transferRemote = Just r } + writeTChan q (t, info) {- Adds a pending transfer to the end of the queue. -} queueTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO () |