aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Remote/Helper/P2P.hs374
-rw-r--r--Remote/Helper/P2P/IO.hs159
-rw-r--r--git-annex.cabal1
3 files changed, 356 insertions, 178 deletions
diff --git a/Remote/Helper/P2P.hs b/Remote/Helper/P2P.hs
index d3d3dfa08..fbd6c2463 100644
--- a/Remote/Helper/P2P.hs
+++ b/Remote/Helper/P2P.hs
@@ -7,20 +7,7 @@
{-# LANGUAGE DeriveFunctor, TemplateHaskell, FlexibleContexts, RankNTypes #-}
-module Remote.Helper.P2P (
- AuthToken(..),
- ProtoF(..),
- runPure,
- protoDump,
- auth,
- checkPresent,
- lockContentWhile,
- remove,
- get,
- put,
- connect,
- serve,
-) where
+module Remote.Helper.P2P where
import qualified Utility.SimpleProtocol as Proto
import Types.Key
@@ -33,7 +20,7 @@ import Control.Monad.Free
import Control.Monad.Free.TH
import Control.Monad.Catch
import System.Exit (ExitCode(..))
-import System.IO (Handle)
+import System.IO
import qualified Data.ByteString.Lazy as L
newtype AuthToken = AuthToken String
@@ -49,10 +36,6 @@ newtype Len = Len Integer
data Service = UploadPack | ReceivePack
deriving (Show)
-data RelayData
- = RelayData L.ByteString
- | RelayMessage Message
-
-- | Messages in the protocol. The peer that makes the connection
-- always initiates requests, and the other peer makes responses to them.
data Message
@@ -75,72 +58,164 @@ data Message
| ERROR String
deriving (Show)
--- | Free monad for implementing actions that use the protocol.
-data ProtoF next
- = SendMessage Message next
- | ReceiveMessage (Message -> next)
- | SendBytes Len L.ByteString next
- | ReceiveBytes Len (L.ByteString -> next)
+instance Proto.Sendable Message where
+ formatMessage (AUTH uuid authtoken) = ["AUTH", Proto.serialize uuid, Proto.serialize authtoken]
+ formatMessage (AUTH_SUCCESS uuid) = ["AUTH-SUCCESS", Proto.serialize uuid]
+ formatMessage AUTH_FAILURE = ["AUTH-FAILURE"]
+ formatMessage (CONNECT service) = ["CONNECT", Proto.serialize service]
+ formatMessage (CONNECTDONE exitcode) = ["CONNECTDONE", Proto.serialize exitcode]
+ formatMessage (CHECKPRESENT key) = ["CHECKPRESENT", Proto.serialize key]
+ formatMessage (LOCKCONTENT key) = ["LOCKCONTENT", Proto.serialize key]
+ formatMessage UNLOCKCONTENT = ["UNLOCKCONTENT"]
+ formatMessage (REMOVE key) = ["REMOVE", Proto.serialize key]
+ formatMessage (GET offset key) = ["GET", Proto.serialize offset, Proto.serialize key]
+ formatMessage (PUT key) = ["PUT", Proto.serialize key]
+ formatMessage (PUT_FROM offset) = ["PUT-FROM", Proto.serialize offset]
+ formatMessage ALREADY_HAVE = ["ALREADY-HAVE"]
+ formatMessage SUCCESS = ["SUCCESS"]
+ formatMessage FAILURE = ["FAILURE"]
+ formatMessage (DATA len) = ["DATA", Proto.serialize len]
+ formatMessage (ERROR err) = ["ERROR", Proto.serialize err]
+
+instance Proto.Receivable Message where
+ parseCommand "AUTH" = Proto.parse2 AUTH
+ parseCommand "AUTH-SUCCESS" = Proto.parse1 AUTH_SUCCESS
+ parseCommand "AUTH-FAILURE" = Proto.parse0 AUTH_FAILURE
+ parseCommand "CONNECT" = Proto.parse1 CONNECT
+ parseCommand "CONNECTDONE" = Proto.parse1 CONNECT
+ parseCommand "CHECKPRESENT" = Proto.parse1 CHECKPRESENT
+ parseCommand "LOCKCONTENT" = Proto.parse1 LOCKCONTENT
+ parseCommand "UNLOCKCONTENT" = Proto.parse0 UNLOCKCONTENT
+ parseCommand "REMOVE" = Proto.parse1 REMOVE
+ parseCommand "GET" = Proto.parse2 GET
+ parseCommand "PUT" = Proto.parse1 PUT
+ parseCommand "PUT-FROM" = Proto.parse1 PUT_FROM
+ parseCommand "ALREADY-HAVE" = Proto.parse0 ALREADY_HAVE
+ parseCommand "SUCCESS" = Proto.parse0 SUCCESS
+ parseCommand "FAILURE" = Proto.parse0 FAILURE
+ parseCommand "DATA" = Proto.parse1 DATA
+ parseCommand "ERROR" = Proto.parse1 ERROR
+ parseCommand _ = Proto.parseFail
+
+instance Proto.Serializable Offset where
+ serialize (Offset n) = show n
+ deserialize = Offset <$$> readish
+
+instance Proto.Serializable Len where
+ serialize (Len n) = show n
+ deserialize = Len <$$> readish
+
+instance Proto.Serializable AuthToken where
+ serialize (AuthToken s) = s
+ deserialize = Just . AuthToken
+
+instance Proto.Serializable Service where
+ serialize UploadPack = "git-upload-pack"
+ serialize ReceivePack = "git-receive-pack"
+ deserialize "git-upload-pack" = Just UploadPack
+ deserialize "git-receive-pack" = Just ReceivePack
+ deserialize _ = Nothing
+
+-- | Free monad for the protocol, combining net communication,
+-- and local actions.
+data ProtoF c = Net (NetF c) | Local (LocalF c)
+ deriving (Functor)
+
+type Proto = Free ProtoF
+
+net :: Net a -> Proto a
+net = hoistFree Net
+
+local :: Local a -> Proto a
+local = hoistFree Local
+
+data NetF c
+ = SendMessage Message c
+ | ReceiveMessage (Message -> c)
+ | SendBytes Len L.ByteString c
+ | ReceiveBytes Len (L.ByteString -> c)
+ | Relay RelayHandle
+ (RelayData -> Net (Maybe ExitCode))
+ (ExitCode -> c)
+ -- ^ Waits for data to be written to the RelayHandle, and for messages
+ -- to be received from the peer, and passes the data to the
+ -- callback, continuing until it returns an ExitCode.
+ | RelayService Service
+ (RelayHandle -> RelayData -> Net (Maybe ExitCode))
+ (ExitCode -> c)
+ -- ^ Runs a service, and waits for it to output to stdout,
+ -- and for messages to be received from the peer, and passes
+ -- the data to the callback (which is also passed the service's
+ -- stdin RelayHandle), continuing uniil the service exits.
+ | WriteRelay RelayHandle L.ByteString c
+ -- ^ Write data to a relay's handle, flushing it immediately.
+ deriving (Functor)
+
+type Net = Free NetF
+
+data RelayData
+ = RelayData L.ByteString
+ | RelayMessage Message
+
+newtype RelayHandle = RelayHandle Handle
+
+data LocalF c
-- ^ Lazily reads bytes from peer. Stops once Len are read,
-- or if connection is lost, and in either case returns the bytes
-- that were read. This allows resuming interrupted transfers.
- | KeyFileSize Key (Len -> next)
+ = KeyFileSize Key (Len -> c)
-- ^ Checks size of key file (dne = 0)
- | ReadKeyFile Key Offset (L.ByteString -> next)
- | WriteKeyFile Key Offset Len L.ByteString (Bool -> next)
+ | ReadKeyFile Key Offset (L.ByteString -> c)
+ | WriteKeyFile Key Offset Len L.ByteString (Bool -> c)
-- ^ Writes to key file starting at an offset. Returns True
-- once the whole content of the key is stored in the key file.
--
-- Note: The ByteString may not contain the entire remaining content
-- of the key. Only once the key file size == Len has the whole
-- content been transferred.
- | CheckAuthToken UUID AuthToken (Bool -> next)
- | SetPresent Key UUID next
- | CheckContentPresent Key (Bool -> next)
+ | CheckAuthToken UUID AuthToken (Bool -> c)
+ | SetPresent Key UUID c
+ | CheckContentPresent Key (Bool -> c)
-- ^ Checks if the whole content of the key is locally present.
- | RemoveKeyFile Key (Bool -> next)
+ | RemoveKeyFile Key (Bool -> c)
-- ^ If the key file is not present, still succeeds.
-- May fail if not enough copies to safely drop, etc.
- | TryLockContent Key (Bool -> Proto ()) next
- | WriteHandle Handle L.ByteString next
+ | TryLockContent Key (Bool -> Proto ()) c
-- ^ Try to lock the content of a key, preventing it
-- from being deleted, and run the provided protocol action.
- | Relay Handle (RelayData -> Proto (Maybe ExitCode)) (ExitCode -> next)
- -- ^ Waits for data to be written to the Handle, and for messages
- -- to be received from the peer, and passes the data to the
- -- callback, continuing until it returns an ExitCode.
- | RelayService Service
- (Handle -> RelayData -> Proto (Maybe ExitCode))
- (ExitCode -> next)
- -- ^ Runs a service, and waits for it to output to stdout,
- -- and for messages to be received from the peer, and passes
- -- the data to the callback (which is also passed the service's
- -- stdin Handle), continuing uniil the service exits.
deriving (Functor)
-type Proto = Free ProtoF
+type Local = Free LocalF
-$(makeFree ''ProtoF)
+-- Generate sendMessage etc functions for all free monad constructors.
+$(makeFree ''NetF)
+$(makeFree ''LocalF)
-- | Running Proto actions purely, to see what they do.
runPure :: Show r => Proto r -> [Message] -> [(String, Maybe Message)]
runPure (Pure r) _ = [("result: " ++ show r, Nothing)]
-runPure (Free (SendMessage m next)) ms = (">", Just m):runPure next ms
-runPure (Free (ReceiveMessage _)) [] = [("not enough Messages provided", Nothing)]
-runPure (Free (ReceiveMessage next)) (m:ms) = ("<", Just m):runPure (next m) ms
-runPure (Free (SendBytes _ _ next)) ms = ("> bytes", Nothing):runPure next ms
-runPure (Free (ReceiveBytes _ next)) ms = ("< bytes", Nothing):runPure (next L.empty) ms
-runPure (Free (KeyFileSize _ next)) ms = runPure (next (Len 100)) ms
-runPure (Free (ReadKeyFile _ _ next)) ms = runPure (next L.empty) ms
-runPure (Free (WriteKeyFile _ _ _ _ next)) ms = runPure (next True) ms
-runPure (Free (CheckAuthToken _ _ next)) ms = runPure (next True) ms
-runPure (Free (SetPresent _ _ next)) ms = runPure next ms
-runPure (Free (CheckContentPresent _ next)) ms = runPure (next False) ms
-runPure (Free (RemoveKeyFile _ next)) ms = runPure (next True) ms
-runPure (Free (TryLockContent _ p next)) ms = runPure (p True >> next) ms
-runPure (Free (WriteHandle _ _ next)) ms = runPure next ms
-runPure (Free (Relay _ _ next)) ms = runPure (next ExitSuccess) ms
-runPure (Free (RelayService _ _ next)) ms = runPure (next ExitSuccess) ms
+runPure (Free (Net n)) ms = runNet n ms
+runPure (Free (Local n)) ms = runLocal n ms
+
+runNet :: Show r => NetF (Proto r) -> [Message] -> [(String, Maybe Message)]
+runNet (SendMessage m next) ms = (">", Just m):runPure next ms
+runNet (ReceiveMessage _) [] = [("not enough Messages provided", Nothing)]
+runNet (ReceiveMessage next) (m:ms) = ("<", Just m):runPure (next m) ms
+runNet (SendBytes _ _ next) ms = ("> bytes", Nothing):runPure next ms
+runNet (ReceiveBytes _ next) ms = ("< bytes", Nothing):runPure (next L.empty) ms
+runNet (Relay _ _ next) ms = runPure (next ExitSuccess) ms
+runNet (RelayService _ _ next) ms = runPure (next ExitSuccess) ms
+runNet (WriteRelay _ _ next) ms = runPure next ms
+
+runLocal :: Show r => LocalF (Proto r) -> [Message] -> [(String, Maybe Message)]
+runLocal (KeyFileSize _ next) ms = runPure (next (Len 100)) ms
+runLocal (ReadKeyFile _ _ next) ms = runPure (next L.empty) ms
+runLocal (WriteKeyFile _ _ _ _ next) ms = runPure (next True) ms
+runLocal (CheckAuthToken _ _ next) ms = runPure (next True) ms
+runLocal (SetPresent _ _ next) ms = runPure next ms
+runLocal (CheckContentPresent _ next) ms = runPure (next False) ms
+runLocal (RemoveKeyFile _ next) ms = runPure (next True) ms
+runLocal (TryLockContent _ p next) ms = runPure (p True >> next) ms
protoDump :: [(String, Maybe Message)] -> String
protoDump = unlines . map protoDump'
@@ -151,18 +226,18 @@ protoDump' (s, Just m) = s ++ " " ++ unwords (Proto.formatMessage m)
auth :: UUID -> AuthToken -> Proto (Maybe UUID)
auth myuuid t = do
- sendMessage (AUTH myuuid t)
- r <- receiveMessage
+ net $ sendMessage (AUTH myuuid t)
+ r <- net receiveMessage
case r of
AUTH_SUCCESS theiruuid -> return $ Just theiruuid
AUTH_FAILURE -> return Nothing
_ -> do
- sendMessage (ERROR "auth failed")
+ net $ sendMessage (ERROR "auth failed")
return Nothing
checkPresent :: Key -> Proto Bool
checkPresent key = do
- sendMessage (CHECKPRESENT key)
+ net $ sendMessage (CHECKPRESENT key)
checkSuccess
{- Locks content to prevent it from being dropped, while running an action.
@@ -180,14 +255,14 @@ lockContentWhile
lockContentWhile runproto key a = bracket setup cleanup a
where
setup = runproto $ do
- sendMessage (LOCKCONTENT key)
+ net $ sendMessage (LOCKCONTENT key)
checkSuccess
- cleanup True = runproto $ sendMessage UNLOCKCONTENT
+ cleanup True = runproto $ net $ sendMessage UNLOCKCONTENT
cleanup False = return ()
remove :: Key -> Proto Bool
remove key = do
- sendMessage (REMOVE key)
+ net $ sendMessage (REMOVE key)
checkSuccess
get :: Key -> Proto Bool
@@ -195,35 +270,15 @@ get key = receiveContent key (`GET` key)
put :: Key -> Proto Bool
put key = do
- sendMessage (PUT key)
- r <- receiveMessage
+ net $ sendMessage (PUT key)
+ r <- net receiveMessage
case r of
PUT_FROM offset -> sendContent key offset
ALREADY_HAVE -> return True
_ -> do
- sendMessage (ERROR "expected PUT_FROM")
+ net $ sendMessage (ERROR "expected PUT_FROM")
return False
-connect :: Service -> Handle -> Handle -> Proto ExitCode
-connect service hin hout = do
- sendMessage (CONNECT service)
- relay hin (relayCallback hout)
-
-relayCallback :: Handle -> RelayData -> Proto (Maybe ExitCode)
-relayCallback hout (RelayMessage (DATA len)) = do
- writeHandle hout =<< receiveBytes len
- return Nothing
-relayCallback _ (RelayMessage (CONNECTDONE exitcode)) =
- return (Just exitcode)
-relayCallback _ (RelayMessage _) = do
- sendMessage (ERROR "expected DATA or CONNECTDONE")
- return (Just (ExitFailure 1))
-relayCallback _ (RelayData b) = do
- let len = Len $ fromIntegral $ L.length b
- sendMessage (DATA len)
- sendBytes len b
- return Nothing
-
-- | Serve the protocol.
--
-- Note that if the client sends an unexpected message, the server will
@@ -240,153 +295,116 @@ serve :: UUID -> Proto ()
serve myuuid = go Nothing
where
go autheduuid = do
- r <- receiveMessage
+ r <- net receiveMessage
case r of
AUTH theiruuid authtoken -> do
- ok <- checkAuthToken theiruuid authtoken
+ ok <- local $ checkAuthToken theiruuid authtoken
if ok
then do
- sendMessage (AUTH_SUCCESS myuuid)
+ net $ sendMessage (AUTH_SUCCESS myuuid)
go (Just theiruuid)
else do
- sendMessage AUTH_FAILURE
+ net $ sendMessage AUTH_FAILURE
go autheduuid
ERROR _ -> return ()
_ -> do
case autheduuid of
Just theiruuid -> authed theiruuid r
- Nothing -> sendMessage (ERROR "must AUTH first")
+ Nothing -> net $ sendMessage (ERROR "must AUTH first")
go autheduuid
authed _theiruuid r = case r of
- LOCKCONTENT key -> tryLockContent key $ \locked -> do
+ LOCKCONTENT key -> local $ tryLockContent key $ \locked -> do
sendSuccess locked
when locked $ do
- r' <- receiveMessage
+ r' <- net receiveMessage
case r' of
UNLOCKCONTENT -> return ()
- _ -> sendMessage (ERROR "expected UNLOCKCONTENT")
- CHECKPRESENT key -> sendSuccess =<< checkContentPresent key
- REMOVE key -> sendSuccess =<< removeKeyFile key
+ _ -> net $ sendMessage (ERROR "expected UNLOCKCONTENT")
+ CHECKPRESENT key -> sendSuccess =<< local (checkContentPresent key)
+ REMOVE key -> sendSuccess =<< local (removeKeyFile key)
PUT key -> do
- have <- checkContentPresent key
+ have <- local $ checkContentPresent key
if have
- then sendMessage ALREADY_HAVE
+ then net $ sendMessage ALREADY_HAVE
else do
ok <- receiveContent key PUT_FROM
when ok $
- setPresent key myuuid
+ local $ setPresent key myuuid
-- setPresent not called because the peer may have
-- requested the data but not permanatly stored it.
GET offset key -> void $ sendContent key offset
CONNECT service -> do
- exitcode <- relayService service relayCallback
- sendMessage (CONNECTDONE exitcode)
- _ -> sendMessage (ERROR "unexpected command")
+ exitcode <- net $ relayService service relayCallback
+ net $ sendMessage (CONNECTDONE exitcode)
+ _ -> net $ sendMessage (ERROR "unexpected command")
sendContent :: Key -> Offset -> Proto Bool
sendContent key offset = do
(len, content) <- readKeyFileLen key offset
- sendMessage (DATA len)
- sendBytes len content
+ net $ sendMessage (DATA len)
+ net $ sendBytes len content
checkSuccess
receiveContent :: Key -> (Offset -> Message) -> Proto Bool
receiveContent key mkmsg = do
- Len n <- keyFileSize key
+ Len n <- local $ keyFileSize key
let offset = Offset n
- sendMessage (mkmsg offset)
- r <- receiveMessage
+ net $ sendMessage (mkmsg offset)
+ r <- net receiveMessage
case r of
DATA len -> do
- ok <- writeKeyFile key offset len =<< receiveBytes len
+ ok <- local . writeKeyFile key offset len
+ =<< net (receiveBytes len)
sendSuccess ok
return ok
_ -> do
- sendMessage (ERROR "expected DATA")
+ net $ sendMessage (ERROR "expected DATA")
return False
checkSuccess :: Proto Bool
checkSuccess = do
- ack <- receiveMessage
+ ack <- net receiveMessage
case ack of
SUCCESS -> return True
FAILURE -> return False
_ -> do
- sendMessage (ERROR "expected SUCCESS or FAILURE")
+ net $ sendMessage (ERROR "expected SUCCESS or FAILURE")
return False
sendSuccess :: Bool -> Proto ()
-sendSuccess True = sendMessage SUCCESS
-sendSuccess False = sendMessage FAILURE
+sendSuccess True = net $ sendMessage SUCCESS
+sendSuccess False = net $ sendMessage FAILURE
-- Reads key file from an offset. The Len should correspond to
-- the length of the ByteString, but to avoid buffering the content
-- in memory, is gotten using keyFileSize.
readKeyFileLen :: Key -> Offset -> Proto (Len, L.ByteString)
readKeyFileLen key (Offset offset) = do
- (Len totallen) <- keyFileSize key
+ (Len totallen) <- local $ keyFileSize key
let len = totallen - offset
if len <= 0
then return (Len 0, L.empty)
else do
- content <- readKeyFile key (Offset offset)
+ content <- local $ readKeyFile key (Offset offset)
return (Len len, content)
-instance Proto.Sendable Message where
- formatMessage (AUTH uuid authtoken) = ["AUTH", Proto.serialize uuid, Proto.serialize authtoken]
- formatMessage (AUTH_SUCCESS uuid) = ["AUTH-SUCCESS", Proto.serialize uuid]
- formatMessage AUTH_FAILURE = ["AUTH-FAILURE"]
- formatMessage (CONNECT service) = ["CONNECT", Proto.serialize service]
- formatMessage (CONNECTDONE exitcode) = ["CONNECTDONE", Proto.serialize exitcode]
- formatMessage (CHECKPRESENT key) = ["CHECKPRESENT", Proto.serialize key]
- formatMessage (LOCKCONTENT key) = ["LOCKCONTENT", Proto.serialize key]
- formatMessage UNLOCKCONTENT = ["UNLOCKCONTENT"]
- formatMessage (REMOVE key) = ["REMOVE", Proto.serialize key]
- formatMessage (GET offset key) = ["GET", Proto.serialize offset, Proto.serialize key]
- formatMessage (PUT key) = ["PUT", Proto.serialize key]
- formatMessage (PUT_FROM offset) = ["PUT-FROM", Proto.serialize offset]
- formatMessage ALREADY_HAVE = ["ALREADY-HAVE"]
- formatMessage SUCCESS = ["SUCCESS"]
- formatMessage FAILURE = ["FAILURE"]
- formatMessage (DATA len) = ["DATA", Proto.serialize len]
- formatMessage (ERROR err) = ["ERROR", Proto.serialize err]
-
-instance Proto.Receivable Message where
- parseCommand "AUTH" = Proto.parse2 AUTH
- parseCommand "AUTH-SUCCESS" = Proto.parse1 AUTH_SUCCESS
- parseCommand "AUTH-FAILURE" = Proto.parse0 AUTH_FAILURE
- parseCommand "CONNECT" = Proto.parse1 CONNECT
- parseCommand "CONNECTDONE" = Proto.parse1 CONNECT
- parseCommand "CHECKPRESENT" = Proto.parse1 CHECKPRESENT
- parseCommand "LOCKCONTENT" = Proto.parse1 LOCKCONTENT
- parseCommand "UNLOCKCONTENT" = Proto.parse0 UNLOCKCONTENT
- parseCommand "REMOVE" = Proto.parse1 REMOVE
- parseCommand "GET" = Proto.parse2 GET
- parseCommand "PUT" = Proto.parse1 PUT
- parseCommand "PUT-FROM" = Proto.parse1 PUT_FROM
- parseCommand "ALREADY-HAVE" = Proto.parse0 ALREADY_HAVE
- parseCommand "SUCCESS" = Proto.parse0 SUCCESS
- parseCommand "FAILURE" = Proto.parse0 FAILURE
- parseCommand "DATA" = Proto.parse1 DATA
- parseCommand "ERROR" = Proto.parse1 ERROR
- parseCommand _ = Proto.parseFail
-
-instance Proto.Serializable Offset where
- serialize (Offset n) = show n
- deserialize = Offset <$$> readish
-
-instance Proto.Serializable Len where
- serialize (Len n) = show n
- deserialize = Len <$$> readish
-
-instance Proto.Serializable AuthToken where
- serialize (AuthToken s) = s
- deserialize = Just . AuthToken
+connect :: Service -> Handle -> Handle -> Proto ExitCode
+connect service hin hout = do
+ net $ sendMessage (CONNECT service)
+ net $ relay (RelayHandle hin) (relayCallback (RelayHandle hout))
-instance Proto.Serializable Service where
- serialize UploadPack = "git-upload-pack"
- serialize ReceivePack = "git-receive-pack"
- deserialize "git-upload-pack" = Just UploadPack
- deserialize "git-receive-pack" = Just ReceivePack
- deserialize _ = Nothing
+relayCallback :: RelayHandle -> RelayData -> Net (Maybe ExitCode)
+relayCallback hout (RelayMessage (DATA len)) = do
+ writeRelay hout =<< receiveBytes len
+ return Nothing
+relayCallback _ (RelayMessage (CONNECTDONE exitcode)) =
+ return (Just exitcode)
+relayCallback _ (RelayMessage _) = do
+ sendMessage (ERROR "expected DATA or CONNECTDONE")
+ return (Just (ExitFailure 1))
+relayCallback _ (RelayData b) = do
+ let len = Len $ fromIntegral $ L.length b
+ sendMessage (DATA len)
+ sendBytes len b
+ return Nothing
diff --git a/Remote/Helper/P2P/IO.hs b/Remote/Helper/P2P/IO.hs
new file mode 100644
index 000000000..7179adc2b
--- /dev/null
+++ b/Remote/Helper/P2P/IO.hs
@@ -0,0 +1,159 @@
+{- P2P protocol, partial IO implementation
+ -
+ - Copyright 2016 Joey Hess <id@joeyh.name>
+ -
+ - Licensed under the GNU GPL version 3 or higher.
+ -}
+
+{-# LANGUAGE RankNTypes #-}
+
+module Remote.Helper.P2P.IO
+ ( RunProto
+ , runProtoHandle
+ ) where
+
+import Remote.Helper.P2P
+import Utility.Process
+import Git
+import Git.Command
+import Utility.SafeCommand
+import Utility.SimpleProtocol
+
+import Control.Monad
+import Control.Monad.Free
+import Control.Monad.IO.Class
+import Data.Maybe
+import System.Exit (ExitCode(..))
+import System.IO
+import Control.Concurrent
+import qualified Data.ByteString as B
+import qualified Data.ByteString.Lazy as L
+
+type RunProto = forall a m. MonadIO m => Proto a -> m a
+
+data S = S
+ { repo :: Repo
+ , hdl :: Handle
+ }
+
+-- Implementation of the protocol, communicating with a peer
+-- over a Handle. No Local actions will be run.
+runProtoHandle :: MonadIO m => Handle -> Repo -> Proto a -> m a
+runProtoHandle h r = go
+ where
+ go :: RunProto
+ go (Pure a) = pure a
+ go (Free (Net n)) = runNetHandle (S r h) go n
+ go (Free (Local _)) = error "local actions not allowed"
+
+runNetHandle :: MonadIO m => S -> RunProto -> NetF (Proto a) -> m a
+runNetHandle s runner f = case f of
+ SendMessage m next -> do
+ liftIO $ do
+ hPutStrLn (hdl s) (unwords (formatMessage m))
+ hFlush (hdl s)
+ runner next
+ ReceiveMessage next -> do
+ l <- liftIO $ hGetLine (hdl s)
+ let m = fromMaybe (ERROR "protocol parse error")
+ (parseMessage l)
+ runner (next m)
+ SendBytes _len b next -> do
+ liftIO $ do
+ L.hPut (hdl s) b
+ hFlush (hdl s)
+ runner next
+ ReceiveBytes (Len n) next -> do
+ b <- liftIO $ L.hGet (hdl s) (fromIntegral n)
+ runner (next b)
+ Relay hout callback next ->
+ runRelay runner hout callback >>= runner . next
+ RelayService service callback next ->
+ runRelayService s runner service callback >>= runner . next
+ WriteRelay (RelayHandle h) b next -> do
+ liftIO $ do
+ L.hPut h b
+ hFlush h
+ runner next
+
+runRelay
+ :: MonadIO m
+ => RunProto
+ -> RelayHandle
+ -> (RelayData -> Net (Maybe ExitCode))
+ -> m ExitCode
+runRelay runner (RelayHandle hout) callback = do
+ v <- liftIO newEmptyMVar
+ _ <- liftIO $ forkIO $ readout v
+ feeder <- liftIO $ forkIO $ feedin v
+ exitcode <- liftIO $ drain v
+ liftIO $ killThread feeder
+ return exitcode
+ where
+ feedin v = forever $ do
+ m <- runner $ net receiveMessage
+ putMVar v $ RelayMessage m
+
+ readout v = do
+ b <- B.hGetSome hout 65536
+ if B.null b
+ then hClose hout
+ else do
+ putMVar v $ RelayData (L.fromChunks [b])
+ readout v
+
+ drain v = do
+ d <- takeMVar v
+ r <- runner $ net $ callback d
+ case r of
+ Nothing -> drain v
+ Just exitcode -> return exitcode
+
+runRelayService
+ :: MonadIO m
+ => S
+ -> RunProto
+ -> Service
+ -> (RelayHandle -> RelayData -> Net (Maybe ExitCode))
+ -> m ExitCode
+runRelayService s runner service callback = do
+ v <- liftIO newEmptyMVar
+ (Just hin, Just hout, _, pid) <- liftIO $ createProcess serviceproc
+ { std_out = CreatePipe
+ , std_in = CreatePipe
+ }
+ _ <- liftIO $ forkIO $ readout v hout
+ feeder <- liftIO $ forkIO $ feedin v
+ _ <- liftIO $ forkIO $ putMVar v . Left =<< waitForProcess pid
+ exitcode <- liftIO $ drain v hin
+ liftIO $ killThread feeder
+ return exitcode
+ where
+ cmd = case service of
+ UploadPack -> "upload-pack"
+ ReceivePack -> "receive-pack"
+ serviceproc = gitCreateProcess [Param cmd, File (repoPath (repo s))] (repo s)
+
+ drain v hin = do
+ d <- takeMVar v
+ case d of
+ Left exitcode -> do
+ hClose hin
+ return exitcode
+ Right relaydata -> do
+ _ <- runner $ net $
+ callback (RelayHandle hin) relaydata
+ drain v hin
+
+ readout v hout = do
+ b <- B.hGetSome hout 65536
+ if B.null b
+ then hClose hout
+ else do
+ putMVar v $ Right $
+ RelayData (L.fromChunks [b])
+ readout v hout
+
+ feedin v = forever $ do
+ m <- runner $ net receiveMessage
+ putMVar v $ Right $ RelayMessage m
diff --git a/git-annex.cabal b/git-annex.cabal
index 4fb4e1c3c..77c50b66e 100644
--- a/git-annex.cabal
+++ b/git-annex.cabal
@@ -920,6 +920,7 @@ Executable git-annex
Remote.Helper.Http
Remote.Helper.Messages
Remote.Helper.P2P
+ Remote.Helper.P2P.IO
Remote.Helper.ReadOnly
Remote.Helper.Special
Remote.Helper.Ssh