aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java58
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java16
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java20
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java19
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java6
5 files changed, 80 insertions, 39 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
index 559d55d3fc..8c0fcdc93a 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
@@ -35,8 +35,10 @@ import com.google.devtools.remoteexecution.v1test.OutputDirectory;
import com.google.devtools.remoteexecution.v1test.OutputFile;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.concurrent.Semaphore;
@@ -98,15 +100,12 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
throws IOException, InterruptedException {
// This unconditionally reads the whole file into memory first!
if (input instanceof VirtualActionInput) {
- ByteArrayOutputStream buffer = new ByteArrayOutputStream();
- ((VirtualActionInput) input).writeTo(buffer);
- byte[] blob = buffer.toByteArray();
+ byte[] blob = ((VirtualActionInput) input).getBytes().toByteArray();
return uploadBlob(blob, Digests.computeDigest(blob));
}
- return uploadBlob(
- ByteString.readFrom(execRoot.getRelative(input.getExecPathString()).getInputStream())
- .toByteArray(),
- Digests.getDigestFromInputCache(input, inputCache));
+ try (InputStream in = execRoot.getRelative(input.getExecPathString()).getInputStream()) {
+ return uploadBlob(Digests.getDigestFromInputCache(input, inputCache), in);
+ }
}
@Override
@@ -159,7 +158,8 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
Digest stdout = uploadFileContents(outErr.getOutputPath());
result.setStdoutDigest(stdout);
}
- blobStore.put(actionKey.getDigest().getHash(), result.build().toByteArray());
+ blobStore.put(
+ actionKey.getDigest().getHash(), new ByteArrayInputStream(result.build().toByteArray()));
}
public void upload(ActionResult.Builder result, Path execRoot, Collection<Path> files)
@@ -203,8 +203,11 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
private void downloadFileContents(Digest digest, Path dest, boolean executable)
throws IOException, CacheNotFoundException, InterruptedException {
- // This unconditionally downloads the whole file into memory first!
- createFile(downloadBlob(digest), dest, executable);
+ FileSystemUtils.createDirectoryAndParents(dest.getParentDirectory());
+ try (OutputStream out = dest.getOutputStream()) {
+ downloadBlob(digest, out);
+ }
+ dest.setExecutable(executable);
}
private void createFile(byte[] contents, Path dest, boolean executable) throws IOException {
@@ -232,13 +235,29 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
checkBlobSize(blobSizeKBytes, "Upload");
uploadMemoryAvailable.acquire(blobSizeKBytes);
try {
- blobStore.put(digest.getHash(), blob);
+ return uploadBlob(digest, new ByteArrayInputStream(blob));
} finally {
uploadMemoryAvailable.release(blobSizeKBytes);
}
+ }
+
+ private Digest uploadBlob(Digest digest, InputStream in)
+ throws IOException, InterruptedException {
+ blobStore.put(digest.getHash(), in);
return digest;
}
+ public void downloadBlob(Digest digest, OutputStream out)
+ throws IOException, CacheNotFoundException, InterruptedException {
+ if (digest.getSizeBytes() == 0) {
+ return;
+ }
+ boolean success = blobStore.get(digest.getHash(), out);
+ if (!success) {
+ throw new CacheNotFoundException(digest);
+ }
+ }
+
public byte[] downloadBlob(Digest digest)
throws IOException, CacheNotFoundException, InterruptedException {
if (digest.getSizeBytes() == 0) {
@@ -246,11 +265,9 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
}
// This unconditionally downloads the whole blob into memory!
checkBlobSize(digest.getSizeBytes() / 1024, "Download");
- byte[] data = blobStore.get(digest.getHash());
- if (data == null) {
- throw new CacheNotFoundException(digest);
- }
- return data;
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ downloadBlob(digest, out);
+ return out.toByteArray();
}
public boolean containsKey(Digest digest) throws IOException, InterruptedException {
@@ -260,20 +277,19 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
@Override
public ActionResult getCachedActionResult(ActionKey actionKey)
throws IOException, InterruptedException {
- byte[] data = blobStore.get(actionKey.getDigest().getHash());
- if (data == null) {
- return null;
- }
try {
+ byte[] data = downloadBlob(actionKey.getDigest());
return ActionResult.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
return null;
+ } catch (CacheNotFoundException e) {
+ return null;
}
}
public void setCachedActionResult(ActionKey actionKey, ActionResult result)
throws IOException, InterruptedException {
- blobStore.put(actionKey.getDigest().getHash(), result.toByteArray());
+ blobStore.put(actionKey.getDigest().getHash(), new ByteArrayInputStream(result.toByteArray()));
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java
index 59b5a117b0..179385c3b7 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java
@@ -13,6 +13,10 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.blobstore;
+import com.google.common.io.ByteStreams;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.concurrent.ConcurrentMap;
/** A {@link SimpleBlobStore} implementation using a {@link ConcurrentMap}. */
@@ -29,12 +33,18 @@ public final class ConcurrentMapBlobStore implements SimpleBlobStore {
}
@Override
- public byte[] get(String key) {
- return map.get(key);
+ public boolean get(String key, OutputStream out) throws IOException {
+ byte[] data = map.get(key);
+ if (data == null) {
+ return false;
+ }
+ out.write(data);
+ return true;
}
@Override
- public void put(String key, byte[] value) {
+ public void put(String key, InputStream in) throws IOException {
+ byte[] value = ByteStreams.toByteArray(in);
map.put(key, value);
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java
index 8f2ee0790e..8e780556b5 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java
@@ -13,9 +13,10 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.blobstore;
-import com.google.devtools.build.lib.vfs.FileSystemUtils;
+import com.google.common.io.ByteStreams;
import com.google.devtools.build.lib.vfs.Path;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.util.UUID;
@@ -33,17 +34,26 @@ public final class OnDiskBlobStore implements SimpleBlobStore {
}
@Override
- public byte[] get(String key) throws IOException {
+ public boolean get(String key, OutputStream out) throws IOException {
Path f = toPath(key);
- return f.exists() ? FileSystemUtils.readContent(f) : null;
+ if (!f.exists()) {
+ return false;
+ }
+ try (InputStream in = f.getInputStream()) {
+ ByteStreams.copy(in, out);
+ }
+ return true;
}
@Override
- public void put(String key, byte[] value) throws IOException {
+ public void put(String key, InputStream in) throws IOException {
+ // Write a temporary file first, and then rename, to avoid data corruption in case of a crash.
Path temp = toPath(UUID.randomUUID().toString());
try (OutputStream out = temp.getOutputStream()) {
- out.write(value);
+ ByteStreams.copy(in, out);
}
+ // TODO(ulfjack): Fsync temp here before we rename it to avoid data loss in the case of machine
+ // crashes (the OS may reorder the writes and the rename).
Path f = toPath(key);
temp.renameTo(f);
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java
index 898bd84c8a..60ff77b7bd 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java
@@ -13,8 +13,10 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.blobstore;
-import java.io.ByteArrayOutputStream;
+import com.google.common.io.ByteStreams;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
@@ -79,7 +81,7 @@ public final class RestBlobStore implements SimpleBlobStore {
}
@Override
- public byte[] get(String key) throws IOException {
+ public boolean get(String key, OutputStream out) throws IOException {
HttpClient client = clientFactory.build();
HttpGet get = new HttpGet(baseUrl + "/" + key);
return client.execute(
@@ -88,22 +90,23 @@ public final class RestBlobStore implements SimpleBlobStore {
int statusCode = response.getStatusLine().getStatusCode();
if (HttpStatus.SC_NOT_FOUND == statusCode
|| HttpStatus.SC_NO_CONTENT == statusCode) {
- return null;
+ return false;
}
if (HttpStatus.SC_OK != statusCode) {
throw new IOException("GET failed with status code " + statusCode);
}
- ByteArrayOutputStream buffer = new ByteArrayOutputStream();
- response.getEntity().writeTo(buffer);
- return buffer.toByteArray();
+ response.getEntity().writeTo(out);
+ return true;
});
}
@Override
- public void put(String key, byte[] value) throws IOException {
+ public void put(String key, InputStream in) throws IOException {
HttpClient client = clientFactory.build();
HttpPut put = new HttpPut(baseUrl + "/" + key);
- put.setEntity(new ByteArrayEntity(value));
+ // For now, upload a byte array instead of a stream, due to Hazelcast crashing on the stream.
+ // See https://github.com/hazelcast/hazelcast/issues/10878.
+ put.setEntity(new ByteArrayEntity(ByteStreams.toByteArray(in)));
put.setHeader("Content-Type", "application/octet-stream");
client.execute(
put,
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java
index f3088b9d60..5f79c4590f 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java
@@ -15,6 +15,8 @@
package com.google.devtools.build.lib.remote.blobstore;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
/**
* A simple interface for storing blobs (in the form of byte arrays) each one indexed by a
@@ -28,13 +30,13 @@ public interface SimpleBlobStore {
* Returns the blob (in the form of a byte array) indexed by {@code key}. Returns null if the
* {@code key} cannot be found.
*/
- byte[] get(String key) throws IOException, InterruptedException;
+ boolean get(String key, OutputStream out) throws IOException, InterruptedException;
/**
* Uploads a blob (as {@code value}) indexed by {@code key} to the blob store. Existing blob
* indexed by the same {@code key} will be overwritten.
*/
- void put(String key, byte[] value) throws IOException, InterruptedException;
+ void put(String key, InputStream in) throws IOException, InterruptedException;
/** Close resources associated with the blob store. */
void close();