summaryrefslogtreecommitdiff
path: root/RemoteDaemon
diff options
context:
space:
mode:
Diffstat (limited to 'RemoteDaemon')
-rw-r--r--RemoteDaemon/Common.hs24
-rw-r--r--RemoteDaemon/Core.hs36
-rw-r--r--RemoteDaemon/Transport.hs6
-rw-r--r--RemoteDaemon/Transport/Ssh.hs36
-rw-r--r--RemoteDaemon/Transport/Ssh/Types.hs4
-rw-r--r--RemoteDaemon/Transport/Tor.hs162
-rw-r--r--RemoteDaemon/Types.hs14
7 files changed, 233 insertions, 49 deletions
diff --git a/RemoteDaemon/Common.hs b/RemoteDaemon/Common.hs
index 982a84b43..711771f97 100644
--- a/RemoteDaemon/Common.hs
+++ b/RemoteDaemon/Common.hs
@@ -1,6 +1,6 @@
{- git-remote-daemon utilities
-
- - Copyright 2014 Joey Hess <id@joeyh.name>
+ - Copyright 2014-2016 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU GPL version 3 or higher.
-}
@@ -9,6 +9,8 @@ module RemoteDaemon.Common
( liftAnnex
, inLocalRepo
, checkNewShas
+ , ConnectionStatus(..)
+ , robustConnection
) where
import qualified Annex
@@ -16,6 +18,7 @@ import Annex.Common
import RemoteDaemon.Types
import qualified Git
import Annex.CatFile
+import Utility.ThreadScheduler
import Control.Concurrent
@@ -40,3 +43,22 @@ checkNewShas transporthandle = check
check [] = return True
check (r:rs) = maybe (check rs) (const $ return False)
=<< liftAnnex transporthandle (catObjectDetails r)
+
+data ConnectionStatus = ConnectionStopping | ConnectionClosed
+
+{- Make connection robust, retrying on error, with exponential backoff. -}
+robustConnection :: Int -> IO ConnectionStatus -> IO ()
+robustConnection backoff a =
+ caught =<< a `catchNonAsync` (const $ return ConnectionClosed)
+ where
+ caught ConnectionStopping = return ()
+ caught ConnectionClosed = do
+ threadDelaySeconds (Seconds backoff)
+ robustConnection increasedbackoff a
+
+ increasedbackoff
+ | b2 > maxbackoff = maxbackoff
+ | otherwise = b2
+ where
+ b2 = backoff * 2
+ maxbackoff = 3600 -- one hour
diff --git a/RemoteDaemon/Core.hs b/RemoteDaemon/Core.hs
index 5fa413155..2166c2b7a 100644
--- a/RemoteDaemon/Core.hs
+++ b/RemoteDaemon/Core.hs
@@ -1,11 +1,11 @@
{- git-remote-daemon core
-
- - Copyright 2014 Joey Hess <id@joeyh.name>
+ - Copyright 2014-2016 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU GPL version 3 or higher.
-}
-module RemoteDaemon.Core (runForeground) where
+module RemoteDaemon.Core (runInteractive, runNonInteractive) where
import qualified Annex
import Common
@@ -17,8 +17,10 @@ import qualified Git
import qualified Git.Types as Git
import qualified Git.CurrentRepo
import Utility.SimpleProtocol
+import Utility.ThreadScheduler
import Config
import Annex.Ssh
+import Types.Messages
import Control.Concurrent
import Control.Concurrent.Async
@@ -26,8 +28,8 @@ import Control.Concurrent.STM
import Network.URI
import qualified Data.Map as M
-runForeground :: IO ()
-runForeground = do
+runInteractive :: IO ()
+runInteractive = do
(readh, writeh) <- dupIoHandles
ichan <- newTChanIO :: IO (TChan Consumed)
ochan <- newTChanIO :: IO (TChan Emitted)
@@ -44,8 +46,25 @@ runForeground = do
let controller = runController ichan ochan
-- If any thread fails, the rest will be killed.
- void $ tryIO $
- reader `concurrently` writer `concurrently` controller
+ void $ tryIO $ reader
+ `concurrently` writer
+ `concurrently` controller
+
+runNonInteractive :: IO ()
+runNonInteractive = do
+ ichan <- newTChanIO :: IO (TChan Consumed)
+ ochan <- newTChanIO :: IO (TChan Emitted)
+
+ let reader = forever $ do
+ threadDelaySeconds (Seconds (60*60))
+ atomically $ writeTChan ichan RELOAD
+ let writer = forever $
+ void $ atomically $ readTChan ochan
+ let controller = runController ichan ochan
+
+ void $ tryIO $ reader
+ `concurrently` writer
+ `concurrently` controller
type RemoteMap = M.Map Git.Repo (IO (), TChan Consumed)
@@ -56,6 +75,7 @@ runController ichan ochan = do
h <- genTransportHandle
m <- genRemoteMap h ochan
startrunning m
+ mapM_ (\s -> async (s h)) remoteServers
go h False m
where
go h paused m = do
@@ -132,7 +152,9 @@ genTransportHandle :: IO TransportHandle
genTransportHandle = do
annexstate <- newMVar =<< Annex.new =<< Git.CurrentRepo.get
g <- Annex.repo <$> readMVar annexstate
- return $ TransportHandle (LocalRepo g) annexstate
+ let h = TransportHandle (LocalRepo g) annexstate
+ liftAnnex h $ Annex.setOutput QuietOutput
+ return h
updateTransportHandle :: TransportHandle -> IO TransportHandle
updateTransportHandle h@(TransportHandle _g annexstate) = do
diff --git a/RemoteDaemon/Transport.hs b/RemoteDaemon/Transport.hs
index 0e2040d1f..053973424 100644
--- a/RemoteDaemon/Transport.hs
+++ b/RemoteDaemon/Transport.hs
@@ -10,7 +10,9 @@ module RemoteDaemon.Transport where
import RemoteDaemon.Types
import qualified RemoteDaemon.Transport.Ssh
import qualified RemoteDaemon.Transport.GCrypt
+import qualified RemoteDaemon.Transport.Tor
import qualified Git.GCrypt
+import P2P.Address (torAnnexScheme)
import qualified Data.Map as M
@@ -21,4 +23,8 @@ remoteTransports :: M.Map TransportScheme Transport
remoteTransports = M.fromList
[ ("ssh:", RemoteDaemon.Transport.Ssh.transport)
, (Git.GCrypt.urlScheme, RemoteDaemon.Transport.GCrypt.transport)
+ , (torAnnexScheme, RemoteDaemon.Transport.Tor.transport)
]
+
+remoteServers :: [TransportHandle -> IO ()]
+remoteServers = [RemoteDaemon.Transport.Tor.server]
diff --git a/RemoteDaemon/Transport/Ssh.hs b/RemoteDaemon/Transport/Ssh.hs
index 73c88054c..6f8e8323e 100644
--- a/RemoteDaemon/Transport/Ssh.hs
+++ b/RemoteDaemon/Transport/Ssh.hs
@@ -16,7 +16,7 @@ import qualified RemoteDaemon.Transport.Ssh.Types as SshRemote
import Utility.SimpleProtocol
import qualified Git
import Git.Command
-import Utility.ThreadScheduler
+import Annex.ChangedRefs
import Control.Concurrent.STM
import Control.Concurrent.Async
@@ -37,7 +37,7 @@ transportUsingCmd cmd params rr@(RemoteRepo r gc) url h@(TransportHandle (LocalR
transportUsingCmd' :: FilePath -> [CommandParam] -> Transport
transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan =
- robustly 1 $ do
+ robustConnection 1 $ do
(Just toh, Just fromh, Just errh, pid) <-
createProcess (proc cmd (toCommand params))
{ std_in = CreatePipe
@@ -68,23 +68,23 @@ transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan =
send (DONESYNCING url ok)
handlestdout fromh = do
- l <- hGetLine fromh
- case parseMessage l of
+ ml <- getProtocolLine fromh
+ case parseMessage =<< ml of
Just SshRemote.READY -> do
send (CONNECTED url)
handlestdout fromh
- Just (SshRemote.CHANGED shas) -> do
+ Just (SshRemote.CHANGED (ChangedRefs shas)) -> do
whenM (checkNewShas transporthandle shas) $
fetch
handlestdout fromh
-- avoid reconnect on protocol error
- Nothing -> return Stopping
+ Nothing -> return ConnectionStopping
handlecontrol = do
msg <- atomically $ readTChan ichan
case msg of
- STOP -> return Stopping
- LOSTNET -> return Stopping
+ STOP -> return ConnectionStopping
+ LOSTNET -> return ConnectionStopping
_ -> handlecontrol
-- Old versions of git-annex-shell that do not support
@@ -102,23 +102,5 @@ transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan =
, "needs its git-annex upgraded"
, "to 5.20140405 or newer"
]
- return Stopping
+ return ConnectionStopping
else handlestderr errh
-
-data Status = Stopping | ConnectionClosed
-
-{- Make connection robustly, with exponential backoff on failure. -}
-robustly :: Int -> IO Status -> IO ()
-robustly backoff a = caught =<< catchDefaultIO ConnectionClosed a
- where
- caught Stopping = return ()
- caught ConnectionClosed = do
- threadDelaySeconds (Seconds backoff)
- robustly increasedbackoff a
-
- increasedbackoff
- | b2 > maxbackoff = maxbackoff
- | otherwise = b2
- where
- b2 = backoff * 2
- maxbackoff = 3600 -- one hour
diff --git a/RemoteDaemon/Transport/Ssh/Types.hs b/RemoteDaemon/Transport/Ssh/Types.hs
index fa6a55d3d..606e1a563 100644
--- a/RemoteDaemon/Transport/Ssh/Types.hs
+++ b/RemoteDaemon/Transport/Ssh/Types.hs
@@ -16,11 +16,11 @@ module RemoteDaemon.Transport.Ssh.Types (
) where
import qualified Utility.SimpleProtocol as Proto
-import RemoteDaemon.Types (RefList)
+import Annex.ChangedRefs (ChangedRefs)
data Notification
= READY
- | CHANGED RefList
+ | CHANGED ChangedRefs
instance Proto.Sendable Notification where
formatMessage READY = ["READY"]
diff --git a/RemoteDaemon/Transport/Tor.hs b/RemoteDaemon/Transport/Tor.hs
new file mode 100644
index 000000000..e7d3794d6
--- /dev/null
+++ b/RemoteDaemon/Transport/Tor.hs
@@ -0,0 +1,162 @@
+{- git-remote-daemon, tor hidden service server and transport
+ -
+ - Copyright 2016 Joey Hess <id@joeyh.name>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+module RemoteDaemon.Transport.Tor (server, transport) where
+
+import Common
+import qualified Annex
+import Annex.Concurrent
+import Annex.ChangedRefs
+import RemoteDaemon.Types
+import RemoteDaemon.Common
+import Utility.Tor
+import Utility.AuthToken
+import P2P.Protocol as P2P
+import P2P.IO
+import P2P.Annex
+import P2P.Auth
+import P2P.Address
+import Annex.UUID
+import Types.UUID
+import Messages
+import Git
+import Git.Command
+
+import System.PosixCompat.User
+import Control.Concurrent
+import System.Log.Logger (debugM)
+import Control.Concurrent.STM
+import Control.Concurrent.STM.TBMQueue
+import Control.Concurrent.Async
+
+-- Run tor hidden service.
+server :: TransportHandle -> IO ()
+server th@(TransportHandle (LocalRepo r) _) = do
+ u <- liftAnnex th getUUID
+ uid <- getRealUserID
+ let ident = fromUUID u
+ go u =<< getHiddenServiceSocketFile torAppName uid ident
+ where
+ go u (Just sock) = do
+ q <- newTBMQueueIO maxConnections
+ replicateM_ maxConnections $
+ forkIO $ forever $ serveClient th u r q
+
+ debugM "remotedaemon" "Tor hidden service running"
+ serveUnixSocket sock $ \conn -> do
+ ok <- atomically $ ifM (isFullTBMQueue q)
+ ( return False
+ , do
+ writeTBMQueue q conn
+ return True
+ )
+ unless ok $ do
+ hClose conn
+ warningIO "dropped Tor connection, too busy"
+ go _ Nothing = debugM "remotedaemon" "Tor hidden service not enabled"
+
+-- How many clients to serve at a time, maximum. This is to avoid DOS attacks.
+maxConnections :: Int
+maxConnections = 100
+
+serveClient :: TransportHandle -> UUID -> Repo -> TBMQueue Handle -> IO ()
+serveClient th u r q = bracket setup cleanup start
+ where
+ setup = do
+ h <- atomically $ readTBMQueue q
+ debugM "remotedaemon" "serving a Tor connection"
+ return h
+
+ cleanup Nothing = return ()
+ cleanup (Just h) = do
+ debugM "remotedaemon" "done with Tor connection"
+ hClose h
+
+ start Nothing = return ()
+ start (Just h) = do
+ -- Avoid doing any work in the liftAnnex, since only one
+ -- can run at a time.
+ st <- liftAnnex th dupState
+ ((), st') <- Annex.run st $ do
+ -- Load auth tokens for every connection, to notice
+ -- when the allowed set is changed.
+ allowed <- loadP2PAuthTokens
+ let conn = P2PConnection
+ { connRepo = r
+ , connCheckAuth = (`isAllowedAuthToken` allowed)
+ , connIhdl = h
+ , connOhdl = h
+ }
+ v <- liftIO $ runNetProto conn $ P2P.serveAuth u
+ case v of
+ Right (Just theiruuid) -> authed conn theiruuid
+ Right Nothing -> liftIO $
+ debugM "remotedaemon" "Tor connection failed to authenticate"
+ Left e -> liftIO $
+ debugM "remotedaemon" ("Tor connection error before authentication: " ++ e)
+ -- Merge the duplicated state back in.
+ liftAnnex th $ mergeState st'
+
+ authed conn theiruuid =
+ bracket watchChangedRefs (liftIO . maybe noop stopWatchingChangedRefs) $ \crh -> do
+ v' <- runFullProto (Serving theiruuid crh) conn $
+ P2P.serveAuthed u
+ case v' of
+ Right () -> return ()
+ Left e -> liftIO $ debugM "remotedaemon" ("Tor connection error: " ++ e)
+
+-- Connect to peer's tor hidden service.
+transport :: Transport
+transport (RemoteRepo r _) url@(RemoteURI uri) th ichan ochan =
+ case unformatP2PAddress (show uri) of
+ Nothing -> return ()
+ Just addr -> robustConnection 1 $ do
+ g <- liftAnnex th Annex.gitRepo
+ bracket (connectPeer g addr) closeConnection (go addr)
+ where
+ go addr conn = do
+ myuuid <- liftAnnex th getUUID
+ authtoken <- fromMaybe nullAuthToken
+ <$> liftAnnex th (loadP2PRemoteAuthToken addr)
+ res <- runNetProto conn $
+ P2P.auth myuuid authtoken
+ case res of
+ Right (Just theiruuid) -> do
+ expecteduuid <- liftAnnex th $ getRepoUUID r
+ if expecteduuid == theiruuid
+ then do
+ send (CONNECTED url)
+ status <- handlecontrol
+ `race` handlepeer conn
+ send (DISCONNECTED url)
+ return $ either id id status
+ else return ConnectionStopping
+ _ -> return ConnectionClosed
+
+ send msg = atomically $ writeTChan ochan msg
+
+ handlecontrol = do
+ msg <- atomically $ readTChan ichan
+ case msg of
+ STOP -> return ConnectionStopping
+ LOSTNET -> return ConnectionStopping
+ _ -> handlecontrol
+
+ handlepeer conn = do
+ v <- runNetProto conn P2P.notifyChange
+ case v of
+ Right (Just (ChangedRefs shas)) -> do
+ whenM (checkNewShas th shas) $
+ fetch
+ handlepeer conn
+ _ -> return ConnectionClosed
+
+ fetch = do
+ send (SYNCING url)
+ ok <- inLocalRepo th $
+ runBool [Param "fetch", Param $ Git.repoDescribe r]
+ send (DONESYNCING url ok)
diff --git a/RemoteDaemon/Types.hs b/RemoteDaemon/Types.hs
index f85219ea5..c0d74e038 100644
--- a/RemoteDaemon/Types.hs
+++ b/RemoteDaemon/Types.hs
@@ -5,7 +5,6 @@
- Licensed under the GNU GPL version 3 or higher.
-}
-{-# LANGUAGE TypeSynonymInstances, FlexibleInstances #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
module RemoteDaemon.Types where
@@ -15,6 +14,7 @@ import qualified Annex
import qualified Git.Types as Git
import qualified Utility.SimpleProtocol as Proto
import Types.GitConfig
+import Annex.ChangedRefs (ChangedRefs)
import Network.URI
import Control.Concurrent
@@ -52,13 +52,11 @@ data Consumed
= PAUSE
| LOSTNET
| RESUME
- | CHANGED RefList
+ | CHANGED ChangedRefs
| RELOAD
| STOP
deriving (Show)
-type RefList = [Git.Ref]
-
instance Proto.Sendable Emitted where
formatMessage (CONNECTED remote) =
["CONNECTED", Proto.serialize remote]
@@ -100,14 +98,6 @@ instance Proto.Serializable RemoteURI where
serialize (RemoteURI u) = show u
deserialize = RemoteURI <$$> parseURI
-instance Proto.Serializable [Char] where
- serialize = id
- deserialize = Just
-
-instance Proto.Serializable RefList where
- serialize = unwords . map Git.fromRef
- deserialize = Just . map Git.Ref . words
-
instance Proto.Serializable Bool where
serialize False = "0"
serialize True = "1"