summaryrefslogtreecommitdiff
path: root/Remote
diff options
context:
space:
mode:
Diffstat (limited to 'Remote')
-rw-r--r--Remote/Helper/Http.hs35
-rw-r--r--Remote/S3.hs9
2 files changed, 34 insertions, 10 deletions
diff --git a/Remote/Helper/Http.hs b/Remote/Helper/Http.hs
index cb3af335a..6ce5bacb8 100644
--- a/Remote/Helper/Http.hs
+++ b/Remote/Helper/Http.hs
@@ -5,6 +5,8 @@
- Licensed under the GNU GPL version 3 or higher.
-}
+{-# LANGUAGE BangPatterns #-}
+
module Remote.Helper.Http where
import Common.Annex
@@ -31,17 +33,38 @@ httpStorer a = fileStorer $ \k f m -> a k =<< liftIO (httpBodyStorer f m)
httpBodyStorer :: FilePath -> MeterUpdate -> IO RequestBody
httpBodyStorer src m = do
size <- fromIntegral . fileSize <$> getFileStatus src :: IO Integer
- let streamer sink = withMeteredFile src m $ \b -> mkPopper b sink
+ let streamer sink = withMeteredFile src m $ \b -> byteStringPopper b sink
return $ RequestBodyStream (fromInteger size) streamer
-mkPopper :: L.ByteString -> NeedsPopper () -> IO ()
-mkPopper b sink = do
+byteStringPopper :: L.ByteString -> NeedsPopper () -> IO ()
+byteStringPopper b sink = do
mvar <- newMVar $ L.toChunks b
- let getnextchunk = modifyMVar mvar $ pure . pop
+ let getnextchunk = modifyMVar mvar $ \v ->
+ case v of
+ [] -> return ([], S.empty)
+ (c:cs) -> return (cs, c)
+ sink getnextchunk
+
+{- Makes a Popper that streams a given number of chunks of a given
+ - size from the handle, updating the meter as the chunks are read. -}
+handlePopper :: Integer -> Int -> MeterUpdate -> Handle -> NeedsPopper () -> IO ()
+handlePopper numchunks chunksize meterupdate h sink = do
+ mvar <- newMVar zeroBytesProcessed
+ let getnextchunk = do
+ sent <- takeMVar mvar
+ if sent >= target
+ then do
+ putMVar mvar sent
+ return S.empty
+ else do
+ b <- S.hGet h chunksize
+ let !sent' = addBytesProcessed sent chunksize
+ putMVar mvar sent'
+ meterupdate sent'
+ return b
sink getnextchunk
where
- pop [] = ([], S.empty)
- pop (c:cs) = (cs, c)
+ target = toBytesProcessed (numchunks * fromIntegral chunksize)
-- Reads the http body and stores it to the specified file, updating the
-- meter as it goes.
diff --git a/Remote/S3.hs b/Remote/S3.hs
index f19c42842..a1f5bf75d 100644
--- a/Remote/S3.hs
+++ b/Remote/S3.hs
@@ -198,10 +198,11 @@ store r h = fileStorer $ \k f p -> do
let sz = if fsz - pos < partsz'
then fsz - pos
else partsz'
- b <- liftIO $ hGetUntilMetered fh (< partsz') meter
- let body = RequestBodyStream (fromIntegral sz) (mkPopper b)
- S3.UploadPartResponse _ etag <- sendS3Handle h $
- S3.uploadPart (bucket info) object partnum uploadid body
+ let numchunks = ceiling (fromIntegral sz / defaultChunkSize)
+ let popper = handlePopper numchunks defaultChunkSize p fh
+ let req = S3.uploadPart (bucket info) object partnum uploadid $
+ RequestBodyStream (fromIntegral sz) popper
+ S3.UploadPartResponse _ etag <- sendS3Handle h req
sendparts (offsetMeterUpdate meter (toBytesProcessed sz)) (etag:etags) (partnum + 1)
sendparts p [] 1