diff options
Diffstat (limited to 'RemoteDaemon')
-rw-r--r-- | RemoteDaemon/Common.hs | 24 | ||||
-rw-r--r-- | RemoteDaemon/Core.hs | 36 | ||||
-rw-r--r-- | RemoteDaemon/Transport.hs | 6 | ||||
-rw-r--r-- | RemoteDaemon/Transport/Ssh.hs | 36 | ||||
-rw-r--r-- | RemoteDaemon/Transport/Ssh/Types.hs | 4 | ||||
-rw-r--r-- | RemoteDaemon/Transport/Tor.hs | 162 | ||||
-rw-r--r-- | RemoteDaemon/Types.hs | 14 |
7 files changed, 233 insertions, 49 deletions
diff --git a/RemoteDaemon/Common.hs b/RemoteDaemon/Common.hs index 982a84b43..711771f97 100644 --- a/RemoteDaemon/Common.hs +++ b/RemoteDaemon/Common.hs @@ -1,6 +1,6 @@ {- git-remote-daemon utilities - - - Copyright 2014 Joey Hess <id@joeyh.name> + - Copyright 2014-2016 Joey Hess <id@joeyh.name> - - Licensed under the GNU GPL version 3 or higher. -} @@ -9,6 +9,8 @@ module RemoteDaemon.Common ( liftAnnex , inLocalRepo , checkNewShas + , ConnectionStatus(..) + , robustConnection ) where import qualified Annex @@ -16,6 +18,7 @@ import Annex.Common import RemoteDaemon.Types import qualified Git import Annex.CatFile +import Utility.ThreadScheduler import Control.Concurrent @@ -40,3 +43,22 @@ checkNewShas transporthandle = check check [] = return True check (r:rs) = maybe (check rs) (const $ return False) =<< liftAnnex transporthandle (catObjectDetails r) + +data ConnectionStatus = ConnectionStopping | ConnectionClosed + +{- Make connection robust, retrying on error, with exponential backoff. -} +robustConnection :: Int -> IO ConnectionStatus -> IO () +robustConnection backoff a = + caught =<< a `catchNonAsync` (const $ return ConnectionClosed) + where + caught ConnectionStopping = return () + caught ConnectionClosed = do + threadDelaySeconds (Seconds backoff) + robustConnection increasedbackoff a + + increasedbackoff + | b2 > maxbackoff = maxbackoff + | otherwise = b2 + where + b2 = backoff * 2 + maxbackoff = 3600 -- one hour diff --git a/RemoteDaemon/Core.hs b/RemoteDaemon/Core.hs index 5fa413155..2166c2b7a 100644 --- a/RemoteDaemon/Core.hs +++ b/RemoteDaemon/Core.hs @@ -1,11 +1,11 @@ {- git-remote-daemon core - - - Copyright 2014 Joey Hess <id@joeyh.name> + - Copyright 2014-2016 Joey Hess <id@joeyh.name> - - Licensed under the GNU GPL version 3 or higher. -} -module RemoteDaemon.Core (runForeground) where +module RemoteDaemon.Core (runInteractive, runNonInteractive) where import qualified Annex import Common @@ -17,8 +17,10 @@ import qualified Git import qualified Git.Types as Git import qualified Git.CurrentRepo import Utility.SimpleProtocol +import Utility.ThreadScheduler import Config import Annex.Ssh +import Types.Messages import Control.Concurrent import Control.Concurrent.Async @@ -26,8 +28,8 @@ import Control.Concurrent.STM import Network.URI import qualified Data.Map as M -runForeground :: IO () -runForeground = do +runInteractive :: IO () +runInteractive = do (readh, writeh) <- dupIoHandles ichan <- newTChanIO :: IO (TChan Consumed) ochan <- newTChanIO :: IO (TChan Emitted) @@ -44,8 +46,25 @@ runForeground = do let controller = runController ichan ochan -- If any thread fails, the rest will be killed. - void $ tryIO $ - reader `concurrently` writer `concurrently` controller + void $ tryIO $ reader + `concurrently` writer + `concurrently` controller + +runNonInteractive :: IO () +runNonInteractive = do + ichan <- newTChanIO :: IO (TChan Consumed) + ochan <- newTChanIO :: IO (TChan Emitted) + + let reader = forever $ do + threadDelaySeconds (Seconds (60*60)) + atomically $ writeTChan ichan RELOAD + let writer = forever $ + void $ atomically $ readTChan ochan + let controller = runController ichan ochan + + void $ tryIO $ reader + `concurrently` writer + `concurrently` controller type RemoteMap = M.Map Git.Repo (IO (), TChan Consumed) @@ -56,6 +75,7 @@ runController ichan ochan = do h <- genTransportHandle m <- genRemoteMap h ochan startrunning m + mapM_ (\s -> async (s h)) remoteServers go h False m where go h paused m = do @@ -132,7 +152,9 @@ genTransportHandle :: IO TransportHandle genTransportHandle = do annexstate <- newMVar =<< Annex.new =<< Git.CurrentRepo.get g <- Annex.repo <$> readMVar annexstate - return $ TransportHandle (LocalRepo g) annexstate + let h = TransportHandle (LocalRepo g) annexstate + liftAnnex h $ Annex.setOutput QuietOutput + return h updateTransportHandle :: TransportHandle -> IO TransportHandle updateTransportHandle h@(TransportHandle _g annexstate) = do diff --git a/RemoteDaemon/Transport.hs b/RemoteDaemon/Transport.hs index 0e2040d1f..053973424 100644 --- a/RemoteDaemon/Transport.hs +++ b/RemoteDaemon/Transport.hs @@ -10,7 +10,9 @@ module RemoteDaemon.Transport where import RemoteDaemon.Types import qualified RemoteDaemon.Transport.Ssh import qualified RemoteDaemon.Transport.GCrypt +import qualified RemoteDaemon.Transport.Tor import qualified Git.GCrypt +import P2P.Address (torAnnexScheme) import qualified Data.Map as M @@ -21,4 +23,8 @@ remoteTransports :: M.Map TransportScheme Transport remoteTransports = M.fromList [ ("ssh:", RemoteDaemon.Transport.Ssh.transport) , (Git.GCrypt.urlScheme, RemoteDaemon.Transport.GCrypt.transport) + , (torAnnexScheme, RemoteDaemon.Transport.Tor.transport) ] + +remoteServers :: [TransportHandle -> IO ()] +remoteServers = [RemoteDaemon.Transport.Tor.server] diff --git a/RemoteDaemon/Transport/Ssh.hs b/RemoteDaemon/Transport/Ssh.hs index 73c88054c..6f8e8323e 100644 --- a/RemoteDaemon/Transport/Ssh.hs +++ b/RemoteDaemon/Transport/Ssh.hs @@ -16,7 +16,7 @@ import qualified RemoteDaemon.Transport.Ssh.Types as SshRemote import Utility.SimpleProtocol import qualified Git import Git.Command -import Utility.ThreadScheduler +import Annex.ChangedRefs import Control.Concurrent.STM import Control.Concurrent.Async @@ -37,7 +37,7 @@ transportUsingCmd cmd params rr@(RemoteRepo r gc) url h@(TransportHandle (LocalR transportUsingCmd' :: FilePath -> [CommandParam] -> Transport transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan = - robustly 1 $ do + robustConnection 1 $ do (Just toh, Just fromh, Just errh, pid) <- createProcess (proc cmd (toCommand params)) { std_in = CreatePipe @@ -68,23 +68,23 @@ transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan = send (DONESYNCING url ok) handlestdout fromh = do - l <- hGetLine fromh - case parseMessage l of + ml <- getProtocolLine fromh + case parseMessage =<< ml of Just SshRemote.READY -> do send (CONNECTED url) handlestdout fromh - Just (SshRemote.CHANGED shas) -> do + Just (SshRemote.CHANGED (ChangedRefs shas)) -> do whenM (checkNewShas transporthandle shas) $ fetch handlestdout fromh -- avoid reconnect on protocol error - Nothing -> return Stopping + Nothing -> return ConnectionStopping handlecontrol = do msg <- atomically $ readTChan ichan case msg of - STOP -> return Stopping - LOSTNET -> return Stopping + STOP -> return ConnectionStopping + LOSTNET -> return ConnectionStopping _ -> handlecontrol -- Old versions of git-annex-shell that do not support @@ -102,23 +102,5 @@ transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan = , "needs its git-annex upgraded" , "to 5.20140405 or newer" ] - return Stopping + return ConnectionStopping else handlestderr errh - -data Status = Stopping | ConnectionClosed - -{- Make connection robustly, with exponential backoff on failure. -} -robustly :: Int -> IO Status -> IO () -robustly backoff a = caught =<< catchDefaultIO ConnectionClosed a - where - caught Stopping = return () - caught ConnectionClosed = do - threadDelaySeconds (Seconds backoff) - robustly increasedbackoff a - - increasedbackoff - | b2 > maxbackoff = maxbackoff - | otherwise = b2 - where - b2 = backoff * 2 - maxbackoff = 3600 -- one hour diff --git a/RemoteDaemon/Transport/Ssh/Types.hs b/RemoteDaemon/Transport/Ssh/Types.hs index fa6a55d3d..606e1a563 100644 --- a/RemoteDaemon/Transport/Ssh/Types.hs +++ b/RemoteDaemon/Transport/Ssh/Types.hs @@ -16,11 +16,11 @@ module RemoteDaemon.Transport.Ssh.Types ( ) where import qualified Utility.SimpleProtocol as Proto -import RemoteDaemon.Types (RefList) +import Annex.ChangedRefs (ChangedRefs) data Notification = READY - | CHANGED RefList + | CHANGED ChangedRefs instance Proto.Sendable Notification where formatMessage READY = ["READY"] diff --git a/RemoteDaemon/Transport/Tor.hs b/RemoteDaemon/Transport/Tor.hs new file mode 100644 index 000000000..e7d3794d6 --- /dev/null +++ b/RemoteDaemon/Transport/Tor.hs @@ -0,0 +1,162 @@ +{- git-remote-daemon, tor hidden service server and transport + - + - Copyright 2016 Joey Hess <id@joeyh.name> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module RemoteDaemon.Transport.Tor (server, transport) where + +import Common +import qualified Annex +import Annex.Concurrent +import Annex.ChangedRefs +import RemoteDaemon.Types +import RemoteDaemon.Common +import Utility.Tor +import Utility.AuthToken +import P2P.Protocol as P2P +import P2P.IO +import P2P.Annex +import P2P.Auth +import P2P.Address +import Annex.UUID +import Types.UUID +import Messages +import Git +import Git.Command + +import System.PosixCompat.User +import Control.Concurrent +import System.Log.Logger (debugM) +import Control.Concurrent.STM +import Control.Concurrent.STM.TBMQueue +import Control.Concurrent.Async + +-- Run tor hidden service. +server :: TransportHandle -> IO () +server th@(TransportHandle (LocalRepo r) _) = do + u <- liftAnnex th getUUID + uid <- getRealUserID + let ident = fromUUID u + go u =<< getHiddenServiceSocketFile torAppName uid ident + where + go u (Just sock) = do + q <- newTBMQueueIO maxConnections + replicateM_ maxConnections $ + forkIO $ forever $ serveClient th u r q + + debugM "remotedaemon" "Tor hidden service running" + serveUnixSocket sock $ \conn -> do + ok <- atomically $ ifM (isFullTBMQueue q) + ( return False + , do + writeTBMQueue q conn + return True + ) + unless ok $ do + hClose conn + warningIO "dropped Tor connection, too busy" + go _ Nothing = debugM "remotedaemon" "Tor hidden service not enabled" + +-- How many clients to serve at a time, maximum. This is to avoid DOS attacks. +maxConnections :: Int +maxConnections = 100 + +serveClient :: TransportHandle -> UUID -> Repo -> TBMQueue Handle -> IO () +serveClient th u r q = bracket setup cleanup start + where + setup = do + h <- atomically $ readTBMQueue q + debugM "remotedaemon" "serving a Tor connection" + return h + + cleanup Nothing = return () + cleanup (Just h) = do + debugM "remotedaemon" "done with Tor connection" + hClose h + + start Nothing = return () + start (Just h) = do + -- Avoid doing any work in the liftAnnex, since only one + -- can run at a time. + st <- liftAnnex th dupState + ((), st') <- Annex.run st $ do + -- Load auth tokens for every connection, to notice + -- when the allowed set is changed. + allowed <- loadP2PAuthTokens + let conn = P2PConnection + { connRepo = r + , connCheckAuth = (`isAllowedAuthToken` allowed) + , connIhdl = h + , connOhdl = h + } + v <- liftIO $ runNetProto conn $ P2P.serveAuth u + case v of + Right (Just theiruuid) -> authed conn theiruuid + Right Nothing -> liftIO $ + debugM "remotedaemon" "Tor connection failed to authenticate" + Left e -> liftIO $ + debugM "remotedaemon" ("Tor connection error before authentication: " ++ e) + -- Merge the duplicated state back in. + liftAnnex th $ mergeState st' + + authed conn theiruuid = + bracket watchChangedRefs (liftIO . maybe noop stopWatchingChangedRefs) $ \crh -> do + v' <- runFullProto (Serving theiruuid crh) conn $ + P2P.serveAuthed u + case v' of + Right () -> return () + Left e -> liftIO $ debugM "remotedaemon" ("Tor connection error: " ++ e) + +-- Connect to peer's tor hidden service. +transport :: Transport +transport (RemoteRepo r _) url@(RemoteURI uri) th ichan ochan = + case unformatP2PAddress (show uri) of + Nothing -> return () + Just addr -> robustConnection 1 $ do + g <- liftAnnex th Annex.gitRepo + bracket (connectPeer g addr) closeConnection (go addr) + where + go addr conn = do + myuuid <- liftAnnex th getUUID + authtoken <- fromMaybe nullAuthToken + <$> liftAnnex th (loadP2PRemoteAuthToken addr) + res <- runNetProto conn $ + P2P.auth myuuid authtoken + case res of + Right (Just theiruuid) -> do + expecteduuid <- liftAnnex th $ getRepoUUID r + if expecteduuid == theiruuid + then do + send (CONNECTED url) + status <- handlecontrol + `race` handlepeer conn + send (DISCONNECTED url) + return $ either id id status + else return ConnectionStopping + _ -> return ConnectionClosed + + send msg = atomically $ writeTChan ochan msg + + handlecontrol = do + msg <- atomically $ readTChan ichan + case msg of + STOP -> return ConnectionStopping + LOSTNET -> return ConnectionStopping + _ -> handlecontrol + + handlepeer conn = do + v <- runNetProto conn P2P.notifyChange + case v of + Right (Just (ChangedRefs shas)) -> do + whenM (checkNewShas th shas) $ + fetch + handlepeer conn + _ -> return ConnectionClosed + + fetch = do + send (SYNCING url) + ok <- inLocalRepo th $ + runBool [Param "fetch", Param $ Git.repoDescribe r] + send (DONESYNCING url ok) diff --git a/RemoteDaemon/Types.hs b/RemoteDaemon/Types.hs index f85219ea5..c0d74e038 100644 --- a/RemoteDaemon/Types.hs +++ b/RemoteDaemon/Types.hs @@ -5,7 +5,6 @@ - Licensed under the GNU GPL version 3 or higher. -} -{-# LANGUAGE TypeSynonymInstances, FlexibleInstances #-} {-# OPTIONS_GHC -fno-warn-orphans #-} module RemoteDaemon.Types where @@ -15,6 +14,7 @@ import qualified Annex import qualified Git.Types as Git import qualified Utility.SimpleProtocol as Proto import Types.GitConfig +import Annex.ChangedRefs (ChangedRefs) import Network.URI import Control.Concurrent @@ -52,13 +52,11 @@ data Consumed = PAUSE | LOSTNET | RESUME - | CHANGED RefList + | CHANGED ChangedRefs | RELOAD | STOP deriving (Show) -type RefList = [Git.Ref] - instance Proto.Sendable Emitted where formatMessage (CONNECTED remote) = ["CONNECTED", Proto.serialize remote] @@ -100,14 +98,6 @@ instance Proto.Serializable RemoteURI where serialize (RemoteURI u) = show u deserialize = RemoteURI <$$> parseURI -instance Proto.Serializable [Char] where - serialize = id - deserialize = Just - -instance Proto.Serializable RefList where - serialize = unwords . map Git.fromRef - deserialize = Just . map Git.Ref . words - instance Proto.Serializable Bool where serialize False = "0" serialize True = "1" |