diff options
author | 2016-12-24 14:48:51 -0400 | |
---|---|---|
committer | 2016-12-24 15:01:55 -0400 | |
commit | 42e08cd4575d3dc558dfe172c1f28c752d69e8c6 (patch) | |
tree | 78a8eddc31c390aaf8f66435bb13db9366f9a7c4 /RemoteDaemon/Transport | |
parent | 34f375526f44ff255d45bbabcd1425b3d5d0bb4a (diff) | |
parent | 3b9d9a267b7c9247d36d9b622e1b836724ca5fb0 (diff) |
Merge branch 'master' into no-xmpp
Diffstat (limited to 'RemoteDaemon/Transport')
-rw-r--r-- | RemoteDaemon/Transport/Ssh.hs | 36 | ||||
-rw-r--r-- | RemoteDaemon/Transport/Ssh/Types.hs | 4 | ||||
-rw-r--r-- | RemoteDaemon/Transport/Tor.hs | 162 |
3 files changed, 173 insertions, 29 deletions
diff --git a/RemoteDaemon/Transport/Ssh.hs b/RemoteDaemon/Transport/Ssh.hs index 73c88054c..6f8e8323e 100644 --- a/RemoteDaemon/Transport/Ssh.hs +++ b/RemoteDaemon/Transport/Ssh.hs @@ -16,7 +16,7 @@ import qualified RemoteDaemon.Transport.Ssh.Types as SshRemote import Utility.SimpleProtocol import qualified Git import Git.Command -import Utility.ThreadScheduler +import Annex.ChangedRefs import Control.Concurrent.STM import Control.Concurrent.Async @@ -37,7 +37,7 @@ transportUsingCmd cmd params rr@(RemoteRepo r gc) url h@(TransportHandle (LocalR transportUsingCmd' :: FilePath -> [CommandParam] -> Transport transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan = - robustly 1 $ do + robustConnection 1 $ do (Just toh, Just fromh, Just errh, pid) <- createProcess (proc cmd (toCommand params)) { std_in = CreatePipe @@ -68,23 +68,23 @@ transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan = send (DONESYNCING url ok) handlestdout fromh = do - l <- hGetLine fromh - case parseMessage l of + ml <- getProtocolLine fromh + case parseMessage =<< ml of Just SshRemote.READY -> do send (CONNECTED url) handlestdout fromh - Just (SshRemote.CHANGED shas) -> do + Just (SshRemote.CHANGED (ChangedRefs shas)) -> do whenM (checkNewShas transporthandle shas) $ fetch handlestdout fromh -- avoid reconnect on protocol error - Nothing -> return Stopping + Nothing -> return ConnectionStopping handlecontrol = do msg <- atomically $ readTChan ichan case msg of - STOP -> return Stopping - LOSTNET -> return Stopping + STOP -> return ConnectionStopping + LOSTNET -> return ConnectionStopping _ -> handlecontrol -- Old versions of git-annex-shell that do not support @@ -102,23 +102,5 @@ transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan = , "needs its git-annex upgraded" , "to 5.20140405 or newer" ] - return Stopping + return ConnectionStopping else handlestderr errh - -data Status = Stopping | ConnectionClosed - -{- Make connection robustly, with exponential backoff on failure. -} -robustly :: Int -> IO Status -> IO () -robustly backoff a = caught =<< catchDefaultIO ConnectionClosed a - where - caught Stopping = return () - caught ConnectionClosed = do - threadDelaySeconds (Seconds backoff) - robustly increasedbackoff a - - increasedbackoff - | b2 > maxbackoff = maxbackoff - | otherwise = b2 - where - b2 = backoff * 2 - maxbackoff = 3600 -- one hour diff --git a/RemoteDaemon/Transport/Ssh/Types.hs b/RemoteDaemon/Transport/Ssh/Types.hs index fa6a55d3d..606e1a563 100644 --- a/RemoteDaemon/Transport/Ssh/Types.hs +++ b/RemoteDaemon/Transport/Ssh/Types.hs @@ -16,11 +16,11 @@ module RemoteDaemon.Transport.Ssh.Types ( ) where import qualified Utility.SimpleProtocol as Proto -import RemoteDaemon.Types (RefList) +import Annex.ChangedRefs (ChangedRefs) data Notification = READY - | CHANGED RefList + | CHANGED ChangedRefs instance Proto.Sendable Notification where formatMessage READY = ["READY"] diff --git a/RemoteDaemon/Transport/Tor.hs b/RemoteDaemon/Transport/Tor.hs new file mode 100644 index 000000000..e7d3794d6 --- /dev/null +++ b/RemoteDaemon/Transport/Tor.hs @@ -0,0 +1,162 @@ +{- git-remote-daemon, tor hidden service server and transport + - + - Copyright 2016 Joey Hess <id@joeyh.name> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module RemoteDaemon.Transport.Tor (server, transport) where + +import Common +import qualified Annex +import Annex.Concurrent +import Annex.ChangedRefs +import RemoteDaemon.Types +import RemoteDaemon.Common +import Utility.Tor +import Utility.AuthToken +import P2P.Protocol as P2P +import P2P.IO +import P2P.Annex +import P2P.Auth +import P2P.Address +import Annex.UUID +import Types.UUID +import Messages +import Git +import Git.Command + +import System.PosixCompat.User +import Control.Concurrent +import System.Log.Logger (debugM) +import Control.Concurrent.STM +import Control.Concurrent.STM.TBMQueue +import Control.Concurrent.Async + +-- Run tor hidden service. +server :: TransportHandle -> IO () +server th@(TransportHandle (LocalRepo r) _) = do + u <- liftAnnex th getUUID + uid <- getRealUserID + let ident = fromUUID u + go u =<< getHiddenServiceSocketFile torAppName uid ident + where + go u (Just sock) = do + q <- newTBMQueueIO maxConnections + replicateM_ maxConnections $ + forkIO $ forever $ serveClient th u r q + + debugM "remotedaemon" "Tor hidden service running" + serveUnixSocket sock $ \conn -> do + ok <- atomically $ ifM (isFullTBMQueue q) + ( return False + , do + writeTBMQueue q conn + return True + ) + unless ok $ do + hClose conn + warningIO "dropped Tor connection, too busy" + go _ Nothing = debugM "remotedaemon" "Tor hidden service not enabled" + +-- How many clients to serve at a time, maximum. This is to avoid DOS attacks. +maxConnections :: Int +maxConnections = 100 + +serveClient :: TransportHandle -> UUID -> Repo -> TBMQueue Handle -> IO () +serveClient th u r q = bracket setup cleanup start + where + setup = do + h <- atomically $ readTBMQueue q + debugM "remotedaemon" "serving a Tor connection" + return h + + cleanup Nothing = return () + cleanup (Just h) = do + debugM "remotedaemon" "done with Tor connection" + hClose h + + start Nothing = return () + start (Just h) = do + -- Avoid doing any work in the liftAnnex, since only one + -- can run at a time. + st <- liftAnnex th dupState + ((), st') <- Annex.run st $ do + -- Load auth tokens for every connection, to notice + -- when the allowed set is changed. + allowed <- loadP2PAuthTokens + let conn = P2PConnection + { connRepo = r + , connCheckAuth = (`isAllowedAuthToken` allowed) + , connIhdl = h + , connOhdl = h + } + v <- liftIO $ runNetProto conn $ P2P.serveAuth u + case v of + Right (Just theiruuid) -> authed conn theiruuid + Right Nothing -> liftIO $ + debugM "remotedaemon" "Tor connection failed to authenticate" + Left e -> liftIO $ + debugM "remotedaemon" ("Tor connection error before authentication: " ++ e) + -- Merge the duplicated state back in. + liftAnnex th $ mergeState st' + + authed conn theiruuid = + bracket watchChangedRefs (liftIO . maybe noop stopWatchingChangedRefs) $ \crh -> do + v' <- runFullProto (Serving theiruuid crh) conn $ + P2P.serveAuthed u + case v' of + Right () -> return () + Left e -> liftIO $ debugM "remotedaemon" ("Tor connection error: " ++ e) + +-- Connect to peer's tor hidden service. +transport :: Transport +transport (RemoteRepo r _) url@(RemoteURI uri) th ichan ochan = + case unformatP2PAddress (show uri) of + Nothing -> return () + Just addr -> robustConnection 1 $ do + g <- liftAnnex th Annex.gitRepo + bracket (connectPeer g addr) closeConnection (go addr) + where + go addr conn = do + myuuid <- liftAnnex th getUUID + authtoken <- fromMaybe nullAuthToken + <$> liftAnnex th (loadP2PRemoteAuthToken addr) + res <- runNetProto conn $ + P2P.auth myuuid authtoken + case res of + Right (Just theiruuid) -> do + expecteduuid <- liftAnnex th $ getRepoUUID r + if expecteduuid == theiruuid + then do + send (CONNECTED url) + status <- handlecontrol + `race` handlepeer conn + send (DISCONNECTED url) + return $ either id id status + else return ConnectionStopping + _ -> return ConnectionClosed + + send msg = atomically $ writeTChan ochan msg + + handlecontrol = do + msg <- atomically $ readTChan ichan + case msg of + STOP -> return ConnectionStopping + LOSTNET -> return ConnectionStopping + _ -> handlecontrol + + handlepeer conn = do + v <- runNetProto conn P2P.notifyChange + case v of + Right (Just (ChangedRefs shas)) -> do + whenM (checkNewShas th shas) $ + fetch + handlepeer conn + _ -> return ConnectionClosed + + fetch = do + send (SYNCING url) + ok <- inLocalRepo th $ + runBool [Param "fetch", Param $ Git.repoDescribe r] + send (DONESYNCING url ok) |