summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Joey Hess <joey@kitenet.net>2014-08-09 14:23:54 -0400
committerGravatar Joey Hess <joey@kitenet.net>2014-08-09 14:26:19 -0400
commitbd5de3ff4b50f6e90f6bb638d32fb60472ca8e47 (patch)
tree1189c0204ed676fdca09d88876eac3bf6a692df6
parentd03284c66e5f356421da06a17bf0e71d7b205157 (diff)
S3: convert to aws for store, remove, checkPresent
Fixes the memory leak on store.. the second oldest open git-annex bug! Only retrieve remains to be converted. This commit was sponsored by Scott Robinson.
-rw-r--r--Remote/Helper/Http.hs14
-rw-r--r--Remote/S3.hs120
2 files changed, 69 insertions, 65 deletions
diff --git a/Remote/Helper/Http.hs b/Remote/Helper/Http.hs
index f1d576d1c..4088854ff 100644
--- a/Remote/Helper/Http.hs
+++ b/Remote/Helper/Http.hs
@@ -24,14 +24,18 @@ import Control.Concurrent
-- Implemented as a fileStorer, so that the content can be streamed
-- from the file in constant space.
httpStorer :: (Key -> RequestBody -> Annex Bool) -> Storer
-httpStorer a = fileStorer $ \k f m -> do
- size <- liftIO $ (fromIntegral . fileSize <$> getFileStatus f :: IO Integer)
- let streamer sink = withMeteredFile f m $ \b -> do
+httpStorer a = fileStorer $ \k f m -> a k =<< liftIO (httpBodyStorer f m)
+
+-- Reads the file and generates a streaming request body, that will update
+-- the meter as it's sent.
+httpBodyStorer :: FilePath -> MeterUpdate -> IO RequestBody
+httpBodyStorer src m = do
+ size <- fromIntegral . fileSize <$> getFileStatus src :: IO Integer
+ let streamer sink = withMeteredFile src m $ \b -> do
mvar <- newMVar $ L.toChunks b
let getnextchunk = modifyMVar mvar $ pure . pop
sink getnextchunk
- let body = RequestBodyStream (fromInteger size) streamer
- a k body
+ return $ RequestBodyStream (fromInteger size) streamer
where
pop [] = ([], S.empty)
pop (c:cs) = (cs, c)
diff --git a/Remote/S3.hs b/Remote/S3.hs
index 4f15f988e..58b408cd2 100644
--- a/Remote/S3.hs
+++ b/Remote/S3.hs
@@ -34,9 +34,9 @@ import qualified Git
import Config
import Config.Cost
import Remote.Helper.Special
+import Remote.Helper.Http
import qualified Remote.Helper.AWS as AWS
import Creds
-import Utility.Metered
import Annex.UUID
import Logs.Web
@@ -54,10 +54,10 @@ gen :: Git.Repo -> UUID -> RemoteConfig -> RemoteGitConfig -> Annex (Maybe Remot
gen r u c gc = new <$> remoteCost gc expensiveRemoteCost
where
new cst = Just $ specialRemote c
- (prepareStore this)
+ (prepareS3 this $ store this)
(prepareRetrieve this)
- (simplyPrepare $ remove this c)
- (simplyPrepare $ checkKey this)
+ (prepareS3 this $ remove this)
+ (prepareS3 this $ checkKey this)
this
where
this = Remote {
@@ -132,32 +132,22 @@ s3Setup' u c = if isIA c then archiveorg else defaulthost
writeUUIDFile archiveconfig u
use archiveconfig
-prepareStore :: Remote -> Preparer Storer
-prepareStore r = resourcePrepare (const $ s3Action r False) $ \(conn, bucket) ->
- fileStorer $ \k src p -> do
- ok <- s3Bool =<< liftIO (store (conn, bucket) r k p src)
+-- Sets up a http connection manager for S3 encdpoint, which allows
+-- http connections to be reused across calls to the helper.
+prepareS3 :: Remote -> (S3Handle -> helper) -> Preparer helper
+prepareS3 r = resourcePrepare $ const $ withS3Handle (config r) (uuid r)
- -- Store public URL to item in Internet Archive.
- when (ok && isIA (config r) && not (isChunkKey k)) $
- setUrlPresent k (iaKeyUrl r k)
-
- return ok
-
-store :: (AWSConnection, BucketName) -> Remote -> Key -> MeterUpdate -> FilePath -> IO (AWSResult ())
-store (conn, bucket) r k p file = do
- error "TODO"
- {-
- size <- (fromIntegral . fileSize <$> getFileStatus file) :: IO Integer
- withMeteredFile file p $ \content -> do
- -- size is provided to S3 so the whole content
- -- does not need to be buffered to calculate it
- let object = S3Object
- bucket (bucketFile r k) ""
- (("Content-Length", show size) : getXheaders (config r))
- content
- sendObject conn $
- setStorageClass (getStorageClass $ config r) object
- -}
+store :: Remote -> S3Handle -> Storer
+store r h = fileStorer $ \k f p -> do
+ rbody <- liftIO $ httpBodyStorer f p
+ void $ sendS3Handle h $
+ S3.putObject (hBucket h) (hBucketObject h k) rbody
+
+ -- Store public URL to item in Internet Archive.
+ when (hIsIA h && not (isChunkKey k)) $
+ setUrlPresent k (iaKeyUrl r k)
+
+ return True
prepareRetrieve :: Remote -> Preparer Retriever
prepareRetrieve r = resourcePrepare (const $ s3Action r False) $ \(conn, bucket) ->
@@ -174,31 +164,37 @@ retrieveCheap _ _ = return False
{- Internet Archive doesn't easily allow removing content.
- While it may remove the file, there are generally other files
- derived from it that it does not remove. -}
-remove :: Remote -> RemoteConfig -> Remover
-remove r c k
- | isIA c = do
+remove :: Remote -> S3Handle -> Remover
+remove r h k
+ | hIsIA h = do
warning "Cannot remove content from the Internet Archive"
return False
- | otherwise = remove' r k
+ | otherwise = do
+ res <- tryNonAsync $ sendS3Handle h $
+ S3.DeleteObject (hBucketObject h k) (hBucket h)
+ return $ either (const False) (const True) res
-remove' :: Remote -> Key -> Annex Bool
-remove' r k = s3Action r False $ \(conn, bucket) ->
- s3Bool =<< liftIO (deleteObject conn $ bucketKey r bucket k)
-
-checkKey :: Remote -> CheckPresent
-checkKey r k = s3Action r noconn $ \(conn, bucket) -> do
+checkKey :: Remote -> S3Handle -> CheckPresent
+checkKey r h k = do
showAction $ "checking " ++ name r
- {-
- res <- liftIO $ getObjectInfo conn $ bucketKey r bucket k
- case res of
- Right _ -> return True
- Left (AWSError _ _) -> return False
- Left e -> s3Error e
- -}
- error "TODO"
+ catchMissingException $ do
+ void $ sendS3Handle h $
+ S3.headObject (hBucket h) (hBucketObject h k)
+ return True
+
+{- Catch exception headObject returns when an object is not present
+ - in the bucket, and returns False. All other exceptions indicate a
+ - check error and are let through. -}
+catchMissingException :: Annex Bool -> Annex Bool
+catchMissingException a = catchJust missing a (const $ return False)
where
- noconn = error "S3 not configured"
-
+ -- This is not very good; see
+ -- https://github.com/aristidb/aws/issues/121
+ missing :: AWS.HeaderException -> Maybe ()
+ missing e
+ | AWS.headerErrorMessage e == "ETag missing" = Just ()
+ | otherwise = Nothing
+
s3Warning :: ReqError -> Annex Bool
s3Warning e = do
warning $ prettyReqError e
@@ -216,16 +212,8 @@ s3Action r noconn action = do
(Just b, Just c) -> action (c, b)
_ -> return noconn
-bucketFile :: Remote -> Key -> FilePath
-bucketFile r = munge . key2file
- where
- munge s = case M.lookup "mungekeys" c of
- Just "ia" -> iaMunge $ getFilePrefix c ++ s
- _ -> getFilePrefix c ++ s
- c = config r
-
bucketKey :: Remote -> BucketName -> Key -> S3Object
-bucketKey r bucket k = S3Object bucket (bucketFile r k) "" [] L.empty
+bucketKey r bucket k = S3Object bucket (bucketObject (config r) k) "" [] L.empty
{- Generate the bucket if it does not already exist, including creating the
- UUID file within the bucket.
@@ -313,8 +301,12 @@ data S3Handle = S3Handle
{ hmanager :: Manager
, hawscfg :: AWS.Configuration
, hs3cfg :: S3.S3Configuration AWS.NormalQuery
+
+ -- Cached values.
, hBucket :: S3.Bucket
, hStorageClass :: S3.StorageClass
+ , hBucketObject :: Key -> S3.Bucket
+ , hIsIA :: Bool
}
{- Sends a request to S3 and gets back the response.
@@ -339,12 +331,13 @@ withS3Handle c u a = do
bucket <- maybe nobucket (return . T.pack) (getBucketName c)
let awscfg = AWS.Configuration AWS.Timestamp awscreds (AWS.defaultLog AWS.Error)
bracketIO (newManager httpcfg) closeManager $ \mgr ->
- a $ S3Handle mgr awscfg s3cfg bucket sc
+ a $ S3Handle mgr awscfg s3cfg bucket sc bo (isIA c)
where
s3cfg = s3Configuration c
httpcfg = defaultManagerSettings
{ managerResponseTimeout = Nothing }
sc = getStorageClass c
+ bo = T.pack . bucketObject c -- memoized
nocreds = error "Cannot use S3 without credentials configured"
nobucket = error "S3 bucket not configured"
@@ -390,6 +383,13 @@ getXheaders = filter isxheader . M.assocs
getFilePrefix :: RemoteConfig -> String
getFilePrefix = M.findWithDefault "" "fileprefix"
+bucketObject :: RemoteConfig -> Key -> FilePath
+bucketObject c = munge . key2file
+ where
+ munge s = case M.lookup "mungekeys" c of
+ Just "ia" -> iaMunge $ getFilePrefix c ++ s
+ _ -> getFilePrefix c ++ s
+
{- Internet Archive limits filenames to a subset of ascii,
- with no whitespace. Other characters are xml entity
- encoded. -}
@@ -416,6 +416,6 @@ iaItemUrl :: BucketName -> URLString
iaItemUrl bucket = "http://archive.org/details/" ++ bucket
iaKeyUrl :: Remote -> Key -> URLString
-iaKeyUrl r k = "http://archive.org/download/" ++ bucket ++ "/" ++ bucketFile r k
+iaKeyUrl r k = "http://archive.org/download/" ++ bucket ++ "/" ++ bucketObject (config r) k
where
bucket = fromMaybe "" $ getBucketName $ config r