summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--P2P/IO.hs23
-rw-r--r--P2P/Protocol.hs19
-rw-r--r--Remote/P2P.hs3
-rw-r--r--Utility/Metered.hs7
4 files changed, 24 insertions, 28 deletions
diff --git a/P2P/IO.hs b/P2P/IO.hs
index 9abefb8a0..8a580452c 100644
--- a/P2P/IO.hs
+++ b/P2P/IO.hs
@@ -5,7 +5,7 @@
- Licensed under the GNU GPL version 3 or higher.
-}
-{-# LANGUAGE RankNTypes, FlexibleContexts, BangPatterns, CPP #-}
+{-# LANGUAGE RankNTypes, FlexibleContexts, CPP #-}
module P2P.IO
( RunProto
@@ -26,6 +26,7 @@ import Utility.AuthToken
import Utility.SafeCommand
import Utility.SimpleProtocol
import Utility.Exception
+import Utility.Metered
import Utility.Tor
import Utility.FileSystemEncoding
@@ -110,9 +111,9 @@ runNet conn runner f = case f of
let e = ERROR $ "protocol parse error: " ++ show l
net $ sendMessage e
next e
- SendBytes len b next -> do
+ SendBytes len b p next -> do
v <- liftIO $ tryNonAsync $ do
- ok <- sendExactly len b (connOhdl conn)
+ ok <- sendExactly len b (connOhdl conn) p
hFlush (connOhdl conn)
return ok
case v of
@@ -153,18 +154,10 @@ runNet conn runner f = case f of
--
-- If too few bytes are sent, the only option is to give up on this
-- connection. False is returned to indicate this problem.
---
--- We can't check the length of the whole lazy bytestring without buffering
--- it in memory. Instead, process it one chunk at a time, and sum the length
--- of the chunks.
-sendExactly :: Len -> L.ByteString -> Handle -> IO Bool
-sendExactly (Len l) lb h = go 0 $ L.toChunks $ L.take (fromIntegral l) lb
- where
- go n [] = return (toInteger n == l)
- go n (b:bs) = do
- B.hPut h b
- let !n' = n + B.length b
- go n' bs
+sendExactly :: Len -> L.ByteString -> Handle -> MeterUpdate -> IO Bool
+sendExactly (Len l) b h p = do
+ sent <- meteredWrite' p h (L.take (fromIntegral l) b)
+ return (fromBytesProcessed sent == l)
runRelay :: RunProto IO -> RelayHandle -> RelayHandle -> IO (Maybe ExitCode)
runRelay runner (RelayHandle hout) (RelayHandle hin) = bracket setup cleanup go
diff --git a/P2P/Protocol.hs b/P2P/Protocol.hs
index b2b734e48..6cefce38c 100644
--- a/P2P/Protocol.hs
+++ b/P2P/Protocol.hs
@@ -17,6 +17,7 @@ import Types.UUID
import Utility.AuthToken
import Utility.Applicative
import Utility.PartialPrelude
+import Utility.Metered
import Git.FilePath
import Control.Monad
@@ -163,7 +164,7 @@ local = hoistFree Local
data NetF c
= SendMessage Message c
| ReceiveMessage (Message -> c)
- | SendBytes Len L.ByteString c
+ | SendBytes Len L.ByteString MeterUpdate c
-- ^ Sends exactly Len bytes of data. (Any more or less will
-- confuse the receiver.)
| ReceiveBytes Len (L.ByteString -> c)
@@ -278,12 +279,12 @@ get dest key af = receiveContent sizer storer (\offset -> GET offset af key)
sizer = fileSize dest
storer = storeContentTo dest
-put :: Key -> AssociatedFile -> Proto Bool
-put key af = do
+put :: Key -> AssociatedFile -> MeterUpdate -> Proto Bool
+put key af p = do
net $ sendMessage (PUT af key)
r <- net receiveMessage
case r of
- PUT_FROM offset -> sendContent key af offset
+ PUT_FROM offset -> sendContent key af offset p
ALREADY_HAVE -> return True
_ -> do
net $ sendMessage (ERROR "expected PUT_FROM")
@@ -368,7 +369,7 @@ serveAuthed myuuid = void $ serverLoop handler
local $ setPresent key myuuid
return ServerContinue
handler (GET offset key af) = do
- void $ sendContent af key offset
+ void $ sendContent af key offset nullMeterUpdate
-- setPresent not called because the peer may have
-- requested the data but not permanently stored it.
return ServerContinue
@@ -377,11 +378,11 @@ serveAuthed myuuid = void $ serverLoop handler
return ServerContinue
handler _ = return ServerUnexpected
-sendContent :: Key -> AssociatedFile -> Offset -> Proto Bool
-sendContent key af offset = do
+sendContent :: Key -> AssociatedFile -> Offset -> MeterUpdate -> Proto Bool
+sendContent key af offset p = do
(len, content) <- readContentLen key af offset
net $ sendMessage (DATA len)
- net $ sendBytes len content
+ net $ sendBytes len content p
checkSuccess
receiveContent :: Local Len -> (Offset -> Len -> L.ByteString -> Local Bool) -> (Offset -> Message) -> Proto Bool
@@ -456,5 +457,5 @@ relayToPeer (RelayDone exitcode) = sendMessage (CONNECTDONE exitcode)
relayToPeer (RelayToPeer b) = do
let len = Len $ fromIntegral $ L.length b
sendMessage (DATA len)
- sendBytes len b
+ sendBytes len b nullMeterUpdate
relayToPeer (RelayFromPeer _) = return ()
diff --git a/Remote/P2P.hs b/Remote/P2P.hs
index f4f1d5f38..8286a9a18 100644
--- a/Remote/P2P.hs
+++ b/Remote/P2P.hs
@@ -73,10 +73,9 @@ chainGen addr r u c gc = do
}
return (Just this)
--- TODO update progress
store :: UUID -> P2PAddress -> ConnectionPool -> Key -> AssociatedFile -> MeterUpdate -> Annex Bool
store u addr connpool k af p = fromMaybe False
- <$> runProto u addr connpool (P2P.put k af)
+ <$> runProto u addr connpool (P2P.put k af p)
retrieve :: UUID -> P2PAddress -> ConnectionPool -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> Annex (Bool, Verification)
retrieve u addr connpool k af dest _p = unVerified $ fromMaybe False
diff --git a/Utility/Metered.hs b/Utility/Metered.hs
index 440aa3f07..aa65efd4d 100644
--- a/Utility/Metered.hs
+++ b/Utility/Metered.hs
@@ -85,9 +85,12 @@ streamMeteredFile f meterupdate h = withMeteredFile f meterupdate $ L.hPut h
{- Writes a ByteString to a Handle, updating a meter as it's written. -}
meteredWrite :: MeterUpdate -> Handle -> L.ByteString -> IO ()
-meteredWrite meterupdate h = go zeroBytesProcessed . L.toChunks
+meteredWrite meterupdate h = void . meteredWrite' meterupdate h
+
+meteredWrite' :: MeterUpdate -> Handle -> L.ByteString -> IO BytesProcessed
+meteredWrite' meterupdate h = go zeroBytesProcessed . L.toChunks
where
- go _ [] = return ()
+ go sofar [] = return sofar
go sofar (c:cs) = do
S.hPut h c
let sofar' = addBytesProcessed sofar $ S.length c