aboutsummaryrefslogtreecommitdiff
path: root/RemoteDaemon/Transport
diff options
context:
space:
mode:
authorGravatar Joey Hess <joeyh@joeyh.name>2016-12-24 14:48:51 -0400
committerGravatar Joey Hess <joeyh@joeyh.name>2016-12-24 15:01:55 -0400
commit42e08cd4575d3dc558dfe172c1f28c752d69e8c6 (patch)
tree78a8eddc31c390aaf8f66435bb13db9366f9a7c4 /RemoteDaemon/Transport
parent34f375526f44ff255d45bbabcd1425b3d5d0bb4a (diff)
parent3b9d9a267b7c9247d36d9b622e1b836724ca5fb0 (diff)
Merge branch 'master' into no-xmpp
Diffstat (limited to 'RemoteDaemon/Transport')
-rw-r--r--RemoteDaemon/Transport/Ssh.hs36
-rw-r--r--RemoteDaemon/Transport/Ssh/Types.hs4
-rw-r--r--RemoteDaemon/Transport/Tor.hs162
3 files changed, 173 insertions, 29 deletions
diff --git a/RemoteDaemon/Transport/Ssh.hs b/RemoteDaemon/Transport/Ssh.hs
index 73c88054c..6f8e8323e 100644
--- a/RemoteDaemon/Transport/Ssh.hs
+++ b/RemoteDaemon/Transport/Ssh.hs
@@ -16,7 +16,7 @@ import qualified RemoteDaemon.Transport.Ssh.Types as SshRemote
import Utility.SimpleProtocol
import qualified Git
import Git.Command
-import Utility.ThreadScheduler
+import Annex.ChangedRefs
import Control.Concurrent.STM
import Control.Concurrent.Async
@@ -37,7 +37,7 @@ transportUsingCmd cmd params rr@(RemoteRepo r gc) url h@(TransportHandle (LocalR
transportUsingCmd' :: FilePath -> [CommandParam] -> Transport
transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan =
- robustly 1 $ do
+ robustConnection 1 $ do
(Just toh, Just fromh, Just errh, pid) <-
createProcess (proc cmd (toCommand params))
{ std_in = CreatePipe
@@ -68,23 +68,23 @@ transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan =
send (DONESYNCING url ok)
handlestdout fromh = do
- l <- hGetLine fromh
- case parseMessage l of
+ ml <- getProtocolLine fromh
+ case parseMessage =<< ml of
Just SshRemote.READY -> do
send (CONNECTED url)
handlestdout fromh
- Just (SshRemote.CHANGED shas) -> do
+ Just (SshRemote.CHANGED (ChangedRefs shas)) -> do
whenM (checkNewShas transporthandle shas) $
fetch
handlestdout fromh
-- avoid reconnect on protocol error
- Nothing -> return Stopping
+ Nothing -> return ConnectionStopping
handlecontrol = do
msg <- atomically $ readTChan ichan
case msg of
- STOP -> return Stopping
- LOSTNET -> return Stopping
+ STOP -> return ConnectionStopping
+ LOSTNET -> return ConnectionStopping
_ -> handlecontrol
-- Old versions of git-annex-shell that do not support
@@ -102,23 +102,5 @@ transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan =
, "needs its git-annex upgraded"
, "to 5.20140405 or newer"
]
- return Stopping
+ return ConnectionStopping
else handlestderr errh
-
-data Status = Stopping | ConnectionClosed
-
-{- Make connection robustly, with exponential backoff on failure. -}
-robustly :: Int -> IO Status -> IO ()
-robustly backoff a = caught =<< catchDefaultIO ConnectionClosed a
- where
- caught Stopping = return ()
- caught ConnectionClosed = do
- threadDelaySeconds (Seconds backoff)
- robustly increasedbackoff a
-
- increasedbackoff
- | b2 > maxbackoff = maxbackoff
- | otherwise = b2
- where
- b2 = backoff * 2
- maxbackoff = 3600 -- one hour
diff --git a/RemoteDaemon/Transport/Ssh/Types.hs b/RemoteDaemon/Transport/Ssh/Types.hs
index fa6a55d3d..606e1a563 100644
--- a/RemoteDaemon/Transport/Ssh/Types.hs
+++ b/RemoteDaemon/Transport/Ssh/Types.hs
@@ -16,11 +16,11 @@ module RemoteDaemon.Transport.Ssh.Types (
) where
import qualified Utility.SimpleProtocol as Proto
-import RemoteDaemon.Types (RefList)
+import Annex.ChangedRefs (ChangedRefs)
data Notification
= READY
- | CHANGED RefList
+ | CHANGED ChangedRefs
instance Proto.Sendable Notification where
formatMessage READY = ["READY"]
diff --git a/RemoteDaemon/Transport/Tor.hs b/RemoteDaemon/Transport/Tor.hs
new file mode 100644
index 000000000..e7d3794d6
--- /dev/null
+++ b/RemoteDaemon/Transport/Tor.hs
@@ -0,0 +1,162 @@
+{- git-remote-daemon, tor hidden service server and transport
+ -
+ - Copyright 2016 Joey Hess <id@joeyh.name>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module RemoteDaemon.Transport.Tor (server, transport) where
+
+import Common
+import qualified Annex
+import Annex.Concurrent
+import Annex.ChangedRefs
+import RemoteDaemon.Types
+import RemoteDaemon.Common
+import Utility.Tor
+import Utility.AuthToken
+import P2P.Protocol as P2P
+import P2P.IO
+import P2P.Annex
+import P2P.Auth
+import P2P.Address
+import Annex.UUID
+import Types.UUID
+import Messages
+import Git
+import Git.Command
+
+import System.PosixCompat.User
+import Control.Concurrent
+import System.Log.Logger (debugM)
+import Control.Concurrent.STM
+import Control.Concurrent.STM.TBMQueue
+import Control.Concurrent.Async
+
+-- Run tor hidden service.
+server :: TransportHandle -> IO ()
+server th@(TransportHandle (LocalRepo r) _) = do
+ u <- liftAnnex th getUUID
+ uid <- getRealUserID
+ let ident = fromUUID u
+ go u =<< getHiddenServiceSocketFile torAppName uid ident
+ where
+ go u (Just sock) = do
+ q <- newTBMQueueIO maxConnections
+ replicateM_ maxConnections $
+ forkIO $ forever $ serveClient th u r q
+
+ debugM "remotedaemon" "Tor hidden service running"
+ serveUnixSocket sock $ \conn -> do
+ ok <- atomically $ ifM (isFullTBMQueue q)
+ ( return False
+ , do
+ writeTBMQueue q conn
+ return True
+ )
+ unless ok $ do
+ hClose conn
+ warningIO "dropped Tor connection, too busy"
+ go _ Nothing = debugM "remotedaemon" "Tor hidden service not enabled"
+
+-- How many clients to serve at a time, maximum. This is to avoid DOS attacks.
+maxConnections :: Int
+maxConnections = 100
+
+serveClient :: TransportHandle -> UUID -> Repo -> TBMQueue Handle -> IO ()
+serveClient th u r q = bracket setup cleanup start
+ where
+ setup = do
+ h <- atomically $ readTBMQueue q
+ debugM "remotedaemon" "serving a Tor connection"
+ return h
+
+ cleanup Nothing = return ()
+ cleanup (Just h) = do
+ debugM "remotedaemon" "done with Tor connection"
+ hClose h
+
+ start Nothing = return ()
+ start (Just h) = do
+ -- Avoid doing any work in the liftAnnex, since only one
+ -- can run at a time.
+ st <- liftAnnex th dupState
+ ((), st') <- Annex.run st $ do
+ -- Load auth tokens for every connection, to notice
+ -- when the allowed set is changed.
+ allowed <- loadP2PAuthTokens
+ let conn = P2PConnection
+ { connRepo = r
+ , connCheckAuth = (`isAllowedAuthToken` allowed)
+ , connIhdl = h
+ , connOhdl = h
+ }
+ v <- liftIO $ runNetProto conn $ P2P.serveAuth u
+ case v of
+ Right (Just theiruuid) -> authed conn theiruuid
+ Right Nothing -> liftIO $
+ debugM "remotedaemon" "Tor connection failed to authenticate"
+ Left e -> liftIO $
+ debugM "remotedaemon" ("Tor connection error before authentication: " ++ e)
+ -- Merge the duplicated state back in.
+ liftAnnex th $ mergeState st'
+
+ authed conn theiruuid =
+ bracket watchChangedRefs (liftIO . maybe noop stopWatchingChangedRefs) $ \crh -> do
+ v' <- runFullProto (Serving theiruuid crh) conn $
+ P2P.serveAuthed u
+ case v' of
+ Right () -> return ()
+ Left e -> liftIO $ debugM "remotedaemon" ("Tor connection error: " ++ e)
+
+-- Connect to peer's tor hidden service.
+transport :: Transport
+transport (RemoteRepo r _) url@(RemoteURI uri) th ichan ochan =
+ case unformatP2PAddress (show uri) of
+ Nothing -> return ()
+ Just addr -> robustConnection 1 $ do
+ g <- liftAnnex th Annex.gitRepo
+ bracket (connectPeer g addr) closeConnection (go addr)
+ where
+ go addr conn = do
+ myuuid <- liftAnnex th getUUID
+ authtoken <- fromMaybe nullAuthToken
+ <$> liftAnnex th (loadP2PRemoteAuthToken addr)
+ res <- runNetProto conn $
+ P2P.auth myuuid authtoken
+ case res of
+ Right (Just theiruuid) -> do
+ expecteduuid <- liftAnnex th $ getRepoUUID r
+ if expecteduuid == theiruuid
+ then do
+ send (CONNECTED url)
+ status <- handlecontrol
+ `race` handlepeer conn
+ send (DISCONNECTED url)
+ return $ either id id status
+ else return ConnectionStopping
+ _ -> return ConnectionClosed
+
+ send msg = atomically $ writeTChan ochan msg
+
+ handlecontrol = do
+ msg <- atomically $ readTChan ichan
+ case msg of
+ STOP -> return ConnectionStopping
+ LOSTNET -> return ConnectionStopping
+ _ -> handlecontrol
+
+ handlepeer conn = do
+ v <- runNetProto conn P2P.notifyChange
+ case v of
+ Right (Just (ChangedRefs shas)) -> do
+ whenM (checkNewShas th shas) $
+ fetch
+ handlepeer conn
+ _ -> return ConnectionClosed
+
+ fetch = do
+ send (SYNCING url)
+ ok <- inLocalRepo th $
+ runBool [Param "fetch", Param $ Git.repoDescribe r]
+ send (DONESYNCING url ok)