From 46f3063131f7711208c1134b31caf999e375306d Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Fri, 9 Dec 2016 16:02:43 -0400 Subject: remotedaemon: git change detection over tor hidden service --- RemoteDaemon/Transport/Ssh.hs | 29 +++-------------- RemoteDaemon/Transport/Tor.hs | 73 +++++++++++++++++++++++++++++++++++++------ 2 files changed, 68 insertions(+), 34 deletions(-) (limited to 'RemoteDaemon/Transport') diff --git a/RemoteDaemon/Transport/Ssh.hs b/RemoteDaemon/Transport/Ssh.hs index 59502f8d3..6f8e8323e 100644 --- a/RemoteDaemon/Transport/Ssh.hs +++ b/RemoteDaemon/Transport/Ssh.hs @@ -16,7 +16,6 @@ import qualified RemoteDaemon.Transport.Ssh.Types as SshRemote import Utility.SimpleProtocol import qualified Git import Git.Command -import Utility.ThreadScheduler import Annex.ChangedRefs import Control.Concurrent.STM @@ -38,7 +37,7 @@ transportUsingCmd cmd params rr@(RemoteRepo r gc) url h@(TransportHandle (LocalR transportUsingCmd' :: FilePath -> [CommandParam] -> Transport transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan = - robustly 1 $ do + robustConnection 1 $ do (Just toh, Just fromh, Just errh, pid) <- createProcess (proc cmd (toCommand params)) { std_in = CreatePipe @@ -79,13 +78,13 @@ transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan = fetch handlestdout fromh -- avoid reconnect on protocol error - Nothing -> return Stopping + Nothing -> return ConnectionStopping handlecontrol = do msg <- atomically $ readTChan ichan case msg of - STOP -> return Stopping - LOSTNET -> return Stopping + STOP -> return ConnectionStopping + LOSTNET -> return ConnectionStopping _ -> handlecontrol -- Old versions of git-annex-shell that do not support @@ -103,23 +102,5 @@ transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan = , "needs its git-annex upgraded" , "to 5.20140405 or newer" ] - return Stopping + return ConnectionStopping else handlestderr errh - -data Status = Stopping | ConnectionClosed - -{- Make connection robustly, with exponential backoff on failure. -} -robustly :: Int -> IO Status -> IO () -robustly backoff a = caught =<< catchDefaultIO ConnectionClosed a - where - caught Stopping = return () - caught ConnectionClosed = do - threadDelaySeconds (Seconds backoff) - robustly increasedbackoff a - - increasedbackoff - | b2 > maxbackoff = maxbackoff - | otherwise = b2 - where - b2 = backoff * 2 - maxbackoff = 3600 -- one hour diff --git a/RemoteDaemon/Transport/Tor.hs b/RemoteDaemon/Transport/Tor.hs index 5ea06ac2c..20320cadd 100644 --- a/RemoteDaemon/Transport/Tor.hs +++ b/RemoteDaemon/Transport/Tor.hs @@ -1,11 +1,11 @@ -{- git-remote-daemon, tor hidden service transport +{- git-remote-daemon, tor hidden service server and transport - - Copyright 2016 Joey Hess - - Licensed under the GNU GPL version 3 or higher. -} -module RemoteDaemon.Transport.Tor (server) where +module RemoteDaemon.Transport.Tor (server, transport) where import Common import qualified Annex @@ -16,20 +16,23 @@ import RemoteDaemon.Common import Utility.Tor import Utility.FileMode import Utility.AuthToken -import P2P.Protocol +import P2P.Protocol as P2P import P2P.IO import P2P.Annex import P2P.Auth +import P2P.Address import Annex.UUID import Types.UUID import Messages import Git +import Git.Command import System.PosixCompat.User -import Network.Socket import Control.Concurrent import System.Log.Logger (debugM) import Control.Concurrent.STM +import Control.Concurrent.Async +import qualified Network.Socket as S -- Run tor hidden service. server :: TransportHandle -> IO () @@ -44,17 +47,17 @@ server th@(TransportHandle (LocalRepo r) _) = do let ident = fromUUID u let sock = hiddenServiceSocketFile uid ident nukeFile sock - soc <- socket AF_UNIX Stream defaultProtocol - bind soc (SockAddrUnix sock) + soc <- S.socket S.AF_UNIX S.Stream S.defaultProtocol + S.bind soc (S.SockAddrUnix sock) -- Allow everyone to read and write to the socket; tor is probably -- running as a different user. Connections have to authenticate -- to do anything, so it's fine that other local users can connect. modifyFileMode sock $ addModes [groupReadMode, groupWriteMode, otherReadMode, otherWriteMode] - listen soc 2 + S.listen soc 2 debugM "remotedaemon" "Tor hidden service running" forever $ do - (conn, _) <- accept soc + (conn, _) <- S.accept soc h <- setupHandle conn ok <- atomically $ ifM (isFullTBQueue q) ( return False @@ -97,7 +100,7 @@ serveClient th u r q = bracket setup cleanup start , connIhdl = h , connOhdl = h } - v <- liftIO $ runNetProto conn $ serveAuth u + v <- liftIO $ runNetProto conn $ P2P.serveAuth u case v of Right (Just theiruuid) -> authed conn theiruuid Right Nothing -> liftIO $ @@ -110,7 +113,57 @@ serveClient th u r q = bracket setup cleanup start authed conn theiruuid = bracket watchChangedRefs (liftIO . stopWatchingChangedRefs) $ \crh -> do v' <- runFullProto (Serving theiruuid crh) conn $ - serveAuthed u + P2P.serveAuthed u case v' of Right () -> return () Left e -> liftIO $ debugM "remotedaemon" ("Tor connection error: " ++ e) + +-- Connect to peer's tor hidden service. +transport :: Transport +transport (RemoteRepo r _) url@(RemoteURI uri) th ichan ochan = + case unformatP2PAddress (show uri) of + Nothing -> return () + Just addr -> robustConnection 1 $ do + g <- liftAnnex th Annex.gitRepo + bracket (connectPeer g addr) closeConnection (go addr) + where + go addr conn = do + myuuid <- liftAnnex th getUUID + authtoken <- fromMaybe nullAuthToken + <$> liftAnnex th (loadP2PRemoteAuthToken addr) + res <- runNetProto conn $ + P2P.auth myuuid authtoken + case res of + Right (Just theiruuid) + | getUncachedUUID r == theiruuid -> do + send (CONNECTED url) + status <- handlecontrol + `race` handlepeer conn + send (DISCONNECTED url) + return $ either id id status + | otherwise -> return ConnectionStopping + _ -> return ConnectionClosed + + send msg = atomically $ writeTChan ochan msg + + handlecontrol = do + msg <- atomically $ readTChan ichan + case msg of + STOP -> return ConnectionStopping + LOSTNET -> return ConnectionStopping + _ -> handlecontrol + + handlepeer conn = do + v <- runNetProto conn P2P.notifyChange + case v of + Right (Just (ChangedRefs shas)) -> do + whenM (checkNewShas th shas) $ + fetch + handlepeer conn + _ -> return ConnectionClosed + + fetch = do + send (SYNCING url) + ok <- inLocalRepo th $ + runBool [Param "fetch", Param $ Git.repoDescribe r] + send (DONESYNCING url ok) -- cgit v1.2.3