aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Joey Hess <joeyh@joeyh.name>2018-03-08 14:02:18 -0400
committerGravatar Joey Hess <joeyh@joeyh.name>2018-03-08 15:11:31 -0400
commit9c1fb9d7822efadae1027c9763e2bf573399d0b3 (patch)
tree873811f61c835336e6774cff252ab00f2da18f1f
parent6637d79273a84c83fa13bb5d797d4dcee07c363e (diff)
p2p ssh connection pools
Much like Remote.P2P, there's a pool of connections to a peer, in order to support concurrent operations. Deals with old git-annex-ssh on the remote that does not support p2pstdio, by only trying once to use it, and remembering if it's not supported. Made p2pstdio send an AUTH_SUCCESS with its uuid, which serves the dual purposes of something to detect to see that the connection is working, and a way to verify that it's connected to the right uuid. (There's a redundant uuid check since the uuid field is sent by git_annex_shell, but I anticipate that being removed later when the legacy git-annex-shell stuff gets removed.) Not entirely happy with Remote.Git.runSsh's behavior when the proto action fails. Running the fallback will work ok, but what will we do when the fallbacks later get removed? It might be better to try to reconnect, in case the connection got closed. This commit was sponsored by Boyd Stephen Smith Jr. on Patreon.
-rw-r--r--Command/P2PStdIO.hs7
-rw-r--r--P2P/IO.hs5
-rw-r--r--P2P/Protocol.hs4
-rw-r--r--Remote/Git.hs41
-rw-r--r--Remote/Helper/Ssh.hs97
-rw-r--r--Remote/P2P.hs6
-rw-r--r--Utility/Process.hs6
7 files changed, 150 insertions, 16 deletions
diff --git a/Command/P2PStdIO.hs b/Command/P2PStdIO.hs
index f6e4ae0f0..73c38b906 100644
--- a/Command/P2PStdIO.hs
+++ b/Command/P2PStdIO.hs
@@ -11,13 +11,10 @@ import Command
import P2P.IO
import P2P.Annex
import qualified P2P.Protocol as P2P
-import Git.Types
import qualified Annex
import Annex.UUID
import qualified CmdLine.GitAnnexShell.Checks as Checks
import qualified CmdLine.GitAnnexShell.Fields as Fields
-import Utility.AuthToken
-import Utility.Tmp.Dir
cmd :: Command
cmd = noMessages $ command "p2pstdio" SectionPlumbing
@@ -38,7 +35,9 @@ start = do
Just u -> return (toUUID u)
myuuid <- getUUID
conn <- stdioP2PConnection <$> Annex.gitRepo
- let server = P2P.serveAuthed servermode myuuid
+ let server = do
+ P2P.net $ P2P.sendMessage (P2P.AUTH_SUCCESS myuuid)
+ P2P.serveAuthed servermode myuuid
runFullProto (Serving theiruuid Nothing) conn server >>= \case
Right () -> next $ next $ return True
Left e -> giveup e
diff --git a/P2P/IO.hs b/P2P/IO.hs
index 6cdc5b7d5..8b532c7f4 100644
--- a/P2P/IO.hs
+++ b/P2P/IO.hs
@@ -10,6 +10,7 @@
module P2P.IO
( RunProto
, P2PConnection(..)
+ , ClosableConnection(..)
, stdioP2PConnection
, connectPeer
, closeConnection
@@ -51,6 +52,10 @@ data P2PConnection = P2PConnection
, connOhdl :: Handle
}
+data ClosableConnection conn
+ = OpenConnection conn
+ | ClosedConnection
+
-- P2PConnection using stdio.
stdioP2PConnection :: Git.Repo -> P2PConnection
stdioP2PConnection g = P2PConnection
diff --git a/P2P/Protocol.hs b/P2P/Protocol.hs
index c750ae6ff..4acbaadef 100644
--- a/P2P/Protocol.hs
+++ b/P2P/Protocol.hs
@@ -250,6 +250,10 @@ $(makeFree ''LocalF)
auth :: UUID -> AuthToken -> Proto (Maybe UUID)
auth myuuid t = do
net $ sendMessage (AUTH myuuid t)
+ postAuth
+
+postAuth :: Proto (Maybe UUID)
+postAuth = do
r <- net receiveMessage
case r of
AUTH_SUCCESS theiruuid -> return $ Just theiruuid
diff --git a/Remote/Git.hs b/Remote/Git.hs
index caa677464..63cfdeae9 100644
--- a/Remote/Git.hs
+++ b/Remote/Git.hs
@@ -55,6 +55,9 @@ import qualified Remote.Helper.Ssh as Ssh
import qualified Remote.GCrypt
import qualified Remote.P2P
import P2P.Address
+import qualified P2P.Protocol as P2P
+import qualified P2P.Annex as P2P
+import qualified P2P.IO as P2P
import Annex.Path
import Creds
import Messages.Progress
@@ -729,10 +732,11 @@ mkCopier remotewanthardlink rsyncparams = do
, return copier
)
-{- Normally the UUID is checked at startup, but annex-checkuuid config
- - can prevent that. To avoid getting confused, a deferred
- - check is done just before the repository is used. This returns False
- - when the repository UUID is not as expected. -}
+{- Normally the UUID of a local repository is checked at startup,
+ - but annex-checkuuid config can prevent that. To avoid getting
+ - confused, a deferred check is done just before the repository
+ - is used.
+ - This returns False when the repository UUID is not as expected. -}
type DeferredUUIDCheck = Annex Bool
mkDeferredUUIDCheck :: Git.Repo -> UUID -> RemoteGitConfig -> Annex DeferredUUIDCheck
@@ -751,3 +755,32 @@ mkDeferredUUIDCheck r u gc
return ok
, liftIO $ readMVar v
)
+
+-- Runs a P2P Proto action on a remote when it supports that,
+-- otherwise the fallback action.
+runSsh :: Remote -> Ssh.P2PSshConnectionPool -> P2P.Proto a -> Annex a -> Annex a
+runSsh r connpool proto fallback =
+ Ssh.getP2PSshConnection r connpool >>= maybe fallback go
+ where
+ go c = do
+ (c', v) <- runSsh' proto c
+ case v of
+ Just res -> do
+ liftIO $ Ssh.storeP2PSshConnection connpool c'
+ return res
+ -- Running the proto failed, either due to a protocol
+ -- error or a network error, so discard the
+ -- connection, and run the fallback.
+ Nothing -> fallback
+
+runSsh' :: P2P.Proto a -> Ssh.P2PSshConnection -> Annex (Ssh.P2PSshConnection, Maybe a)
+runSsh' _ P2P.ClosedConnection = return (P2P.ClosedConnection, Nothing)
+runSsh' a conn@(P2P.OpenConnection (c, _pid)) =
+ P2P.runFullProto P2P.Client c a >>= \case
+ Right r -> return (conn, Just r)
+ -- When runFullProto fails, the connection is no longer
+ -- usable, so close it.
+ Left e -> do
+ warning $ "Lost connection (" ++ e ++ ")"
+ conn' <- liftIO $ Ssh.closeP2PSshConnection conn
+ return (conn', Nothing)
diff --git a/Remote/Helper/Ssh.hs b/Remote/Helper/Ssh.hs
index a4d91ab92..96c419dd4 100644
--- a/Remote/Helper/Ssh.hs
+++ b/Remote/Helper/Ssh.hs
@@ -1,6 +1,6 @@
{- git-annex remote access with ssh and git-annex-shell
-
- - Copyright 2011-2013 Joey Hess <id@joeyh.name>
+ - Copyright 2011-2018 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU GPL version 3 or higher.
-}
@@ -23,6 +23,11 @@ import Utility.SshHost
import Types.Remote
import Types.Transfer
import Config
+import qualified P2P.Protocol as P2P
+import qualified P2P.IO as P2P
+
+import Control.Concurrent.Async
+import Control.Concurrent.STM
toRepo :: ConsumeStdin -> Git.Repo -> RemoteGitConfig -> SshCommand -> Annex (FilePath, [CommandParam])
toRepo cs r gc remotecmd = do
@@ -91,9 +96,9 @@ onRemote cs r (with, errorval) command params fields = do
inAnnex :: Git.Repo -> Key -> Annex Bool
inAnnex r k = do
showChecking r
- onRemote NoConsumeStdin r (check, cantCheck r) "inannex" [Param $ key2file k] []
+ onRemote NoConsumeStdin r (runcheck, cantCheck r) "inannex" [Param $ key2file k] []
where
- check c p = dispatch =<< safeSystem c p
+ runcheck c p = dispatch =<< safeSystem c p
dispatch ExitSuccess = return True
dispatch (ExitFailure 1) = return False
dispatch _ = cantCheck r
@@ -179,3 +184,89 @@ rsyncParams r direction = do
-- successfully locked.
contentLockedMarker :: String
contentLockedMarker = "OK"
+
+-- A connection over ssh to git-annex shell speaking the P2P protocol.
+type P2PSshConnection = P2P.ClosableConnection (P2P.P2PConnection, ProcessHandle)
+
+closeP2PSshConnection :: P2PSshConnection -> IO P2PSshConnection
+closeP2PSshConnection P2P.ClosedConnection = return P2P.ClosedConnection
+closeP2PSshConnection (P2P.OpenConnection (conn, pid)) = do
+ P2P.closeConnection conn
+ void $ async $ waitForProcess pid
+ return P2P.ClosedConnection
+
+-- Pool of connections over ssh to git-annex-shell p2pstdio.
+type P2PSshConnectionPool = TVar (Maybe P2PSshConnectionPoolState)
+
+data P2PSshConnectionPoolState
+ = P2PSshConnections [P2PSshConnection]
+ -- Remotes using an old version of git-annex-shell don't support P2P
+ | P2PSshUnsupported
+
+mkP2PSshConnectionPool :: Annex P2PSshConnectionPool
+mkP2PSshConnectionPool = liftIO $ newTVarIO Nothing
+
+-- Takes a connection from the pool, if any are available, otherwise
+-- tries to open a new one.
+getP2PSshConnection :: Remote -> P2PSshConnectionPool -> Annex (Maybe P2PSshConnection)
+getP2PSshConnection r connpool = getexistingconn >>= \case
+ Nothing -> return Nothing
+ Just Nothing -> openP2PSshConnection r connpool
+ Just (Just c) -> return (Just c)
+ where
+ getexistingconn = liftIO $ atomically $ readTVar connpool >>= \case
+ Just P2PSshUnsupported -> return Nothing
+ Just (P2PSshConnections (c:cs)) -> do
+ writeTVar connpool (Just (P2PSshConnections cs))
+ return (Just (Just c))
+ Just (P2PSshConnections []) -> return (Just Nothing)
+ Nothing -> return (Just Nothing)
+
+-- Add a connection to the pool, unless it's closed.
+storeP2PSshConnection :: P2PSshConnectionPool -> P2PSshConnection -> IO ()
+storeP2PSshConnection _ P2P.ClosedConnection = return ()
+storeP2PSshConnection connpool conn = atomically $ modifyTVar' connpool $ \case
+ Just (P2PSshConnections cs) -> Just (P2PSshConnections (conn:cs))
+ _ -> Just (P2PSshConnections [conn])
+
+-- Try to open a P2PSshConnection.
+-- The new connection is not added to the pool, so it's available
+-- for the caller to use.
+-- If the remote does not support the P2P protocol, that's remembered in
+-- the connection pool.
+openP2PSshConnection :: Remote -> P2PSshConnectionPool -> Annex (Maybe P2PSshConnection)
+openP2PSshConnection r connpool =
+ git_annex_shell ConsumeStdin (repo r) "p2pstdio" [] [] >>= \case
+ Nothing -> do
+ liftIO $ rememberunsupported
+ return Nothing
+ Just (cmd, params) -> start cmd params
+ where
+ start cmd params = liftIO $ withNullHandle $ \nullh -> do
+ -- stderr is discarded because old versions of git-annex
+ -- shell always error
+ (Just from, Just to, Nothing, pid) <- createProcess $
+ (proc cmd (toCommand params))
+ { std_in = CreatePipe
+ , std_out = CreatePipe
+ , std_err = UseHandle nullh
+ }
+ let conn = P2P.P2PConnection
+ { P2P.connRepo = repo r
+ , P2P.connCheckAuth = const False
+ , P2P.connIhdl = from
+ , P2P.connOhdl = to
+ }
+ let c = P2P.OpenConnection (conn, pid)
+ -- When the connection is successful, the peer
+ -- will send an AUTH_SUCCESS with its uuid.
+ tryNonAsync (P2P.runNetProto conn $ P2P.postAuth) >>= \case
+ Right (Right (Just theiruuid)) | theiruuid == uuid r ->
+ return $ Just c
+ _ -> do
+ void $ closeP2PSshConnection c
+ rememberunsupported
+ return Nothing
+ rememberunsupported = atomically $
+ modifyTVar' connpool $
+ maybe (Just P2PSshUnsupported) Just
diff --git a/Remote/P2P.hs b/Remote/P2P.hs
index 83ce258de..cfed5c604 100644
--- a/Remote/P2P.hs
+++ b/Remote/P2P.hs
@@ -116,10 +116,8 @@ lock u addr connpool k callback =
go False = giveup "can't lock content"
go True = withVerifiedCopy LockedCopy u (return True) callback
--- | A connection to the peer.
-data Connection
- = OpenConnection P2PConnection
- | ClosedConnection
+-- | A connection to the peer, which can be closed.
+type Connection = ClosableConnection P2PConnection
type ConnectionPool = TVar [Connection]
diff --git a/Utility/Process.hs b/Utility/Process.hs
index ff454f799..1807a1335 100644
--- a/Utility/Process.hs
+++ b/Utility/Process.hs
@@ -27,6 +27,7 @@ module Utility.Process (
withHandle,
withIOHandles,
withOEHandles,
+ withNullHandle,
withQuietOutput,
feedWithQuietOutput,
createProcess,
@@ -213,13 +214,16 @@ withOEHandles creator p a = creator p' $ a . oeHandles
, std_err = CreatePipe
}
+withNullHandle :: (Handle -> IO a) -> IO a
+withNullHandle = withFile devNull WriteMode
+
-- | Forces the CreateProcessRunner to run quietly;
-- both stdout and stderr are discarded.
withQuietOutput
:: CreateProcessRunner
-> CreateProcess
-> IO ()
-withQuietOutput creator p = withFile devNull WriteMode $ \nullh -> do
+withQuietOutput creator p = withNullHandle $ \nullh -> do
let p' = p
{ std_out = UseHandle nullh
, std_err = UseHandle nullh