diff options
-rw-r--r-- | Remote/S3.hs | 38 |
1 files changed, 24 insertions, 14 deletions
diff --git a/Remote/S3.hs b/Remote/S3.hs index 60501f2ce..1de1abad6 100644 --- a/Remote/S3.hs +++ b/Remote/S3.hs @@ -27,7 +27,6 @@ import Control.Monad.Trans.Resource import Control.Monad.Catch import Data.Conduit #if MIN_VERSION_aws(0,10,6) -import qualified Aws.S3.Commands.Multipart as Multipart import qualified Data.Conduit.List as CL import qualified Data.Conduit.Binary as CB import Network.HTTP.Conduit (withManager) @@ -160,10 +159,10 @@ prepareS3 r info = resourcePrepare $ const $ store :: Remote -> S3Handle -> Storer store r h = fileStorer $ \k f p -> do case partSize (hinfo h) of - Just sz -> do + Just partsz | partsz > 0 -> do fsz <- fromIntegral . fileSize <$> liftIO (getFileStatus f) - if fsz > sz - then multipartupload sz k f p + if fsz > partsz + then multipartupload fsz partsz k f p else singlepartupload k f p Nothing -> singlepartupload k f p -- Store public URL to item in Internet Archive. @@ -174,7 +173,7 @@ store r h = fileStorer $ \k f p -> do singlepartupload k f p = do rbody <- liftIO $ httpBodyStorer f p void $ sendS3Handle h $ putObject h (bucketObject (hinfo h) k) rbody - multipartupload sz k f p = do + multipartupload fsz partsz k f p = do #if MIN_VERSION_aws(0,10,6) let info = hinfo h let object = bucketObject info k @@ -187,19 +186,30 @@ store r h = fileStorer $ \k f p -> do } uploadid <- S3.imurUploadId <$> sendS3Handle h req - -- TODO: progress display - -- TODO: avoid needing tons of memory - -- https://github.com/aristidb/aws/issues/142 - etags <- liftIO $ withManager $ \mgr -> - CB.sourceFile f - $= Multipart.chunkedConduit sz - $= Multipart.putConduit (hawscfg h) (hs3cfg h) mgr (bucket info) object uploadid - $$ CL.consume + -- Send parts of the file, taking care to stream each part + -- w/o buffering in memory, since the parts can be large. + etags <- bracketIO (openBinaryFile f ReadMode) hClose $ \h -> do + let sendparts etags partnum = do + b <- liftIO $ hGetUntilMetered h (< partsz) p + if L.null b + then return (reverse etags) + else do + mvar <- newMVar $ L.toChunks b + let streamer sink = do + let getnextchunk = modifyMVar mvar $ pure . pop + sink getnextchunk + let body = RequestBodyStreamChunked streamer + S3.UploadPartResponse _ etag <- sendS3Handle h $ + S3.uploadPart (bucket info) object partnum uploadid body + sendparts (etag:etags) (partnum + 1) + sendparts [] 0 1 void $ sendS3Handle h $ S3.postCompleteMultipartUpload (bucket info) object uploadid (zip [1..] etags) + pop [] = ([], S.empty) + pop (c:cs) = (cs, c) #else - warning $ "Cannot do multipart upload (partsize " ++ show sz ++ "); built with too old a version of the aws library." + warning $ "Cannot do multipart upload (partsize " ++ show partsz ++ " vs filesize " ++ show fsz ++ "); built with too old a version of the aws library." singlepartupload k f p #endif |