diff options
author | Joey Hess <joey@kitenet.net> | 2014-08-09 14:23:54 -0400 |
---|---|---|
committer | Joey Hess <joey@kitenet.net> | 2014-08-09 14:26:19 -0400 |
commit | bd5de3ff4b50f6e90f6bb638d32fb60472ca8e47 (patch) | |
tree | 1189c0204ed676fdca09d88876eac3bf6a692df6 | |
parent | d03284c66e5f356421da06a17bf0e71d7b205157 (diff) |
S3: convert to aws for store, remove, checkPresent
Fixes the memory leak on store.. the second oldest open git-annex bug!
Only retrieve remains to be converted.
This commit was sponsored by Scott Robinson.
-rw-r--r-- | Remote/Helper/Http.hs | 14 | ||||
-rw-r--r-- | Remote/S3.hs | 120 |
2 files changed, 69 insertions, 65 deletions
diff --git a/Remote/Helper/Http.hs b/Remote/Helper/Http.hs index f1d576d1c..4088854ff 100644 --- a/Remote/Helper/Http.hs +++ b/Remote/Helper/Http.hs @@ -24,14 +24,18 @@ import Control.Concurrent -- Implemented as a fileStorer, so that the content can be streamed -- from the file in constant space. httpStorer :: (Key -> RequestBody -> Annex Bool) -> Storer -httpStorer a = fileStorer $ \k f m -> do - size <- liftIO $ (fromIntegral . fileSize <$> getFileStatus f :: IO Integer) - let streamer sink = withMeteredFile f m $ \b -> do +httpStorer a = fileStorer $ \k f m -> a k =<< liftIO (httpBodyStorer f m) + +-- Reads the file and generates a streaming request body, that will update +-- the meter as it's sent. +httpBodyStorer :: FilePath -> MeterUpdate -> IO RequestBody +httpBodyStorer src m = do + size <- fromIntegral . fileSize <$> getFileStatus src :: IO Integer + let streamer sink = withMeteredFile src m $ \b -> do mvar <- newMVar $ L.toChunks b let getnextchunk = modifyMVar mvar $ pure . pop sink getnextchunk - let body = RequestBodyStream (fromInteger size) streamer - a k body + return $ RequestBodyStream (fromInteger size) streamer where pop [] = ([], S.empty) pop (c:cs) = (cs, c) diff --git a/Remote/S3.hs b/Remote/S3.hs index 4f15f988e..58b408cd2 100644 --- a/Remote/S3.hs +++ b/Remote/S3.hs @@ -34,9 +34,9 @@ import qualified Git import Config import Config.Cost import Remote.Helper.Special +import Remote.Helper.Http import qualified Remote.Helper.AWS as AWS import Creds -import Utility.Metered import Annex.UUID import Logs.Web @@ -54,10 +54,10 @@ gen :: Git.Repo -> UUID -> RemoteConfig -> RemoteGitConfig -> Annex (Maybe Remot gen r u c gc = new <$> remoteCost gc expensiveRemoteCost where new cst = Just $ specialRemote c - (prepareStore this) + (prepareS3 this $ store this) (prepareRetrieve this) - (simplyPrepare $ remove this c) - (simplyPrepare $ checkKey this) + (prepareS3 this $ remove this) + (prepareS3 this $ checkKey this) this where this = Remote { @@ -132,32 +132,22 @@ s3Setup' u c = if isIA c then archiveorg else defaulthost writeUUIDFile archiveconfig u use archiveconfig -prepareStore :: Remote -> Preparer Storer -prepareStore r = resourcePrepare (const $ s3Action r False) $ \(conn, bucket) -> - fileStorer $ \k src p -> do - ok <- s3Bool =<< liftIO (store (conn, bucket) r k p src) +-- Sets up a http connection manager for S3 encdpoint, which allows +-- http connections to be reused across calls to the helper. +prepareS3 :: Remote -> (S3Handle -> helper) -> Preparer helper +prepareS3 r = resourcePrepare $ const $ withS3Handle (config r) (uuid r) - -- Store public URL to item in Internet Archive. - when (ok && isIA (config r) && not (isChunkKey k)) $ - setUrlPresent k (iaKeyUrl r k) - - return ok - -store :: (AWSConnection, BucketName) -> Remote -> Key -> MeterUpdate -> FilePath -> IO (AWSResult ()) -store (conn, bucket) r k p file = do - error "TODO" - {- - size <- (fromIntegral . fileSize <$> getFileStatus file) :: IO Integer - withMeteredFile file p $ \content -> do - -- size is provided to S3 so the whole content - -- does not need to be buffered to calculate it - let object = S3Object - bucket (bucketFile r k) "" - (("Content-Length", show size) : getXheaders (config r)) - content - sendObject conn $ - setStorageClass (getStorageClass $ config r) object - -} +store :: Remote -> S3Handle -> Storer +store r h = fileStorer $ \k f p -> do + rbody <- liftIO $ httpBodyStorer f p + void $ sendS3Handle h $ + S3.putObject (hBucket h) (hBucketObject h k) rbody + + -- Store public URL to item in Internet Archive. + when (hIsIA h && not (isChunkKey k)) $ + setUrlPresent k (iaKeyUrl r k) + + return True prepareRetrieve :: Remote -> Preparer Retriever prepareRetrieve r = resourcePrepare (const $ s3Action r False) $ \(conn, bucket) -> @@ -174,31 +164,37 @@ retrieveCheap _ _ = return False {- Internet Archive doesn't easily allow removing content. - While it may remove the file, there are generally other files - derived from it that it does not remove. -} -remove :: Remote -> RemoteConfig -> Remover -remove r c k - | isIA c = do +remove :: Remote -> S3Handle -> Remover +remove r h k + | hIsIA h = do warning "Cannot remove content from the Internet Archive" return False - | otherwise = remove' r k + | otherwise = do + res <- tryNonAsync $ sendS3Handle h $ + S3.DeleteObject (hBucketObject h k) (hBucket h) + return $ either (const False) (const True) res -remove' :: Remote -> Key -> Annex Bool -remove' r k = s3Action r False $ \(conn, bucket) -> - s3Bool =<< liftIO (deleteObject conn $ bucketKey r bucket k) - -checkKey :: Remote -> CheckPresent -checkKey r k = s3Action r noconn $ \(conn, bucket) -> do +checkKey :: Remote -> S3Handle -> CheckPresent +checkKey r h k = do showAction $ "checking " ++ name r - {- - res <- liftIO $ getObjectInfo conn $ bucketKey r bucket k - case res of - Right _ -> return True - Left (AWSError _ _) -> return False - Left e -> s3Error e - -} - error "TODO" + catchMissingException $ do + void $ sendS3Handle h $ + S3.headObject (hBucket h) (hBucketObject h k) + return True + +{- Catch exception headObject returns when an object is not present + - in the bucket, and returns False. All other exceptions indicate a + - check error and are let through. -} +catchMissingException :: Annex Bool -> Annex Bool +catchMissingException a = catchJust missing a (const $ return False) where - noconn = error "S3 not configured" - + -- This is not very good; see + -- https://github.com/aristidb/aws/issues/121 + missing :: AWS.HeaderException -> Maybe () + missing e + | AWS.headerErrorMessage e == "ETag missing" = Just () + | otherwise = Nothing + s3Warning :: ReqError -> Annex Bool s3Warning e = do warning $ prettyReqError e @@ -216,16 +212,8 @@ s3Action r noconn action = do (Just b, Just c) -> action (c, b) _ -> return noconn -bucketFile :: Remote -> Key -> FilePath -bucketFile r = munge . key2file - where - munge s = case M.lookup "mungekeys" c of - Just "ia" -> iaMunge $ getFilePrefix c ++ s - _ -> getFilePrefix c ++ s - c = config r - bucketKey :: Remote -> BucketName -> Key -> S3Object -bucketKey r bucket k = S3Object bucket (bucketFile r k) "" [] L.empty +bucketKey r bucket k = S3Object bucket (bucketObject (config r) k) "" [] L.empty {- Generate the bucket if it does not already exist, including creating the - UUID file within the bucket. @@ -313,8 +301,12 @@ data S3Handle = S3Handle { hmanager :: Manager , hawscfg :: AWS.Configuration , hs3cfg :: S3.S3Configuration AWS.NormalQuery + + -- Cached values. , hBucket :: S3.Bucket , hStorageClass :: S3.StorageClass + , hBucketObject :: Key -> S3.Bucket + , hIsIA :: Bool } {- Sends a request to S3 and gets back the response. @@ -339,12 +331,13 @@ withS3Handle c u a = do bucket <- maybe nobucket (return . T.pack) (getBucketName c) let awscfg = AWS.Configuration AWS.Timestamp awscreds (AWS.defaultLog AWS.Error) bracketIO (newManager httpcfg) closeManager $ \mgr -> - a $ S3Handle mgr awscfg s3cfg bucket sc + a $ S3Handle mgr awscfg s3cfg bucket sc bo (isIA c) where s3cfg = s3Configuration c httpcfg = defaultManagerSettings { managerResponseTimeout = Nothing } sc = getStorageClass c + bo = T.pack . bucketObject c -- memoized nocreds = error "Cannot use S3 without credentials configured" nobucket = error "S3 bucket not configured" @@ -390,6 +383,13 @@ getXheaders = filter isxheader . M.assocs getFilePrefix :: RemoteConfig -> String getFilePrefix = M.findWithDefault "" "fileprefix" +bucketObject :: RemoteConfig -> Key -> FilePath +bucketObject c = munge . key2file + where + munge s = case M.lookup "mungekeys" c of + Just "ia" -> iaMunge $ getFilePrefix c ++ s + _ -> getFilePrefix c ++ s + {- Internet Archive limits filenames to a subset of ascii, - with no whitespace. Other characters are xml entity - encoded. -} @@ -416,6 +416,6 @@ iaItemUrl :: BucketName -> URLString iaItemUrl bucket = "http://archive.org/details/" ++ bucket iaKeyUrl :: Remote -> Key -> URLString -iaKeyUrl r k = "http://archive.org/download/" ++ bucket ++ "/" ++ bucketFile r k +iaKeyUrl r k = "http://archive.org/download/" ++ bucket ++ "/" ++ bucketObject (config r) k where bucket = fromMaybe "" $ getBucketName $ config r |