aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--P2P/IO.hs13
-rw-r--r--P2P/Protocol.hs16
-rw-r--r--Remote/P2P.hs11
-rw-r--r--Remote/S3.hs2
-rw-r--r--Utility/Metered.hs30
5 files changed, 45 insertions, 27 deletions
diff --git a/P2P/IO.hs b/P2P/IO.hs
index 8a580452c..ea15ecfc3 100644
--- a/P2P/IO.hs
+++ b/P2P/IO.hs
@@ -119,8 +119,8 @@ runNet conn runner f = case f of
case v of
Right True -> runner next
_ -> return Nothing
- ReceiveBytes (Len n) next -> do
- v <- liftIO $ tryNonAsync $ L.hGet (connIhdl conn) (fromIntegral n)
+ ReceiveBytes len p next -> do
+ v <- liftIO $ tryNonAsync $ receiveExactly len (connIhdl conn) p
case v of
Left _e -> return Nothing
Right b -> runner (next b)
@@ -155,9 +155,12 @@ runNet conn runner f = case f of
-- If too few bytes are sent, the only option is to give up on this
-- connection. False is returned to indicate this problem.
sendExactly :: Len -> L.ByteString -> Handle -> MeterUpdate -> IO Bool
-sendExactly (Len l) b h p = do
- sent <- meteredWrite' p h (L.take (fromIntegral l) b)
- return (fromBytesProcessed sent == l)
+sendExactly (Len n) b h p = do
+ sent <- meteredWrite' p h (L.take (fromIntegral n) b)
+ return (fromBytesProcessed sent == n)
+
+receiveExactly :: Len -> Handle -> MeterUpdate -> IO L.ByteString
+receiveExactly (Len n) h p = hGetMetered h (Just n) p
runRelay :: RunProto IO -> RelayHandle -> RelayHandle -> IO (Maybe ExitCode)
runRelay runner (RelayHandle hout) (RelayHandle hin) = bracket setup cleanup go
diff --git a/P2P/Protocol.hs b/P2P/Protocol.hs
index 6cefce38c..b1e2bf481 100644
--- a/P2P/Protocol.hs
+++ b/P2P/Protocol.hs
@@ -167,7 +167,7 @@ data NetF c
| SendBytes Len L.ByteString MeterUpdate c
-- ^ Sends exactly Len bytes of data. (Any more or less will
-- confuse the receiver.)
- | ReceiveBytes Len (L.ByteString -> c)
+ | ReceiveBytes Len MeterUpdate (L.ByteString -> c)
-- ^ Lazily reads bytes from peer. Stops once Len are read,
-- or if connection is lost, and in either case returns the bytes
-- that were read. This allows resuming interrupted transfers.
@@ -273,8 +273,8 @@ remove key = do
net $ sendMessage (REMOVE key)
checkSuccess
-get :: FilePath -> Key -> AssociatedFile -> Proto Bool
-get dest key af = receiveContent sizer storer (\offset -> GET offset af key)
+get :: FilePath -> Key -> AssociatedFile -> MeterUpdate -> Proto Bool
+get dest key af p = receiveContent p sizer storer (\offset -> GET offset af key)
where
sizer = fileSize dest
storer = storeContentTo dest
@@ -364,7 +364,7 @@ serveAuthed myuuid = void $ serverLoop handler
else do
let sizer = tmpContentSize key
let storer = storeContent key af
- ok <- receiveContent sizer storer PUT_FROM
+ ok <- receiveContent nullMeterUpdate sizer storer PUT_FROM
when ok $
local $ setPresent key myuuid
return ServerContinue
@@ -385,8 +385,8 @@ sendContent key af offset p = do
net $ sendBytes len content p
checkSuccess
-receiveContent :: Local Len -> (Offset -> Len -> L.ByteString -> Local Bool) -> (Offset -> Message) -> Proto Bool
-receiveContent sizer storer mkmsg = do
+receiveContent :: MeterUpdate -> Local Len -> (Offset -> Len -> L.ByteString -> Local Bool) -> (Offset -> Message) -> Proto Bool
+receiveContent p sizer storer mkmsg = do
Len n <- local sizer
let offset = Offset n
net $ sendMessage (mkmsg offset)
@@ -394,7 +394,7 @@ receiveContent sizer storer mkmsg = do
case r of
DATA len -> do
ok <- local . storer offset len
- =<< net (receiveBytes len)
+ =<< net (receiveBytes len p)
sendSuccess ok
return ok
_ -> do
@@ -447,7 +447,7 @@ relayFromPeer = do
r <- receiveMessage
case r of
CONNECTDONE exitcode -> return $ RelayDone exitcode
- DATA len -> RelayFromPeer <$> receiveBytes len
+ DATA len -> RelayFromPeer <$> receiveBytes len nullMeterUpdate
_ -> do
sendMessage $ ERROR "expected DATA or CONNECTDONE"
return $ RelayDone $ ExitFailure 1
diff --git a/Remote/P2P.hs b/Remote/P2P.hs
index 8286a9a18..1d7ede30f 100644
--- a/Remote/P2P.hs
+++ b/Remote/P2P.hs
@@ -24,6 +24,7 @@ import Annex.UUID
import Config
import Config.Cost
import Remote.Helper.Git
+import Messages.Progress
import Utility.Metered
import Utility.AuthToken
import Types.NumCopies
@@ -74,12 +75,14 @@ chainGen addr r u c gc = do
return (Just this)
store :: UUID -> P2PAddress -> ConnectionPool -> Key -> AssociatedFile -> MeterUpdate -> Annex Bool
-store u addr connpool k af p = fromMaybe False
- <$> runProto u addr connpool (P2P.put k af p)
+store u addr connpool k af p =
+ metered (Just p) k $ \p' -> fromMaybe False
+ <$> runProto u addr connpool (P2P.put k af p')
retrieve :: UUID -> P2PAddress -> ConnectionPool -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> Annex (Bool, Verification)
-retrieve u addr connpool k af dest _p = unVerified $ fromMaybe False
- <$> runProto u addr connpool (P2P.get dest k af)
+retrieve u addr connpool k af dest p = unVerified $
+ metered (Just p) k $ \p' -> fromMaybe False
+ <$> runProto u addr connpool (P2P.get dest k af p')
remove :: UUID -> P2PAddress -> ConnectionPool -> Key -> Annex Bool
remove u addr connpool k = fromMaybe False
diff --git a/Remote/S3.hs b/Remote/S3.hs
index c6f23333f..4c1bd5784 100644
--- a/Remote/S3.hs
+++ b/Remote/S3.hs
@@ -193,7 +193,7 @@ store _r info h = fileStorer $ \k f p -> do
uploadid <- S3.imurUploadId <$> sendS3Handle h startreq
-- The actual part size will be a even multiple of the
- -- 32k chunk size that hGetUntilMetered uses.
+ -- 32k chunk size that lazy ByteStrings use.
let partsz' = (partsz `div` toInteger defaultChunkSize) * toInteger defaultChunkSize
-- Send parts of the file, taking care to stream each part
diff --git a/Utility/Metered.hs b/Utility/Metered.hs
index aa65efd4d..b80d3ae3f 100644
--- a/Utility/Metered.hs
+++ b/Utility/Metered.hs
@@ -1,6 +1,6 @@
{- Metered IO and actions
-
- - Copyright 2012-2106 Joey Hess <id@joeyh.name>
+ - Copyright 2012-2016 Joey Hess <id@joeyh.name>
-
- License: BSD-2-clause
-}
@@ -115,24 +115,24 @@ offsetMeterUpdate base offset = \n -> base (offset `addBytesProcessed` n)
- meter updates, so use caution.
-}
hGetContentsMetered :: Handle -> MeterUpdate -> IO L.ByteString
-hGetContentsMetered h = hGetUntilMetered h (const True)
+hGetContentsMetered h = hGetMetered h Nothing
-{- Reads from the Handle, updating the meter after each chunk.
+{- Reads from the Handle, updating the meter after each chunk is read.
+ -
+ - Stops at EOF, or when the requested number of bytes have been read.
+ - Closes the Handle at EOF, but otherwise leaves it open.
-
- Note that the meter update is run in unsafeInterleaveIO, which means that
- it can be run at any time. It's even possible for updates to run out
- of order, as different parts of the ByteString are consumed.
- -
- - Stops at EOF, or when keepgoing evaluates to False.
- - Closes the Handle at EOF, but otherwise leaves it open.
-}
-hGetUntilMetered :: Handle -> (Integer -> Bool) -> MeterUpdate -> IO L.ByteString
-hGetUntilMetered h keepgoing meterupdate = lazyRead zeroBytesProcessed
+hGetMetered :: Handle -> Maybe Integer -> MeterUpdate -> IO L.ByteString
+hGetMetered h wantsize meterupdate = lazyRead zeroBytesProcessed
where
lazyRead sofar = unsafeInterleaveIO $ loop sofar
loop sofar = do
- c <- S.hGet h defaultChunkSize
+ c <- S.hGet h (nextchunksize (fromBytesProcessed sofar))
if S.null c
then do
hClose h
@@ -148,6 +148,18 @@ hGetUntilMetered h keepgoing meterupdate = lazyRead zeroBytesProcessed
cs <- lazyRead sofar'
return $ L.append (L.fromChunks [c]) cs
else return $ L.fromChunks [c]
+
+ keepgoing n = case wantsize of
+ Nothing -> True
+ Just sz -> n < sz
+
+ nextchunksize n = case wantsize of
+ Nothing -> defaultChunkSize
+ Just sz ->
+ let togo = sz - n
+ in if togo < toInteger defaultChunkSize
+ then fromIntegral togo
+ else defaultChunkSize
{- Same default chunk size Lazy ByteStrings use. -}
defaultChunkSize :: Int