aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java2
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java70
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java35
3 files changed, 88 insertions, 19 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
index 8c0fcdc93a..daa4ab0cab 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
@@ -241,7 +241,7 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
}
}
- private Digest uploadBlob(Digest digest, InputStream in)
+ public Digest uploadBlob(Digest digest, InputStream in)
throws IOException, InterruptedException {
blobStore.put(digest.getHash(), in);
return digest;
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
index 87ca590ca0..cf3ccd0d80 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
@@ -14,6 +14,7 @@
package com.google.devtools.build.remote;
+import static java.util.logging.Level.SEVERE;
import static java.util.logging.Level.WARNING;
import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
@@ -25,12 +26,18 @@ import com.google.devtools.build.lib.remote.CacheNotFoundException;
import com.google.devtools.build.lib.remote.Chunker;
import com.google.devtools.build.lib.remote.Digests;
import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
+import com.google.devtools.build.lib.vfs.FileSystemUtils;
+import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.remoteexecution.v1test.Digest;
import com.google.protobuf.ByteString;
import com.google.rpc.Code;
import com.google.rpc.Status;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.UUID;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -38,6 +45,7 @@ import javax.annotation.Nullable;
final class ByteStreamServer extends ByteStreamImplBase {
private static final Logger logger = Logger.getLogger(ByteStreamServer.class.getName());
private final SimpleBlobStoreActionCache cache;
+ private final Path workPath;
static @Nullable Digest parseDigestFromResourceName(String resourceName) {
try {
@@ -53,8 +61,9 @@ final class ByteStreamServer extends ByteStreamImplBase {
}
}
- public ByteStreamServer(SimpleBlobStoreActionCache cache) {
+ public ByteStreamServer(SimpleBlobStoreActionCache cache, Path workPath) {
this.cache = cache;
+ this.workPath = workPath;
}
@Override
@@ -87,12 +96,22 @@ final class ByteStreamServer extends ByteStreamImplBase {
@Override
public StreamObserver<WriteRequest> write(final StreamObserver<WriteResponse> responseObserver) {
+ Path temp = workPath.getRelative("upload").getRelative(UUID.randomUUID().toString());
+ try {
+ FileSystemUtils.createDirectoryAndParents(temp.getParentDirectory());
+ temp.getOutputStream().close();
+ } catch (IOException e) {
+ logger.log(SEVERE, "Failed to create temporary file for upload", e);
+ responseObserver.onError(StatusUtils.internalError(e));
+ // We need to make sure that subsequent onNext or onCompleted calls don't make any further
+ // calls on the responseObserver after the onError above, so we return a no-op observer.
+ return new NoOpStreamObserver<>();
+ }
return new StreamObserver<WriteRequest>() {
- byte[] blob;
- Digest digest;
- long offset;
- String resourceName;
- boolean closed;
+ private Digest digest;
+ private long offset;
+ private String resourceName;
+ private boolean closed;
@Override
public void onNext(WriteRequest request) {
@@ -103,7 +122,6 @@ final class ByteStreamServer extends ByteStreamImplBase {
if (digest == null) {
resourceName = request.getResourceName();
digest = parseDigestFromResourceName(resourceName);
- blob = new byte[(int) digest.getSizeBytes()];
}
if (digest == null) {
@@ -137,7 +155,13 @@ final class ByteStreamServer extends ByteStreamImplBase {
long size = request.getData().size();
if (size > 0) {
- request.getData().copyTo(blob, (int) offset);
+ try (OutputStream out = temp.getOutputStream(true)) {
+ request.getData().writeTo(out);
+ } catch (IOException e) {
+ responseObserver.onError(StatusUtils.internalError(e));
+ closed = true;
+ return;
+ }
offset += size;
}
@@ -149,6 +173,7 @@ final class ByteStreamServer extends ByteStreamImplBase {
"finish_write",
"Expected:" + shouldFinishWrite + ", received: " + request.getFinishWrite()));
closed = true;
+ return;
}
}
@@ -156,6 +181,11 @@ final class ByteStreamServer extends ByteStreamImplBase {
public void onError(Throwable t) {
logger.log(WARNING, "Write request failed remotely.", t);
closed = true;
+ try {
+ temp.delete();
+ } catch (IOException e) {
+ logger.log(WARNING, "Could not delete temp file.", e);
+ }
}
@Override
@@ -176,7 +206,15 @@ final class ByteStreamServer extends ByteStreamImplBase {
}
try {
- Digest d = cache.uploadBlob(blob);
+ Digest d = Digests.computeDigest(temp);
+ try (InputStream in = temp.getInputStream()) {
+ cache.uploadBlob(d, in);
+ }
+ try {
+ temp.delete();
+ } catch (IOException e) {
+ logger.log(WARNING, "Could not delete temp file.", e);
+ }
if (!d.equals(digest)) {
responseObserver.onError(
@@ -197,4 +235,18 @@ final class ByteStreamServer extends ByteStreamImplBase {
}
};
}
+
+ private static class NoOpStreamObserver<T> implements StreamObserver<T> {
+ @Override
+ public void onNext(T value) {
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ }
}
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
index 49e63d7915..f245ae61a8 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
@@ -77,16 +77,32 @@ public final class RemoteWorker {
}
public RemoteWorker(
- RemoteWorkerOptions workerOptions, SimpleBlobStoreActionCache cache, Path sandboxPath)
+ FileSystem fs, RemoteWorkerOptions workerOptions, SimpleBlobStoreActionCache cache,
+ Path sandboxPath)
throws IOException {
this.workerOptions = workerOptions;
this.actionCacheServer = new ActionCacheServer(cache);
- this.bsServer = new ByteStreamServer(cache);
+ Path workPath;
+ if (workerOptions.workPath != null) {
+ workPath = fs.getPath(workerOptions.workPath);
+ } else {
+ // TODO(ulfjack): The plan is to make the on-disk storage the default, so we always need to
+ // provide a path to the remote worker, and we can then also use that as the work path. E.g.:
+ // /given/path/cas/
+ // /given/path/upload/
+ // /given/path/work/
+ // We could technically use a different path for temporary files and execution, but we want
+ // the cas/ directory to be on the same file system as the upload/ and work/ directories so
+ // that we can atomically move files between them, and / or use hard-links for the exec
+ // directories.
+ // For now, we use a temporary path if no work path was provided.
+ workPath = fs.getPath("/tmp/remote-worker");
+ }
+ this.bsServer = new ByteStreamServer(cache, workPath);
this.casServer = new CasServer(cache);
if (workerOptions.workPath != null) {
ConcurrentHashMap<String, ExecuteRequest> operationsCache = new ConcurrentHashMap<>();
- Path workPath = getFileSystem().getPath(workerOptions.workPath);
FileSystemUtils.createDirectoryAndParents(workPath);
watchServer = new WatcherServer(workPath, cache, workerOptions, operationsCache, sandboxPath);
execServer = new ExecutionServer(operationsCache);
@@ -157,9 +173,10 @@ public final class RemoteWorker {
rootLog.getHandlers()[0].setLevel(FINE);
}
+ FileSystem fs = getFileSystem();
Path sandboxPath = null;
if (remoteWorkerOptions.sandboxing) {
- sandboxPath = prepareSandboxRunner(remoteWorkerOptions);
+ sandboxPath = prepareSandboxRunner(fs, remoteWorkerOptions);
}
logger.info("Initializing in-memory cache server.");
@@ -169,7 +186,7 @@ public final class RemoteWorker {
}
if ((remoteWorkerOptions.casPath != null)
&& (!PathFragment.create(remoteWorkerOptions.casPath).isAbsolute()
- || !getFileSystem().getPath(remoteWorkerOptions.casPath).exists())) {
+ || !fs.getPath(remoteWorkerOptions.casPath).exists())) {
logger.severe("--cas_path must refer to an existing, absolute path!");
System.exit(1);
return;
@@ -179,19 +196,19 @@ public final class RemoteWorker {
usingRemoteCache
? SimpleBlobStoreFactory.create(remoteOptions)
: remoteWorkerOptions.casPath != null
- ? new OnDiskBlobStore(getFileSystem().getPath(remoteWorkerOptions.casPath))
+ ? new OnDiskBlobStore(fs.getPath(remoteWorkerOptions.casPath))
: new ConcurrentMapBlobStore(new ConcurrentHashMap<String, byte[]>());
RemoteWorker worker =
new RemoteWorker(
- remoteWorkerOptions, new SimpleBlobStoreActionCache(blobStore), sandboxPath);
+ fs, remoteWorkerOptions, new SimpleBlobStoreActionCache(blobStore), sandboxPath);
final Server server = worker.startServer();
worker.createPidFile();
server.awaitTermination();
}
- private static Path prepareSandboxRunner(RemoteWorkerOptions remoteWorkerOptions) {
+ private static Path prepareSandboxRunner(FileSystem fs, RemoteWorkerOptions remoteWorkerOptions) {
if (OS.getCurrent() != OS.LINUX) {
logger.severe("Sandboxing requested, but it is currently only available on Linux.");
System.exit(1);
@@ -212,7 +229,7 @@ public final class RemoteWorker {
Path sandboxPath = null;
try {
- sandboxPath = getFileSystem().getPath(remoteWorkerOptions.workPath).getChild("linux-sandbox");
+ sandboxPath = fs.getPath(remoteWorkerOptions.workPath).getChild("linux-sandbox");
try (FileOutputStream fos = new FileOutputStream(sandboxPath.getPathString())) {
ByteStreams.copy(sandbox, fos);
}