diff options
6 files changed, 33 insertions, 44 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java index 0a435fabed..c07e8c2b57 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java @@ -42,7 +42,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Collection; -import java.util.concurrent.Semaphore; /** * A RemoteActionCache implementation that uses a concurrent map as a distributed storage for files @@ -58,7 +57,6 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { private static final int MAX_BLOB_SIZE_FOR_INLINE = 10 * 1024; private final SimpleBlobStore blobStore; - private final Semaphore uploadMemoryAvailable = new Semaphore(MAX_MEMORY_KBYTES, true); public SimpleBlobStoreActionCache(SimpleBlobStore blobStore) { this.blobStore = blobStore; @@ -92,22 +90,21 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { } private Digest uploadFileContents(Path file) throws IOException, InterruptedException { + Digest digest = Digests.computeDigest(file); try (InputStream in = file.getInputStream()) { - // This unconditionally reads the whole file into memory first! - return uploadBlob(ByteString.readFrom(in).toByteArray()); + return uploadStream(digest, in); } } private Digest uploadFileContents( ActionInput input, Path execRoot, MetadataProvider inputCache) throws IOException, InterruptedException { - // This unconditionally reads the whole file into memory first! if (input instanceof VirtualActionInput) { byte[] blob = ((VirtualActionInput) input).getBytes().toByteArray(); return uploadBlob(blob, Digests.computeDigest(blob)); } try (InputStream in = execRoot.getRelative(input.getExecPathString()).getInputStream()) { - return uploadBlob(Digests.getDigestFromInputCache(input, inputCache), in); + return uploadStream(Digests.getDigestFromInputCache(input, inputCache), in); } } @@ -187,8 +184,7 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { result.setStdoutDigest(stdout); } if (uploadAction) { - blobStore.putActionResult( - actionKey.getDigest().getHash(), new ByteArrayInputStream(result.build().toByteArray())); + blobStore.putActionResult(actionKey.getDigest().getHash(), result.build().toByteArray()); } } @@ -263,17 +259,12 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { private Digest uploadBlob(byte[] blob, Digest digest) throws IOException, InterruptedException { int blobSizeKBytes = blob.length / 1024; checkBlobSize(blobSizeKBytes, "Upload"); - uploadMemoryAvailable.acquire(blobSizeKBytes); - try { - return uploadBlob(digest, new ByteArrayInputStream(blob)); - } finally { - uploadMemoryAvailable.release(blobSizeKBytes); - } + return uploadStream(digest, new ByteArrayInputStream(blob)); } - public Digest uploadBlob(Digest digest, InputStream in) + public Digest uploadStream(Digest digest, InputStream in) throws IOException, InterruptedException { - blobStore.put(digest.getHash(), in); + blobStore.put(digest.getHash(), digest.getSizeBytes(), in); return digest; } @@ -331,8 +322,7 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { public void setCachedActionResult(ActionKey actionKey, ActionResult result) throws IOException, InterruptedException { - blobStore.putActionResult(actionKey.getDigest().getHash(), - new ByteArrayInputStream(result.toByteArray())); + blobStore.putActionResult(actionKey.getDigest().getHash(), result.toByteArray()); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java index 82cd36bc97..65d6f2d547 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java +++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java @@ -14,6 +14,7 @@ package com.google.devtools.build.lib.remote.blobstore; import com.google.common.io.ByteStreams; +import com.google.devtools.build.lib.util.Preconditions; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -49,17 +50,17 @@ public final class ConcurrentMapBlobStore implements SimpleBlobStore { } @Override - public void put(String key, InputStream in) throws IOException { + public void put(String key, long length, InputStream in) throws IOException { byte[] value = ByteStreams.toByteArray(in); + Preconditions.checkState(value.length == length); map.put(key, value); } @Override - public void putActionResult(String key, InputStream in) - throws IOException, InterruptedException { - put(key, in); + public void putActionResult(String key, byte[] in) throws IOException, InterruptedException { + map.put(key, in); } @Override public void close() {} -}
\ No newline at end of file +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java index 8d41023089..2dbd3208a7 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java +++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java @@ -15,6 +15,7 @@ package com.google.devtools.build.lib.remote.blobstore; import com.google.common.io.ByteStreams; import com.google.devtools.build.lib.vfs.Path; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -52,7 +53,7 @@ public final class OnDiskBlobStore implements SimpleBlobStore { } @Override - public void put(String key, InputStream in) throws IOException { + public void put(String key, long length, InputStream in) throws IOException { // Write a temporary file first, and then rename, to avoid data corruption in case of a crash. Path temp = toPath(UUID.randomUUID().toString()); try (OutputStream out = temp.getOutputStream()) { @@ -65,8 +66,8 @@ public final class OnDiskBlobStore implements SimpleBlobStore { } @Override - public void putActionResult(String key, InputStream in) throws IOException, InterruptedException { - put(key, in); + public void putActionResult(String key, byte[] in) throws IOException, InterruptedException { + put(key, in.length, new ByteArrayInputStream(in)); } @Override @@ -75,4 +76,4 @@ public final class OnDiskBlobStore implements SimpleBlobStore { private Path toPath(String key) { return root.getChild(key); } -}
\ No newline at end of file +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java index b7473d8c9b..9796554480 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java +++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java @@ -13,18 +13,20 @@ // limitations under the License. package com.google.devtools.build.lib.remote.blobstore; -import com.google.common.io.ByteStreams; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; +import org.apache.http.HttpEntity; import org.apache.http.HttpStatus; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpHead; import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.InputStreamEntity; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; @@ -117,22 +119,19 @@ public final class RestBlobStore implements SimpleBlobStore { } @Override - public void put(String key, InputStream in) throws IOException { - put(CAS_PREFIX, key, in); + public void put(String key, long length, InputStream in) throws IOException { + put(CAS_PREFIX, key, new InputStreamEntity(in, length, ContentType.APPLICATION_OCTET_STREAM)); } @Override - public void putActionResult(String key, InputStream in) throws IOException, InterruptedException { - put(ACTION_CACHE_PREFIX, key, in); + public void putActionResult(String key, byte[] in) throws IOException, InterruptedException { + put(ACTION_CACHE_PREFIX, key, new ByteArrayEntity(in, ContentType.APPLICATION_OCTET_STREAM)); } - private void put(String urlPrefix, String key, InputStream in) throws IOException { + private void put(String urlPrefix, String key, HttpEntity entity) throws IOException { HttpClient client = clientFactory.build(); HttpPut put = new HttpPut(baseUrl + "/" + urlPrefix + "/" + key); - // For now, upload a byte array instead of a stream, due to Hazelcast crashing on the stream. - // See https://github.com/hazelcast/hazelcast/issues/10878. - put.setEntity(new ByteArrayEntity(ByteStreams.toByteArray(in))); - put.setHeader("Content-Type", "application/octet-stream"); + put.setEntity(entity); client.execute( put, (response) -> { diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java index 38479962b4..4231060603 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java +++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java @@ -46,14 +46,12 @@ public interface SimpleBlobStore { InterruptedException; /** - * Uploads a BLOB (as {@code in}) indexed by {@code key} to the CAS. + * Uploads a BLOB (as {@code in}) with length {@code length} indexed by {@code key} to the CAS. */ - void put(String key, InputStream in) throws IOException, InterruptedException; + void put(String key, long length, InputStream in) throws IOException, InterruptedException; - /** - * Uploads a BLOB (as {@code in}) indexed by {@code key} to the Action Cache. - */ - void putActionResult(String actionKey, InputStream in) throws IOException, InterruptedException; + /** Uploads a bytearray BLOB (as {@code in}) indexed by {@code key} to the Action Cache. */ + void putActionResult(String actionKey, byte[] in) throws IOException, InterruptedException; /** Close resources associated with the blob store. */ void close(); diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java index 91bd8afee6..869d8cf908 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java @@ -229,7 +229,7 @@ final class ByteStreamServer extends ByteStreamImplBase { try { Digest d = Digests.computeDigest(temp); try (InputStream in = temp.getInputStream()) { - cache.uploadBlob(d, in); + cache.uploadStream(d, in); } try { temp.delete(); |