summaryrefslogtreecommitdiff
path: root/Assistant
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2013-03-19 18:46:29 -0400
committerGravatar Joey Hess <joey@kitenet.net>2013-03-19 18:46:29 -0400
commit4887b1b3afd8acf58602d622499664ffb777a8b1 (patch)
tree16cd101c9a61320f5fbf1ca72f1a51f602f70824 /Assistant
parent28336756f9e97173ce922d02c6eeed4e01d07e57 (diff)
maintain pools of running transferkeys processes (untested)
Diffstat (limited to 'Assistant')
-rw-r--r--Assistant/Monad.hs3
-rw-r--r--Assistant/Threads/Transferrer.hs103
-rw-r--r--Assistant/TransferSlots.hs37
-rw-r--r--Assistant/TransferrerPool.hs81
-rw-r--r--Assistant/Types/TransferrerPool.hs23
5 files changed, 168 insertions, 79 deletions
diff --git a/Assistant/Monad.hs b/Assistant/Monad.hs
index e046c9666..ced98b395 100644
--- a/Assistant/Monad.hs
+++ b/Assistant/Monad.hs
@@ -35,6 +35,7 @@ import Assistant.Types.DaemonStatus
import Assistant.Types.ScanRemotes
import Assistant.Types.TransferQueue
import Assistant.Types.TransferSlots
+import Assistant.Types.TransferrerPool
import Assistant.Types.Pushes
import Assistant.Types.BranchChange
import Assistant.Types.Commits
@@ -62,6 +63,7 @@ data AssistantData = AssistantData
, scanRemoteMap :: ScanRemoteMap
, transferQueue :: TransferQueue
, transferSlots :: TransferSlots
+ , transferrerPool :: TransferrerPool
, failedPushMap :: FailedPushMap
, commitChan :: CommitChan
, changeChan :: ChangeChan
@@ -78,6 +80,7 @@ newAssistantData st dstatus = AssistantData
<*> newScanRemoteMap
<*> newTransferQueue
<*> newTransferSlots
+ <*> newTransferrerPool
<*> newFailedPushMap
<*> newCommitChan
<*> newChangeChan
diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs
index ccca4ca5e..3dcbb40cd 100644
--- a/Assistant/Threads/Transferrer.hs
+++ b/Assistant/Threads/Transferrer.hs
@@ -14,25 +14,23 @@ import Assistant.TransferSlots
import Assistant.Alert
import Assistant.Commits
import Assistant.Drop
+import Assistant.TransferrerPool
import Logs.Transfer
import Logs.Location
import Annex.Content
import qualified Remote
import qualified Types.Remote as Remote
import qualified Git
-import Types.Key
import Locations.UserConfig
import Assistant.Threads.TransferWatcher
import Annex.Wanted
-import System.Process (create_group)
-
{- Dispatches transfers from the queue. -}
transfererThread :: NamedThread
transfererThread = namedThread "Transferrer" $ do
program <- liftIO readProgramFile
- forever $ inTransferSlot $
- maybe (return Nothing) (uncurry $ startTransfer program)
+ forever $ inTransferSlot program $
+ maybe (return Nothing) (uncurry $ genTransfer)
=<< getNextTransfer notrunning
where
{- Skip transfers that are already running. -}
@@ -40,12 +38,8 @@ transfererThread = namedThread "Transferrer" $ do
{- By the time this is called, the daemonstatus's currentTransfers map should
- already have been updated to include the transfer. -}
-startTransfer
- :: FilePath
- -> Transfer
- -> TransferInfo
- -> Assistant (Maybe (Transfer, TransferInfo, Assistant ()))
-startTransfer program t info = case (transferRemote info, associatedFile info) of
+genTransfer :: Transfer -> TransferInfo -> TransferGenerator
+genTransfer t info = case (transferRemote info, associatedFile info) of
(Just remote, Just file)
| Git.repoIsLocalUnknown (Remote.repo remote) -> do
-- optimisation for removable drives not plugged in
@@ -56,7 +50,7 @@ startTransfer program t info = case (transferRemote info, associatedFile info) o
( do
debug [ "Transferring:" , describeTransfer t info ]
notifyTransfer
- return $ Just (t, info, transferprocess remote file)
+ return $ Just (t, info, go remote file)
, do
debug [ "Skipping unnecessary transfer:",
describeTransfer t info ]
@@ -69,57 +63,40 @@ startTransfer program t info = case (transferRemote info, associatedFile info) o
direction = transferDirection t
isdownload = direction == Download
- transferprocess remote file = void $ do
- (_, _, _, pid)
- <- liftIO $ createProcess
- (proc program $ toCommand params)
- { create_group = True }
- {- Alerts are only shown for successful transfers.
- - Transfers can temporarily fail for many reasons,
- - so there's no point in bothering the user about
- - those. The assistant should recover.
- -
- - After a successful upload, handle dropping it from
- - here, if desired. In this case, the remote it was
- - uploaded to is known to have it.
- -
- - Also, after a successful transfer, the location
- - log has changed. Indicate that a commit has been
- - made, in order to queue a push of the git-annex
- - branch out to remotes that did not participate
- - in the transfer.
- -
- - If the process failed, it could have crashed,
- - so remove the transfer from the list of current
- - transfers, just in case it didn't stop
- - in a way that lets the TransferWatcher do its
- - usual cleanup.
- -}
- ifM (liftIO $ (==) ExitSuccess <$> waitForProcess pid)
- ( do
- void $ addAlert $ makeAlertFiller True $
- transferFileAlert direction True file
- unless isdownload $
- handleDrops
- ("object uploaded to " ++ show remote)
- True (transferKey t)
- (associatedFile info)
- (Just remote)
- recordCommit
- , void $ removeTransfer t
- )
- where
- params =
- [ Param "transferkey"
- , Param "--quiet"
- , Param $ key2file $ transferKey t
- , Param $ if isdownload
- then "--from"
- else "--to"
- , Param $ Remote.name remote
- , Param "--file"
- , File file
- ]
+ {- Alerts are only shown for successful transfers.
+ - Transfers can temporarily fail for many reasons,
+ - so there's no point in bothering the user about
+ - those. The assistant should recover.
+ -
+ - After a successful upload, handle dropping it from
+ - here, if desired. In this case, the remote it was
+ - uploaded to is known to have it.
+ -
+ - Also, after a successful transfer, the location
+ - log has changed. Indicate that a commit has been
+ - made, in order to queue a push of the git-annex
+ - branch out to remotes that did not participate
+ - in the transfer.
+ -
+ - If the process failed, it could have crashed,
+ - so remove the transfer from the list of current
+ - transfers, just in case it didn't stop
+ - in a way that lets the TransferWatcher do its
+ - usual cleanup.
+ -}
+ go remote file transferrer = ifM (liftIO $ performTransfer transferrer t $ associatedFile info)
+ ( do
+ void $ addAlert $ makeAlertFiller True $
+ transferFileAlert direction True file
+ unless isdownload $
+ handleDrops
+ ("object uploaded to " ++ show remote)
+ True (transferKey t)
+ (associatedFile info)
+ (Just remote)
+ void $ recordCommit
+ , void $ removeTransfer t
+ )
{- Called right before a transfer begins, this is a last chance to avoid
- unnecessary transfers.
diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs
index 7c9f74702..81a778a0a 100644
--- a/Assistant/TransferSlots.hs
+++ b/Assistant/TransferSlots.hs
@@ -11,28 +11,30 @@ import Assistant.Common
import Utility.ThreadScheduler
import Assistant.Types.TransferSlots
import Assistant.DaemonStatus
+import Assistant.TransferrerPool
+import Assistant.Types.TransferrerPool
import Logs.Transfer
import qualified Control.Exception as E
import Control.Concurrent
import qualified Control.Concurrent.MSemN as MSemN
-type TransferGenerator = Assistant (Maybe (Transfer, TransferInfo, Assistant ()))
+type TransferGenerator = Assistant (Maybe (Transfer, TransferInfo, Transferrer -> Assistant ()))
{- Waits until a transfer slot becomes available, then runs a
- TransferGenerator, and then runs the transfer action in its own thread.
-}
-inTransferSlot :: TransferGenerator -> Assistant ()
-inTransferSlot gen = do
+inTransferSlot :: FilePath -> TransferGenerator -> Assistant ()
+inTransferSlot program gen = do
flip MSemN.wait 1 <<~ transferSlots
- runTransferThread =<< gen
+ runTransferThread program =<< gen
{- Runs a TransferGenerator, and its transfer action,
- without waiting for a slot to become available. -}
-inImmediateTransferSlot :: TransferGenerator -> Assistant ()
-inImmediateTransferSlot gen = do
+inImmediateTransferSlot :: FilePath -> TransferGenerator -> Assistant ()
+inImmediateTransferSlot program gen = do
flip MSemN.signal (-1) <<~ transferSlots
- runTransferThread =<< gen
+ runTransferThread program =<< gen
{- Runs a transfer action, in an already allocated transfer slot.
- Once it finishes, frees the transfer slot.
@@ -44,19 +46,22 @@ inImmediateTransferSlot gen = do
- then pausing the thread until a ResumeTransfer exception is raised,
- then rerunning the action.
-}
-runTransferThread :: Maybe (Transfer, TransferInfo, Assistant ()) -> Assistant ()
-runTransferThread Nothing = flip MSemN.signal 1 <<~ transferSlots
-runTransferThread (Just (t, info, a)) = do
+runTransferThread :: FilePath -> Maybe (Transfer, TransferInfo, Transferrer -> Assistant ()) -> Assistant ()
+runTransferThread _ Nothing = flip MSemN.signal 1 <<~ transferSlots
+runTransferThread program (Just (t, info, a)) = do
d <- getAssistant id
- aio <- asIO a
- tid <- liftIO $ forkIO $ runTransferThread' d aio
+ aio <- asIO1 a
+ tid <- liftIO $ forkIO $ runTransferThread' program d aio
updateTransferInfo t $ info { transferTid = Just tid }
-runTransferThread' :: AssistantData -> IO () -> IO ()
-runTransferThread' d a = go
+runTransferThread' :: FilePath -> AssistantData -> (Transferrer -> IO ()) -> IO ()
+runTransferThread' program d run = go
where
- go = catchPauseResume a
- pause = catchPauseResume $ runEvery (Seconds 86400) noop
+ go = catchPauseResume $
+ withTransferrer program (transferrerPool d)
+ run
+ pause = catchPauseResume $
+ runEvery (Seconds 86400) noop
{- Note: This must use E.try, rather than E.catch.
- When E.catch is used, and has called go in its exception
- handler, Control.Concurrent.throwTo will block sometimes
diff --git a/Assistant/TransferrerPool.hs b/Assistant/TransferrerPool.hs
new file mode 100644
index 000000000..69af93773
--- /dev/null
+++ b/Assistant/TransferrerPool.hs
@@ -0,0 +1,81 @@
+{- A pool of "git-annex transferkeys" processes
+ -
+ - Copyright 2013 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.TransferrerPool where
+
+import Assistant.Common
+import Assistant.Types.TransferrerPool
+import Logs.Transfer
+import qualified Command.TransferKeys as T
+
+import Control.Concurrent.STM
+import System.Process (create_group)
+import Control.Exception (throw)
+import Control.Concurrent
+import Types.Remote (AssociatedFile)
+
+{- Runs an action with a Transferrer from the pool. -}
+withTransferrer :: FilePath -> TransferrerPool -> (Transferrer -> IO a) -> IO a
+withTransferrer program pool a = do
+ t <- maybe (mkTransferrer program) (checkTransferrer program)
+ =<< atomically (tryReadTChan pool)
+ v <- tryNonAsync $ a t
+ unlessM (putback t) $
+ void $ forkIO $ stopTransferrer t
+ either throw return v
+ where
+ putback t = atomically $ ifM (isEmptyTChan pool)
+ ( do
+ writeTChan pool t
+ return True
+ , return False
+ )
+
+{- Requests that a Transferrer perform a Transfer, and waits for it to
+ - finish. -}
+performTransfer :: Transferrer -> Transfer -> AssociatedFile -> IO Bool
+performTransfer transferrer t f = catchBoolIO $ do
+ T.sendRequest t f (transferrerWrite transferrer)
+ T.readResponse (transferrerRead transferrer)
+
+{- Starts a new git-annex transferkeys process, setting up a pipe
+ - that will be used to communicate with it. -}
+mkTransferrer :: FilePath -> IO Transferrer
+mkTransferrer program = do
+ (myread, twrite) <- createPipe
+ (tread, mywrite) <- createPipe
+ mapM_ (\fd -> setFdOption fd CloseOnExec True) [myread, mywrite]
+ let params =
+ [ Param "transferkeys"
+ , Param "--readfd", Param $ show tread
+ , Param "--writefd", Param $ show twrite
+ ]
+ {- It's put into its own group so that the whole group can be
+ - killed to stop a transfer. -}
+ (_, _, _, pid) <- createProcess (proc program $ toCommand params)
+ { create_group = True }
+ closeFd twrite
+ closeFd tread
+ myreadh <- fdToHandle myread
+ mywriteh <- fdToHandle mywrite
+ return $ Transferrer
+ { transferrerRead = myreadh
+ , transferrerWrite = mywriteh
+ , transferrerHandle = pid
+ }
+
+{- Checks if a Transferrer is still running. If not, makes a new one. -}
+checkTransferrer :: FilePath -> Transferrer -> IO Transferrer
+checkTransferrer program t = maybe (return t) (const $ mkTransferrer program)
+ =<< getProcessExitCode (transferrerHandle t)
+
+{- Closing the fds will stop the transferrer. -}
+stopTransferrer :: Transferrer -> IO ()
+stopTransferrer t = do
+ hClose $ transferrerRead t
+ hClose $ transferrerWrite t
+ void $ waitForProcess $ transferrerHandle t
diff --git a/Assistant/Types/TransferrerPool.hs b/Assistant/Types/TransferrerPool.hs
new file mode 100644
index 000000000..2727a6919
--- /dev/null
+++ b/Assistant/Types/TransferrerPool.hs
@@ -0,0 +1,23 @@
+{- A pool of "git-annex transferkeys" processes
+ -
+ - Copyright 2013 Joey Hess <joey@kitenet.net>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module Assistant.Types.TransferrerPool where
+
+import Common.Annex
+
+import Control.Concurrent.STM
+
+type TransferrerPool = TChan Transferrer
+
+data Transferrer = Transferrer
+ { transferrerRead :: Handle
+ , transferrerWrite :: Handle
+ , transferrerHandle :: ProcessHandle
+ }
+
+newTransferrerPool :: IO TransferrerPool
+newTransferrerPool = newTChanIO