diff options
Diffstat (limited to 'RemoteDaemon')
-rw-r--r-- | RemoteDaemon/Core.hs | 2 | ||||
-rw-r--r-- | RemoteDaemon/Transport/Ssh.hs | 117 | ||||
-rw-r--r-- | RemoteDaemon/Types.hs | 28 |
3 files changed, 101 insertions, 46 deletions
diff --git a/RemoteDaemon/Core.hs b/RemoteDaemon/Core.hs index 7d07c35b1..0c2937103 100644 --- a/RemoteDaemon/Core.hs +++ b/RemoteDaemon/Core.hs @@ -106,7 +106,7 @@ genRemoteMap h@(TransportHandle g _) ochan = ichan <- newChan :: IO (Chan Consumed) return $ Just ( r - , (transport r (Git.repoDescribe r) h ichan ochan, ichan) + , (transport r (RemoteURI u) h ichan ochan, ichan) ) _ -> return Nothing _ -> return Nothing diff --git a/RemoteDaemon/Transport/Ssh.hs b/RemoteDaemon/Transport/Ssh.hs index 557a3dce9..87fcf6f8c 100644 --- a/RemoteDaemon/Transport/Ssh.hs +++ b/RemoteDaemon/Transport/Ssh.hs @@ -13,60 +13,103 @@ import RemoteDaemon.Common import Remote.Helper.Ssh import qualified RemoteDaemon.Transport.Ssh.Types as SshRemote import Utility.SimpleProtocol +import qualified Git import Git.Command +import Utility.ThreadScheduler import Control.Concurrent.Chan import Control.Concurrent.Async -import System.Process (std_in, std_out) +import System.Process (std_in, std_out, std_err) transport :: Transport -transport r remotename transporthandle ichan ochan = do +transport r url transporthandle ichan ochan = do v <- liftAnnex transporthandle $ git_annex_shell r "notifychanges" [] [] case v of Nothing -> noop - Just (cmd, params) -> go cmd (toCommand params) + Just (cmd, params) -> robustly 1 $ + connect cmd (toCommand params) where - go cmd params = do - (Just toh, Just fromh, _, pid) <- createProcess (proc cmd params) + connect cmd params = do + (Just toh, Just fromh, Just errh, pid) <- + createProcess (proc cmd params) { std_in = CreatePipe , std_out = CreatePipe + , std_err = CreatePipe } - let shutdown = do - hClose toh - hClose fromh - void $ waitForProcess pid - send DISCONNECTED + -- Run all threads until one finishes and get the status + -- of the first to finish. Cancel the rest. + status <- catchDefaultIO (Right ConnectionClosed) $ + handlestderr errh + `race` handlestdout fromh + `race` handlecontrol - let fromshell = forever $ do - l <- hGetLine fromh - case parseMessage l of - Just SshRemote.READY -> send CONNECTED - Just (SshRemote.CHANGED shas) -> - whenM (checkNewShas transporthandle shas) $ - fetch - Nothing -> shutdown + send (DISCONNECTED url) + hClose toh + hClose fromh + void $ waitForProcess pid - -- The only control message that matters is STOP. - -- - -- Note that a CHANGED control message is not handled; - -- we don't push to the ssh remote. The assistant - -- and git-annex sync both handle pushes, so there's no - -- need to do it here. - let handlecontrol = forever $ do - msg <- readChan ichan - case msg of - STOP -> ioError (userError "done") - _ -> noop + return $ either (either id id) id status - -- Run both threads until one finishes. - void $ tryIO $ concurrently fromshell handlecontrol - shutdown - - send msg = writeChan ochan (msg remotename) + send msg = writeChan ochan msg fetch = do - send SYNCING + send (SYNCING url) ok <- inLocalRepo transporthandle $ - runBool [Param "fetch", Param remotename] - send (DONESYNCING ok) + runBool [Param "fetch", Param $ Git.repoDescribe r] + send (DONESYNCING url ok) + + handlestdout fromh = do + l <- hGetLine fromh + case parseMessage l of + Just SshRemote.READY -> do + send (CONNECTED url) + handlestdout fromh + Just (SshRemote.CHANGED shas) -> do + whenM (checkNewShas transporthandle shas) $ + fetch + handlestdout fromh + -- avoid reconnect on protocol error + Nothing -> return Stopping + + handlecontrol = do + msg <- readChan ichan + case msg of + STOP -> return Stopping + _ -> handlecontrol + + -- Old versions of git-annex-shell that do not support + -- the notifychanges command will exit with a not very useful + -- error message. Detect that error, and avoid reconnecting. + -- Propigate all stderr. + handlestderr errh = do + s <- hGetSomeString errh 1024 + hPutStr stderr s + hFlush stderr + if "git-annex-shell: git-shell failed" `isInfixOf` s + then do + send $ WARNING url $ unwords + [ "Remote", Git.repoDescribe r + , "needs its git-annex upgraded" + , "to 5.20140405 or newer" + ] + return Stopping + else handlestderr errh + +data Status = Stopping | ConnectionClosed + +{- Make connection robustly, with exponentioal backoff on failure. -} +robustly :: Int -> IO Status -> IO () +robustly backoff a = handle =<< catchDefaultIO ConnectionClosed a + where + handle Stopping = return () + handle ConnectionClosed = do + threadDelaySeconds (Seconds backoff) + robustly increasedbackoff a + + increasedbackoff + | b2 > maxbackoff = maxbackoff + | otherwise = b2 + where + b2 = backoff * 2 + maxbackoff = 3600 -- one hour diff --git a/RemoteDaemon/Types.hs b/RemoteDaemon/Types.hs index 025c602df..eef7389cc 100644 --- a/RemoteDaemon/Types.hs +++ b/RemoteDaemon/Types.hs @@ -10,15 +10,20 @@ module RemoteDaemon.Types where +import Common import qualified Annex import qualified Git.Types as Git import qualified Utility.SimpleProtocol as Proto +import Network.URI import Control.Concurrent +-- The URI of a remote is used to uniquely identify it (names change..) +newtype RemoteURI = RemoteURI URI + -- A Transport for a particular git remote consumes some messages -- from a Chan, and emits others to another Chan. -type Transport = RemoteRepo -> RemoteName -> TransportHandle -> Chan Consumed -> Chan Emitted -> IO () +type Transport = RemoteRepo -> RemoteURI -> TransportHandle -> Chan Consumed -> Chan Emitted -> IO () type RemoteRepo = Git.Repo type LocalRepo = Git.Repo @@ -28,10 +33,11 @@ data TransportHandle = TransportHandle LocalRepo (MVar Annex.AnnexState) -- Messages that the daemon emits. data Emitted - = CONNECTED RemoteName - | DISCONNECTED RemoteName - | SYNCING RemoteName - | DONESYNCING Bool RemoteName + = CONNECTED RemoteURI + | DISCONNECTED RemoteURI + | SYNCING RemoteURI + | DONESYNCING RemoteURI Bool + | WARNING RemoteURI String -- Messages that the deamon consumes. data Consumed @@ -41,7 +47,6 @@ data Consumed | RELOAD | STOP -type RemoteName = String type RefList = [Git.Ref] instance Proto.Sendable Emitted where @@ -51,8 +56,10 @@ instance Proto.Sendable Emitted where ["DISCONNECTED", Proto.serialize remote] formatMessage (SYNCING remote) = ["SYNCING", Proto.serialize remote] - formatMessage (DONESYNCING status remote) = - ["DONESYNCING", Proto.serialize status, Proto.serialize remote] + formatMessage (DONESYNCING remote status) = + ["DONESYNCING", Proto.serialize remote, Proto.serialize status] + formatMessage (WARNING remote message) = + ["WARNING", Proto.serialize remote, Proto.serialize message] instance Proto.Sendable Consumed where formatMessage PAUSE = ["PAUSE"] @@ -66,6 +73,7 @@ instance Proto.Receivable Emitted where parseCommand "DISCONNECTED" = Proto.parse1 DISCONNECTED parseCommand "SYNCING" = Proto.parse1 SYNCING parseCommand "DONESYNCING" = Proto.parse2 DONESYNCING + parseCommand "WARNING" = Proto.parse2 WARNING parseCommand _ = Proto.parseFail instance Proto.Receivable Consumed where @@ -76,6 +84,10 @@ instance Proto.Receivable Consumed where parseCommand "STOP" = Proto.parse0 STOP parseCommand _ = Proto.parseFail +instance Proto.Serializable RemoteURI where + serialize (RemoteURI u) = show u + deserialize = RemoteURI <$$> parseURI + instance Proto.Serializable [Char] where serialize = id deserialize = Just |