From 76c97f4ea90293b04a416368c107d1cc19cc381b Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 3 Nov 2014 19:50:33 -0400 Subject: WIP 2 --- Remote/Helper/Http.hs | 13 ++++++++----- Remote/S3.hs | 37 +++++++++++++++---------------------- 2 files changed, 23 insertions(+), 27 deletions(-) diff --git a/Remote/Helper/Http.hs b/Remote/Helper/Http.hs index 4088854ff..cb3af335a 100644 --- a/Remote/Helper/Http.hs +++ b/Remote/Helper/Http.hs @@ -11,7 +11,7 @@ import Common.Annex import Types.StoreRetrieve import Utility.Metered import Remote.Helper.Special -import Network.HTTP.Client (RequestBody(..), Response, responseStatus, responseBody, BodyReader) +import Network.HTTP.Client (RequestBody(..), Response, responseStatus, responseBody, BodyReader, NeedsPopper) import Network.HTTP.Types import qualified Data.ByteString.Lazy as L @@ -31,11 +31,14 @@ httpStorer a = fileStorer $ \k f m -> a k =<< liftIO (httpBodyStorer f m) httpBodyStorer :: FilePath -> MeterUpdate -> IO RequestBody httpBodyStorer src m = do size <- fromIntegral . fileSize <$> getFileStatus src :: IO Integer - let streamer sink = withMeteredFile src m $ \b -> do - mvar <- newMVar $ L.toChunks b - let getnextchunk = modifyMVar mvar $ pure . pop - sink getnextchunk + let streamer sink = withMeteredFile src m $ \b -> mkPopper b sink return $ RequestBodyStream (fromInteger size) streamer + +mkPopper :: L.ByteString -> NeedsPopper () -> IO () +mkPopper b sink = do + mvar <- newMVar $ L.toChunks b + let getnextchunk = modifyMVar mvar $ pure . pop + sink getnextchunk where pop [] = ([], S.empty) pop (c:cs) = (cs, c) diff --git a/Remote/S3.hs b/Remote/S3.hs index 1de1abad6..685f95bbb 100644 --- a/Remote/S3.hs +++ b/Remote/S3.hs @@ -29,7 +29,6 @@ import Data.Conduit #if MIN_VERSION_aws(0,10,6) import qualified Data.Conduit.List as CL import qualified Data.Conduit.Binary as CB -import Network.HTTP.Conduit (withManager) #endif import Common.Annex @@ -162,9 +161,9 @@ store r h = fileStorer $ \k f p -> do Just partsz | partsz > 0 -> do fsz <- fromIntegral . fileSize <$> liftIO (getFileStatus f) if fsz > partsz - then multipartupload fsz partsz k f p + then multipartupload partsz k f p else singlepartupload k f p - Nothing -> singlepartupload k f p + _ -> singlepartupload k f p -- Store public URL to item in Internet Archive. when (isIA (hinfo h) && not (isChunkKey k)) $ setUrlPresent k (iaKeyUrl r k) @@ -173,7 +172,7 @@ store r h = fileStorer $ \k f p -> do singlepartupload k f p = do rbody <- liftIO $ httpBodyStorer f p void $ sendS3Handle h $ putObject h (bucketObject (hinfo h) k) rbody - multipartupload fsz partsz k f p = do + multipartupload partsz k f p = do #if MIN_VERSION_aws(0,10,6) let info = hinfo h let object = bucketObject info k @@ -188,28 +187,22 @@ store r h = fileStorer $ \k f p -> do -- Send parts of the file, taking care to stream each part -- w/o buffering in memory, since the parts can be large. - etags <- bracketIO (openBinaryFile f ReadMode) hClose $ \h -> do - let sendparts etags partnum = do - b <- liftIO $ hGetUntilMetered h (< partsz) p - if L.null b - then return (reverse etags) - else do - mvar <- newMVar $ L.toChunks b - let streamer sink = do - let getnextchunk = modifyMVar mvar $ pure . pop - sink getnextchunk - let body = RequestBodyStreamChunked streamer - S3.UploadPartResponse _ etag <- sendS3Handle h $ - S3.uploadPart (bucket info) object partnum uploadid body - sendparts (etag:etags) (partnum + 1) - sendparts [] 0 1 + etags <- bracketIO (openBinaryFile f ReadMode) hClose $ \fh -> do + let sendparts etags partnum = ifM (hIsEOF fh) + ( return (reverse etags) + , do + b <- liftIO $ hGetUntilMetered fh (< partsz) p + let body = RequestBodyStream (L.length b) (mkPopper b) + S3.UploadPartResponse _ etag <- sendS3Handle h $ + S3.uploadPart (bucket info) object partnum uploadid body + sendparts (etag:etags) (partnum + 1) + ) + sendparts [] 1 void $ sendS3Handle h $ S3.postCompleteMultipartUpload (bucket info) object uploadid (zip [1..] etags) - pop [] = ([], S.empty) - pop (c:cs) = (cs, c) #else - warning $ "Cannot do multipart upload (partsize " ++ show partsz ++ " vs filesize " ++ show fsz ++ "); built with too old a version of the aws library." + warning $ "Cannot do multipart upload (partsize " ++ show partsz ++ "); built with too old a version of the aws library." singlepartupload k f p #endif -- cgit v1.2.3