diff options
author | 2017-10-17 09:44:57 +0200 | |
---|---|---|
committer | 2017-10-18 10:28:04 +0200 | |
commit | 5de01e5ed348a0b9d9eea88502fcca4b8c4133ed (patch) | |
tree | c8ef58c1a3fb0e6ba5c150de80c217210fccdfec /src/main/java/com/google/devtools/build/lib | |
parent | 784bb52613feb484221c3aa6756de45012163088 (diff) |
Stream rest cache file uploads.
This means we no longer keep large action inputs or outputs in memory
during upload.
I adjusted the SimpleBlogStore interface to require clients to pass in
the InputStream length. That allows us to always set Content-length on
uploads. It's polite to do so, so that the server may, e.g.,
preallocate space for the blob.
Fixes https://github.com/bazelbuild/bazel/issues/3250.
Change-Id: I944c9dbc35fa2fa80dce523b0133ea9757bb3973
PiperOrigin-RevId: 172433522
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib')
5 files changed, 32 insertions, 43 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java index 0a435fabed..c07e8c2b57 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java @@ -42,7 +42,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Collection; -import java.util.concurrent.Semaphore; /** * A RemoteActionCache implementation that uses a concurrent map as a distributed storage for files @@ -58,7 +57,6 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { private static final int MAX_BLOB_SIZE_FOR_INLINE = 10 * 1024; private final SimpleBlobStore blobStore; - private final Semaphore uploadMemoryAvailable = new Semaphore(MAX_MEMORY_KBYTES, true); public SimpleBlobStoreActionCache(SimpleBlobStore blobStore) { this.blobStore = blobStore; @@ -92,22 +90,21 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { } private Digest uploadFileContents(Path file) throws IOException, InterruptedException { + Digest digest = Digests.computeDigest(file); try (InputStream in = file.getInputStream()) { - // This unconditionally reads the whole file into memory first! - return uploadBlob(ByteString.readFrom(in).toByteArray()); + return uploadStream(digest, in); } } private Digest uploadFileContents( ActionInput input, Path execRoot, MetadataProvider inputCache) throws IOException, InterruptedException { - // This unconditionally reads the whole file into memory first! if (input instanceof VirtualActionInput) { byte[] blob = ((VirtualActionInput) input).getBytes().toByteArray(); return uploadBlob(blob, Digests.computeDigest(blob)); } try (InputStream in = execRoot.getRelative(input.getExecPathString()).getInputStream()) { - return uploadBlob(Digests.getDigestFromInputCache(input, inputCache), in); + return uploadStream(Digests.getDigestFromInputCache(input, inputCache), in); } } @@ -187,8 +184,7 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { result.setStdoutDigest(stdout); } if (uploadAction) { - blobStore.putActionResult( - actionKey.getDigest().getHash(), new ByteArrayInputStream(result.build().toByteArray())); + blobStore.putActionResult(actionKey.getDigest().getHash(), result.build().toByteArray()); } } @@ -263,17 +259,12 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { private Digest uploadBlob(byte[] blob, Digest digest) throws IOException, InterruptedException { int blobSizeKBytes = blob.length / 1024; checkBlobSize(blobSizeKBytes, "Upload"); - uploadMemoryAvailable.acquire(blobSizeKBytes); - try { - return uploadBlob(digest, new ByteArrayInputStream(blob)); - } finally { - uploadMemoryAvailable.release(blobSizeKBytes); - } + return uploadStream(digest, new ByteArrayInputStream(blob)); } - public Digest uploadBlob(Digest digest, InputStream in) + public Digest uploadStream(Digest digest, InputStream in) throws IOException, InterruptedException { - blobStore.put(digest.getHash(), in); + blobStore.put(digest.getHash(), digest.getSizeBytes(), in); return digest; } @@ -331,8 +322,7 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { public void setCachedActionResult(ActionKey actionKey, ActionResult result) throws IOException, InterruptedException { - blobStore.putActionResult(actionKey.getDigest().getHash(), - new ByteArrayInputStream(result.toByteArray())); + blobStore.putActionResult(actionKey.getDigest().getHash(), result.toByteArray()); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java index 82cd36bc97..65d6f2d547 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java +++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java @@ -14,6 +14,7 @@ package com.google.devtools.build.lib.remote.blobstore; import com.google.common.io.ByteStreams; +import com.google.devtools.build.lib.util.Preconditions; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -49,17 +50,17 @@ public final class ConcurrentMapBlobStore implements SimpleBlobStore { } @Override - public void put(String key, InputStream in) throws IOException { + public void put(String key, long length, InputStream in) throws IOException { byte[] value = ByteStreams.toByteArray(in); + Preconditions.checkState(value.length == length); map.put(key, value); } @Override - public void putActionResult(String key, InputStream in) - throws IOException, InterruptedException { - put(key, in); + public void putActionResult(String key, byte[] in) throws IOException, InterruptedException { + map.put(key, in); } @Override public void close() {} -}
\ No newline at end of file +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java index 8d41023089..2dbd3208a7 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java +++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java @@ -15,6 +15,7 @@ package com.google.devtools.build.lib.remote.blobstore; import com.google.common.io.ByteStreams; import com.google.devtools.build.lib.vfs.Path; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -52,7 +53,7 @@ public final class OnDiskBlobStore implements SimpleBlobStore { } @Override - public void put(String key, InputStream in) throws IOException { + public void put(String key, long length, InputStream in) throws IOException { // Write a temporary file first, and then rename, to avoid data corruption in case of a crash. Path temp = toPath(UUID.randomUUID().toString()); try (OutputStream out = temp.getOutputStream()) { @@ -65,8 +66,8 @@ public final class OnDiskBlobStore implements SimpleBlobStore { } @Override - public void putActionResult(String key, InputStream in) throws IOException, InterruptedException { - put(key, in); + public void putActionResult(String key, byte[] in) throws IOException, InterruptedException { + put(key, in.length, new ByteArrayInputStream(in)); } @Override @@ -75,4 +76,4 @@ public final class OnDiskBlobStore implements SimpleBlobStore { private Path toPath(String key) { return root.getChild(key); } -}
\ No newline at end of file +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java index b7473d8c9b..9796554480 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java +++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java @@ -13,18 +13,20 @@ // limitations under the License. package com.google.devtools.build.lib.remote.blobstore; -import com.google.common.io.ByteStreams; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; +import org.apache.http.HttpEntity; import org.apache.http.HttpStatus; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpHead; import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.InputStreamEntity; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; @@ -117,22 +119,19 @@ public final class RestBlobStore implements SimpleBlobStore { } @Override - public void put(String key, InputStream in) throws IOException { - put(CAS_PREFIX, key, in); + public void put(String key, long length, InputStream in) throws IOException { + put(CAS_PREFIX, key, new InputStreamEntity(in, length, ContentType.APPLICATION_OCTET_STREAM)); } @Override - public void putActionResult(String key, InputStream in) throws IOException, InterruptedException { - put(ACTION_CACHE_PREFIX, key, in); + public void putActionResult(String key, byte[] in) throws IOException, InterruptedException { + put(ACTION_CACHE_PREFIX, key, new ByteArrayEntity(in, ContentType.APPLICATION_OCTET_STREAM)); } - private void put(String urlPrefix, String key, InputStream in) throws IOException { + private void put(String urlPrefix, String key, HttpEntity entity) throws IOException { HttpClient client = clientFactory.build(); HttpPut put = new HttpPut(baseUrl + "/" + urlPrefix + "/" + key); - // For now, upload a byte array instead of a stream, due to Hazelcast crashing on the stream. - // See https://github.com/hazelcast/hazelcast/issues/10878. - put.setEntity(new ByteArrayEntity(ByteStreams.toByteArray(in))); - put.setHeader("Content-Type", "application/octet-stream"); + put.setEntity(entity); client.execute( put, (response) -> { diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java index 38479962b4..4231060603 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java +++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java @@ -46,14 +46,12 @@ public interface SimpleBlobStore { InterruptedException; /** - * Uploads a BLOB (as {@code in}) indexed by {@code key} to the CAS. + * Uploads a BLOB (as {@code in}) with length {@code length} indexed by {@code key} to the CAS. */ - void put(String key, InputStream in) throws IOException, InterruptedException; + void put(String key, long length, InputStream in) throws IOException, InterruptedException; - /** - * Uploads a BLOB (as {@code in}) indexed by {@code key} to the Action Cache. - */ - void putActionResult(String actionKey, InputStream in) throws IOException, InterruptedException; + /** Uploads a bytearray BLOB (as {@code in}) indexed by {@code key} to the Action Cache. */ + void putActionResult(String actionKey, byte[] in) throws IOException, InterruptedException; /** Close resources associated with the blob store. */ void close(); |