aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib
diff options
context:
space:
mode:
authorGravatar Benjamin Peterson <bp@benjamin.pe>2017-10-17 09:44:57 +0200
committerGravatar Jakob Buchgraber <buchgr@google.com>2017-10-18 10:28:04 +0200
commit5de01e5ed348a0b9d9eea88502fcca4b8c4133ed (patch)
treec8ef58c1a3fb0e6ba5c150de80c217210fccdfec /src/main/java/com/google/devtools/build/lib
parent784bb52613feb484221c3aa6756de45012163088 (diff)
Stream rest cache file uploads.
This means we no longer keep large action inputs or outputs in memory during upload. I adjusted the SimpleBlogStore interface to require clients to pass in the InputStream length. That allows us to always set Content-length on uploads. It's polite to do so, so that the server may, e.g., preallocate space for the blob. Fixes https://github.com/bazelbuild/bazel/issues/3250. Change-Id: I944c9dbc35fa2fa80dce523b0133ea9757bb3973 PiperOrigin-RevId: 172433522
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib')
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java26
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java11
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java9
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java19
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java10
5 files changed, 32 insertions, 43 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
index 0a435fabed..c07e8c2b57 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
@@ -42,7 +42,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
-import java.util.concurrent.Semaphore;
/**
* A RemoteActionCache implementation that uses a concurrent map as a distributed storage for files
@@ -58,7 +57,6 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
private static final int MAX_BLOB_SIZE_FOR_INLINE = 10 * 1024;
private final SimpleBlobStore blobStore;
- private final Semaphore uploadMemoryAvailable = new Semaphore(MAX_MEMORY_KBYTES, true);
public SimpleBlobStoreActionCache(SimpleBlobStore blobStore) {
this.blobStore = blobStore;
@@ -92,22 +90,21 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
}
private Digest uploadFileContents(Path file) throws IOException, InterruptedException {
+ Digest digest = Digests.computeDigest(file);
try (InputStream in = file.getInputStream()) {
- // This unconditionally reads the whole file into memory first!
- return uploadBlob(ByteString.readFrom(in).toByteArray());
+ return uploadStream(digest, in);
}
}
private Digest uploadFileContents(
ActionInput input, Path execRoot, MetadataProvider inputCache)
throws IOException, InterruptedException {
- // This unconditionally reads the whole file into memory first!
if (input instanceof VirtualActionInput) {
byte[] blob = ((VirtualActionInput) input).getBytes().toByteArray();
return uploadBlob(blob, Digests.computeDigest(blob));
}
try (InputStream in = execRoot.getRelative(input.getExecPathString()).getInputStream()) {
- return uploadBlob(Digests.getDigestFromInputCache(input, inputCache), in);
+ return uploadStream(Digests.getDigestFromInputCache(input, inputCache), in);
}
}
@@ -187,8 +184,7 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
result.setStdoutDigest(stdout);
}
if (uploadAction) {
- blobStore.putActionResult(
- actionKey.getDigest().getHash(), new ByteArrayInputStream(result.build().toByteArray()));
+ blobStore.putActionResult(actionKey.getDigest().getHash(), result.build().toByteArray());
}
}
@@ -263,17 +259,12 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
private Digest uploadBlob(byte[] blob, Digest digest) throws IOException, InterruptedException {
int blobSizeKBytes = blob.length / 1024;
checkBlobSize(blobSizeKBytes, "Upload");
- uploadMemoryAvailable.acquire(blobSizeKBytes);
- try {
- return uploadBlob(digest, new ByteArrayInputStream(blob));
- } finally {
- uploadMemoryAvailable.release(blobSizeKBytes);
- }
+ return uploadStream(digest, new ByteArrayInputStream(blob));
}
- public Digest uploadBlob(Digest digest, InputStream in)
+ public Digest uploadStream(Digest digest, InputStream in)
throws IOException, InterruptedException {
- blobStore.put(digest.getHash(), in);
+ blobStore.put(digest.getHash(), digest.getSizeBytes(), in);
return digest;
}
@@ -331,8 +322,7 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
public void setCachedActionResult(ActionKey actionKey, ActionResult result)
throws IOException, InterruptedException {
- blobStore.putActionResult(actionKey.getDigest().getHash(),
- new ByteArrayInputStream(result.toByteArray()));
+ blobStore.putActionResult(actionKey.getDigest().getHash(), result.toByteArray());
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java
index 82cd36bc97..65d6f2d547 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java
@@ -14,6 +14,7 @@
package com.google.devtools.build.lib.remote.blobstore;
import com.google.common.io.ByteStreams;
+import com.google.devtools.build.lib.util.Preconditions;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -49,17 +50,17 @@ public final class ConcurrentMapBlobStore implements SimpleBlobStore {
}
@Override
- public void put(String key, InputStream in) throws IOException {
+ public void put(String key, long length, InputStream in) throws IOException {
byte[] value = ByteStreams.toByteArray(in);
+ Preconditions.checkState(value.length == length);
map.put(key, value);
}
@Override
- public void putActionResult(String key, InputStream in)
- throws IOException, InterruptedException {
- put(key, in);
+ public void putActionResult(String key, byte[] in) throws IOException, InterruptedException {
+ map.put(key, in);
}
@Override
public void close() {}
-} \ No newline at end of file
+}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java
index 8d41023089..2dbd3208a7 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java
@@ -15,6 +15,7 @@ package com.google.devtools.build.lib.remote.blobstore;
import com.google.common.io.ByteStreams;
import com.google.devtools.build.lib.vfs.Path;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -52,7 +53,7 @@ public final class OnDiskBlobStore implements SimpleBlobStore {
}
@Override
- public void put(String key, InputStream in) throws IOException {
+ public void put(String key, long length, InputStream in) throws IOException {
// Write a temporary file first, and then rename, to avoid data corruption in case of a crash.
Path temp = toPath(UUID.randomUUID().toString());
try (OutputStream out = temp.getOutputStream()) {
@@ -65,8 +66,8 @@ public final class OnDiskBlobStore implements SimpleBlobStore {
}
@Override
- public void putActionResult(String key, InputStream in) throws IOException, InterruptedException {
- put(key, in);
+ public void putActionResult(String key, byte[] in) throws IOException, InterruptedException {
+ put(key, in.length, new ByteArrayInputStream(in));
}
@Override
@@ -75,4 +76,4 @@ public final class OnDiskBlobStore implements SimpleBlobStore {
private Path toPath(String key) {
return root.getChild(key);
}
-} \ No newline at end of file
+}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java
index b7473d8c9b..9796554480 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java
@@ -13,18 +13,20 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.blobstore;
-import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
+import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
@@ -117,22 +119,19 @@ public final class RestBlobStore implements SimpleBlobStore {
}
@Override
- public void put(String key, InputStream in) throws IOException {
- put(CAS_PREFIX, key, in);
+ public void put(String key, long length, InputStream in) throws IOException {
+ put(CAS_PREFIX, key, new InputStreamEntity(in, length, ContentType.APPLICATION_OCTET_STREAM));
}
@Override
- public void putActionResult(String key, InputStream in) throws IOException, InterruptedException {
- put(ACTION_CACHE_PREFIX, key, in);
+ public void putActionResult(String key, byte[] in) throws IOException, InterruptedException {
+ put(ACTION_CACHE_PREFIX, key, new ByteArrayEntity(in, ContentType.APPLICATION_OCTET_STREAM));
}
- private void put(String urlPrefix, String key, InputStream in) throws IOException {
+ private void put(String urlPrefix, String key, HttpEntity entity) throws IOException {
HttpClient client = clientFactory.build();
HttpPut put = new HttpPut(baseUrl + "/" + urlPrefix + "/" + key);
- // For now, upload a byte array instead of a stream, due to Hazelcast crashing on the stream.
- // See https://github.com/hazelcast/hazelcast/issues/10878.
- put.setEntity(new ByteArrayEntity(ByteStreams.toByteArray(in)));
- put.setHeader("Content-Type", "application/octet-stream");
+ put.setEntity(entity);
client.execute(
put,
(response) -> {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java
index 38479962b4..4231060603 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java
@@ -46,14 +46,12 @@ public interface SimpleBlobStore {
InterruptedException;
/**
- * Uploads a BLOB (as {@code in}) indexed by {@code key} to the CAS.
+ * Uploads a BLOB (as {@code in}) with length {@code length} indexed by {@code key} to the CAS.
*/
- void put(String key, InputStream in) throws IOException, InterruptedException;
+ void put(String key, long length, InputStream in) throws IOException, InterruptedException;
- /**
- * Uploads a BLOB (as {@code in}) indexed by {@code key} to the Action Cache.
- */
- void putActionResult(String actionKey, InputStream in) throws IOException, InterruptedException;
+ /** Uploads a bytearray BLOB (as {@code in}) indexed by {@code key} to the Action Cache. */
+ void putActionResult(String actionKey, byte[] in) throws IOException, InterruptedException;
/** Close resources associated with the blob store. */
void close();