aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java26
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java11
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java9
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java19
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java10
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java2
6 files changed, 33 insertions, 44 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
index 0a435fabed..c07e8c2b57 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
@@ -42,7 +42,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
-import java.util.concurrent.Semaphore;
/**
* A RemoteActionCache implementation that uses a concurrent map as a distributed storage for files
@@ -58,7 +57,6 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
private static final int MAX_BLOB_SIZE_FOR_INLINE = 10 * 1024;
private final SimpleBlobStore blobStore;
- private final Semaphore uploadMemoryAvailable = new Semaphore(MAX_MEMORY_KBYTES, true);
public SimpleBlobStoreActionCache(SimpleBlobStore blobStore) {
this.blobStore = blobStore;
@@ -92,22 +90,21 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
}
private Digest uploadFileContents(Path file) throws IOException, InterruptedException {
+ Digest digest = Digests.computeDigest(file);
try (InputStream in = file.getInputStream()) {
- // This unconditionally reads the whole file into memory first!
- return uploadBlob(ByteString.readFrom(in).toByteArray());
+ return uploadStream(digest, in);
}
}
private Digest uploadFileContents(
ActionInput input, Path execRoot, MetadataProvider inputCache)
throws IOException, InterruptedException {
- // This unconditionally reads the whole file into memory first!
if (input instanceof VirtualActionInput) {
byte[] blob = ((VirtualActionInput) input).getBytes().toByteArray();
return uploadBlob(blob, Digests.computeDigest(blob));
}
try (InputStream in = execRoot.getRelative(input.getExecPathString()).getInputStream()) {
- return uploadBlob(Digests.getDigestFromInputCache(input, inputCache), in);
+ return uploadStream(Digests.getDigestFromInputCache(input, inputCache), in);
}
}
@@ -187,8 +184,7 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
result.setStdoutDigest(stdout);
}
if (uploadAction) {
- blobStore.putActionResult(
- actionKey.getDigest().getHash(), new ByteArrayInputStream(result.build().toByteArray()));
+ blobStore.putActionResult(actionKey.getDigest().getHash(), result.build().toByteArray());
}
}
@@ -263,17 +259,12 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
private Digest uploadBlob(byte[] blob, Digest digest) throws IOException, InterruptedException {
int blobSizeKBytes = blob.length / 1024;
checkBlobSize(blobSizeKBytes, "Upload");
- uploadMemoryAvailable.acquire(blobSizeKBytes);
- try {
- return uploadBlob(digest, new ByteArrayInputStream(blob));
- } finally {
- uploadMemoryAvailable.release(blobSizeKBytes);
- }
+ return uploadStream(digest, new ByteArrayInputStream(blob));
}
- public Digest uploadBlob(Digest digest, InputStream in)
+ public Digest uploadStream(Digest digest, InputStream in)
throws IOException, InterruptedException {
- blobStore.put(digest.getHash(), in);
+ blobStore.put(digest.getHash(), digest.getSizeBytes(), in);
return digest;
}
@@ -331,8 +322,7 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
public void setCachedActionResult(ActionKey actionKey, ActionResult result)
throws IOException, InterruptedException {
- blobStore.putActionResult(actionKey.getDigest().getHash(),
- new ByteArrayInputStream(result.toByteArray()));
+ blobStore.putActionResult(actionKey.getDigest().getHash(), result.toByteArray());
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java
index 82cd36bc97..65d6f2d547 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java
@@ -14,6 +14,7 @@
package com.google.devtools.build.lib.remote.blobstore;
import com.google.common.io.ByteStreams;
+import com.google.devtools.build.lib.util.Preconditions;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -49,17 +50,17 @@ public final class ConcurrentMapBlobStore implements SimpleBlobStore {
}
@Override
- public void put(String key, InputStream in) throws IOException {
+ public void put(String key, long length, InputStream in) throws IOException {
byte[] value = ByteStreams.toByteArray(in);
+ Preconditions.checkState(value.length == length);
map.put(key, value);
}
@Override
- public void putActionResult(String key, InputStream in)
- throws IOException, InterruptedException {
- put(key, in);
+ public void putActionResult(String key, byte[] in) throws IOException, InterruptedException {
+ map.put(key, in);
}
@Override
public void close() {}
-} \ No newline at end of file
+}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java
index 8d41023089..2dbd3208a7 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java
@@ -15,6 +15,7 @@ package com.google.devtools.build.lib.remote.blobstore;
import com.google.common.io.ByteStreams;
import com.google.devtools.build.lib.vfs.Path;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -52,7 +53,7 @@ public final class OnDiskBlobStore implements SimpleBlobStore {
}
@Override
- public void put(String key, InputStream in) throws IOException {
+ public void put(String key, long length, InputStream in) throws IOException {
// Write a temporary file first, and then rename, to avoid data corruption in case of a crash.
Path temp = toPath(UUID.randomUUID().toString());
try (OutputStream out = temp.getOutputStream()) {
@@ -65,8 +66,8 @@ public final class OnDiskBlobStore implements SimpleBlobStore {
}
@Override
- public void putActionResult(String key, InputStream in) throws IOException, InterruptedException {
- put(key, in);
+ public void putActionResult(String key, byte[] in) throws IOException, InterruptedException {
+ put(key, in.length, new ByteArrayInputStream(in));
}
@Override
@@ -75,4 +76,4 @@ public final class OnDiskBlobStore implements SimpleBlobStore {
private Path toPath(String key) {
return root.getChild(key);
}
-} \ No newline at end of file
+}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java
index b7473d8c9b..9796554480 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java
@@ -13,18 +13,20 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.blobstore;
-import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
+import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
@@ -117,22 +119,19 @@ public final class RestBlobStore implements SimpleBlobStore {
}
@Override
- public void put(String key, InputStream in) throws IOException {
- put(CAS_PREFIX, key, in);
+ public void put(String key, long length, InputStream in) throws IOException {
+ put(CAS_PREFIX, key, new InputStreamEntity(in, length, ContentType.APPLICATION_OCTET_STREAM));
}
@Override
- public void putActionResult(String key, InputStream in) throws IOException, InterruptedException {
- put(ACTION_CACHE_PREFIX, key, in);
+ public void putActionResult(String key, byte[] in) throws IOException, InterruptedException {
+ put(ACTION_CACHE_PREFIX, key, new ByteArrayEntity(in, ContentType.APPLICATION_OCTET_STREAM));
}
- private void put(String urlPrefix, String key, InputStream in) throws IOException {
+ private void put(String urlPrefix, String key, HttpEntity entity) throws IOException {
HttpClient client = clientFactory.build();
HttpPut put = new HttpPut(baseUrl + "/" + urlPrefix + "/" + key);
- // For now, upload a byte array instead of a stream, due to Hazelcast crashing on the stream.
- // See https://github.com/hazelcast/hazelcast/issues/10878.
- put.setEntity(new ByteArrayEntity(ByteStreams.toByteArray(in)));
- put.setHeader("Content-Type", "application/octet-stream");
+ put.setEntity(entity);
client.execute(
put,
(response) -> {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java
index 38479962b4..4231060603 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java
@@ -46,14 +46,12 @@ public interface SimpleBlobStore {
InterruptedException;
/**
- * Uploads a BLOB (as {@code in}) indexed by {@code key} to the CAS.
+ * Uploads a BLOB (as {@code in}) with length {@code length} indexed by {@code key} to the CAS.
*/
- void put(String key, InputStream in) throws IOException, InterruptedException;
+ void put(String key, long length, InputStream in) throws IOException, InterruptedException;
- /**
- * Uploads a BLOB (as {@code in}) indexed by {@code key} to the Action Cache.
- */
- void putActionResult(String actionKey, InputStream in) throws IOException, InterruptedException;
+ /** Uploads a bytearray BLOB (as {@code in}) indexed by {@code key} to the Action Cache. */
+ void putActionResult(String actionKey, byte[] in) throws IOException, InterruptedException;
/** Close resources associated with the blob store. */
void close();
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
index 91bd8afee6..869d8cf908 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
@@ -229,7 +229,7 @@ final class ByteStreamServer extends ByteStreamImplBase {
try {
Digest d = Digests.computeDigest(temp);
try (InputStream in = temp.getInputStream()) {
- cache.uploadBlob(d, in);
+ cache.uploadStream(d, in);
}
try {
temp.delete();