summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Joey Hess <id@joeyh.name>2014-12-03 14:02:29 -0400
committerGravatar Joey Hess <id@joeyh.name>2014-12-03 14:10:52 -0400
commit69957946eaa066406a243edca8fd3e19e7febfee (patch)
tree7ce300577cd986f4f03b5f81446a188916e75097
parentab9bb79e8f0eaa8d951d46e82b321f8511ded942 (diff)
parent718932c895b38228ab8aed4477d7ce8bba205e5a (diff)
Merge branch 's3-aws'
-rw-r--r--Remote/Helper/AWS.hs38
-rw-r--r--Remote/Helper/Http.hs52
-rw-r--r--Remote/S3.hs526
-rw-r--r--debian/changelog6
-rw-r--r--debian/control4
-rwxr-xr-xdebian/rules4
-rw-r--r--doc/bugs/S3_memory_leaks.mdwn4
-rw-r--r--doc/bugs/S3_upload_not_using_multipart.mdwn8
-rw-r--r--doc/bugs/new_AWS_region___40__eu-central-1__41__.mdwn2
-rw-r--r--doc/special_remotes/S3.mdwn22
-rw-r--r--doc/todo/S3_multipart_interruption_cleanup.mdwn14
-rw-r--r--git-annex.cabal10
12 files changed, 479 insertions, 211 deletions
diff --git a/Remote/Helper/AWS.hs b/Remote/Helper/AWS.hs
index 9b3643bc2..d27f2aad1 100644
--- a/Remote/Helper/AWS.hs
+++ b/Remote/Helper/AWS.hs
@@ -1,6 +1,6 @@
{- Amazon Web Services common infrastructure.
-
- - Copyright 2011,2012 Joey Hess <joey@kitenet.net>
+ - Copyright 2011-2014 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
@@ -12,8 +12,14 @@ module Remote.Helper.AWS where
import Common.Annex
import Creds
+import qualified Aws
+import qualified Aws.S3 as S3
import qualified Data.Map as M
+import qualified Data.ByteString as B
+import qualified Data.Text as T
+import Data.Text.Encoding (encodeUtf8)
import Data.Text (Text)
+import Data.IORef
creds :: UUID -> CredPairStorage
creds u = CredPairStorage
@@ -22,6 +28,13 @@ creds u = CredPairStorage
, credPairRemoteKey = Just "s3creds"
}
+genCredentials :: CredPair -> IO Aws.Credentials
+genCredentials (keyid, secret) = Aws.Credentials
+ <$> pure (encodeUtf8 (T.pack keyid))
+ <*> pure (encodeUtf8 (T.pack secret))
+ <*> newIORef []
+ <*> pure Nothing
+
data Service = S3 | Glacier
deriving (Eq)
@@ -33,9 +46,10 @@ regionMap = M.fromList . regionInfo
defaultRegion :: Service -> Region
defaultRegion = snd . Prelude.head . regionInfo
-{- S3 and Glacier use different names for some regions. Ie, "us-east-1"
- - cannot be used with S3, while "US" cannot be used with Glacier. Dunno why.
- - Also, Glacier is not yet available in all regions. -}
+data ServiceRegion = BothRegion Region | S3Region Region | GlacierRegion Region
+
+{- The "US" and "EU" names are used as location constraints when creating a
+ - S3 bucket. -}
regionInfo :: Service -> [(Text, Region)]
regionInfo service = map (\(t, r) -> (t, fromServiceRegion r)) $
filter (matchingService . snd) $
@@ -45,9 +59,7 @@ regionInfo service = map (\(t, r) -> (t, fromServiceRegion r)) $
[ ("US East (N. Virginia)", [S3Region "US", GlacierRegion "us-east-1"])
, ("US West (Oregon)", [BothRegion "us-west-2"])
, ("US West (N. California)", [BothRegion "us-west-1"])
- -- Requires AWS4-HMAC-SHA256 which S3 library does not
- -- currently support.
- -- , ("EU (Frankfurt)", [BothRegion "eu-central-1"])
+ , ("EU (Frankfurt)", [BothRegion "eu-central-1"])
, ("EU (Ireland)", [S3Region "EU", GlacierRegion "eu-west-1"])
, ("Asia Pacific (Singapore)", [S3Region "ap-southeast-1"])
, ("Asia Pacific (Tokyo)", [BothRegion "ap-northeast-1"])
@@ -63,4 +75,14 @@ regionInfo service = map (\(t, r) -> (t, fromServiceRegion r)) $
matchingService (S3Region _) = service == S3
matchingService (GlacierRegion _) = service == Glacier
-data ServiceRegion = BothRegion Region | S3Region Region | GlacierRegion Region
+s3HostName :: Region -> B.ByteString
+s3HostName "US" = "s3.amazonaws.com"
+s3HostName "EU" = "s3-eu-west-1.amazonaws.com"
+s3HostName r = encodeUtf8 $ T.concat ["s3-", r, ".amazonaws.com"]
+
+s3DefaultHost :: String
+s3DefaultHost = "s3.amazonaws.com"
+
+mkLocationConstraint :: Region -> S3.LocationConstraint
+mkLocationConstraint "US" = S3.locationUsClassic
+mkLocationConstraint r = r
diff --git a/Remote/Helper/Http.hs b/Remote/Helper/Http.hs
index f1d576d1c..6ce5bacb8 100644
--- a/Remote/Helper/Http.hs
+++ b/Remote/Helper/Http.hs
@@ -5,13 +5,15 @@
- Licensed under the GNU GPL version 3 or higher.
-}
+{-# LANGUAGE BangPatterns #-}
+
module Remote.Helper.Http where
import Common.Annex
import Types.StoreRetrieve
import Utility.Metered
import Remote.Helper.Special
-import Network.HTTP.Client (RequestBody(..), Response, responseStatus, responseBody, BodyReader)
+import Network.HTTP.Client (RequestBody(..), Response, responseStatus, responseBody, BodyReader, NeedsPopper)
import Network.HTTP.Types
import qualified Data.ByteString.Lazy as L
@@ -24,17 +26,45 @@ import Control.Concurrent
-- Implemented as a fileStorer, so that the content can be streamed
-- from the file in constant space.
httpStorer :: (Key -> RequestBody -> Annex Bool) -> Storer
-httpStorer a = fileStorer $ \k f m -> do
- size <- liftIO $ (fromIntegral . fileSize <$> getFileStatus f :: IO Integer)
- let streamer sink = withMeteredFile f m $ \b -> do
- mvar <- newMVar $ L.toChunks b
- let getnextchunk = modifyMVar mvar $ pure . pop
- sink getnextchunk
- let body = RequestBodyStream (fromInteger size) streamer
- a k body
+httpStorer a = fileStorer $ \k f m -> a k =<< liftIO (httpBodyStorer f m)
+
+-- Reads the file and generates a streaming request body, that will update
+-- the meter as it's sent.
+httpBodyStorer :: FilePath -> MeterUpdate -> IO RequestBody
+httpBodyStorer src m = do
+ size <- fromIntegral . fileSize <$> getFileStatus src :: IO Integer
+ let streamer sink = withMeteredFile src m $ \b -> byteStringPopper b sink
+ return $ RequestBodyStream (fromInteger size) streamer
+
+byteStringPopper :: L.ByteString -> NeedsPopper () -> IO ()
+byteStringPopper b sink = do
+ mvar <- newMVar $ L.toChunks b
+ let getnextchunk = modifyMVar mvar $ \v ->
+ case v of
+ [] -> return ([], S.empty)
+ (c:cs) -> return (cs, c)
+ sink getnextchunk
+
+{- Makes a Popper that streams a given number of chunks of a given
+ - size from the handle, updating the meter as the chunks are read. -}
+handlePopper :: Integer -> Int -> MeterUpdate -> Handle -> NeedsPopper () -> IO ()
+handlePopper numchunks chunksize meterupdate h sink = do
+ mvar <- newMVar zeroBytesProcessed
+ let getnextchunk = do
+ sent <- takeMVar mvar
+ if sent >= target
+ then do
+ putMVar mvar sent
+ return S.empty
+ else do
+ b <- S.hGet h chunksize
+ let !sent' = addBytesProcessed sent chunksize
+ putMVar mvar sent'
+ meterupdate sent'
+ return b
+ sink getnextchunk
where
- pop [] = ([], S.empty)
- pop (c:cs) = (cs, c)
+ target = toBytesProcessed (numchunks * fromIntegral chunksize)
-- Reads the http body and stores it to the specified file, updating the
-- meter as it goes.
diff --git a/Remote/S3.hs b/Remote/S3.hs
index 281aaf6f3..844d87902 100644
--- a/Remote/S3.hs
+++ b/Remote/S3.hs
@@ -1,22 +1,31 @@
{- S3 remotes
-
- - Copyright 2011-2013 Joey Hess <joey@kitenet.net>
+ - Copyright 2011-2014 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
+{-# LANGUAGE TypeFamilies #-}
+{-# LANGUAGE CPP #-}
+
module Remote.S3 (remote, iaHost, configIA, iaItemUrl) where
-import Network.AWS.AWSConnection
-import Network.AWS.S3Object hiding (getStorageClass)
-import Network.AWS.S3Bucket hiding (size)
-import Network.AWS.AWSResult
+import qualified Aws as AWS
+import qualified Aws.Core as AWS
+import qualified Aws.S3 as S3
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import qualified Data.ByteString.Lazy as L
+import qualified Data.ByteString as S
import qualified Data.Map as M
import Data.Char
import Network.Socket (HostName)
+import Network.HTTP.Conduit (Manager, newManager, closeManager)
+import Network.HTTP.Client (defaultManagerSettings, managerResponseTimeout, responseStatus, responseBody, RequestBody(..))
+import Network.HTTP.Types
+import Control.Monad.Trans.Resource
+import Control.Monad.Catch
+import Data.Conduit
import Common.Annex
import Types.Remote
@@ -25,13 +34,15 @@ import qualified Git
import Config
import Config.Cost
import Remote.Helper.Special
+import Remote.Helper.Http
import qualified Remote.Helper.AWS as AWS
import Creds
-import Utility.Metered
import Annex.UUID
import Logs.Web
+import Utility.Metered
+import Utility.DataUnits
-type Bucket = String
+type BucketName = String
remote :: RemoteType
remote = RemoteType {
@@ -42,13 +53,16 @@ remote = RemoteType {
}
gen :: Git.Repo -> UUID -> RemoteConfig -> RemoteGitConfig -> Annex (Maybe Remote)
-gen r u c gc = new <$> remoteCost gc expensiveRemoteCost
+gen r u c gc = do
+ cst <- remoteCost gc expensiveRemoteCost
+ info <- extractS3Info c
+ return $ new cst info
where
- new cst = Just $ specialRemote c
- (prepareStore this)
- (prepareRetrieve this)
- (simplyPrepare $ remove this c)
- (simplyPrepare $ checkKey this)
+ new cst info = Just $ specialRemote c
+ (prepareS3 this info $ store this)
+ (prepareS3 this info retrieve)
+ (prepareS3 this info remove)
+ (prepareS3 this info $ checkKey this)
this
where
this = Remote {
@@ -73,10 +87,11 @@ gen r u c gc = new <$> remoteCost gc expensiveRemoteCost
remotetype = remote,
mkUnavailable = gen r u (M.insert "host" "!dne!" c) gc,
getInfo = includeCredsInfo c (AWS.creds u) $ catMaybes
- [ Just ("bucket", fromMaybe "unknown" (getBucket c))
+ [ Just ("bucket", fromMaybe "unknown" (getBucketName c))
, if configIA c
- then Just ("internet archive item", iaItemUrl $ fromMaybe "unknown" $ getBucket c)
+ then Just ("internet archive item", iaItemUrl $ fromMaybe "unknown" $ getBucketName c)
else Nothing
+ , Just ("partsize", maybe "unlimited" (roughSize storageUnits False) (getPartSize c))
]
}
@@ -92,8 +107,8 @@ s3Setup' u mcreds c = if configIA c then archiveorg else defaulthost
defaults = M.fromList
[ ("datacenter", T.unpack $ AWS.defaultRegion AWS.S3)
, ("storageclass", "STANDARD")
- , ("host", defaultAmazonS3Host)
- , ("port", show defaultAmazonS3Port)
+ , ("host", AWS.s3DefaultHost)
+ , ("port", "80")
, ("bucket", defbucket)
]
@@ -113,52 +128,114 @@ s3Setup' u mcreds c = if configIA c then archiveorg else defaulthost
c' <- setRemoteCredPair noEncryptionUsed c (AWS.creds u) mcreds
-- Ensure user enters a valid bucket name, since
-- this determines the name of the archive.org item.
- let bucket = replace " " "-" $ map toLower $
+ let validbucket = replace " " "-" $ map toLower $
fromMaybe (error "specify bucket=") $
- getBucket c'
+ getBucketName c'
let archiveconfig =
- -- hS3 does not pass through x-archive-* headers
+ -- IA acdepts x-amz-* as an alias for x-archive-*
M.mapKeys (replace "x-archive-" "x-amz-") $
-- encryption does not make sense here
M.insert "encryption" "none" $
- M.insert "bucket" bucket $
+ M.insert "bucket" validbucket $
M.union c' $
-- special constraints on key names
- M.insert "mungekeys" "ia" $
- -- bucket created only when files are uploaded
- M.insert "x-amz-auto-make-bucket" "1" defaults
- writeUUIDFile archiveconfig u
+ M.insert "mungekeys" "ia" defaults
+ info <- extractS3Info archiveconfig
+ withS3Handle archiveconfig u info $
+ writeUUIDFile archiveconfig u
use archiveconfig
-prepareStore :: Remote -> Preparer Storer
-prepareStore r = resourcePrepare (const $ s3Action r False) $ \(conn, bucket) ->
- fileStorer $ \k src p -> do
- ok <- s3Bool =<< liftIO (store (conn, bucket) r k p src)
-
- -- Store public URL to item in Internet Archive.
- when (ok && configIA (config r) && not (isChunkKey k)) $
- setUrlPresent k (iaKeyUrl r k)
-
- return ok
-
-store :: (AWSConnection, Bucket) -> Remote -> Key -> MeterUpdate -> FilePath -> IO (AWSResult ())
-store (conn, bucket) r k p file = do
- size <- (fromIntegral . fileSize <$> getFileStatus file) :: IO Integer
- withMeteredFile file p $ \content -> do
- -- size is provided to S3 so the whole content
- -- does not need to be buffered to calculate it
- let object = S3Object
- bucket (bucketFile r k) ""
- (("Content-Length", show size) : getXheaders (config r))
- content
- sendObject conn $
- setStorageClass (getStorageClass $ config r) object
-
-prepareRetrieve :: Remote -> Preparer Retriever
-prepareRetrieve r = resourcePrepare (const $ s3Action r False) $ \(conn, bucket) ->
- byteRetriever $ \k sink ->
- liftIO (getObject conn $ bucketKey r bucket k)
- >>= either s3Error (sink . obj_data)
+-- Sets up a http connection manager for S3 encdpoint, which allows
+-- http connections to be reused across calls to the helper.
+prepareS3 :: Remote -> S3Info -> (S3Handle -> helper) -> Preparer helper
+prepareS3 r info = resourcePrepare $ const $
+ withS3Handle (config r) (uuid r) info
+
+store :: Remote -> S3Handle -> Storer
+store r h = fileStorer $ \k f p -> do
+ case partSize (hinfo h) of
+ Just partsz | partsz > 0 -> do
+ fsz <- fromIntegral . fileSize <$> liftIO (getFileStatus f)
+ if fsz > partsz
+ then multipartupload fsz partsz k f p
+ else singlepartupload k f p
+ _ -> singlepartupload k f p
+ -- Store public URL to item in Internet Archive.
+ when (isIA (hinfo h) && not (isChunkKey k)) $
+ setUrlPresent k (iaKeyUrl r k)
+ return True
+ where
+ singlepartupload k f p = do
+ rbody <- liftIO $ httpBodyStorer f p
+ void $ sendS3Handle h $ putObject h (bucketObject (hinfo h) k) rbody
+ multipartupload fsz partsz k f p = do
+#if MIN_VERSION_aws(0,10,6)
+ let info = hinfo h
+ let object = bucketObject info k
+
+ let startreq = (S3.postInitiateMultipartUpload (bucket info) object)
+ { S3.imuStorageClass = Just (storageClass info)
+ , S3.imuMetadata = metaHeaders info
+ , S3.imuAutoMakeBucket = isIA info
+ , S3.imuExpires = Nothing -- TODO set some reasonable expiry
+ }
+ uploadid <- S3.imurUploadId <$> sendS3Handle h startreq
+
+ -- The actual part size will be a even multiple of the
+ -- 32k chunk size that hGetUntilMetered uses.
+ let partsz' = (partsz `div` toInteger defaultChunkSize) * toInteger defaultChunkSize
+
+ -- Send parts of the file, taking care to stream each part
+ -- w/o buffering in memory, since the parts can be large.
+ etags <- bracketIO (openBinaryFile f ReadMode) hClose $ \fh -> do
+ let sendparts meter etags partnum = do
+ pos <- liftIO $ hTell fh
+ if pos >= fsz
+ then return (reverse etags)
+ else do
+ -- Calculate size of part that will
+ -- be read.
+ let sz = if fsz - pos < partsz'
+ then fsz - pos
+ else partsz'
+ let p' = offsetMeterUpdate p (toBytesProcessed pos)
+ let numchunks = ceiling (fromIntegral sz / fromIntegral defaultChunkSize :: Double)
+ let popper = handlePopper numchunks defaultChunkSize p' fh
+ let req = S3.uploadPart (bucket info) object partnum uploadid $
+ RequestBodyStream (fromIntegral sz) popper
+ S3.UploadPartResponse _ etag <- sendS3Handle h req
+ sendparts (offsetMeterUpdate meter (toBytesProcessed sz)) (etag:etags) (partnum + 1)
+ sendparts p [] 1
+
+ void $ sendS3Handle h $ S3.postCompleteMultipartUpload
+ (bucket info) object uploadid (zip [1..] etags)
+#else
+ warning $ "Cannot do multipart upload (partsize " ++ show partsz ++ ") of large file (" ++ show fsz ++ "); built with too old a version of the aws library."
+ singlepartupload k f p
+#endif
+
+{- Implemented as a fileRetriever, that uses conduit to stream the chunks
+ - out to the file. Would be better to implement a byteRetriever, but
+ - that is difficult. -}
+retrieve :: S3Handle -> Retriever
+retrieve h = fileRetriever $ \f k p -> liftIO $ runResourceT $ do
+ (fr, fh) <- allocate (openFile f WriteMode) hClose
+ let req = S3.getObject (bucket info) (bucketObject info k)
+ S3.GetObjectResponse { S3.gorResponse = rsp } <- sendS3Handle' h req
+ responseBody rsp $$+- sinkprogressfile fh p zeroBytesProcessed
+ release fr
+ where
+ info = hinfo h
+ sinkprogressfile fh meterupdate sofar = do
+ mbs <- await
+ case mbs of
+ Nothing -> return ()
+ Just bs -> do
+ let sofar' = addBytesProcessed sofar (S.length bs)
+ liftIO $ do
+ void $ meterupdate sofar'
+ S.hPut fh bs
+ sinkprogressfile fh meterupdate sofar'
retrieveCheap :: Key -> FilePath -> Annex Bool
retrieveCheap _ _ = return False
@@ -166,186 +243,263 @@ retrieveCheap _ _ = return False
{- Internet Archive doesn't easily allow removing content.
- While it may remove the file, there are generally other files
- derived from it that it does not remove. -}
-remove :: Remote -> RemoteConfig -> Remover
-remove r c k
- | configIA c = do
+remove :: S3Handle -> Remover
+remove h k
+ | isIA info = do
warning "Cannot remove content from the Internet Archive"
return False
- | otherwise = remove' r k
-
-remove' :: Remote -> Key -> Annex Bool
-remove' r k = s3Action r False $ \(conn, bucket) ->
- s3Bool =<< liftIO (deleteObject conn $ bucketKey r bucket k)
-
-checkKey :: Remote -> CheckPresent
-checkKey r k = s3Action r noconn $ \(conn, bucket) -> do
- showAction $ "checking " ++ name r
- res <- liftIO $ getObjectInfo conn $ bucketKey r bucket k
- case res of
- Right _ -> return True
- Left (AWSError _ _) -> return False
- Left e -> s3Error e
+ | otherwise = do
+ res <- tryNonAsync $ sendS3Handle h $
+ S3.DeleteObject (bucketObject info k) (bucket info)
+ return $ either (const False) (const True) res
where
- noconn = error "S3 not configured"
-
-s3Warning :: ReqError -> Annex Bool
-s3Warning e = do
- warning $ prettyReqError e
- return False
-
-s3Error :: ReqError -> a
-s3Error e = error $ prettyReqError e
-
-s3Bool :: AWSResult () -> Annex Bool
-s3Bool (Right _) = return True
-s3Bool (Left e) = s3Warning e
-
-s3Action :: Remote -> a -> ((AWSConnection, Bucket) -> Annex a) -> Annex a
-s3Action r noconn action = do
- let bucket = M.lookup "bucket" $ config r
- conn <- s3Connection (config r) (uuid r)
- case (bucket, conn) of
- (Just b, Just c) -> action (c, b)
- _ -> return noconn
-
-bucketFile :: Remote -> Key -> FilePath
-bucketFile r = munge . key2file
- where
- munge s = case M.lookup "mungekeys" c of
- Just "ia" -> iaMunge $ filePrefix c ++ s
- _ -> filePrefix c ++ s
- c = config r
-
-filePrefix :: RemoteConfig -> String
-filePrefix = M.findWithDefault "" "fileprefix"
-
-bucketKey :: Remote -> Bucket -> Key -> S3Object
-bucketKey r bucket k = S3Object bucket (bucketFile r k) "" [] L.empty
+ info = hinfo h
-{- Internet Archive limits filenames to a subset of ascii,
- - with no whitespace. Other characters are xml entity
- - encoded. -}
-iaMunge :: String -> String
-iaMunge = (>>= munge)
+checkKey :: Remote -> S3Handle -> CheckPresent
+checkKey r h k = do
+ showAction $ "checking " ++ name r
+#if MIN_VERSION_aws(0,10,0)
+ rsp <- go
+ return (isJust $ S3.horMetadata rsp)
+#else
+ catchMissingException $ do
+ void go
+ return True
+#endif
where
- munge c
- | isAsciiUpper c || isAsciiLower c || isNumber c = [c]
- | c `elem` "_-.\"" = [c]
- | isSpace c = []
- | otherwise = "&" ++ show (ord c) ++ ";"
+ go = sendS3Handle h $
+ S3.headObject (bucket (hinfo h)) (bucketObject (hinfo h) k)
+
+#if ! MIN_VERSION_aws(0,10,0)
+ {- Catch exception headObject returns when an object is not present
+ - in the bucket, and returns False. All other exceptions indicate a
+ - check error and are let through. -}
+ catchMissingException :: Annex Bool -> Annex Bool
+ catchMissingException a = catchJust missing a (const $ return False)
+ where
+ missing :: AWS.HeaderException -> Maybe ()
+ missing e
+ | AWS.headerErrorMessage e == "ETag missing" = Just ()
+ | otherwise = Nothing
+#endif
{- Generate the bucket if it does not already exist, including creating the
- UUID file within the bucket.
-
- - To check if the bucket exists, ask for its location. However, some ACLs
- - can allow read/write to buckets, but not querying location, so first
- - check if the UUID file already exists and we can skip doing anything.
+ - Some ACLs can allow read/write to buckets, but not querying them,
+ - so first check if the UUID file already exists and we can skip doing
+ - anything.
-}
genBucket :: RemoteConfig -> UUID -> Annex ()
genBucket c u = do
- conn <- s3ConnectionRequired c u
showAction "checking bucket"
- unlessM ((== Right True) <$> checkUUIDFile c u conn) $ do
- loc <- liftIO $ getBucketLocation conn bucket
- case loc of
- Right _ -> writeUUIDFile c u
- Left err@(NetworkError _) -> s3Error err
- Left (AWSError _ _) -> do
- showAction $ "creating bucket in " ++ datacenter
- res <- liftIO $ createBucketIn conn bucket datacenter
- case res of
- Right _ -> writeUUIDFile c u
- Left err -> s3Error err
+ info <- extractS3Info c
+ withS3Handle c u info $ \h ->
+ go h =<< checkUUIDFile c u h
where
- bucket = fromJust $ getBucket c
+ go _ (Right True) = noop
+ go h _ = do
+ v <- tryNonAsync $ sendS3Handle h (S3.getBucket $ bucket $ hinfo h)
+ case v of
+ Right _ -> noop
+ Left _ -> do
+ showAction $ "creating bucket in " ++ datacenter
+ void $ sendS3Handle h $
+ S3.PutBucket (bucket $ hinfo h) Nothing $
+ AWS.mkLocationConstraint $
+ T.pack datacenter
+ writeUUIDFile c u h
+
datacenter = fromJust $ M.lookup "datacenter" c
{- Writes the UUID to an annex-uuid file within the bucket.
-
- If the file already exists in the bucket, it must match.
-
- - Note that IA items do not get created by createBucketIn.
- - Rather, they are created the first time a file is stored in them.
- - So this also takes care of that.
+ - Note that IA buckets can only created by having a file
+ - stored in them. So this also takes care of that.
-}
-writeUUIDFile :: RemoteConfig -> UUID -> Annex ()
-writeUUIDFile c u = do
- conn <- s3ConnectionRequired c u
- v <- checkUUIDFile c u conn
+writeUUIDFile :: RemoteConfig -> UUID -> S3Handle -> Annex ()
+writeUUIDFile c u h = do
+ v <- checkUUIDFile c u h
case v of
- Left e -> error e
- Right True -> return ()
- Right False -> do
- let object = setStorageClass (getStorageClass c) (mkobject uuidb)
- either s3Error return =<< liftIO (sendObject conn object)
+ Right True -> noop
+ _ -> void $ sendS3Handle h mkobject
where
- file = uuidFile c
+ file = T.pack $ uuidFile c
uuidb = L.fromChunks [T.encodeUtf8 $ T.pack $ fromUUID u]
- bucket = fromJust $ getBucket c
- mkobject = S3Object bucket file "" (getXheaders c)
+ mkobject = putObject h file (RequestBodyLBS uuidb)
-{- Checks if the UUID file exists in the bucket and has the specified UUID already. -}
-checkUUIDFile :: RemoteConfig -> UUID -> AWSConnection -> Annex (Either String Bool)
-checkUUIDFile c u conn = check <$> liftIO (tryNonAsync $ getObject conn $ mkobject L.empty)
+{- Checks if the UUID file exists in the bucket
+ - and has the specified UUID already. -}
+checkUUIDFile :: RemoteConfig -> UUID -> S3Handle -> Annex (Either SomeException Bool)
+checkUUIDFile c u h = tryNonAsync $ check <$> get
where
- check (Right (Right o))
- | obj_data o == uuidb = Right True
- | otherwise = Left $ "This bucket is already in use by a different S3 special remote, with UUID: " ++ show (obj_data o)
- check _ = Right False
-
+ get = liftIO
+ . runResourceT
+ . either (pure . Left) (Right <$$> AWS.loadToMemory)
+ =<< tryS3 (sendS3Handle h (S3.getObject (bucket (hinfo h)) file))
+ check (Right (S3.GetObjectMemoryResponse _meta rsp)) =
+ responseStatus rsp == ok200 && responseBody rsp == uuidb
+ check (Left _S3Error) = False
+
+ file = T.pack $ uuidFile c
uuidb = L.fromChunks [T.encodeUtf8 $ T.pack $ fromUUID u]
- bucket = fromJust $ getBucket c
- file = uuidFile c
-
- mkobject = S3Object bucket file "" (getXheaders c)
uuidFile :: RemoteConfig -> FilePath
-uuidFile c = filePrefix c ++ "annex-uuid"
-
-s3ConnectionRequired :: RemoteConfig -> UUID -> Annex AWSConnection
-s3ConnectionRequired c u =
- maybe (error "Cannot connect to S3") return =<< s3Connection c u
-
-s3Connection :: RemoteConfig -> UUID -> Annex (Maybe AWSConnection)
-s3Connection c u = go =<< getRemoteCredPairFor "S3" c (AWS.creds u)
+uuidFile c = getFilePrefix c ++ "annex-uuid"
+
+putObject :: S3Handle -> T.Text -> RequestBody -> S3.PutObject
+putObject h file rbody = (S3.putObject (bucket info) file rbody)
+ { S3.poStorageClass = Just (storageClass info)
+ , S3.poMetadata = metaHeaders info
+ , S3.poAutoMakeBucket = isIA info
+ }
+ where
+ info = hinfo h
+
+data S3Handle = S3Handle
+ { hmanager :: Manager
+ , hawscfg :: AWS.Configuration
+ , hs3cfg :: S3.S3Configuration AWS.NormalQuery
+ , hinfo :: S3Info
+ }
+
+{- Sends a request to S3 and gets back the response.
+ -
+ - Note that pureAws's use of ResourceT is bypassed here;
+ - the response should be fully processed while the S3Handle
+ - is still open, eg within a call to withS3Handle.
+ -}
+sendS3Handle
+ :: (AWS.Transaction req res, AWS.ServiceConfiguration req ~ S3.S3Configuration)
+ => S3Handle
+ -> req
+ -> Annex res
+sendS3Handle h r = liftIO $ runResourceT $ sendS3Handle' h r
+
+sendS3Handle'
+ :: (AWS.Transaction r a, AWS.ServiceConfiguration r ~ S3.S3Configuration)
+ => S3Handle
+ -> r
+ -> ResourceT IO a
+sendS3Handle' h = AWS.pureAws (hawscfg h) (hs3cfg h) (hmanager h)
+
+withS3Handle :: RemoteConfig -> UUID -> S3Info -> (S3Handle -> Annex a) -> Annex a
+withS3Handle c u info a = do
+ creds <- getRemoteCredPairFor "S3" c (AWS.creds u)
+ awscreds <- liftIO $ AWS.genCredentials $ fromMaybe nocreds creds
+ let awscfg = AWS.Configuration AWS.Timestamp awscreds (AWS.defaultLog AWS.Error)
+ bracketIO (newManager httpcfg) closeManager $ \mgr ->
+ a $ S3Handle mgr awscfg s3cfg info
where
- go Nothing = return Nothing
- go (Just (ak, sk)) = return $ Just $ AWSConnection host port ak sk
+ s3cfg = s3Configuration c
+ httpcfg = defaultManagerSettings
+ { managerResponseTimeout = Nothing }
+ nocreds = error "Cannot use S3 without credentials configured"
+s3Configuration :: RemoteConfig -> S3.S3Configuration AWS.NormalQuery
+s3Configuration c = (S3.s3 proto endpoint False) { S3.s3Port = port }
+ where
+ proto
+ | port == 443 = AWS.HTTPS
+ | otherwise = AWS.HTTP
host = fromJust $ M.lookup "host" c
+ datacenter = fromJust $ M.lookup "datacenter" c
+ -- When the default S3 host is configured, connect directly to
+ -- the S3 endpoint for the configured datacenter.
+ -- When another host is configured, it's used as-is.
+ endpoint
+ | host == AWS.s3DefaultHost = AWS.s3HostName $ T.pack datacenter
+ | otherwise = T.encodeUtf8 $ T.pack host
port = let s = fromJust $ M.lookup "port" c in
case reads s of
[(p, _)] -> p
_ -> error $ "bad S3 port value: " ++ s
-getBucket :: RemoteConfig -> Maybe Bucket
-getBucket = M.lookup "bucket"
+tryS3 :: Annex a -> Annex (Either S3.S3Error a)
+tryS3 a = (Right <$> a) `catch` (pure . Left)
+
+data S3Info = S3Info
+ { bucket :: S3.Bucket
+ , storageClass :: S3.StorageClass
+ , bucketObject :: Key -> T.Text
+ , metaHeaders :: [(T.Text, T.Text)]
+ , partSize :: Maybe Integer
+ , isIA :: Bool
+ }
+
+extractS3Info :: RemoteConfig -> Annex S3Info
+extractS3Info c = do
+ b <- maybe
+ (error "S3 bucket not configured")
+ (return . T.pack)
+ (getBucketName c)
+ return $ S3Info
+ { bucket = b
+ , storageClass = getStorageClass c
+ , bucketObject = T.pack . getBucketObject c
+ , metaHeaders = getMetaHeaders c
+ , partSize = getPartSize c
+ , isIA = configIA c
+ }
+
+getBucketName :: RemoteConfig -> Maybe BucketName
+getBucketName = M.lookup "bucket"
-getStorageClass :: RemoteConfig -> StorageClass
-getStorageClass c = case fromJust $ M.lookup "storageclass" c of
- "REDUCED_REDUNDANCY" -> REDUCED_REDUNDANCY
- _ -> STANDARD
-
-getXheaders :: RemoteConfig -> [(String, String)]
-getXheaders = filter isxheader . M.assocs
+getStorageClass :: RemoteConfig -> S3.StorageClass
+getStorageClass c = case M.lookup "storageclass" c of
+ Just "REDUCED_REDUNDANCY" -> S3.ReducedRedundancy
+ _ -> S3.Standard
+
+getPartSize :: RemoteConfig -> Maybe Integer
+getPartSize c = readSize dataUnits =<< M.lookup "partsize" c
+
+getMetaHeaders :: RemoteConfig -> [(T.Text, T.Text)]
+getMetaHeaders = map munge . filter ismetaheader . M.assocs
+ where
+ ismetaheader (h, _) = metaprefix `isPrefixOf` h
+ metaprefix = "x-amz-meta-"
+ metaprefixlen = length metaprefix
+ munge (k, v) = (T.pack $ drop metaprefixlen k, T.pack v)
+
+getFilePrefix :: RemoteConfig -> String
+getFilePrefix = M.findWithDefault "" "fileprefix"
+
+getBucketObject :: RemoteConfig -> Key -> FilePath
+getBucketObject c = munge . key2file
+ where
+ munge s = case M.lookup "mungekeys" c of
+ Just "ia" -> iaMunge $ getFilePrefix c ++ s
+ _ -> getFilePrefix c ++ s
+
+{- Internet Archive limits filenames to a subset of ascii,
+ - with no whitespace. Other characters are xml entity
+ - encoded. -}
+iaMunge :: String -> String
+iaMunge = (>>= munge)
where
- isxheader (h, _) = "x-amz-" `isPrefixOf` h
+ munge c
+ | isAsciiUpper c || isAsciiLower c || isNumber c = [c]
+ | c `elem` "_-.\"" = [c]
+ | isSpace c = []
+ | otherwise = "&" ++ show (ord c) ++ ";"
+
+configIA :: RemoteConfig -> Bool
+configIA = maybe False isIAHost . M.lookup "host"
{- Hostname to use for archive.org S3. -}
iaHost :: HostName
iaHost = "s3.us.archive.org"
-configIA :: RemoteConfig -> Bool
-configIA c = maybe False isIAHost (M.lookup "host" c)
-
isIAHost :: HostName -> Bool
isIAHost h = ".archive.org" `isSuffixOf` map toLower h
-iaItemUrl :: Bucket -> URLString
-iaItemUrl bucket = "http://archive.org/details/" ++ bucket
+iaItemUrl :: BucketName -> URLString
+iaItemUrl b = "http://archive.org/details/" ++ b
iaKeyUrl :: Remote -> Key -> URLString
-iaKeyUrl r k = "http://archive.org/download/" ++ bucket ++ "/" ++ bucketFile r k
+iaKeyUrl r k = "http://archive.org/download/" ++ b ++ "/" ++ getBucketObject (config r) k
where
- bucket = fromMaybe "" $ getBucket $ config r
+ b = fromMaybe "" $ getBucketName $ config r
diff --git a/debian/changelog b/debian/changelog
index a747ea277..9bba1ee56 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -12,6 +12,12 @@ git-annex (5.20141126) UNRELEASED; urgency=medium
that left a typechange staged in index due to some infelicity of git's
handling of partial commits.
* Work around behavior change in lsof 4.88's -F output format.
+ * S3: Switched to using the haskell aws library.
+ * S3: No longer buffers entire files in memory when uploading without
+ chunking.
+ * S3: When built with a new enough version of the haskell aws library,
+ supports doing multipart uploads, in order to store extremely large
+ files in S3 when not using chunking.
* Don't show "(gpg)" when decrypting the remote encryption cipher,
since this could be taken to read that's the only time git-annex
runs gpg, which is not the case.
diff --git a/debian/control b/debian/control
index 4270f2b28..aeb95bf3e 100644
--- a/debian/control
+++ b/debian/control
@@ -14,7 +14,9 @@ Build-Depends:
libghc-cryptohash-dev,
libghc-dataenc-dev,
libghc-utf8-string-dev,
- libghc-hs3-dev (>= 0.5.6),
+ libghc-aws-dev (>= 0.9.2),
+ libghc-conduit-dev,
+ libghc-resourcet-dev,
libghc-dav-dev (>= 1.0) [amd64 i386 kfreebsd-amd64 kfreebsd-i386 powerpc],
libghc-quickcheck2-dev,
libghc-monad-control-dev (>= 0.3),
diff --git a/debian/rules b/debian/rules
index 7c8f8a560..22be48195 100755
--- a/debian/rules
+++ b/debian/rules
@@ -8,6 +8,10 @@ export RELEASE_BUILD=1
%:
dh $@
+# Debian currently has a patched aws 0.9.2, rather than the newer 0.10.2.
+override_dh_auto_configure:
+ debian/cabal-wrapper configure -fPatchedAWS
+
# Not intended for use by anyone except the author.
announcedir:
@echo ${HOME}/src/git-annex/doc/news
diff --git a/doc/bugs/S3_memory_leaks.mdwn b/doc/bugs/S3_memory_leaks.mdwn
index 94bbdc398..7dc1e5757 100644
--- a/doc/bugs/S3_memory_leaks.mdwn
+++ b/doc/bugs/S3_memory_leaks.mdwn
@@ -2,9 +2,13 @@ S3 has memory leaks
Sending a file to S3 causes a slow memory increase toward the file size.
+> This is fixed, now that it uses aws. --[[Joey]]
+
Copying the file back from S3 causes a slow memory increase toward the
file size.
+> [[fixed|done]] too! --[[Joey]]
+
The author of hS3 is aware of the problem, and working on it. I think I
have identified the root cause of the buffering; it's done by hS3 so it can
resend the data if S3 sends it a 307 redirect. --[[Joey]]
diff --git a/doc/bugs/S3_upload_not_using_multipart.mdwn b/doc/bugs/S3_upload_not_using_multipart.mdwn
index 5e5d97c6a..cd40e9d2b 100644
--- a/doc/bugs/S3_upload_not_using_multipart.mdwn
+++ b/doc/bugs/S3_upload_not_using_multipart.mdwn
@@ -52,3 +52,11 @@ Please provide any additional information below.
upgrade supported from repository versions: 0 1 2
[[!tag confirmed]]
+
+> [[fixed|done]] This is now supported, when git-annex is built with a new
+> enough version of the aws library. You need to configure the remote to
+> use an appropriate value for multipart, eg:
+>
+> git annex enableremote cloud multipart=1GiB
+>
+> --[[Joey]]
diff --git a/doc/bugs/new_AWS_region___40__eu-central-1__41__.mdwn b/doc/bugs/new_AWS_region___40__eu-central-1__41__.mdwn
index 80f89b243..177f7e138 100644
--- a/doc/bugs/new_AWS_region___40__eu-central-1__41__.mdwn
+++ b/doc/bugs/new_AWS_region___40__eu-central-1__41__.mdwn
@@ -6,3 +6,5 @@ Amazon has opened up a new region in AWS with a datacenter in Frankfurt/Germany.
* Region: eu-central-1
This should be added to the "Adding an Amazon S3 repository" page in the Datacenter dropdown of the webapp.
+
+> [[fixed|done]] --[[Joey]]
diff --git a/doc/special_remotes/S3.mdwn b/doc/special_remotes/S3.mdwn
index fe46948b3..5d161c3b8 100644
--- a/doc/special_remotes/S3.mdwn
+++ b/doc/special_remotes/S3.mdwn
@@ -18,11 +18,11 @@ the S3 remote.
* `encryption` - One of "none", "hybrid", "shared", or "pubkey".
See [[encryption]].
+* `keyid` - Specifies the gpg key to use for [[encryption]].
+
* `chunk` - Enables [[chunking]] when storing large files.
`chunk=1MiB` is a good starting point for chunking.
-* `keyid` - Specifies the gpg key to use for [[encryption]].
-
* `embedcreds` - Optional. Set to "yes" embed the login credentials inside
the git repository, which allows other clones to also access them. This is
the default when gpg encryption is enabled; the credentials are stored
@@ -33,7 +33,8 @@ the S3 remote.
embedcreds without gpg encryption.
* `datacenter` - Defaults to "US". Other values include "EU",
- "us-west-1", and "ap-southeast-1".
+ "us-west-1", "us-west-2", "ap-southeast-1", "ap-southeast-2", and
+ "sa-east-1".
* `storageclass` - Default is "STANDARD". If you have configured git-annex
to preserve multiple [[copies]], consider setting this to "REDUCED_REDUNDANCY"
@@ -46,11 +47,24 @@ the S3 remote.
so by default, a bucket name is chosen based on the remote name
and UUID. This can be specified to pick a bucket name.
+* `partsize` - Amazon S3 only accepts uploads up to a certian file size,
+ and storing larger files requires a multipart upload process.
+
+ Setting `partsize=1GiB` is recommended for Amazon S3 when not using
+ chunking; this will cause multipart uploads to be done using parts
+ up to 1GiB in size. Note that setting partsize to less than 100MiB
+ will cause Amazon S3 to reject uploads.
+
+ This is not enabled by default, since other S3 implementations may
+ not support multipart uploads or have different limits,
+ but can be enabled or changed at any time.
+ time.
+
* `fileprefix` - By default, git-annex places files in a tree rooted at the
top of the S3 bucket. When this is set, it's prefixed to the filenames
used. For example, you could set it to "foo/" in one special remote,
and to "bar/" in another special remote, and both special remotes could
then use the same bucket.
-* `x-amz-*` are passed through as http headers when storing keys
+* `x-amz-meta-*` are passed through as http headers when storing keys
in S3.
diff --git a/doc/todo/S3_multipart_interruption_cleanup.mdwn b/doc/todo/S3_multipart_interruption_cleanup.mdwn
new file mode 100644
index 000000000..adb5fd2cb
--- /dev/null
+++ b/doc/todo/S3_multipart_interruption_cleanup.mdwn
@@ -0,0 +1,14 @@
+When a multipart S3 upload is being made, and gets interrupted,
+the parts remain in the bucket, and S3 may charge for them.
+
+I am not sure what happens if the same object gets uploaded again. Is S3
+nice enough to remove the old parts? I need to find out..
+
+If not, this needs to be dealt with somehow. One way would be to configure an
+expiry of the uploaded parts, but this is tricky as a huge upload could
+take arbitrarily long. Another way would be to record the uploadid and the
+etags of the parts, and then resume where it left off the next time the
+object is sent to S3. (Or at least cancel the old upload; resume isn't
+practical when uploading an encrypted object.)
+
+It could store that info in either the local FS or the git-annex branch.
diff --git a/git-annex.cabal b/git-annex.cabal
index a9472897c..cc5b6c884 100644
--- a/git-annex.cabal
+++ b/git-annex.cabal
@@ -34,6 +34,10 @@ Description:
Flag S3
Description: Enable S3 support
+Flag PatchedAWS
+ Description: Building on system, like Debian, with old AWS patched to support git-annex
+ Default: False
+
Flag WebDAV
Description: Enable WebDAV support
@@ -151,7 +155,11 @@ Executable git-annex
CPP-Options: -DWITH_CRYPTOHASH
if flag(S3)
- Build-Depends: hS3
+ Build-Depends: conduit, resourcet, conduit-extra
+ if flag(PatchedAWS)
+ Build-Depends: aws (>= 0.9.2)
+ else
+ Build-Depends: aws (>= 0.10.4)
CPP-Options: -DWITH_S3
if flag(WebDAV)