summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2014-11-03 19:50:33 -0400
committerGravatar Joey Hess <joey@kitenet.net>2014-11-03 19:50:33 -0400
commit76c97f4ea90293b04a416368c107d1cc19cc381b (patch)
tree2bd234bfd9a8816e686af85f233114f9c7b8c9a0
parente6cac7f24606d57d2a70a4ff96c4557ade4a0b3a (diff)
WIP 2
-rw-r--r--Remote/Helper/Http.hs13
-rw-r--r--Remote/S3.hs37
2 files changed, 23 insertions, 27 deletions
diff --git a/Remote/Helper/Http.hs b/Remote/Helper/Http.hs
index 4088854ff..cb3af335a 100644
--- a/Remote/Helper/Http.hs
+++ b/Remote/Helper/Http.hs
@@ -11,7 +11,7 @@ import Common.Annex
import Types.StoreRetrieve
import Utility.Metered
import Remote.Helper.Special
-import Network.HTTP.Client (RequestBody(..), Response, responseStatus, responseBody, BodyReader)
+import Network.HTTP.Client (RequestBody(..), Response, responseStatus, responseBody, BodyReader, NeedsPopper)
import Network.HTTP.Types
import qualified Data.ByteString.Lazy as L
@@ -31,11 +31,14 @@ httpStorer a = fileStorer $ \k f m -> a k =<< liftIO (httpBodyStorer f m)
httpBodyStorer :: FilePath -> MeterUpdate -> IO RequestBody
httpBodyStorer src m = do
size <- fromIntegral . fileSize <$> getFileStatus src :: IO Integer
- let streamer sink = withMeteredFile src m $ \b -> do
- mvar <- newMVar $ L.toChunks b
- let getnextchunk = modifyMVar mvar $ pure . pop
- sink getnextchunk
+ let streamer sink = withMeteredFile src m $ \b -> mkPopper b sink
return $ RequestBodyStream (fromInteger size) streamer
+
+mkPopper :: L.ByteString -> NeedsPopper () -> IO ()
+mkPopper b sink = do
+ mvar <- newMVar $ L.toChunks b
+ let getnextchunk = modifyMVar mvar $ pure . pop
+ sink getnextchunk
where
pop [] = ([], S.empty)
pop (c:cs) = (cs, c)
diff --git a/Remote/S3.hs b/Remote/S3.hs
index 1de1abad6..685f95bbb 100644
--- a/Remote/S3.hs
+++ b/Remote/S3.hs
@@ -29,7 +29,6 @@ import Data.Conduit
#if MIN_VERSION_aws(0,10,6)
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Binary as CB
-import Network.HTTP.Conduit (withManager)
#endif
import Common.Annex
@@ -162,9 +161,9 @@ store r h = fileStorer $ \k f p -> do
Just partsz | partsz > 0 -> do
fsz <- fromIntegral . fileSize <$> liftIO (getFileStatus f)
if fsz > partsz
- then multipartupload fsz partsz k f p
+ then multipartupload partsz k f p
else singlepartupload k f p
- Nothing -> singlepartupload k f p
+ _ -> singlepartupload k f p
-- Store public URL to item in Internet Archive.
when (isIA (hinfo h) && not (isChunkKey k)) $
setUrlPresent k (iaKeyUrl r k)
@@ -173,7 +172,7 @@ store r h = fileStorer $ \k f p -> do
singlepartupload k f p = do
rbody <- liftIO $ httpBodyStorer f p
void $ sendS3Handle h $ putObject h (bucketObject (hinfo h) k) rbody
- multipartupload fsz partsz k f p = do
+ multipartupload partsz k f p = do
#if MIN_VERSION_aws(0,10,6)
let info = hinfo h
let object = bucketObject info k
@@ -188,28 +187,22 @@ store r h = fileStorer $ \k f p -> do
-- Send parts of the file, taking care to stream each part
-- w/o buffering in memory, since the parts can be large.
- etags <- bracketIO (openBinaryFile f ReadMode) hClose $ \h -> do
- let sendparts etags partnum = do
- b <- liftIO $ hGetUntilMetered h (< partsz) p
- if L.null b
- then return (reverse etags)
- else do
- mvar <- newMVar $ L.toChunks b
- let streamer sink = do
- let getnextchunk = modifyMVar mvar $ pure . pop
- sink getnextchunk
- let body = RequestBodyStreamChunked streamer
- S3.UploadPartResponse _ etag <- sendS3Handle h $
- S3.uploadPart (bucket info) object partnum uploadid body
- sendparts (etag:etags) (partnum + 1)
- sendparts [] 0 1
+ etags <- bracketIO (openBinaryFile f ReadMode) hClose $ \fh -> do
+ let sendparts etags partnum = ifM (hIsEOF fh)
+ ( return (reverse etags)
+ , do
+ b <- liftIO $ hGetUntilMetered fh (< partsz) p
+ let body = RequestBodyStream (L.length b) (mkPopper b)
+ S3.UploadPartResponse _ etag <- sendS3Handle h $
+ S3.uploadPart (bucket info) object partnum uploadid body
+ sendparts (etag:etags) (partnum + 1)
+ )
+ sendparts [] 1
void $ sendS3Handle h $ S3.postCompleteMultipartUpload
(bucket info) object uploadid (zip [1..] etags)
- pop [] = ([], S.empty)
- pop (c:cs) = (cs, c)
#else
- warning $ "Cannot do multipart upload (partsize " ++ show partsz ++ " vs filesize " ++ show fsz ++ "); built with too old a version of the aws library."
+ warning $ "Cannot do multipart upload (partsize " ++ show partsz ++ "); built with too old a version of the aws library."
singlepartupload k f p
#endif