diff options
author | Joey Hess <joey@kitenet.net> | 2013-03-19 18:46:29 -0400 |
---|---|---|
committer | Joey Hess <joey@kitenet.net> | 2013-03-19 18:46:29 -0400 |
commit | 4887b1b3afd8acf58602d622499664ffb777a8b1 (patch) | |
tree | 16cd101c9a61320f5fbf1ca72f1a51f602f70824 /Assistant | |
parent | 28336756f9e97173ce922d02c6eeed4e01d07e57 (diff) |
maintain pools of running transferkeys processes (untested)
Diffstat (limited to 'Assistant')
-rw-r--r-- | Assistant/Monad.hs | 3 | ||||
-rw-r--r-- | Assistant/Threads/Transferrer.hs | 103 | ||||
-rw-r--r-- | Assistant/TransferSlots.hs | 37 | ||||
-rw-r--r-- | Assistant/TransferrerPool.hs | 81 | ||||
-rw-r--r-- | Assistant/Types/TransferrerPool.hs | 23 |
5 files changed, 168 insertions, 79 deletions
diff --git a/Assistant/Monad.hs b/Assistant/Monad.hs index e046c9666..ced98b395 100644 --- a/Assistant/Monad.hs +++ b/Assistant/Monad.hs @@ -35,6 +35,7 @@ import Assistant.Types.DaemonStatus import Assistant.Types.ScanRemotes import Assistant.Types.TransferQueue import Assistant.Types.TransferSlots +import Assistant.Types.TransferrerPool import Assistant.Types.Pushes import Assistant.Types.BranchChange import Assistant.Types.Commits @@ -62,6 +63,7 @@ data AssistantData = AssistantData , scanRemoteMap :: ScanRemoteMap , transferQueue :: TransferQueue , transferSlots :: TransferSlots + , transferrerPool :: TransferrerPool , failedPushMap :: FailedPushMap , commitChan :: CommitChan , changeChan :: ChangeChan @@ -78,6 +80,7 @@ newAssistantData st dstatus = AssistantData <*> newScanRemoteMap <*> newTransferQueue <*> newTransferSlots + <*> newTransferrerPool <*> newFailedPushMap <*> newCommitChan <*> newChangeChan diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs index ccca4ca5e..3dcbb40cd 100644 --- a/Assistant/Threads/Transferrer.hs +++ b/Assistant/Threads/Transferrer.hs @@ -14,25 +14,23 @@ import Assistant.TransferSlots import Assistant.Alert import Assistant.Commits import Assistant.Drop +import Assistant.TransferrerPool import Logs.Transfer import Logs.Location import Annex.Content import qualified Remote import qualified Types.Remote as Remote import qualified Git -import Types.Key import Locations.UserConfig import Assistant.Threads.TransferWatcher import Annex.Wanted -import System.Process (create_group) - {- Dispatches transfers from the queue. -} transfererThread :: NamedThread transfererThread = namedThread "Transferrer" $ do program <- liftIO readProgramFile - forever $ inTransferSlot $ - maybe (return Nothing) (uncurry $ startTransfer program) + forever $ inTransferSlot program $ + maybe (return Nothing) (uncurry $ genTransfer) =<< getNextTransfer notrunning where {- Skip transfers that are already running. -} @@ -40,12 +38,8 @@ transfererThread = namedThread "Transferrer" $ do {- By the time this is called, the daemonstatus's currentTransfers map should - already have been updated to include the transfer. -} -startTransfer - :: FilePath - -> Transfer - -> TransferInfo - -> Assistant (Maybe (Transfer, TransferInfo, Assistant ())) -startTransfer program t info = case (transferRemote info, associatedFile info) of +genTransfer :: Transfer -> TransferInfo -> TransferGenerator +genTransfer t info = case (transferRemote info, associatedFile info) of (Just remote, Just file) | Git.repoIsLocalUnknown (Remote.repo remote) -> do -- optimisation for removable drives not plugged in @@ -56,7 +50,7 @@ startTransfer program t info = case (transferRemote info, associatedFile info) o ( do debug [ "Transferring:" , describeTransfer t info ] notifyTransfer - return $ Just (t, info, transferprocess remote file) + return $ Just (t, info, go remote file) , do debug [ "Skipping unnecessary transfer:", describeTransfer t info ] @@ -69,57 +63,40 @@ startTransfer program t info = case (transferRemote info, associatedFile info) o direction = transferDirection t isdownload = direction == Download - transferprocess remote file = void $ do - (_, _, _, pid) - <- liftIO $ createProcess - (proc program $ toCommand params) - { create_group = True } - {- Alerts are only shown for successful transfers. - - Transfers can temporarily fail for many reasons, - - so there's no point in bothering the user about - - those. The assistant should recover. - - - - After a successful upload, handle dropping it from - - here, if desired. In this case, the remote it was - - uploaded to is known to have it. - - - - Also, after a successful transfer, the location - - log has changed. Indicate that a commit has been - - made, in order to queue a push of the git-annex - - branch out to remotes that did not participate - - in the transfer. - - - - If the process failed, it could have crashed, - - so remove the transfer from the list of current - - transfers, just in case it didn't stop - - in a way that lets the TransferWatcher do its - - usual cleanup. - -} - ifM (liftIO $ (==) ExitSuccess <$> waitForProcess pid) - ( do - void $ addAlert $ makeAlertFiller True $ - transferFileAlert direction True file - unless isdownload $ - handleDrops - ("object uploaded to " ++ show remote) - True (transferKey t) - (associatedFile info) - (Just remote) - recordCommit - , void $ removeTransfer t - ) - where - params = - [ Param "transferkey" - , Param "--quiet" - , Param $ key2file $ transferKey t - , Param $ if isdownload - then "--from" - else "--to" - , Param $ Remote.name remote - , Param "--file" - , File file - ] + {- Alerts are only shown for successful transfers. + - Transfers can temporarily fail for many reasons, + - so there's no point in bothering the user about + - those. The assistant should recover. + - + - After a successful upload, handle dropping it from + - here, if desired. In this case, the remote it was + - uploaded to is known to have it. + - + - Also, after a successful transfer, the location + - log has changed. Indicate that a commit has been + - made, in order to queue a push of the git-annex + - branch out to remotes that did not participate + - in the transfer. + - + - If the process failed, it could have crashed, + - so remove the transfer from the list of current + - transfers, just in case it didn't stop + - in a way that lets the TransferWatcher do its + - usual cleanup. + -} + go remote file transferrer = ifM (liftIO $ performTransfer transferrer t $ associatedFile info) + ( do + void $ addAlert $ makeAlertFiller True $ + transferFileAlert direction True file + unless isdownload $ + handleDrops + ("object uploaded to " ++ show remote) + True (transferKey t) + (associatedFile info) + (Just remote) + void $ recordCommit + , void $ removeTransfer t + ) {- Called right before a transfer begins, this is a last chance to avoid - unnecessary transfers. diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs index 7c9f74702..81a778a0a 100644 --- a/Assistant/TransferSlots.hs +++ b/Assistant/TransferSlots.hs @@ -11,28 +11,30 @@ import Assistant.Common import Utility.ThreadScheduler import Assistant.Types.TransferSlots import Assistant.DaemonStatus +import Assistant.TransferrerPool +import Assistant.Types.TransferrerPool import Logs.Transfer import qualified Control.Exception as E import Control.Concurrent import qualified Control.Concurrent.MSemN as MSemN -type TransferGenerator = Assistant (Maybe (Transfer, TransferInfo, Assistant ())) +type TransferGenerator = Assistant (Maybe (Transfer, TransferInfo, Transferrer -> Assistant ())) {- Waits until a transfer slot becomes available, then runs a - TransferGenerator, and then runs the transfer action in its own thread. -} -inTransferSlot :: TransferGenerator -> Assistant () -inTransferSlot gen = do +inTransferSlot :: FilePath -> TransferGenerator -> Assistant () +inTransferSlot program gen = do flip MSemN.wait 1 <<~ transferSlots - runTransferThread =<< gen + runTransferThread program =<< gen {- Runs a TransferGenerator, and its transfer action, - without waiting for a slot to become available. -} -inImmediateTransferSlot :: TransferGenerator -> Assistant () -inImmediateTransferSlot gen = do +inImmediateTransferSlot :: FilePath -> TransferGenerator -> Assistant () +inImmediateTransferSlot program gen = do flip MSemN.signal (-1) <<~ transferSlots - runTransferThread =<< gen + runTransferThread program =<< gen {- Runs a transfer action, in an already allocated transfer slot. - Once it finishes, frees the transfer slot. @@ -44,19 +46,22 @@ inImmediateTransferSlot gen = do - then pausing the thread until a ResumeTransfer exception is raised, - then rerunning the action. -} -runTransferThread :: Maybe (Transfer, TransferInfo, Assistant ()) -> Assistant () -runTransferThread Nothing = flip MSemN.signal 1 <<~ transferSlots -runTransferThread (Just (t, info, a)) = do +runTransferThread :: FilePath -> Maybe (Transfer, TransferInfo, Transferrer -> Assistant ()) -> Assistant () +runTransferThread _ Nothing = flip MSemN.signal 1 <<~ transferSlots +runTransferThread program (Just (t, info, a)) = do d <- getAssistant id - aio <- asIO a - tid <- liftIO $ forkIO $ runTransferThread' d aio + aio <- asIO1 a + tid <- liftIO $ forkIO $ runTransferThread' program d aio updateTransferInfo t $ info { transferTid = Just tid } -runTransferThread' :: AssistantData -> IO () -> IO () -runTransferThread' d a = go +runTransferThread' :: FilePath -> AssistantData -> (Transferrer -> IO ()) -> IO () +runTransferThread' program d run = go where - go = catchPauseResume a - pause = catchPauseResume $ runEvery (Seconds 86400) noop + go = catchPauseResume $ + withTransferrer program (transferrerPool d) + run + pause = catchPauseResume $ + runEvery (Seconds 86400) noop {- Note: This must use E.try, rather than E.catch. - When E.catch is used, and has called go in its exception - handler, Control.Concurrent.throwTo will block sometimes diff --git a/Assistant/TransferrerPool.hs b/Assistant/TransferrerPool.hs new file mode 100644 index 000000000..69af93773 --- /dev/null +++ b/Assistant/TransferrerPool.hs @@ -0,0 +1,81 @@ +{- A pool of "git-annex transferkeys" processes + - + - Copyright 2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.TransferrerPool where + +import Assistant.Common +import Assistant.Types.TransferrerPool +import Logs.Transfer +import qualified Command.TransferKeys as T + +import Control.Concurrent.STM +import System.Process (create_group) +import Control.Exception (throw) +import Control.Concurrent +import Types.Remote (AssociatedFile) + +{- Runs an action with a Transferrer from the pool. -} +withTransferrer :: FilePath -> TransferrerPool -> (Transferrer -> IO a) -> IO a +withTransferrer program pool a = do + t <- maybe (mkTransferrer program) (checkTransferrer program) + =<< atomically (tryReadTChan pool) + v <- tryNonAsync $ a t + unlessM (putback t) $ + void $ forkIO $ stopTransferrer t + either throw return v + where + putback t = atomically $ ifM (isEmptyTChan pool) + ( do + writeTChan pool t + return True + , return False + ) + +{- Requests that a Transferrer perform a Transfer, and waits for it to + - finish. -} +performTransfer :: Transferrer -> Transfer -> AssociatedFile -> IO Bool +performTransfer transferrer t f = catchBoolIO $ do + T.sendRequest t f (transferrerWrite transferrer) + T.readResponse (transferrerRead transferrer) + +{- Starts a new git-annex transferkeys process, setting up a pipe + - that will be used to communicate with it. -} +mkTransferrer :: FilePath -> IO Transferrer +mkTransferrer program = do + (myread, twrite) <- createPipe + (tread, mywrite) <- createPipe + mapM_ (\fd -> setFdOption fd CloseOnExec True) [myread, mywrite] + let params = + [ Param "transferkeys" + , Param "--readfd", Param $ show tread + , Param "--writefd", Param $ show twrite + ] + {- It's put into its own group so that the whole group can be + - killed to stop a transfer. -} + (_, _, _, pid) <- createProcess (proc program $ toCommand params) + { create_group = True } + closeFd twrite + closeFd tread + myreadh <- fdToHandle myread + mywriteh <- fdToHandle mywrite + return $ Transferrer + { transferrerRead = myreadh + , transferrerWrite = mywriteh + , transferrerHandle = pid + } + +{- Checks if a Transferrer is still running. If not, makes a new one. -} +checkTransferrer :: FilePath -> Transferrer -> IO Transferrer +checkTransferrer program t = maybe (return t) (const $ mkTransferrer program) + =<< getProcessExitCode (transferrerHandle t) + +{- Closing the fds will stop the transferrer. -} +stopTransferrer :: Transferrer -> IO () +stopTransferrer t = do + hClose $ transferrerRead t + hClose $ transferrerWrite t + void $ waitForProcess $ transferrerHandle t diff --git a/Assistant/Types/TransferrerPool.hs b/Assistant/Types/TransferrerPool.hs new file mode 100644 index 000000000..2727a6919 --- /dev/null +++ b/Assistant/Types/TransferrerPool.hs @@ -0,0 +1,23 @@ +{- A pool of "git-annex transferkeys" processes + - + - Copyright 2013 Joey Hess <joey@kitenet.net> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Types.TransferrerPool where + +import Common.Annex + +import Control.Concurrent.STM + +type TransferrerPool = TChan Transferrer + +data Transferrer = Transferrer + { transferrerRead :: Handle + , transferrerWrite :: Handle + , transferrerHandle :: ProcessHandle + } + +newTransferrerPool :: IO TransferrerPool +newTransferrerPool = newTChanIO |