diff options
3 files changed, 88 insertions, 19 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java index 8c0fcdc93a..daa4ab0cab 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java @@ -241,7 +241,7 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { } } - private Digest uploadBlob(Digest digest, InputStream in) + public Digest uploadBlob(Digest digest, InputStream in) throws IOException, InterruptedException { blobStore.put(digest.getHash(), in); return digest; diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java index 87ca590ca0..cf3ccd0d80 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java @@ -14,6 +14,7 @@ package com.google.devtools.build.remote; +import static java.util.logging.Level.SEVERE; import static java.util.logging.Level.WARNING; import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase; @@ -25,12 +26,18 @@ import com.google.devtools.build.lib.remote.CacheNotFoundException; import com.google.devtools.build.lib.remote.Chunker; import com.google.devtools.build.lib.remote.Digests; import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache; +import com.google.devtools.build.lib.vfs.FileSystemUtils; +import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.remoteexecution.v1test.Digest; import com.google.protobuf.ByteString; import com.google.rpc.Code; import com.google.rpc.Status; import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.UUID; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -38,6 +45,7 @@ import javax.annotation.Nullable; final class ByteStreamServer extends ByteStreamImplBase { private static final Logger logger = Logger.getLogger(ByteStreamServer.class.getName()); private final SimpleBlobStoreActionCache cache; + private final Path workPath; static @Nullable Digest parseDigestFromResourceName(String resourceName) { try { @@ -53,8 +61,9 @@ final class ByteStreamServer extends ByteStreamImplBase { } } - public ByteStreamServer(SimpleBlobStoreActionCache cache) { + public ByteStreamServer(SimpleBlobStoreActionCache cache, Path workPath) { this.cache = cache; + this.workPath = workPath; } @Override @@ -87,12 +96,22 @@ final class ByteStreamServer extends ByteStreamImplBase { @Override public StreamObserver<WriteRequest> write(final StreamObserver<WriteResponse> responseObserver) { + Path temp = workPath.getRelative("upload").getRelative(UUID.randomUUID().toString()); + try { + FileSystemUtils.createDirectoryAndParents(temp.getParentDirectory()); + temp.getOutputStream().close(); + } catch (IOException e) { + logger.log(SEVERE, "Failed to create temporary file for upload", e); + responseObserver.onError(StatusUtils.internalError(e)); + // We need to make sure that subsequent onNext or onCompleted calls don't make any further + // calls on the responseObserver after the onError above, so we return a no-op observer. + return new NoOpStreamObserver<>(); + } return new StreamObserver<WriteRequest>() { - byte[] blob; - Digest digest; - long offset; - String resourceName; - boolean closed; + private Digest digest; + private long offset; + private String resourceName; + private boolean closed; @Override public void onNext(WriteRequest request) { @@ -103,7 +122,6 @@ final class ByteStreamServer extends ByteStreamImplBase { if (digest == null) { resourceName = request.getResourceName(); digest = parseDigestFromResourceName(resourceName); - blob = new byte[(int) digest.getSizeBytes()]; } if (digest == null) { @@ -137,7 +155,13 @@ final class ByteStreamServer extends ByteStreamImplBase { long size = request.getData().size(); if (size > 0) { - request.getData().copyTo(blob, (int) offset); + try (OutputStream out = temp.getOutputStream(true)) { + request.getData().writeTo(out); + } catch (IOException e) { + responseObserver.onError(StatusUtils.internalError(e)); + closed = true; + return; + } offset += size; } @@ -149,6 +173,7 @@ final class ByteStreamServer extends ByteStreamImplBase { "finish_write", "Expected:" + shouldFinishWrite + ", received: " + request.getFinishWrite())); closed = true; + return; } } @@ -156,6 +181,11 @@ final class ByteStreamServer extends ByteStreamImplBase { public void onError(Throwable t) { logger.log(WARNING, "Write request failed remotely.", t); closed = true; + try { + temp.delete(); + } catch (IOException e) { + logger.log(WARNING, "Could not delete temp file.", e); + } } @Override @@ -176,7 +206,15 @@ final class ByteStreamServer extends ByteStreamImplBase { } try { - Digest d = cache.uploadBlob(blob); + Digest d = Digests.computeDigest(temp); + try (InputStream in = temp.getInputStream()) { + cache.uploadBlob(d, in); + } + try { + temp.delete(); + } catch (IOException e) { + logger.log(WARNING, "Could not delete temp file.", e); + } if (!d.equals(digest)) { responseObserver.onError( @@ -197,4 +235,18 @@ final class ByteStreamServer extends ByteStreamImplBase { } }; } + + private static class NoOpStreamObserver<T> implements StreamObserver<T> { + @Override + public void onNext(T value) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onCompleted() { + } + } } diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java index 49e63d7915..f245ae61a8 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java @@ -77,16 +77,32 @@ public final class RemoteWorker { } public RemoteWorker( - RemoteWorkerOptions workerOptions, SimpleBlobStoreActionCache cache, Path sandboxPath) + FileSystem fs, RemoteWorkerOptions workerOptions, SimpleBlobStoreActionCache cache, + Path sandboxPath) throws IOException { this.workerOptions = workerOptions; this.actionCacheServer = new ActionCacheServer(cache); - this.bsServer = new ByteStreamServer(cache); + Path workPath; + if (workerOptions.workPath != null) { + workPath = fs.getPath(workerOptions.workPath); + } else { + // TODO(ulfjack): The plan is to make the on-disk storage the default, so we always need to + // provide a path to the remote worker, and we can then also use that as the work path. E.g.: + // /given/path/cas/ + // /given/path/upload/ + // /given/path/work/ + // We could technically use a different path for temporary files and execution, but we want + // the cas/ directory to be on the same file system as the upload/ and work/ directories so + // that we can atomically move files between them, and / or use hard-links for the exec + // directories. + // For now, we use a temporary path if no work path was provided. + workPath = fs.getPath("/tmp/remote-worker"); + } + this.bsServer = new ByteStreamServer(cache, workPath); this.casServer = new CasServer(cache); if (workerOptions.workPath != null) { ConcurrentHashMap<String, ExecuteRequest> operationsCache = new ConcurrentHashMap<>(); - Path workPath = getFileSystem().getPath(workerOptions.workPath); FileSystemUtils.createDirectoryAndParents(workPath); watchServer = new WatcherServer(workPath, cache, workerOptions, operationsCache, sandboxPath); execServer = new ExecutionServer(operationsCache); @@ -157,9 +173,10 @@ public final class RemoteWorker { rootLog.getHandlers()[0].setLevel(FINE); } + FileSystem fs = getFileSystem(); Path sandboxPath = null; if (remoteWorkerOptions.sandboxing) { - sandboxPath = prepareSandboxRunner(remoteWorkerOptions); + sandboxPath = prepareSandboxRunner(fs, remoteWorkerOptions); } logger.info("Initializing in-memory cache server."); @@ -169,7 +186,7 @@ public final class RemoteWorker { } if ((remoteWorkerOptions.casPath != null) && (!PathFragment.create(remoteWorkerOptions.casPath).isAbsolute() - || !getFileSystem().getPath(remoteWorkerOptions.casPath).exists())) { + || !fs.getPath(remoteWorkerOptions.casPath).exists())) { logger.severe("--cas_path must refer to an existing, absolute path!"); System.exit(1); return; @@ -179,19 +196,19 @@ public final class RemoteWorker { usingRemoteCache ? SimpleBlobStoreFactory.create(remoteOptions) : remoteWorkerOptions.casPath != null - ? new OnDiskBlobStore(getFileSystem().getPath(remoteWorkerOptions.casPath)) + ? new OnDiskBlobStore(fs.getPath(remoteWorkerOptions.casPath)) : new ConcurrentMapBlobStore(new ConcurrentHashMap<String, byte[]>()); RemoteWorker worker = new RemoteWorker( - remoteWorkerOptions, new SimpleBlobStoreActionCache(blobStore), sandboxPath); + fs, remoteWorkerOptions, new SimpleBlobStoreActionCache(blobStore), sandboxPath); final Server server = worker.startServer(); worker.createPidFile(); server.awaitTermination(); } - private static Path prepareSandboxRunner(RemoteWorkerOptions remoteWorkerOptions) { + private static Path prepareSandboxRunner(FileSystem fs, RemoteWorkerOptions remoteWorkerOptions) { if (OS.getCurrent() != OS.LINUX) { logger.severe("Sandboxing requested, but it is currently only available on Linux."); System.exit(1); @@ -212,7 +229,7 @@ public final class RemoteWorker { Path sandboxPath = null; try { - sandboxPath = getFileSystem().getPath(remoteWorkerOptions.workPath).getChild("linux-sandbox"); + sandboxPath = fs.getPath(remoteWorkerOptions.workPath).getChild("linux-sandbox"); try (FileOutputStream fos = new FileOutputStream(sandboxPath.getPathString())) { ByteStreams.copy(sandbox, fos); } |