aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/tools
diff options
context:
space:
mode:
authorGravatar ulfjack <ulfjack@google.com>2017-07-05 13:34:40 -0400
committerGravatar John Cater <jcater@google.com>2017-07-06 07:12:56 -0400
commit75aec49692b1ccddc8d2341d7e722682851d5fda (patch)
tree324695ca780abbc5cf68b5f39153ffaa909cccb0 /src/tools
parent321cecdb656c0b1a7f2c0823c96500c61fdbbcfe (diff)
Rewrite blob upload to use temporary files
Previously, it was allocating in-memory buffers for upload, which caused it to run out of memory on large file uploads (or on many small uploads running simultaneously). Unfortunately, we don't create the temporary file in the right location (due to separation of the BlobStore from the ByteStreamServer), so we copy the file again to write it to the OnDiskBlobStore, which isn't ideal. There'll need to be another BlobStore API change to make that work, when the InMemoryBlobStore is gone. PiperOrigin-RevId: 160974550
Diffstat (limited to 'src/tools')
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java70
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java35
2 files changed, 87 insertions, 18 deletions
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
index 87ca590ca0..cf3ccd0d80 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
@@ -14,6 +14,7 @@
package com.google.devtools.build.remote;
+import static java.util.logging.Level.SEVERE;
import static java.util.logging.Level.WARNING;
import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
@@ -25,12 +26,18 @@ import com.google.devtools.build.lib.remote.CacheNotFoundException;
import com.google.devtools.build.lib.remote.Chunker;
import com.google.devtools.build.lib.remote.Digests;
import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
+import com.google.devtools.build.lib.vfs.FileSystemUtils;
+import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.remoteexecution.v1test.Digest;
import com.google.protobuf.ByteString;
import com.google.rpc.Code;
import com.google.rpc.Status;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.UUID;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -38,6 +45,7 @@ import javax.annotation.Nullable;
final class ByteStreamServer extends ByteStreamImplBase {
private static final Logger logger = Logger.getLogger(ByteStreamServer.class.getName());
private final SimpleBlobStoreActionCache cache;
+ private final Path workPath;
static @Nullable Digest parseDigestFromResourceName(String resourceName) {
try {
@@ -53,8 +61,9 @@ final class ByteStreamServer extends ByteStreamImplBase {
}
}
- public ByteStreamServer(SimpleBlobStoreActionCache cache) {
+ public ByteStreamServer(SimpleBlobStoreActionCache cache, Path workPath) {
this.cache = cache;
+ this.workPath = workPath;
}
@Override
@@ -87,12 +96,22 @@ final class ByteStreamServer extends ByteStreamImplBase {
@Override
public StreamObserver<WriteRequest> write(final StreamObserver<WriteResponse> responseObserver) {
+ Path temp = workPath.getRelative("upload").getRelative(UUID.randomUUID().toString());
+ try {
+ FileSystemUtils.createDirectoryAndParents(temp.getParentDirectory());
+ temp.getOutputStream().close();
+ } catch (IOException e) {
+ logger.log(SEVERE, "Failed to create temporary file for upload", e);
+ responseObserver.onError(StatusUtils.internalError(e));
+ // We need to make sure that subsequent onNext or onCompleted calls don't make any further
+ // calls on the responseObserver after the onError above, so we return a no-op observer.
+ return new NoOpStreamObserver<>();
+ }
return new StreamObserver<WriteRequest>() {
- byte[] blob;
- Digest digest;
- long offset;
- String resourceName;
- boolean closed;
+ private Digest digest;
+ private long offset;
+ private String resourceName;
+ private boolean closed;
@Override
public void onNext(WriteRequest request) {
@@ -103,7 +122,6 @@ final class ByteStreamServer extends ByteStreamImplBase {
if (digest == null) {
resourceName = request.getResourceName();
digest = parseDigestFromResourceName(resourceName);
- blob = new byte[(int) digest.getSizeBytes()];
}
if (digest == null) {
@@ -137,7 +155,13 @@ final class ByteStreamServer extends ByteStreamImplBase {
long size = request.getData().size();
if (size > 0) {
- request.getData().copyTo(blob, (int) offset);
+ try (OutputStream out = temp.getOutputStream(true)) {
+ request.getData().writeTo(out);
+ } catch (IOException e) {
+ responseObserver.onError(StatusUtils.internalError(e));
+ closed = true;
+ return;
+ }
offset += size;
}
@@ -149,6 +173,7 @@ final class ByteStreamServer extends ByteStreamImplBase {
"finish_write",
"Expected:" + shouldFinishWrite + ", received: " + request.getFinishWrite()));
closed = true;
+ return;
}
}
@@ -156,6 +181,11 @@ final class ByteStreamServer extends ByteStreamImplBase {
public void onError(Throwable t) {
logger.log(WARNING, "Write request failed remotely.", t);
closed = true;
+ try {
+ temp.delete();
+ } catch (IOException e) {
+ logger.log(WARNING, "Could not delete temp file.", e);
+ }
}
@Override
@@ -176,7 +206,15 @@ final class ByteStreamServer extends ByteStreamImplBase {
}
try {
- Digest d = cache.uploadBlob(blob);
+ Digest d = Digests.computeDigest(temp);
+ try (InputStream in = temp.getInputStream()) {
+ cache.uploadBlob(d, in);
+ }
+ try {
+ temp.delete();
+ } catch (IOException e) {
+ logger.log(WARNING, "Could not delete temp file.", e);
+ }
if (!d.equals(digest)) {
responseObserver.onError(
@@ -197,4 +235,18 @@ final class ByteStreamServer extends ByteStreamImplBase {
}
};
}
+
+ private static class NoOpStreamObserver<T> implements StreamObserver<T> {
+ @Override
+ public void onNext(T value) {
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ }
}
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
index 49e63d7915..f245ae61a8 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
@@ -77,16 +77,32 @@ public final class RemoteWorker {
}
public RemoteWorker(
- RemoteWorkerOptions workerOptions, SimpleBlobStoreActionCache cache, Path sandboxPath)
+ FileSystem fs, RemoteWorkerOptions workerOptions, SimpleBlobStoreActionCache cache,
+ Path sandboxPath)
throws IOException {
this.workerOptions = workerOptions;
this.actionCacheServer = new ActionCacheServer(cache);
- this.bsServer = new ByteStreamServer(cache);
+ Path workPath;
+ if (workerOptions.workPath != null) {
+ workPath = fs.getPath(workerOptions.workPath);
+ } else {
+ // TODO(ulfjack): The plan is to make the on-disk storage the default, so we always need to
+ // provide a path to the remote worker, and we can then also use that as the work path. E.g.:
+ // /given/path/cas/
+ // /given/path/upload/
+ // /given/path/work/
+ // We could technically use a different path for temporary files and execution, but we want
+ // the cas/ directory to be on the same file system as the upload/ and work/ directories so
+ // that we can atomically move files between them, and / or use hard-links for the exec
+ // directories.
+ // For now, we use a temporary path if no work path was provided.
+ workPath = fs.getPath("/tmp/remote-worker");
+ }
+ this.bsServer = new ByteStreamServer(cache, workPath);
this.casServer = new CasServer(cache);
if (workerOptions.workPath != null) {
ConcurrentHashMap<String, ExecuteRequest> operationsCache = new ConcurrentHashMap<>();
- Path workPath = getFileSystem().getPath(workerOptions.workPath);
FileSystemUtils.createDirectoryAndParents(workPath);
watchServer = new WatcherServer(workPath, cache, workerOptions, operationsCache, sandboxPath);
execServer = new ExecutionServer(operationsCache);
@@ -157,9 +173,10 @@ public final class RemoteWorker {
rootLog.getHandlers()[0].setLevel(FINE);
}
+ FileSystem fs = getFileSystem();
Path sandboxPath = null;
if (remoteWorkerOptions.sandboxing) {
- sandboxPath = prepareSandboxRunner(remoteWorkerOptions);
+ sandboxPath = prepareSandboxRunner(fs, remoteWorkerOptions);
}
logger.info("Initializing in-memory cache server.");
@@ -169,7 +186,7 @@ public final class RemoteWorker {
}
if ((remoteWorkerOptions.casPath != null)
&& (!PathFragment.create(remoteWorkerOptions.casPath).isAbsolute()
- || !getFileSystem().getPath(remoteWorkerOptions.casPath).exists())) {
+ || !fs.getPath(remoteWorkerOptions.casPath).exists())) {
logger.severe("--cas_path must refer to an existing, absolute path!");
System.exit(1);
return;
@@ -179,19 +196,19 @@ public final class RemoteWorker {
usingRemoteCache
? SimpleBlobStoreFactory.create(remoteOptions)
: remoteWorkerOptions.casPath != null
- ? new OnDiskBlobStore(getFileSystem().getPath(remoteWorkerOptions.casPath))
+ ? new OnDiskBlobStore(fs.getPath(remoteWorkerOptions.casPath))
: new ConcurrentMapBlobStore(new ConcurrentHashMap<String, byte[]>());
RemoteWorker worker =
new RemoteWorker(
- remoteWorkerOptions, new SimpleBlobStoreActionCache(blobStore), sandboxPath);
+ fs, remoteWorkerOptions, new SimpleBlobStoreActionCache(blobStore), sandboxPath);
final Server server = worker.startServer();
worker.createPidFile();
server.awaitTermination();
}
- private static Path prepareSandboxRunner(RemoteWorkerOptions remoteWorkerOptions) {
+ private static Path prepareSandboxRunner(FileSystem fs, RemoteWorkerOptions remoteWorkerOptions) {
if (OS.getCurrent() != OS.LINUX) {
logger.severe("Sandboxing requested, but it is currently only available on Linux.");
System.exit(1);
@@ -212,7 +229,7 @@ public final class RemoteWorker {
Path sandboxPath = null;
try {
- sandboxPath = getFileSystem().getPath(remoteWorkerOptions.workPath).getChild("linux-sandbox");
+ sandboxPath = fs.getPath(remoteWorkerOptions.workPath).getChild("linux-sandbox");
try (FileOutputStream fos = new FileOutputStream(sandboxPath.getPathString())) {
ByteStreams.copy(sandbox, fos);
}