diff options
-rw-r--r-- | Remote/Helper/P2P.hs | 374 | ||||
-rw-r--r-- | Remote/Helper/P2P/IO.hs | 159 | ||||
-rw-r--r-- | git-annex.cabal | 1 |
3 files changed, 356 insertions, 178 deletions
diff --git a/Remote/Helper/P2P.hs b/Remote/Helper/P2P.hs index d3d3dfa08..fbd6c2463 100644 --- a/Remote/Helper/P2P.hs +++ b/Remote/Helper/P2P.hs @@ -7,20 +7,7 @@ {-# LANGUAGE DeriveFunctor, TemplateHaskell, FlexibleContexts, RankNTypes #-} -module Remote.Helper.P2P ( - AuthToken(..), - ProtoF(..), - runPure, - protoDump, - auth, - checkPresent, - lockContentWhile, - remove, - get, - put, - connect, - serve, -) where +module Remote.Helper.P2P where import qualified Utility.SimpleProtocol as Proto import Types.Key @@ -33,7 +20,7 @@ import Control.Monad.Free import Control.Monad.Free.TH import Control.Monad.Catch import System.Exit (ExitCode(..)) -import System.IO (Handle) +import System.IO import qualified Data.ByteString.Lazy as L newtype AuthToken = AuthToken String @@ -49,10 +36,6 @@ newtype Len = Len Integer data Service = UploadPack | ReceivePack deriving (Show) -data RelayData - = RelayData L.ByteString - | RelayMessage Message - -- | Messages in the protocol. The peer that makes the connection -- always initiates requests, and the other peer makes responses to them. data Message @@ -75,72 +58,164 @@ data Message | ERROR String deriving (Show) --- | Free monad for implementing actions that use the protocol. -data ProtoF next - = SendMessage Message next - | ReceiveMessage (Message -> next) - | SendBytes Len L.ByteString next - | ReceiveBytes Len (L.ByteString -> next) +instance Proto.Sendable Message where + formatMessage (AUTH uuid authtoken) = ["AUTH", Proto.serialize uuid, Proto.serialize authtoken] + formatMessage (AUTH_SUCCESS uuid) = ["AUTH-SUCCESS", Proto.serialize uuid] + formatMessage AUTH_FAILURE = ["AUTH-FAILURE"] + formatMessage (CONNECT service) = ["CONNECT", Proto.serialize service] + formatMessage (CONNECTDONE exitcode) = ["CONNECTDONE", Proto.serialize exitcode] + formatMessage (CHECKPRESENT key) = ["CHECKPRESENT", Proto.serialize key] + formatMessage (LOCKCONTENT key) = ["LOCKCONTENT", Proto.serialize key] + formatMessage UNLOCKCONTENT = ["UNLOCKCONTENT"] + formatMessage (REMOVE key) = ["REMOVE", Proto.serialize key] + formatMessage (GET offset key) = ["GET", Proto.serialize offset, Proto.serialize key] + formatMessage (PUT key) = ["PUT", Proto.serialize key] + formatMessage (PUT_FROM offset) = ["PUT-FROM", Proto.serialize offset] + formatMessage ALREADY_HAVE = ["ALREADY-HAVE"] + formatMessage SUCCESS = ["SUCCESS"] + formatMessage FAILURE = ["FAILURE"] + formatMessage (DATA len) = ["DATA", Proto.serialize len] + formatMessage (ERROR err) = ["ERROR", Proto.serialize err] + +instance Proto.Receivable Message where + parseCommand "AUTH" = Proto.parse2 AUTH + parseCommand "AUTH-SUCCESS" = Proto.parse1 AUTH_SUCCESS + parseCommand "AUTH-FAILURE" = Proto.parse0 AUTH_FAILURE + parseCommand "CONNECT" = Proto.parse1 CONNECT + parseCommand "CONNECTDONE" = Proto.parse1 CONNECT + parseCommand "CHECKPRESENT" = Proto.parse1 CHECKPRESENT + parseCommand "LOCKCONTENT" = Proto.parse1 LOCKCONTENT + parseCommand "UNLOCKCONTENT" = Proto.parse0 UNLOCKCONTENT + parseCommand "REMOVE" = Proto.parse1 REMOVE + parseCommand "GET" = Proto.parse2 GET + parseCommand "PUT" = Proto.parse1 PUT + parseCommand "PUT-FROM" = Proto.parse1 PUT_FROM + parseCommand "ALREADY-HAVE" = Proto.parse0 ALREADY_HAVE + parseCommand "SUCCESS" = Proto.parse0 SUCCESS + parseCommand "FAILURE" = Proto.parse0 FAILURE + parseCommand "DATA" = Proto.parse1 DATA + parseCommand "ERROR" = Proto.parse1 ERROR + parseCommand _ = Proto.parseFail + +instance Proto.Serializable Offset where + serialize (Offset n) = show n + deserialize = Offset <$$> readish + +instance Proto.Serializable Len where + serialize (Len n) = show n + deserialize = Len <$$> readish + +instance Proto.Serializable AuthToken where + serialize (AuthToken s) = s + deserialize = Just . AuthToken + +instance Proto.Serializable Service where + serialize UploadPack = "git-upload-pack" + serialize ReceivePack = "git-receive-pack" + deserialize "git-upload-pack" = Just UploadPack + deserialize "git-receive-pack" = Just ReceivePack + deserialize _ = Nothing + +-- | Free monad for the protocol, combining net communication, +-- and local actions. +data ProtoF c = Net (NetF c) | Local (LocalF c) + deriving (Functor) + +type Proto = Free ProtoF + +net :: Net a -> Proto a +net = hoistFree Net + +local :: Local a -> Proto a +local = hoistFree Local + +data NetF c + = SendMessage Message c + | ReceiveMessage (Message -> c) + | SendBytes Len L.ByteString c + | ReceiveBytes Len (L.ByteString -> c) + | Relay RelayHandle + (RelayData -> Net (Maybe ExitCode)) + (ExitCode -> c) + -- ^ Waits for data to be written to the RelayHandle, and for messages + -- to be received from the peer, and passes the data to the + -- callback, continuing until it returns an ExitCode. + | RelayService Service + (RelayHandle -> RelayData -> Net (Maybe ExitCode)) + (ExitCode -> c) + -- ^ Runs a service, and waits for it to output to stdout, + -- and for messages to be received from the peer, and passes + -- the data to the callback (which is also passed the service's + -- stdin RelayHandle), continuing uniil the service exits. + | WriteRelay RelayHandle L.ByteString c + -- ^ Write data to a relay's handle, flushing it immediately. + deriving (Functor) + +type Net = Free NetF + +data RelayData + = RelayData L.ByteString + | RelayMessage Message + +newtype RelayHandle = RelayHandle Handle + +data LocalF c -- ^ Lazily reads bytes from peer. Stops once Len are read, -- or if connection is lost, and in either case returns the bytes -- that were read. This allows resuming interrupted transfers. - | KeyFileSize Key (Len -> next) + = KeyFileSize Key (Len -> c) -- ^ Checks size of key file (dne = 0) - | ReadKeyFile Key Offset (L.ByteString -> next) - | WriteKeyFile Key Offset Len L.ByteString (Bool -> next) + | ReadKeyFile Key Offset (L.ByteString -> c) + | WriteKeyFile Key Offset Len L.ByteString (Bool -> c) -- ^ Writes to key file starting at an offset. Returns True -- once the whole content of the key is stored in the key file. -- -- Note: The ByteString may not contain the entire remaining content -- of the key. Only once the key file size == Len has the whole -- content been transferred. - | CheckAuthToken UUID AuthToken (Bool -> next) - | SetPresent Key UUID next - | CheckContentPresent Key (Bool -> next) + | CheckAuthToken UUID AuthToken (Bool -> c) + | SetPresent Key UUID c + | CheckContentPresent Key (Bool -> c) -- ^ Checks if the whole content of the key is locally present. - | RemoveKeyFile Key (Bool -> next) + | RemoveKeyFile Key (Bool -> c) -- ^ If the key file is not present, still succeeds. -- May fail if not enough copies to safely drop, etc. - | TryLockContent Key (Bool -> Proto ()) next - | WriteHandle Handle L.ByteString next + | TryLockContent Key (Bool -> Proto ()) c -- ^ Try to lock the content of a key, preventing it -- from being deleted, and run the provided protocol action. - | Relay Handle (RelayData -> Proto (Maybe ExitCode)) (ExitCode -> next) - -- ^ Waits for data to be written to the Handle, and for messages - -- to be received from the peer, and passes the data to the - -- callback, continuing until it returns an ExitCode. - | RelayService Service - (Handle -> RelayData -> Proto (Maybe ExitCode)) - (ExitCode -> next) - -- ^ Runs a service, and waits for it to output to stdout, - -- and for messages to be received from the peer, and passes - -- the data to the callback (which is also passed the service's - -- stdin Handle), continuing uniil the service exits. deriving (Functor) -type Proto = Free ProtoF +type Local = Free LocalF -$(makeFree ''ProtoF) +-- Generate sendMessage etc functions for all free monad constructors. +$(makeFree ''NetF) +$(makeFree ''LocalF) -- | Running Proto actions purely, to see what they do. runPure :: Show r => Proto r -> [Message] -> [(String, Maybe Message)] runPure (Pure r) _ = [("result: " ++ show r, Nothing)] -runPure (Free (SendMessage m next)) ms = (">", Just m):runPure next ms -runPure (Free (ReceiveMessage _)) [] = [("not enough Messages provided", Nothing)] -runPure (Free (ReceiveMessage next)) (m:ms) = ("<", Just m):runPure (next m) ms -runPure (Free (SendBytes _ _ next)) ms = ("> bytes", Nothing):runPure next ms -runPure (Free (ReceiveBytes _ next)) ms = ("< bytes", Nothing):runPure (next L.empty) ms -runPure (Free (KeyFileSize _ next)) ms = runPure (next (Len 100)) ms -runPure (Free (ReadKeyFile _ _ next)) ms = runPure (next L.empty) ms -runPure (Free (WriteKeyFile _ _ _ _ next)) ms = runPure (next True) ms -runPure (Free (CheckAuthToken _ _ next)) ms = runPure (next True) ms -runPure (Free (SetPresent _ _ next)) ms = runPure next ms -runPure (Free (CheckContentPresent _ next)) ms = runPure (next False) ms -runPure (Free (RemoveKeyFile _ next)) ms = runPure (next True) ms -runPure (Free (TryLockContent _ p next)) ms = runPure (p True >> next) ms -runPure (Free (WriteHandle _ _ next)) ms = runPure next ms -runPure (Free (Relay _ _ next)) ms = runPure (next ExitSuccess) ms -runPure (Free (RelayService _ _ next)) ms = runPure (next ExitSuccess) ms +runPure (Free (Net n)) ms = runNet n ms +runPure (Free (Local n)) ms = runLocal n ms + +runNet :: Show r => NetF (Proto r) -> [Message] -> [(String, Maybe Message)] +runNet (SendMessage m next) ms = (">", Just m):runPure next ms +runNet (ReceiveMessage _) [] = [("not enough Messages provided", Nothing)] +runNet (ReceiveMessage next) (m:ms) = ("<", Just m):runPure (next m) ms +runNet (SendBytes _ _ next) ms = ("> bytes", Nothing):runPure next ms +runNet (ReceiveBytes _ next) ms = ("< bytes", Nothing):runPure (next L.empty) ms +runNet (Relay _ _ next) ms = runPure (next ExitSuccess) ms +runNet (RelayService _ _ next) ms = runPure (next ExitSuccess) ms +runNet (WriteRelay _ _ next) ms = runPure next ms + +runLocal :: Show r => LocalF (Proto r) -> [Message] -> [(String, Maybe Message)] +runLocal (KeyFileSize _ next) ms = runPure (next (Len 100)) ms +runLocal (ReadKeyFile _ _ next) ms = runPure (next L.empty) ms +runLocal (WriteKeyFile _ _ _ _ next) ms = runPure (next True) ms +runLocal (CheckAuthToken _ _ next) ms = runPure (next True) ms +runLocal (SetPresent _ _ next) ms = runPure next ms +runLocal (CheckContentPresent _ next) ms = runPure (next False) ms +runLocal (RemoveKeyFile _ next) ms = runPure (next True) ms +runLocal (TryLockContent _ p next) ms = runPure (p True >> next) ms protoDump :: [(String, Maybe Message)] -> String protoDump = unlines . map protoDump' @@ -151,18 +226,18 @@ protoDump' (s, Just m) = s ++ " " ++ unwords (Proto.formatMessage m) auth :: UUID -> AuthToken -> Proto (Maybe UUID) auth myuuid t = do - sendMessage (AUTH myuuid t) - r <- receiveMessage + net $ sendMessage (AUTH myuuid t) + r <- net receiveMessage case r of AUTH_SUCCESS theiruuid -> return $ Just theiruuid AUTH_FAILURE -> return Nothing _ -> do - sendMessage (ERROR "auth failed") + net $ sendMessage (ERROR "auth failed") return Nothing checkPresent :: Key -> Proto Bool checkPresent key = do - sendMessage (CHECKPRESENT key) + net $ sendMessage (CHECKPRESENT key) checkSuccess {- Locks content to prevent it from being dropped, while running an action. @@ -180,14 +255,14 @@ lockContentWhile lockContentWhile runproto key a = bracket setup cleanup a where setup = runproto $ do - sendMessage (LOCKCONTENT key) + net $ sendMessage (LOCKCONTENT key) checkSuccess - cleanup True = runproto $ sendMessage UNLOCKCONTENT + cleanup True = runproto $ net $ sendMessage UNLOCKCONTENT cleanup False = return () remove :: Key -> Proto Bool remove key = do - sendMessage (REMOVE key) + net $ sendMessage (REMOVE key) checkSuccess get :: Key -> Proto Bool @@ -195,35 +270,15 @@ get key = receiveContent key (`GET` key) put :: Key -> Proto Bool put key = do - sendMessage (PUT key) - r <- receiveMessage + net $ sendMessage (PUT key) + r <- net receiveMessage case r of PUT_FROM offset -> sendContent key offset ALREADY_HAVE -> return True _ -> do - sendMessage (ERROR "expected PUT_FROM") + net $ sendMessage (ERROR "expected PUT_FROM") return False -connect :: Service -> Handle -> Handle -> Proto ExitCode -connect service hin hout = do - sendMessage (CONNECT service) - relay hin (relayCallback hout) - -relayCallback :: Handle -> RelayData -> Proto (Maybe ExitCode) -relayCallback hout (RelayMessage (DATA len)) = do - writeHandle hout =<< receiveBytes len - return Nothing -relayCallback _ (RelayMessage (CONNECTDONE exitcode)) = - return (Just exitcode) -relayCallback _ (RelayMessage _) = do - sendMessage (ERROR "expected DATA or CONNECTDONE") - return (Just (ExitFailure 1)) -relayCallback _ (RelayData b) = do - let len = Len $ fromIntegral $ L.length b - sendMessage (DATA len) - sendBytes len b - return Nothing - -- | Serve the protocol. -- -- Note that if the client sends an unexpected message, the server will @@ -240,153 +295,116 @@ serve :: UUID -> Proto () serve myuuid = go Nothing where go autheduuid = do - r <- receiveMessage + r <- net receiveMessage case r of AUTH theiruuid authtoken -> do - ok <- checkAuthToken theiruuid authtoken + ok <- local $ checkAuthToken theiruuid authtoken if ok then do - sendMessage (AUTH_SUCCESS myuuid) + net $ sendMessage (AUTH_SUCCESS myuuid) go (Just theiruuid) else do - sendMessage AUTH_FAILURE + net $ sendMessage AUTH_FAILURE go autheduuid ERROR _ -> return () _ -> do case autheduuid of Just theiruuid -> authed theiruuid r - Nothing -> sendMessage (ERROR "must AUTH first") + Nothing -> net $ sendMessage (ERROR "must AUTH first") go autheduuid authed _theiruuid r = case r of - LOCKCONTENT key -> tryLockContent key $ \locked -> do + LOCKCONTENT key -> local $ tryLockContent key $ \locked -> do sendSuccess locked when locked $ do - r' <- receiveMessage + r' <- net receiveMessage case r' of UNLOCKCONTENT -> return () - _ -> sendMessage (ERROR "expected UNLOCKCONTENT") - CHECKPRESENT key -> sendSuccess =<< checkContentPresent key - REMOVE key -> sendSuccess =<< removeKeyFile key + _ -> net $ sendMessage (ERROR "expected UNLOCKCONTENT") + CHECKPRESENT key -> sendSuccess =<< local (checkContentPresent key) + REMOVE key -> sendSuccess =<< local (removeKeyFile key) PUT key -> do - have <- checkContentPresent key + have <- local $ checkContentPresent key if have - then sendMessage ALREADY_HAVE + then net $ sendMessage ALREADY_HAVE else do ok <- receiveContent key PUT_FROM when ok $ - setPresent key myuuid + local $ setPresent key myuuid -- setPresent not called because the peer may have -- requested the data but not permanatly stored it. GET offset key -> void $ sendContent key offset CONNECT service -> do - exitcode <- relayService service relayCallback - sendMessage (CONNECTDONE exitcode) - _ -> sendMessage (ERROR "unexpected command") + exitcode <- net $ relayService service relayCallback + net $ sendMessage (CONNECTDONE exitcode) + _ -> net $ sendMessage (ERROR "unexpected command") sendContent :: Key -> Offset -> Proto Bool sendContent key offset = do (len, content) <- readKeyFileLen key offset - sendMessage (DATA len) - sendBytes len content + net $ sendMessage (DATA len) + net $ sendBytes len content checkSuccess receiveContent :: Key -> (Offset -> Message) -> Proto Bool receiveContent key mkmsg = do - Len n <- keyFileSize key + Len n <- local $ keyFileSize key let offset = Offset n - sendMessage (mkmsg offset) - r <- receiveMessage + net $ sendMessage (mkmsg offset) + r <- net receiveMessage case r of DATA len -> do - ok <- writeKeyFile key offset len =<< receiveBytes len + ok <- local . writeKeyFile key offset len + =<< net (receiveBytes len) sendSuccess ok return ok _ -> do - sendMessage (ERROR "expected DATA") + net $ sendMessage (ERROR "expected DATA") return False checkSuccess :: Proto Bool checkSuccess = do - ack <- receiveMessage + ack <- net receiveMessage case ack of SUCCESS -> return True FAILURE -> return False _ -> do - sendMessage (ERROR "expected SUCCESS or FAILURE") + net $ sendMessage (ERROR "expected SUCCESS or FAILURE") return False sendSuccess :: Bool -> Proto () -sendSuccess True = sendMessage SUCCESS -sendSuccess False = sendMessage FAILURE +sendSuccess True = net $ sendMessage SUCCESS +sendSuccess False = net $ sendMessage FAILURE -- Reads key file from an offset. The Len should correspond to -- the length of the ByteString, but to avoid buffering the content -- in memory, is gotten using keyFileSize. readKeyFileLen :: Key -> Offset -> Proto (Len, L.ByteString) readKeyFileLen key (Offset offset) = do - (Len totallen) <- keyFileSize key + (Len totallen) <- local $ keyFileSize key let len = totallen - offset if len <= 0 then return (Len 0, L.empty) else do - content <- readKeyFile key (Offset offset) + content <- local $ readKeyFile key (Offset offset) return (Len len, content) -instance Proto.Sendable Message where - formatMessage (AUTH uuid authtoken) = ["AUTH", Proto.serialize uuid, Proto.serialize authtoken] - formatMessage (AUTH_SUCCESS uuid) = ["AUTH-SUCCESS", Proto.serialize uuid] - formatMessage AUTH_FAILURE = ["AUTH-FAILURE"] - formatMessage (CONNECT service) = ["CONNECT", Proto.serialize service] - formatMessage (CONNECTDONE exitcode) = ["CONNECTDONE", Proto.serialize exitcode] - formatMessage (CHECKPRESENT key) = ["CHECKPRESENT", Proto.serialize key] - formatMessage (LOCKCONTENT key) = ["LOCKCONTENT", Proto.serialize key] - formatMessage UNLOCKCONTENT = ["UNLOCKCONTENT"] - formatMessage (REMOVE key) = ["REMOVE", Proto.serialize key] - formatMessage (GET offset key) = ["GET", Proto.serialize offset, Proto.serialize key] - formatMessage (PUT key) = ["PUT", Proto.serialize key] - formatMessage (PUT_FROM offset) = ["PUT-FROM", Proto.serialize offset] - formatMessage ALREADY_HAVE = ["ALREADY-HAVE"] - formatMessage SUCCESS = ["SUCCESS"] - formatMessage FAILURE = ["FAILURE"] - formatMessage (DATA len) = ["DATA", Proto.serialize len] - formatMessage (ERROR err) = ["ERROR", Proto.serialize err] - -instance Proto.Receivable Message where - parseCommand "AUTH" = Proto.parse2 AUTH - parseCommand "AUTH-SUCCESS" = Proto.parse1 AUTH_SUCCESS - parseCommand "AUTH-FAILURE" = Proto.parse0 AUTH_FAILURE - parseCommand "CONNECT" = Proto.parse1 CONNECT - parseCommand "CONNECTDONE" = Proto.parse1 CONNECT - parseCommand "CHECKPRESENT" = Proto.parse1 CHECKPRESENT - parseCommand "LOCKCONTENT" = Proto.parse1 LOCKCONTENT - parseCommand "UNLOCKCONTENT" = Proto.parse0 UNLOCKCONTENT - parseCommand "REMOVE" = Proto.parse1 REMOVE - parseCommand "GET" = Proto.parse2 GET - parseCommand "PUT" = Proto.parse1 PUT - parseCommand "PUT-FROM" = Proto.parse1 PUT_FROM - parseCommand "ALREADY-HAVE" = Proto.parse0 ALREADY_HAVE - parseCommand "SUCCESS" = Proto.parse0 SUCCESS - parseCommand "FAILURE" = Proto.parse0 FAILURE - parseCommand "DATA" = Proto.parse1 DATA - parseCommand "ERROR" = Proto.parse1 ERROR - parseCommand _ = Proto.parseFail - -instance Proto.Serializable Offset where - serialize (Offset n) = show n - deserialize = Offset <$$> readish - -instance Proto.Serializable Len where - serialize (Len n) = show n - deserialize = Len <$$> readish - -instance Proto.Serializable AuthToken where - serialize (AuthToken s) = s - deserialize = Just . AuthToken +connect :: Service -> Handle -> Handle -> Proto ExitCode +connect service hin hout = do + net $ sendMessage (CONNECT service) + net $ relay (RelayHandle hin) (relayCallback (RelayHandle hout)) -instance Proto.Serializable Service where - serialize UploadPack = "git-upload-pack" - serialize ReceivePack = "git-receive-pack" - deserialize "git-upload-pack" = Just UploadPack - deserialize "git-receive-pack" = Just ReceivePack - deserialize _ = Nothing +relayCallback :: RelayHandle -> RelayData -> Net (Maybe ExitCode) +relayCallback hout (RelayMessage (DATA len)) = do + writeRelay hout =<< receiveBytes len + return Nothing +relayCallback _ (RelayMessage (CONNECTDONE exitcode)) = + return (Just exitcode) +relayCallback _ (RelayMessage _) = do + sendMessage (ERROR "expected DATA or CONNECTDONE") + return (Just (ExitFailure 1)) +relayCallback _ (RelayData b) = do + let len = Len $ fromIntegral $ L.length b + sendMessage (DATA len) + sendBytes len b + return Nothing diff --git a/Remote/Helper/P2P/IO.hs b/Remote/Helper/P2P/IO.hs new file mode 100644 index 000000000..7179adc2b --- /dev/null +++ b/Remote/Helper/P2P/IO.hs @@ -0,0 +1,159 @@ +{- P2P protocol, partial IO implementation + - + - Copyright 2016 Joey Hess <id@joeyh.name> + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE RankNTypes #-} + +module Remote.Helper.P2P.IO + ( RunProto + , runProtoHandle + ) where + +import Remote.Helper.P2P +import Utility.Process +import Git +import Git.Command +import Utility.SafeCommand +import Utility.SimpleProtocol + +import Control.Monad +import Control.Monad.Free +import Control.Monad.IO.Class +import Data.Maybe +import System.Exit (ExitCode(..)) +import System.IO +import Control.Concurrent +import qualified Data.ByteString as B +import qualified Data.ByteString.Lazy as L + +type RunProto = forall a m. MonadIO m => Proto a -> m a + +data S = S + { repo :: Repo + , hdl :: Handle + } + +-- Implementation of the protocol, communicating with a peer +-- over a Handle. No Local actions will be run. +runProtoHandle :: MonadIO m => Handle -> Repo -> Proto a -> m a +runProtoHandle h r = go + where + go :: RunProto + go (Pure a) = pure a + go (Free (Net n)) = runNetHandle (S r h) go n + go (Free (Local _)) = error "local actions not allowed" + +runNetHandle :: MonadIO m => S -> RunProto -> NetF (Proto a) -> m a +runNetHandle s runner f = case f of + SendMessage m next -> do + liftIO $ do + hPutStrLn (hdl s) (unwords (formatMessage m)) + hFlush (hdl s) + runner next + ReceiveMessage next -> do + l <- liftIO $ hGetLine (hdl s) + let m = fromMaybe (ERROR "protocol parse error") + (parseMessage l) + runner (next m) + SendBytes _len b next -> do + liftIO $ do + L.hPut (hdl s) b + hFlush (hdl s) + runner next + ReceiveBytes (Len n) next -> do + b <- liftIO $ L.hGet (hdl s) (fromIntegral n) + runner (next b) + Relay hout callback next -> + runRelay runner hout callback >>= runner . next + RelayService service callback next -> + runRelayService s runner service callback >>= runner . next + WriteRelay (RelayHandle h) b next -> do + liftIO $ do + L.hPut h b + hFlush h + runner next + +runRelay + :: MonadIO m + => RunProto + -> RelayHandle + -> (RelayData -> Net (Maybe ExitCode)) + -> m ExitCode +runRelay runner (RelayHandle hout) callback = do + v <- liftIO newEmptyMVar + _ <- liftIO $ forkIO $ readout v + feeder <- liftIO $ forkIO $ feedin v + exitcode <- liftIO $ drain v + liftIO $ killThread feeder + return exitcode + where + feedin v = forever $ do + m <- runner $ net receiveMessage + putMVar v $ RelayMessage m + + readout v = do + b <- B.hGetSome hout 65536 + if B.null b + then hClose hout + else do + putMVar v $ RelayData (L.fromChunks [b]) + readout v + + drain v = do + d <- takeMVar v + r <- runner $ net $ callback d + case r of + Nothing -> drain v + Just exitcode -> return exitcode + +runRelayService + :: MonadIO m + => S + -> RunProto + -> Service + -> (RelayHandle -> RelayData -> Net (Maybe ExitCode)) + -> m ExitCode +runRelayService s runner service callback = do + v <- liftIO newEmptyMVar + (Just hin, Just hout, _, pid) <- liftIO $ createProcess serviceproc + { std_out = CreatePipe + , std_in = CreatePipe + } + _ <- liftIO $ forkIO $ readout v hout + feeder <- liftIO $ forkIO $ feedin v + _ <- liftIO $ forkIO $ putMVar v . Left =<< waitForProcess pid + exitcode <- liftIO $ drain v hin + liftIO $ killThread feeder + return exitcode + where + cmd = case service of + UploadPack -> "upload-pack" + ReceivePack -> "receive-pack" + serviceproc = gitCreateProcess [Param cmd, File (repoPath (repo s))] (repo s) + + drain v hin = do + d <- takeMVar v + case d of + Left exitcode -> do + hClose hin + return exitcode + Right relaydata -> do + _ <- runner $ net $ + callback (RelayHandle hin) relaydata + drain v hin + + readout v hout = do + b <- B.hGetSome hout 65536 + if B.null b + then hClose hout + else do + putMVar v $ Right $ + RelayData (L.fromChunks [b]) + readout v hout + + feedin v = forever $ do + m <- runner $ net receiveMessage + putMVar v $ Right $ RelayMessage m diff --git a/git-annex.cabal b/git-annex.cabal index 4fb4e1c3c..77c50b66e 100644 --- a/git-annex.cabal +++ b/git-annex.cabal @@ -920,6 +920,7 @@ Executable git-annex Remote.Helper.Http Remote.Helper.Messages Remote.Helper.P2P + Remote.Helper.P2P.IO Remote.Helper.ReadOnly Remote.Helper.Special Remote.Helper.Ssh |