From f42a64198f18708623fc7a6f7e53cfb3b9e5642b Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Fri, 30 Sep 2016 14:29:02 -0400 Subject: allow multiple concurrent external special remote processes Multiple external special remote processes for the same remote will be started as needed when using -J. This should not beak any existing external special remotes, because running multiple git-annex commands at the same time could already start multiple processes for the same external special remotes. --- CHANGELOG | 5 + Remote/External.hs | 198 +++++++++++---------- Remote/External/Types.hs | 28 +-- ...ecial_remote_not_used_concurrently_with_-J.mdwn | 2 + 4 files changed, 115 insertions(+), 118 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 888e413e9..6f2627c98 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -9,6 +9,11 @@ git-annex (6.20160924) UNRELEASED; urgency=medium * Add "total-size" field to --json-progress output. * Make --json-progress output be shown even when the size of a object is not known. + * Multiple external special remote processes for the same remote will be + started as needed when using -J. This should not beak any existing + external special remotes, because running multiple git-annex commands + at the same time could already start multiple processes for the same + external special remotes. -- Joey Hess Mon, 26 Sep 2016 16:46:19 -0400 diff --git a/Remote/External.hs b/Remote/External.hs index e7365991b..fb87f4473 100644 --- a/Remote/External.hs +++ b/Remote/External.hs @@ -1,6 +1,6 @@ {- External special remote interface. - - - Copyright 2013-2015 Joey Hess + - Copyright 2013-2016 Joey Hess - - Licensed under the GNU GPL version 3 or higher. -} @@ -126,9 +126,8 @@ externalSetup mu _ c gc = do INITREMOTE_SUCCESS -> Just noop INITREMOTE_FAILURE errmsg -> Just $ error errmsg _ -> Nothing - withExternalLock external $ \lck -> - fromExternal lck external externalConfig $ - liftIO . atomically . readTMVar + withExternalState external $ + liftIO . atomically . readTMVar . externalConfig gitConfigSpecialRemote u c'' "externaltype" externaltype return (c'', u) @@ -203,27 +202,28 @@ safely a = go =<< tryNonAsync a - While the external remote is processing the Request, it may send - any number of RemoteRequests, that are handled here. - - - Only one request can be made at a time, so locking is used. + - An external remote process can only handle one request at a time. + - Concurrent requests will start up additional processes. - - May throw exceptions, for example on protocol errors, or - when the repository cannot be used. -} handleRequest :: External -> Request -> Maybe MeterUpdate -> (Response -> Maybe (Annex a)) -> Annex a handleRequest external req mp responsehandler = - withExternalLock external $ \lck -> - handleRequest' lck external req mp responsehandler + withExternalState external $ \st -> + handleRequest' st external req mp responsehandler -handleRequest' :: ExternalLock -> External -> Request -> Maybe MeterUpdate -> (Response -> Maybe (Annex a)) -> Annex a -handleRequest' lck external req mp responsehandler +handleRequest' :: ExternalState -> External -> Request -> Maybe MeterUpdate -> (Response -> Maybe (Annex a)) -> Annex a +handleRequest' st external req mp responsehandler | needsPREPARE req = do - checkPrepared lck external + checkPrepared st external go | otherwise = go where go = do - sendMessage lck external req + sendMessage st external req loop - loop = receiveMessage lck external responsehandler + loop = receiveMessage st external responsehandler (\rreq -> Just $ handleRemoteRequest rreq >> loop) (\msg -> Just $ handleAsyncMessage msg >> loop) @@ -234,26 +234,24 @@ handleRequest' lck external req mp responsehandler handleRemoteRequest (DIRHASH_LOWER k) = send $ VALUE $ hashDirLower def k handleRemoteRequest (SETCONFIG setting value) = - fromExternal lck external externalConfig $ \v -> - liftIO $ atomically $ do - m <- takeTMVar v - putTMVar v $ M.insert setting value m + liftIO $ atomically $ do + let v = externalConfig st + m <- takeTMVar v + putTMVar v $ M.insert setting value m handleRemoteRequest (GETCONFIG setting) = do - value <- fromExternal lck external externalConfig $ \v -> - fromMaybe "" . M.lookup setting - <$> liftIO (atomically $ readTMVar v) + value <- fromMaybe "" . M.lookup setting + <$> liftIO (atomically $ readTMVar $ externalConfig st) send $ VALUE value handleRemoteRequest (SETCREDS setting login password) = do - fromExternal lck external externalConfig $ \v -> do - c <- liftIO $ atomically $ readTMVar v - let gc = externalGitConfig external - c' <- setRemoteCredPair encryptionAlreadySetup c gc - (credstorage setting) - (Just (login, password)) - void $ liftIO $ atomically $ swapTMVar v c' + let v = externalConfig st + c <- liftIO $ atomically $ readTMVar v + let gc = externalGitConfig external + c' <- setRemoteCredPair encryptionAlreadySetup c gc + (credstorage setting) + (Just (login, password)) + void $ liftIO $ atomically $ swapTMVar v c' handleRemoteRequest (GETCREDS setting) = do - c <- fromExternal lck external externalConfig $ - liftIO . atomically . readTMVar + c <- liftIO $ atomically $ readTMVar $ externalConfig st let gc = externalGitConfig external creds <- fromMaybe ("", "") <$> getRemoteCredPair c gc (credstorage setting) @@ -286,11 +284,11 @@ handleRequest' lck external req mp responsehandler send (VALUE "") -- end of list handleRemoteRequest (DEBUG msg) = liftIO $ debugM "external" msg handleRemoteRequest (VERSION _) = - sendMessage lck external $ ERROR "too late to send VERSION" + sendMessage st external (ERROR "too late to send VERSION") handleAsyncMessage (ERROR err) = error $ "external special remote error: " ++ err - send = sendMessage lck external + send = sendMessage st external credstorage setting = CredPairStorage { credPairFile = base @@ -303,30 +301,28 @@ handleRequest' lck external req mp responsehandler withurl mk uri = handleRemoteRequest $ mk $ setDownloader (show uri) OtherDownloader -sendMessage :: Sendable m => ExternalLock -> External -> m -> Annex () -sendMessage lck external m = - fromExternal lck external externalSend $ \h -> - liftIO $ do - protocolDebug external True line - hPutStrLn h line - hFlush h +sendMessage :: Sendable m => ExternalState -> External -> m -> Annex () +sendMessage st external m = liftIO $ do + protocolDebug external True line + hPutStrLn h line + hFlush h where line = unwords $ formatMessage m + h = externalSend st {- Waits for a message from the external remote, and passes it to the - apppropriate handler. - - If the handler returns Nothing, this is a protocol error.-} receiveMessage - :: ExternalLock + :: ExternalState -> External -> (Response -> Maybe (Annex a)) -> (RemoteRequest -> Maybe (Annex a)) -> (AsyncMessage -> Maybe (Annex a)) -> Annex a -receiveMessage lck external handleresponse handlerequest handleasync = - go =<< fromExternal lck external externalReceive - (liftIO . catchMaybeIO . hGetLine) +receiveMessage st external handleresponse handlerequest handleasync = + go =<< liftIO (catchMaybeIO $ hGetLine $ externalReceive st) where go Nothing = protocolError False "" go (Just s) = do @@ -348,39 +344,43 @@ protocolDebug external sendto line = debugM "external" $ unwords , line ] -{- Starts up the external remote if it's not yet running, - - and passes a value extracted from its state to an action. - -} -fromExternal :: ExternalLock -> External -> (ExternalState -> v) -> (v -> Annex a) -> Annex a -fromExternal lck external extractor a = - go =<< liftIO (atomically (tryReadTMVar v)) +{- While the action is running, the ExternalState provided to it will not + - be available to any other calls. + - + - Starts up a new process if no ExternalStates are available. -} +withExternalState :: External -> (ExternalState -> Annex a) -> Annex a +withExternalState external = bracket alloc dealloc where - go (Just st) = run st - go Nothing = do - st <- startExternal external - void $ liftIO $ atomically $ do - void $ tryReadTMVar v - putTMVar v st - - {- Handle initial protocol startup; check the VERSION - - the remote sends. -} - receiveMessage lck external - (const Nothing) - (checkVersion lck external) - (const Nothing) - - run st - - run st = a $ extractor st v = externalState external -{- Starts an external remote process running, but does not handle checking - - VERSION, etc. -} + alloc = do + ms <- liftIO $ atomically $ do + l <- takeTMVar v + case l of + [] -> do + putTMVar v l + return Nothing + (st:rest) -> do + putTMVar v rest + return (Just st) + maybe (startExternal external) return ms + + dealloc st = liftIO $ atomically $ do + l <- takeTMVar v + putTMVar v (st:l) + +{- Starts an external remote process running, and checks VERSION. -} startExternal :: External -> Annex ExternalState startExternal external = do errrelayer <- mkStderrRelayer - g <- Annex.gitRepo - liftIO $ do + st <- start errrelayer =<< Annex.gitRepo + receiveMessage st external + (const Nothing) + (checkVersion st external) + (const Nothing) + return st + where + start errrelayer g = liftIO $ do (cmd, ps) <- findShellCommand basecmd let basep = (proc cmd (toCommand ps)) { std_in = CreatePipe @@ -395,17 +395,18 @@ startExternal external = do fileEncoding herr stderrelay <- async $ errrelayer herr checkearlytermination =<< getProcessExitCode pid - cv <- atomically $ newTMVar $ externalDefaultConfig external + cv <- newTMVarIO $ externalDefaultConfig external + pv <- newTMVarIO Unprepared return $ ExternalState { externalSend = hin , externalReceive = hout , externalShutdown = do cancel stderrelay void $ waitForProcess pid - , externalPrepared = Unprepared + , externalPrepared = pv , externalConfig = cv } - where + basecmd = externalRemoteProgram $ externalType external propgit g p = do @@ -422,12 +423,17 @@ startExternal external = do error $ basecmd ++ " is not installed in PATH (" ++ path ++ ")" ) +-- Note: Does not stop any externals that have a withExternalState +-- action currently running. stopExternal :: External -> Annex () -stopExternal external = liftIO $ stop =<< atomically (tryReadTMVar v) +stopExternal external = liftIO $ do + l <- atomically $ do + l <- takeTMVar v + putTMVar v [] + return l + mapM_ stop l where - stop Nothing = noop - stop (Just st) = do - void $ atomically $ tryTakeTMVar v + stop st = do hClose $ externalSend st hClose $ externalReceive st externalShutdown st @@ -436,37 +442,35 @@ stopExternal external = liftIO $ stop =<< atomically (tryReadTMVar v) externalRemoteProgram :: ExternalType -> String externalRemoteProgram externaltype = "git-annex-remote-" ++ externaltype -checkVersion :: ExternalLock -> External -> RemoteRequest -> Maybe (Annex ()) -checkVersion lck external (VERSION v) = Just $ +checkVersion :: ExternalState -> External -> RemoteRequest -> Maybe (Annex ()) +checkVersion st external (VERSION v) = Just $ if v `elem` supportedProtocolVersions then noop - else sendMessage lck external (ERROR "unsupported VERSION") + else sendMessage st external (ERROR "unsupported VERSION") checkVersion _ _ _ = Nothing {- If repo has not been prepared, sends PREPARE. - - If the repo fails to prepare, or failed before, throws an exception with - the error message. -} -checkPrepared :: ExternalLock -> External -> Annex () -checkPrepared lck external = - fromExternal lck external externalPrepared $ \prepared -> - case prepared of - Prepared -> noop - FailedPrepare errmsg -> error errmsg - Unprepared -> - handleRequest' lck external PREPARE Nothing $ \resp -> - case resp of - PREPARE_SUCCESS -> Just $ - setprepared Prepared - PREPARE_FAILURE errmsg -> Just $ do - setprepared $ FailedPrepare errmsg - error errmsg - _ -> Nothing +checkPrepared :: ExternalState -> External -> Annex () +checkPrepared st external = do + v <- liftIO $ atomically $ readTMVar $ externalPrepared st + case v of + Prepared -> noop + FailedPrepare errmsg -> error errmsg + Unprepared -> + handleRequest' st external PREPARE Nothing $ \resp -> + case resp of + PREPARE_SUCCESS -> Just $ + setprepared Prepared + PREPARE_FAILURE errmsg -> Just $ do + setprepared $ FailedPrepare errmsg + error errmsg + _ -> Nothing where - setprepared status = liftIO . atomically $ do - let v = externalState external - st <- takeTMVar v - void $ putTMVar v $ st { externalPrepared = status } + setprepared status = liftIO $ atomically $ void $ + swapTMVar (externalPrepared st) status {- Caches the cost in the git config to avoid needing to start up an - external special remote every time time just to ask it what its diff --git a/Remote/External/Types.hs b/Remote/External/Types.hs index 8098826b2..98d391df8 100644 --- a/Remote/External/Types.hs +++ b/Remote/External/Types.hs @@ -12,8 +12,6 @@ module Remote.External.Types ( External(..), newExternal, ExternalType, - ExternalLock, - withExternalLock, ExternalState(..), PrepareStatus(..), Proto.parseMessage, @@ -44,14 +42,12 @@ import qualified Utility.SimpleProtocol as Proto import Control.Concurrent.STM import Network.URI --- If the remote is not yet running, the ExternalState TMVar is empty. data External = External { externalType :: ExternalType , externalUUID :: UUID - -- Empty until the remote is running. - , externalState :: TMVar ExternalState - -- Empty when a remote is in use. - , externalLock :: TMVar ExternalLock + , externalState :: TMVar [ExternalState] + -- ^ TMVar is never left empty; list contains states for external + -- special remote processes that are not currently in use. , externalDefaultConfig :: RemoteConfig , externalGitConfig :: RemoteGitConfig } @@ -60,8 +56,7 @@ newExternal :: ExternalType -> UUID -> RemoteConfig -> RemoteGitConfig -> Annex newExternal externaltype u c gc = liftIO $ External <$> pure externaltype <*> pure u - <*> atomically newEmptyTMVar - <*> atomically (newTMVar ExternalLock) + <*> atomically (newTMVar []) <*> pure c <*> pure gc @@ -71,23 +66,14 @@ data ExternalState = ExternalState { externalSend :: Handle , externalReceive :: Handle , externalShutdown :: IO () - , externalPrepared :: PrepareStatus - -- Never left empty. + , externalPrepared :: TMVar PrepareStatus + -- ^ Never left empty. , externalConfig :: TMVar RemoteConfig + -- ^ Never left empty. } data PrepareStatus = Unprepared | Prepared | FailedPrepare ErrorMsg --- Constructor is not exported, and only created by newExternal. -data ExternalLock = ExternalLock - -withExternalLock :: External -> (ExternalLock -> Annex a) -> Annex a -withExternalLock external = bracketIO setup cleanup - where - setup = atomically $ takeTMVar v - cleanup = atomically . putTMVar v - v = externalLock external - -- Messages that can be sent to the external remote to request it do something. data Request = PREPARE diff --git a/doc/todo/external_special_remote_not_used_concurrently_with_-J.mdwn b/doc/todo/external_special_remote_not_used_concurrently_with_-J.mdwn index a515d35ca..65575f8a1 100644 --- a/doc/todo/external_special_remote_not_used_concurrently_with_-J.mdwn +++ b/doc/todo/external_special_remote_not_used_concurrently_with_-J.mdwn @@ -7,3 +7,5 @@ This should not be hard to make to use a pool of Externals, starting up a new one if the pool is empty or all in use. --[[Joey]] [[users/yoh]] may want this for datalad? + +> [[done]] --[[Joey]] -- cgit v1.2.3