diff options
author | ulfjack <ulfjack@google.com> | 2017-07-05 10:08:21 -0400 |
---|---|---|
committer | John Cater <jcater@google.com> | 2017-07-05 10:59:34 -0400 |
commit | 28f3b617f91f1a9ee9714b26f10d928ee4e1f970 (patch) | |
tree | 90c1de32fdecdbef279f2792e96d9fabf2d8190a /src/main/java/com/google/devtools/build | |
parent | e212c0514df05c5df8da097ff9ce443e0dde43e9 (diff) |
Change the SimpleBlobStore API to use Input/OutputStreams
This avoid having to load all the data into memory at once in most cases,
especially for large files.
PiperOrigin-RevId: 160954187
Diffstat (limited to 'src/main/java/com/google/devtools/build')
5 files changed, 80 insertions, 39 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java index 559d55d3fc..8c0fcdc93a 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java @@ -35,8 +35,10 @@ import com.google.devtools.remoteexecution.v1test.OutputDirectory; import com.google.devtools.remoteexecution.v1test.OutputFile; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.util.Collection; import java.util.concurrent.Semaphore; @@ -98,15 +100,12 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { throws IOException, InterruptedException { // This unconditionally reads the whole file into memory first! if (input instanceof VirtualActionInput) { - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - ((VirtualActionInput) input).writeTo(buffer); - byte[] blob = buffer.toByteArray(); + byte[] blob = ((VirtualActionInput) input).getBytes().toByteArray(); return uploadBlob(blob, Digests.computeDigest(blob)); } - return uploadBlob( - ByteString.readFrom(execRoot.getRelative(input.getExecPathString()).getInputStream()) - .toByteArray(), - Digests.getDigestFromInputCache(input, inputCache)); + try (InputStream in = execRoot.getRelative(input.getExecPathString()).getInputStream()) { + return uploadBlob(Digests.getDigestFromInputCache(input, inputCache), in); + } } @Override @@ -159,7 +158,8 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { Digest stdout = uploadFileContents(outErr.getOutputPath()); result.setStdoutDigest(stdout); } - blobStore.put(actionKey.getDigest().getHash(), result.build().toByteArray()); + blobStore.put( + actionKey.getDigest().getHash(), new ByteArrayInputStream(result.build().toByteArray())); } public void upload(ActionResult.Builder result, Path execRoot, Collection<Path> files) @@ -203,8 +203,11 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { private void downloadFileContents(Digest digest, Path dest, boolean executable) throws IOException, CacheNotFoundException, InterruptedException { - // This unconditionally downloads the whole file into memory first! - createFile(downloadBlob(digest), dest, executable); + FileSystemUtils.createDirectoryAndParents(dest.getParentDirectory()); + try (OutputStream out = dest.getOutputStream()) { + downloadBlob(digest, out); + } + dest.setExecutable(executable); } private void createFile(byte[] contents, Path dest, boolean executable) throws IOException { @@ -232,13 +235,29 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { checkBlobSize(blobSizeKBytes, "Upload"); uploadMemoryAvailable.acquire(blobSizeKBytes); try { - blobStore.put(digest.getHash(), blob); + return uploadBlob(digest, new ByteArrayInputStream(blob)); } finally { uploadMemoryAvailable.release(blobSizeKBytes); } + } + + private Digest uploadBlob(Digest digest, InputStream in) + throws IOException, InterruptedException { + blobStore.put(digest.getHash(), in); return digest; } + public void downloadBlob(Digest digest, OutputStream out) + throws IOException, CacheNotFoundException, InterruptedException { + if (digest.getSizeBytes() == 0) { + return; + } + boolean success = blobStore.get(digest.getHash(), out); + if (!success) { + throw new CacheNotFoundException(digest); + } + } + public byte[] downloadBlob(Digest digest) throws IOException, CacheNotFoundException, InterruptedException { if (digest.getSizeBytes() == 0) { @@ -246,11 +265,9 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { } // This unconditionally downloads the whole blob into memory! checkBlobSize(digest.getSizeBytes() / 1024, "Download"); - byte[] data = blobStore.get(digest.getHash()); - if (data == null) { - throw new CacheNotFoundException(digest); - } - return data; + ByteArrayOutputStream out = new ByteArrayOutputStream(); + downloadBlob(digest, out); + return out.toByteArray(); } public boolean containsKey(Digest digest) throws IOException, InterruptedException { @@ -260,20 +277,19 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { @Override public ActionResult getCachedActionResult(ActionKey actionKey) throws IOException, InterruptedException { - byte[] data = blobStore.get(actionKey.getDigest().getHash()); - if (data == null) { - return null; - } try { + byte[] data = downloadBlob(actionKey.getDigest()); return ActionResult.parseFrom(data); } catch (InvalidProtocolBufferException e) { return null; + } catch (CacheNotFoundException e) { + return null; } } public void setCachedActionResult(ActionKey actionKey, ActionResult result) throws IOException, InterruptedException { - blobStore.put(actionKey.getDigest().getHash(), result.toByteArray()); + blobStore.put(actionKey.getDigest().getHash(), new ByteArrayInputStream(result.toByteArray())); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java index 59b5a117b0..179385c3b7 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java +++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/ConcurrentMapBlobStore.java @@ -13,6 +13,10 @@ // limitations under the License. package com.google.devtools.build.lib.remote.blobstore; +import com.google.common.io.ByteStreams; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.concurrent.ConcurrentMap; /** A {@link SimpleBlobStore} implementation using a {@link ConcurrentMap}. */ @@ -29,12 +33,18 @@ public final class ConcurrentMapBlobStore implements SimpleBlobStore { } @Override - public byte[] get(String key) { - return map.get(key); + public boolean get(String key, OutputStream out) throws IOException { + byte[] data = map.get(key); + if (data == null) { + return false; + } + out.write(data); + return true; } @Override - public void put(String key, byte[] value) { + public void put(String key, InputStream in) throws IOException { + byte[] value = ByteStreams.toByteArray(in); map.put(key, value); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java index 8f2ee0790e..8e780556b5 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java +++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/OnDiskBlobStore.java @@ -13,9 +13,10 @@ // limitations under the License. package com.google.devtools.build.lib.remote.blobstore; -import com.google.devtools.build.lib.vfs.FileSystemUtils; +import com.google.common.io.ByteStreams; import com.google.devtools.build.lib.vfs.Path; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.util.UUID; @@ -33,17 +34,26 @@ public final class OnDiskBlobStore implements SimpleBlobStore { } @Override - public byte[] get(String key) throws IOException { + public boolean get(String key, OutputStream out) throws IOException { Path f = toPath(key); - return f.exists() ? FileSystemUtils.readContent(f) : null; + if (!f.exists()) { + return false; + } + try (InputStream in = f.getInputStream()) { + ByteStreams.copy(in, out); + } + return true; } @Override - public void put(String key, byte[] value) throws IOException { + public void put(String key, InputStream in) throws IOException { + // Write a temporary file first, and then rename, to avoid data corruption in case of a crash. Path temp = toPath(UUID.randomUUID().toString()); try (OutputStream out = temp.getOutputStream()) { - out.write(value); + ByteStreams.copy(in, out); } + // TODO(ulfjack): Fsync temp here before we rename it to avoid data loss in the case of machine + // crashes (the OS may reorder the writes and the rename). Path f = toPath(key); temp.renameTo(f); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java index 898bd84c8a..60ff77b7bd 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java +++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java @@ -13,8 +13,10 @@ // limitations under the License. package com.google.devtools.build.lib.remote.blobstore; -import java.io.ByteArrayOutputStream; +import com.google.common.io.ByteStreams; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import org.apache.http.HttpStatus; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; @@ -79,7 +81,7 @@ public final class RestBlobStore implements SimpleBlobStore { } @Override - public byte[] get(String key) throws IOException { + public boolean get(String key, OutputStream out) throws IOException { HttpClient client = clientFactory.build(); HttpGet get = new HttpGet(baseUrl + "/" + key); return client.execute( @@ -88,22 +90,23 @@ public final class RestBlobStore implements SimpleBlobStore { int statusCode = response.getStatusLine().getStatusCode(); if (HttpStatus.SC_NOT_FOUND == statusCode || HttpStatus.SC_NO_CONTENT == statusCode) { - return null; + return false; } if (HttpStatus.SC_OK != statusCode) { throw new IOException("GET failed with status code " + statusCode); } - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - response.getEntity().writeTo(buffer); - return buffer.toByteArray(); + response.getEntity().writeTo(out); + return true; }); } @Override - public void put(String key, byte[] value) throws IOException { + public void put(String key, InputStream in) throws IOException { HttpClient client = clientFactory.build(); HttpPut put = new HttpPut(baseUrl + "/" + key); - put.setEntity(new ByteArrayEntity(value)); + // For now, upload a byte array instead of a stream, due to Hazelcast crashing on the stream. + // See https://github.com/hazelcast/hazelcast/issues/10878. + put.setEntity(new ByteArrayEntity(ByteStreams.toByteArray(in))); put.setHeader("Content-Type", "application/octet-stream"); client.execute( put, diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java index f3088b9d60..5f79c4590f 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java +++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/SimpleBlobStore.java @@ -15,6 +15,8 @@ package com.google.devtools.build.lib.remote.blobstore; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; /** * A simple interface for storing blobs (in the form of byte arrays) each one indexed by a @@ -28,13 +30,13 @@ public interface SimpleBlobStore { * Returns the blob (in the form of a byte array) indexed by {@code key}. Returns null if the * {@code key} cannot be found. */ - byte[] get(String key) throws IOException, InterruptedException; + boolean get(String key, OutputStream out) throws IOException, InterruptedException; /** * Uploads a blob (as {@code value}) indexed by {@code key} to the blob store. Existing blob * indexed by the same {@code key} will be overwritten. */ - void put(String key, byte[] value) throws IOException, InterruptedException; + void put(String key, InputStream in) throws IOException, InterruptedException; /** Close resources associated with the blob store. */ void close(); |