From 46f3063131f7711208c1134b31caf999e375306d Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Fri, 9 Dec 2016 16:02:43 -0400 Subject: remotedaemon: git change detection over tor hidden service --- RemoteDaemon/Common.hs | 24 +++++++++++++- RemoteDaemon/Transport.hs | 2 ++ RemoteDaemon/Transport/Ssh.hs | 29 +++-------------- RemoteDaemon/Transport/Tor.hs | 73 +++++++++++++++++++++++++++++++++++++------ 4 files changed, 93 insertions(+), 35 deletions(-) (limited to 'RemoteDaemon') diff --git a/RemoteDaemon/Common.hs b/RemoteDaemon/Common.hs index 982a84b43..711771f97 100644 --- a/RemoteDaemon/Common.hs +++ b/RemoteDaemon/Common.hs @@ -1,6 +1,6 @@ {- git-remote-daemon utilities - - - Copyright 2014 Joey Hess + - Copyright 2014-2016 Joey Hess - - Licensed under the GNU GPL version 3 or higher. -} @@ -9,6 +9,8 @@ module RemoteDaemon.Common ( liftAnnex , inLocalRepo , checkNewShas + , ConnectionStatus(..) + , robustConnection ) where import qualified Annex @@ -16,6 +18,7 @@ import Annex.Common import RemoteDaemon.Types import qualified Git import Annex.CatFile +import Utility.ThreadScheduler import Control.Concurrent @@ -40,3 +43,22 @@ checkNewShas transporthandle = check check [] = return True check (r:rs) = maybe (check rs) (const $ return False) =<< liftAnnex transporthandle (catObjectDetails r) + +data ConnectionStatus = ConnectionStopping | ConnectionClosed + +{- Make connection robust, retrying on error, with exponential backoff. -} +robustConnection :: Int -> IO ConnectionStatus -> IO () +robustConnection backoff a = + caught =<< a `catchNonAsync` (const $ return ConnectionClosed) + where + caught ConnectionStopping = return () + caught ConnectionClosed = do + threadDelaySeconds (Seconds backoff) + robustConnection increasedbackoff a + + increasedbackoff + | b2 > maxbackoff = maxbackoff + | otherwise = b2 + where + b2 = backoff * 2 + maxbackoff = 3600 -- one hour diff --git a/RemoteDaemon/Transport.hs b/RemoteDaemon/Transport.hs index 6605012de..053973424 100644 --- a/RemoteDaemon/Transport.hs +++ b/RemoteDaemon/Transport.hs @@ -12,6 +12,7 @@ import qualified RemoteDaemon.Transport.Ssh import qualified RemoteDaemon.Transport.GCrypt import qualified RemoteDaemon.Transport.Tor import qualified Git.GCrypt +import P2P.Address (torAnnexScheme) import qualified Data.Map as M @@ -22,6 +23,7 @@ remoteTransports :: M.Map TransportScheme Transport remoteTransports = M.fromList [ ("ssh:", RemoteDaemon.Transport.Ssh.transport) , (Git.GCrypt.urlScheme, RemoteDaemon.Transport.GCrypt.transport) + , (torAnnexScheme, RemoteDaemon.Transport.Tor.transport) ] remoteServers :: [TransportHandle -> IO ()] diff --git a/RemoteDaemon/Transport/Ssh.hs b/RemoteDaemon/Transport/Ssh.hs index 59502f8d3..6f8e8323e 100644 --- a/RemoteDaemon/Transport/Ssh.hs +++ b/RemoteDaemon/Transport/Ssh.hs @@ -16,7 +16,6 @@ import qualified RemoteDaemon.Transport.Ssh.Types as SshRemote import Utility.SimpleProtocol import qualified Git import Git.Command -import Utility.ThreadScheduler import Annex.ChangedRefs import Control.Concurrent.STM @@ -38,7 +37,7 @@ transportUsingCmd cmd params rr@(RemoteRepo r gc) url h@(TransportHandle (LocalR transportUsingCmd' :: FilePath -> [CommandParam] -> Transport transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan = - robustly 1 $ do + robustConnection 1 $ do (Just toh, Just fromh, Just errh, pid) <- createProcess (proc cmd (toCommand params)) { std_in = CreatePipe @@ -79,13 +78,13 @@ transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan = fetch handlestdout fromh -- avoid reconnect on protocol error - Nothing -> return Stopping + Nothing -> return ConnectionStopping handlecontrol = do msg <- atomically $ readTChan ichan case msg of - STOP -> return Stopping - LOSTNET -> return Stopping + STOP -> return ConnectionStopping + LOSTNET -> return ConnectionStopping _ -> handlecontrol -- Old versions of git-annex-shell that do not support @@ -103,23 +102,5 @@ transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan = , "needs its git-annex upgraded" , "to 5.20140405 or newer" ] - return Stopping + return ConnectionStopping else handlestderr errh - -data Status = Stopping | ConnectionClosed - -{- Make connection robustly, with exponential backoff on failure. -} -robustly :: Int -> IO Status -> IO () -robustly backoff a = caught =<< catchDefaultIO ConnectionClosed a - where - caught Stopping = return () - caught ConnectionClosed = do - threadDelaySeconds (Seconds backoff) - robustly increasedbackoff a - - increasedbackoff - | b2 > maxbackoff = maxbackoff - | otherwise = b2 - where - b2 = backoff * 2 - maxbackoff = 3600 -- one hour diff --git a/RemoteDaemon/Transport/Tor.hs b/RemoteDaemon/Transport/Tor.hs index 5ea06ac2c..20320cadd 100644 --- a/RemoteDaemon/Transport/Tor.hs +++ b/RemoteDaemon/Transport/Tor.hs @@ -1,11 +1,11 @@ -{- git-remote-daemon, tor hidden service transport +{- git-remote-daemon, tor hidden service server and transport - - Copyright 2016 Joey Hess - - Licensed under the GNU GPL version 3 or higher. -} -module RemoteDaemon.Transport.Tor (server) where +module RemoteDaemon.Transport.Tor (server, transport) where import Common import qualified Annex @@ -16,20 +16,23 @@ import RemoteDaemon.Common import Utility.Tor import Utility.FileMode import Utility.AuthToken -import P2P.Protocol +import P2P.Protocol as P2P import P2P.IO import P2P.Annex import P2P.Auth +import P2P.Address import Annex.UUID import Types.UUID import Messages import Git +import Git.Command import System.PosixCompat.User -import Network.Socket import Control.Concurrent import System.Log.Logger (debugM) import Control.Concurrent.STM +import Control.Concurrent.Async +import qualified Network.Socket as S -- Run tor hidden service. server :: TransportHandle -> IO () @@ -44,17 +47,17 @@ server th@(TransportHandle (LocalRepo r) _) = do let ident = fromUUID u let sock = hiddenServiceSocketFile uid ident nukeFile sock - soc <- socket AF_UNIX Stream defaultProtocol - bind soc (SockAddrUnix sock) + soc <- S.socket S.AF_UNIX S.Stream S.defaultProtocol + S.bind soc (S.SockAddrUnix sock) -- Allow everyone to read and write to the socket; tor is probably -- running as a different user. Connections have to authenticate -- to do anything, so it's fine that other local users can connect. modifyFileMode sock $ addModes [groupReadMode, groupWriteMode, otherReadMode, otherWriteMode] - listen soc 2 + S.listen soc 2 debugM "remotedaemon" "Tor hidden service running" forever $ do - (conn, _) <- accept soc + (conn, _) <- S.accept soc h <- setupHandle conn ok <- atomically $ ifM (isFullTBQueue q) ( return False @@ -97,7 +100,7 @@ serveClient th u r q = bracket setup cleanup start , connIhdl = h , connOhdl = h } - v <- liftIO $ runNetProto conn $ serveAuth u + v <- liftIO $ runNetProto conn $ P2P.serveAuth u case v of Right (Just theiruuid) -> authed conn theiruuid Right Nothing -> liftIO $ @@ -110,7 +113,57 @@ serveClient th u r q = bracket setup cleanup start authed conn theiruuid = bracket watchChangedRefs (liftIO . stopWatchingChangedRefs) $ \crh -> do v' <- runFullProto (Serving theiruuid crh) conn $ - serveAuthed u + P2P.serveAuthed u case v' of Right () -> return () Left e -> liftIO $ debugM "remotedaemon" ("Tor connection error: " ++ e) + +-- Connect to peer's tor hidden service. +transport :: Transport +transport (RemoteRepo r _) url@(RemoteURI uri) th ichan ochan = + case unformatP2PAddress (show uri) of + Nothing -> return () + Just addr -> robustConnection 1 $ do + g <- liftAnnex th Annex.gitRepo + bracket (connectPeer g addr) closeConnection (go addr) + where + go addr conn = do + myuuid <- liftAnnex th getUUID + authtoken <- fromMaybe nullAuthToken + <$> liftAnnex th (loadP2PRemoteAuthToken addr) + res <- runNetProto conn $ + P2P.auth myuuid authtoken + case res of + Right (Just theiruuid) + | getUncachedUUID r == theiruuid -> do + send (CONNECTED url) + status <- handlecontrol + `race` handlepeer conn + send (DISCONNECTED url) + return $ either id id status + | otherwise -> return ConnectionStopping + _ -> return ConnectionClosed + + send msg = atomically $ writeTChan ochan msg + + handlecontrol = do + msg <- atomically $ readTChan ichan + case msg of + STOP -> return ConnectionStopping + LOSTNET -> return ConnectionStopping + _ -> handlecontrol + + handlepeer conn = do + v <- runNetProto conn P2P.notifyChange + case v of + Right (Just (ChangedRefs shas)) -> do + whenM (checkNewShas th shas) $ + fetch + handlepeer conn + _ -> return ConnectionClosed + + fetch = do + send (SYNCING url) + ok <- inLocalRepo th $ + runBool [Param "fetch", Param $ Git.repoDescribe r] + send (DONESYNCING url ok) -- cgit v1.2.3