summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Remote/S3.hs38
1 files changed, 24 insertions, 14 deletions
diff --git a/Remote/S3.hs b/Remote/S3.hs
index 60501f2ce..1de1abad6 100644
--- a/Remote/S3.hs
+++ b/Remote/S3.hs
@@ -27,7 +27,6 @@ import Control.Monad.Trans.Resource
import Control.Monad.Catch
import Data.Conduit
#if MIN_VERSION_aws(0,10,6)
-import qualified Aws.S3.Commands.Multipart as Multipart
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Binary as CB
import Network.HTTP.Conduit (withManager)
@@ -160,10 +159,10 @@ prepareS3 r info = resourcePrepare $ const $
store :: Remote -> S3Handle -> Storer
store r h = fileStorer $ \k f p -> do
case partSize (hinfo h) of
- Just sz -> do
+ Just partsz | partsz > 0 -> do
fsz <- fromIntegral . fileSize <$> liftIO (getFileStatus f)
- if fsz > sz
- then multipartupload sz k f p
+ if fsz > partsz
+ then multipartupload fsz partsz k f p
else singlepartupload k f p
Nothing -> singlepartupload k f p
-- Store public URL to item in Internet Archive.
@@ -174,7 +173,7 @@ store r h = fileStorer $ \k f p -> do
singlepartupload k f p = do
rbody <- liftIO $ httpBodyStorer f p
void $ sendS3Handle h $ putObject h (bucketObject (hinfo h) k) rbody
- multipartupload sz k f p = do
+ multipartupload fsz partsz k f p = do
#if MIN_VERSION_aws(0,10,6)
let info = hinfo h
let object = bucketObject info k
@@ -187,19 +186,30 @@ store r h = fileStorer $ \k f p -> do
}
uploadid <- S3.imurUploadId <$> sendS3Handle h req
- -- TODO: progress display
- -- TODO: avoid needing tons of memory
- -- https://github.com/aristidb/aws/issues/142
- etags <- liftIO $ withManager $ \mgr ->
- CB.sourceFile f
- $= Multipart.chunkedConduit sz
- $= Multipart.putConduit (hawscfg h) (hs3cfg h) mgr (bucket info) object uploadid
- $$ CL.consume
+ -- Send parts of the file, taking care to stream each part
+ -- w/o buffering in memory, since the parts can be large.
+ etags <- bracketIO (openBinaryFile f ReadMode) hClose $ \h -> do
+ let sendparts etags partnum = do
+ b <- liftIO $ hGetUntilMetered h (< partsz) p
+ if L.null b
+ then return (reverse etags)
+ else do
+ mvar <- newMVar $ L.toChunks b
+ let streamer sink = do
+ let getnextchunk = modifyMVar mvar $ pure . pop
+ sink getnextchunk
+ let body = RequestBodyStreamChunked streamer
+ S3.UploadPartResponse _ etag <- sendS3Handle h $
+ S3.uploadPart (bucket info) object partnum uploadid body
+ sendparts (etag:etags) (partnum + 1)
+ sendparts [] 0 1
void $ sendS3Handle h $ S3.postCompleteMultipartUpload
(bucket info) object uploadid (zip [1..] etags)
+ pop [] = ([], S.empty)
+ pop (c:cs) = (cs, c)
#else
- warning $ "Cannot do multipart upload (partsize " ++ show sz ++ "); built with too old a version of the aws library."
+ warning $ "Cannot do multipart upload (partsize " ++ show partsz ++ " vs filesize " ++ show fsz ++ "); built with too old a version of the aws library."
singlepartupload k f p
#endif