aboutsummaryrefslogtreecommitdiff
path: root/Remote/Helper/Http.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Remote/Helper/Http.hs')
-rw-r--r--Remote/Helper/Http.hs52
1 files changed, 41 insertions, 11 deletions
diff --git a/Remote/Helper/Http.hs b/Remote/Helper/Http.hs
index f1d576d1c..6ce5bacb8 100644
--- a/Remote/Helper/Http.hs
+++ b/Remote/Helper/Http.hs
@@ -5,13 +5,15 @@
- Licensed under the GNU GPL version 3 or higher.
-}
+{-# LANGUAGE BangPatterns #-}
+
module Remote.Helper.Http where
import Common.Annex
import Types.StoreRetrieve
import Utility.Metered
import Remote.Helper.Special
-import Network.HTTP.Client (RequestBody(..), Response, responseStatus, responseBody, BodyReader)
+import Network.HTTP.Client (RequestBody(..), Response, responseStatus, responseBody, BodyReader, NeedsPopper)
import Network.HTTP.Types
import qualified Data.ByteString.Lazy as L
@@ -24,17 +26,45 @@ import Control.Concurrent
-- Implemented as a fileStorer, so that the content can be streamed
-- from the file in constant space.
httpStorer :: (Key -> RequestBody -> Annex Bool) -> Storer
-httpStorer a = fileStorer $ \k f m -> do
- size <- liftIO $ (fromIntegral . fileSize <$> getFileStatus f :: IO Integer)
- let streamer sink = withMeteredFile f m $ \b -> do
- mvar <- newMVar $ L.toChunks b
- let getnextchunk = modifyMVar mvar $ pure . pop
- sink getnextchunk
- let body = RequestBodyStream (fromInteger size) streamer
- a k body
+httpStorer a = fileStorer $ \k f m -> a k =<< liftIO (httpBodyStorer f m)
+
+-- Reads the file and generates a streaming request body, that will update
+-- the meter as it's sent.
+httpBodyStorer :: FilePath -> MeterUpdate -> IO RequestBody
+httpBodyStorer src m = do
+ size <- fromIntegral . fileSize <$> getFileStatus src :: IO Integer
+ let streamer sink = withMeteredFile src m $ \b -> byteStringPopper b sink
+ return $ RequestBodyStream (fromInteger size) streamer
+
+byteStringPopper :: L.ByteString -> NeedsPopper () -> IO ()
+byteStringPopper b sink = do
+ mvar <- newMVar $ L.toChunks b
+ let getnextchunk = modifyMVar mvar $ \v ->
+ case v of
+ [] -> return ([], S.empty)
+ (c:cs) -> return (cs, c)
+ sink getnextchunk
+
+{- Makes a Popper that streams a given number of chunks of a given
+ - size from the handle, updating the meter as the chunks are read. -}
+handlePopper :: Integer -> Int -> MeterUpdate -> Handle -> NeedsPopper () -> IO ()
+handlePopper numchunks chunksize meterupdate h sink = do
+ mvar <- newMVar zeroBytesProcessed
+ let getnextchunk = do
+ sent <- takeMVar mvar
+ if sent >= target
+ then do
+ putMVar mvar sent
+ return S.empty
+ else do
+ b <- S.hGet h chunksize
+ let !sent' = addBytesProcessed sent chunksize
+ putMVar mvar sent'
+ meterupdate sent'
+ return b
+ sink getnextchunk
where
- pop [] = ([], S.empty)
- pop (c:cs) = (cs, c)
+ target = toBytesProcessed (numchunks * fromIntegral chunksize)
-- Reads the http body and stores it to the specified file, updating the
-- meter as it goes.