aboutsummaryrefslogtreecommitdiff
path: root/RemoteDaemon
diff options
context:
space:
mode:
authorGravatar Joey Hess <joeyh@joeyh.name>2016-12-09 16:02:43 -0400
committerGravatar Joey Hess <joeyh@joeyh.name>2016-12-09 16:02:43 -0400
commit46f3063131f7711208c1134b31caf999e375306d (patch)
tree47ab66581b0d2de95865b0f7948db041f6589935 /RemoteDaemon
parent1eeae43d88c924dfac7c004ff7aeb67e17c52a13 (diff)
remotedaemon: git change detection over tor hidden service
Diffstat (limited to 'RemoteDaemon')
-rw-r--r--RemoteDaemon/Common.hs24
-rw-r--r--RemoteDaemon/Transport.hs2
-rw-r--r--RemoteDaemon/Transport/Ssh.hs29
-rw-r--r--RemoteDaemon/Transport/Tor.hs73
4 files changed, 93 insertions, 35 deletions
diff --git a/RemoteDaemon/Common.hs b/RemoteDaemon/Common.hs
index 982a84b43..711771f97 100644
--- a/RemoteDaemon/Common.hs
+++ b/RemoteDaemon/Common.hs
@@ -1,6 +1,6 @@
{- git-remote-daemon utilities
-
- - Copyright 2014 Joey Hess <id@joeyh.name>
+ - Copyright 2014-2016 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU GPL version 3 or higher.
-}
@@ -9,6 +9,8 @@ module RemoteDaemon.Common
( liftAnnex
, inLocalRepo
, checkNewShas
+ , ConnectionStatus(..)
+ , robustConnection
) where
import qualified Annex
@@ -16,6 +18,7 @@ import Annex.Common
import RemoteDaemon.Types
import qualified Git
import Annex.CatFile
+import Utility.ThreadScheduler
import Control.Concurrent
@@ -40,3 +43,22 @@ checkNewShas transporthandle = check
check [] = return True
check (r:rs) = maybe (check rs) (const $ return False)
=<< liftAnnex transporthandle (catObjectDetails r)
+
+data ConnectionStatus = ConnectionStopping | ConnectionClosed
+
+{- Make connection robust, retrying on error, with exponential backoff. -}
+robustConnection :: Int -> IO ConnectionStatus -> IO ()
+robustConnection backoff a =
+ caught =<< a `catchNonAsync` (const $ return ConnectionClosed)
+ where
+ caught ConnectionStopping = return ()
+ caught ConnectionClosed = do
+ threadDelaySeconds (Seconds backoff)
+ robustConnection increasedbackoff a
+
+ increasedbackoff
+ | b2 > maxbackoff = maxbackoff
+ | otherwise = b2
+ where
+ b2 = backoff * 2
+ maxbackoff = 3600 -- one hour
diff --git a/RemoteDaemon/Transport.hs b/RemoteDaemon/Transport.hs
index 6605012de..053973424 100644
--- a/RemoteDaemon/Transport.hs
+++ b/RemoteDaemon/Transport.hs
@@ -12,6 +12,7 @@ import qualified RemoteDaemon.Transport.Ssh
import qualified RemoteDaemon.Transport.GCrypt
import qualified RemoteDaemon.Transport.Tor
import qualified Git.GCrypt
+import P2P.Address (torAnnexScheme)
import qualified Data.Map as M
@@ -22,6 +23,7 @@ remoteTransports :: M.Map TransportScheme Transport
remoteTransports = M.fromList
[ ("ssh:", RemoteDaemon.Transport.Ssh.transport)
, (Git.GCrypt.urlScheme, RemoteDaemon.Transport.GCrypt.transport)
+ , (torAnnexScheme, RemoteDaemon.Transport.Tor.transport)
]
remoteServers :: [TransportHandle -> IO ()]
diff --git a/RemoteDaemon/Transport/Ssh.hs b/RemoteDaemon/Transport/Ssh.hs
index 59502f8d3..6f8e8323e 100644
--- a/RemoteDaemon/Transport/Ssh.hs
+++ b/RemoteDaemon/Transport/Ssh.hs
@@ -16,7 +16,6 @@ import qualified RemoteDaemon.Transport.Ssh.Types as SshRemote
import Utility.SimpleProtocol
import qualified Git
import Git.Command
-import Utility.ThreadScheduler
import Annex.ChangedRefs
import Control.Concurrent.STM
@@ -38,7 +37,7 @@ transportUsingCmd cmd params rr@(RemoteRepo r gc) url h@(TransportHandle (LocalR
transportUsingCmd' :: FilePath -> [CommandParam] -> Transport
transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan =
- robustly 1 $ do
+ robustConnection 1 $ do
(Just toh, Just fromh, Just errh, pid) <-
createProcess (proc cmd (toCommand params))
{ std_in = CreatePipe
@@ -79,13 +78,13 @@ transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan =
fetch
handlestdout fromh
-- avoid reconnect on protocol error
- Nothing -> return Stopping
+ Nothing -> return ConnectionStopping
handlecontrol = do
msg <- atomically $ readTChan ichan
case msg of
- STOP -> return Stopping
- LOSTNET -> return Stopping
+ STOP -> return ConnectionStopping
+ LOSTNET -> return ConnectionStopping
_ -> handlecontrol
-- Old versions of git-annex-shell that do not support
@@ -103,23 +102,5 @@ transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan =
, "needs its git-annex upgraded"
, "to 5.20140405 or newer"
]
- return Stopping
+ return ConnectionStopping
else handlestderr errh
-
-data Status = Stopping | ConnectionClosed
-
-{- Make connection robustly, with exponential backoff on failure. -}
-robustly :: Int -> IO Status -> IO ()
-robustly backoff a = caught =<< catchDefaultIO ConnectionClosed a
- where
- caught Stopping = return ()
- caught ConnectionClosed = do
- threadDelaySeconds (Seconds backoff)
- robustly increasedbackoff a
-
- increasedbackoff
- | b2 > maxbackoff = maxbackoff
- | otherwise = b2
- where
- b2 = backoff * 2
- maxbackoff = 3600 -- one hour
diff --git a/RemoteDaemon/Transport/Tor.hs b/RemoteDaemon/Transport/Tor.hs
index 5ea06ac2c..20320cadd 100644
--- a/RemoteDaemon/Transport/Tor.hs
+++ b/RemoteDaemon/Transport/Tor.hs
@@ -1,11 +1,11 @@
-{- git-remote-daemon, tor hidden service transport
+{- git-remote-daemon, tor hidden service server and transport
-
- Copyright 2016 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU GPL version 3 or higher.
-}
-module RemoteDaemon.Transport.Tor (server) where
+module RemoteDaemon.Transport.Tor (server, transport) where
import Common
import qualified Annex
@@ -16,20 +16,23 @@ import RemoteDaemon.Common
import Utility.Tor
import Utility.FileMode
import Utility.AuthToken
-import P2P.Protocol
+import P2P.Protocol as P2P
import P2P.IO
import P2P.Annex
import P2P.Auth
+import P2P.Address
import Annex.UUID
import Types.UUID
import Messages
import Git
+import Git.Command
import System.PosixCompat.User
-import Network.Socket
import Control.Concurrent
import System.Log.Logger (debugM)
import Control.Concurrent.STM
+import Control.Concurrent.Async
+import qualified Network.Socket as S
-- Run tor hidden service.
server :: TransportHandle -> IO ()
@@ -44,17 +47,17 @@ server th@(TransportHandle (LocalRepo r) _) = do
let ident = fromUUID u
let sock = hiddenServiceSocketFile uid ident
nukeFile sock
- soc <- socket AF_UNIX Stream defaultProtocol
- bind soc (SockAddrUnix sock)
+ soc <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
+ S.bind soc (S.SockAddrUnix sock)
-- Allow everyone to read and write to the socket; tor is probably
-- running as a different user. Connections have to authenticate
-- to do anything, so it's fine that other local users can connect.
modifyFileMode sock $ addModes
[groupReadMode, groupWriteMode, otherReadMode, otherWriteMode]
- listen soc 2
+ S.listen soc 2
debugM "remotedaemon" "Tor hidden service running"
forever $ do
- (conn, _) <- accept soc
+ (conn, _) <- S.accept soc
h <- setupHandle conn
ok <- atomically $ ifM (isFullTBQueue q)
( return False
@@ -97,7 +100,7 @@ serveClient th u r q = bracket setup cleanup start
, connIhdl = h
, connOhdl = h
}
- v <- liftIO $ runNetProto conn $ serveAuth u
+ v <- liftIO $ runNetProto conn $ P2P.serveAuth u
case v of
Right (Just theiruuid) -> authed conn theiruuid
Right Nothing -> liftIO $
@@ -110,7 +113,57 @@ serveClient th u r q = bracket setup cleanup start
authed conn theiruuid =
bracket watchChangedRefs (liftIO . stopWatchingChangedRefs) $ \crh -> do
v' <- runFullProto (Serving theiruuid crh) conn $
- serveAuthed u
+ P2P.serveAuthed u
case v' of
Right () -> return ()
Left e -> liftIO $ debugM "remotedaemon" ("Tor connection error: " ++ e)
+
+-- Connect to peer's tor hidden service.
+transport :: Transport
+transport (RemoteRepo r _) url@(RemoteURI uri) th ichan ochan =
+ case unformatP2PAddress (show uri) of
+ Nothing -> return ()
+ Just addr -> robustConnection 1 $ do
+ g <- liftAnnex th Annex.gitRepo
+ bracket (connectPeer g addr) closeConnection (go addr)
+ where
+ go addr conn = do
+ myuuid <- liftAnnex th getUUID
+ authtoken <- fromMaybe nullAuthToken
+ <$> liftAnnex th (loadP2PRemoteAuthToken addr)
+ res <- runNetProto conn $
+ P2P.auth myuuid authtoken
+ case res of
+ Right (Just theiruuid)
+ | getUncachedUUID r == theiruuid -> do
+ send (CONNECTED url)
+ status <- handlecontrol
+ `race` handlepeer conn
+ send (DISCONNECTED url)
+ return $ either id id status
+ | otherwise -> return ConnectionStopping
+ _ -> return ConnectionClosed
+
+ send msg = atomically $ writeTChan ochan msg
+
+ handlecontrol = do
+ msg <- atomically $ readTChan ichan
+ case msg of
+ STOP -> return ConnectionStopping
+ LOSTNET -> return ConnectionStopping
+ _ -> handlecontrol
+
+ handlepeer conn = do
+ v <- runNetProto conn P2P.notifyChange
+ case v of
+ Right (Just (ChangedRefs shas)) -> do
+ whenM (checkNewShas th shas) $
+ fetch
+ handlepeer conn
+ _ -> return ConnectionClosed
+
+ fetch = do
+ send (SYNCING url)
+ ok <- inLocalRepo th $
+ runBool [Param "fetch", Param $ Git.repoDescribe r]
+ send (DONESYNCING url ok)