diff options
author | buchgr <buchgr@google.com> | 2018-01-26 10:42:13 -0800 |
---|---|---|
committer | Copybara-Service <copybara-piper@google.com> | 2018-01-26 10:44:26 -0800 |
commit | 55f89dee5e2e8dd6f72c823b62a5fcd61446939f (patch) | |
tree | 2ea17881de81d46ea36cfba00d39f17c7686b5bc | |
parent | 69f483977c6565f28ed8d507c66feca84dedebad (diff) |
remote: Rewrite the HTTP caching client in Netty. Fixes #4481
* This puts in the foundation of HTTP/2 support for remote caching.
* Allows us to remove the Apache HTTP library as a dependency, reducing
the Bazel binary size by 1MiB.
On fast networks (i.e. GCE to GCS) we can see a >2x speed improvement for TLS
throughput. Even from my workstation to GCS I get significant build time
improvements when using Netty's TLS 18s vs 12s.
Closes #4481.
PiperOrigin-RevId: 183411787
18 files changed, 1066 insertions, 188 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/BUILD b/src/main/java/com/google/devtools/build/lib/BUILD index 745623bd4f..379b4689e4 100644 --- a/src/main/java/com/google/devtools/build/lib/BUILD +++ b/src/main/java/com/google/devtools/build/lib/BUILD @@ -38,6 +38,8 @@ filegroup( "//src/main/java/com/google/devtools/build/lib/profiler/memory:srcs", "//src/main/java/com/google/devtools/build/lib/query2:srcs", "//src/main/java/com/google/devtools/build/lib/remote:srcs", + "//src/main/java/com/google/devtools/build/lib/remote/blobstore:srcs", + "//src/main/java/com/google/devtools/build/lib/remote/blobstore/http:srcs", "//src/main/java/com/google/devtools/build/lib/rules/apple/cpp:srcs", "//src/main/java/com/google/devtools/build/lib/rules/apple:srcs", "//src/main/java/com/google/devtools/build/lib/rules/apple/swift:srcs", diff --git a/src/main/java/com/google/devtools/build/lib/authandtls/BUILD b/src/main/java/com/google/devtools/build/lib/authandtls/BUILD index 2200940bcb..c4f36b5398 100644 --- a/src/main/java/com/google/devtools/build/lib/authandtls/BUILD +++ b/src/main/java/com/google/devtools/build/lib/authandtls/BUILD @@ -11,8 +11,6 @@ java_library( srcs = glob(["*.java"]), deps = [ "//src/main/java/com/google/devtools/common/options", - "//third_party:apache_httpclient", - "//third_party:apache_httpcore", "//third_party:auth", "//third_party:guava", "//third_party:jsr305", diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD index ae2a68dc75..050a555698 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD @@ -8,7 +8,7 @@ filegroup( java_library( name = "remote", - srcs = glob(["**/*.java"]), + srcs = glob(["*.java"]), tags = ["bazel"], runtime_deps = [ # This is required for client TLS. @@ -18,26 +18,23 @@ java_library( "//src/main/java/com/google/devtools/build/lib:build-base", "//src/main/java/com/google/devtools/build/lib:events", "//src/main/java/com/google/devtools/build/lib:io", - "//src/main/java/com/google/devtools/build/lib:packages-internal", "//src/main/java/com/google/devtools/build/lib:runtime", "//src/main/java/com/google/devtools/build/lib:util", "//src/main/java/com/google/devtools/build/lib/actions", "//src/main/java/com/google/devtools/build/lib/analysis/platform", "//src/main/java/com/google/devtools/build/lib/authandtls", "//src/main/java/com/google/devtools/build/lib/buildeventstream", + "//src/main/java/com/google/devtools/build/lib/cmdline", "//src/main/java/com/google/devtools/build/lib/concurrent", "//src/main/java/com/google/devtools/build/lib/exec/apple", "//src/main/java/com/google/devtools/build/lib/exec/local", "//src/main/java/com/google/devtools/build/lib/exec/local:options", + "//src/main/java/com/google/devtools/build/lib/remote/blobstore", + "//src/main/java/com/google/devtools/build/lib/remote/blobstore/http", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/common/options", - "//third_party:apache_httpclient", - "//third_party:apache_httpcore", - "//third_party:api_client", "//third_party:auth", "//third_party:guava", - "//third_party:jsr305", - "//third_party:netty", "//third_party/grpc:grpc-jar", "//third_party/protobuf:protobuf_java", "//third_party/protobuf:protobuf_java_util", diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java index ea7b81003e..70a323ffeb 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java +++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java @@ -18,10 +18,11 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.auth.Credentials; import com.google.devtools.build.lib.remote.blobstore.OnDiskBlobStore; -import com.google.devtools.build.lib.remote.blobstore.RestBlobStore; import com.google.devtools.build.lib.remote.blobstore.SimpleBlobStore; +import com.google.devtools.build.lib.remote.blobstore.http.HttpBlobStore; import com.google.devtools.build.lib.vfs.Path; import java.io.IOException; +import java.net.URI; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -35,8 +36,14 @@ public final class SimpleBlobStoreFactory { public static SimpleBlobStore createRest(RemoteOptions options, Credentials creds) throws IOException { - return new RestBlobStore( - options.remoteHttpCache, (int) TimeUnit.SECONDS.toMillis(options.remoteTimeout), creds); + try { + return new HttpBlobStore( + URI.create(options.remoteHttpCache), + (int) TimeUnit.SECONDS.toMillis(options.remoteTimeout), + creds); + } catch (Exception e) { + throw new RuntimeException(e); + } } public static SimpleBlobStore createLocalDisk(RemoteOptions options, Path workingDirectory) diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/BUILD b/src/main/java/com/google/devtools/build/lib/remote/blobstore/BUILD new file mode 100644 index 0000000000..247315bc08 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/BUILD @@ -0,0 +1,18 @@ +package(default_visibility = ["//src:__subpackages__"]) + +filegroup( + name = "srcs", + srcs = glob(["**"]), + visibility = ["//src/main/java/com/google/devtools/build/lib:__pkg__"], +) + +java_library( + name = "blobstore", + srcs = glob(["*.java"]), + tags = ["bazel"], + deps = [ + "//src/main/java/com/google/devtools/build/lib/vfs", + "//src/main/java/com/google/devtools/common/options", + "//third_party:guava", + ], +) diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java deleted file mode 100644 index 76d1773623..0000000000 --- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/RestBlobStore.java +++ /dev/null @@ -1,175 +0,0 @@ -// Copyright 2017 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package com.google.devtools.build.lib.remote.blobstore; - -import com.google.api.client.http.ByteArrayContent; -import com.google.api.client.http.GenericUrl; -import com.google.api.client.http.HttpContent; -import com.google.api.client.http.HttpRequestFactory; -import com.google.api.client.http.HttpResponse; -import com.google.api.client.http.InputStreamContent; -import com.google.api.client.http.apache.ApacheHttpTransport; -import com.google.auth.Credentials; -import com.google.auth.http.HttpCredentialsAdapter; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.URI; -import java.net.URISyntaxException; -import javax.annotation.Nullable; -import org.apache.http.HttpStatus; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; - -/** - * Implementation of {@link SimpleBlobStore} with a REST service. The REST service needs to support - * the following HTTP methods. - * - * <p>PUT /{ac,cas}/1234 HTTP/1.1 PUT method is used to upload a blob with a base16 key. In this - * example the key is 1234. Valid status codes are 200, 201, 202 and 204. - * - * <p>GET /{ac,cas}/1234 HTTP/1.1 GET method fetches a blob with the specified key. In this example - * the key is 1234. A status code of 200 should be followed by the content of blob. Status code of - * 404 or 204 means the key cannot be found. - */ -public final class RestBlobStore implements SimpleBlobStore { - - private static final String ACTION_CACHE_PREFIX = "ac"; - private static final String CAS_PREFIX = "cas"; - - private final String baseUrl; - private final HttpClientBuilder clientFactory; - private final ApacheHttpTransport transport; - private final HttpRequestFactory requestFactory; - - /** - * Creates a new instance. - * - * @param baseUrl base URL for the remote cache - * @param poolSize maximum number of simultaneous connections - */ - public RestBlobStore(String baseUrl, int timeoutMillis, @Nullable Credentials creds) - throws IOException { - validateUrl(baseUrl); - this.baseUrl = baseUrl; - PoolingHttpClientConnectionManager connMan = new PoolingHttpClientConnectionManager(); - // We'll use as many connections as necessary. The connection pool tries to re-use open - // connections before creating new ones, so in practice we should have as many connections - // as concurrent actions. - connMan.setDefaultMaxPerRoute(Integer.MAX_VALUE); - connMan.setMaxTotal(Integer.MAX_VALUE); - clientFactory = HttpClientBuilder.create(); - clientFactory.setConnectionManager(connMan); - clientFactory.setConnectionManagerShared(true); - clientFactory.setDefaultRequestConfig(RequestConfig.custom() - // Timeout to establish a connection. - .setConnectTimeout(timeoutMillis) - // Timeout between reading data. - .setSocketTimeout(timeoutMillis) - .build()); - transport = new ApacheHttpTransport(clientFactory.build()); - if (creds != null) { - requestFactory = transport.createRequestFactory(new HttpCredentialsAdapter(creds)); - } else { - requestFactory = transport.createRequestFactory(); - } - } - - @Override - public void close() { - transport.shutdown(); - } - - @Override - public boolean containsKey(String key) throws IOException { - throw new UnsupportedOperationException("HTTP Caching does not use this method."); - } - - @Override - public boolean get(String key, OutputStream out) throws IOException { - return get(CAS_PREFIX, key, out); - } - - @Override - public boolean getActionResult(String key, OutputStream out) - throws IOException, InterruptedException { - return get(ACTION_CACHE_PREFIX, key, out); - } - - private boolean get(String urlPrefix, String key, OutputStream out) throws IOException { - HttpResponse response = null; - try { - response = - requestFactory - .buildGetRequest(new GenericUrl(baseUrl + "/" + urlPrefix + "/" + key)) - .setThrowExceptionOnExecuteError(false) - .execute(); - int statusCode = response.getStatusCode(); - if (HttpStatus.SC_NOT_FOUND == statusCode || HttpStatus.SC_NO_CONTENT == statusCode) { - return false; - } - if (HttpStatus.SC_OK != statusCode) { - throw new IOException("GET failed with status code " + statusCode); - } - response.download(out); - return true; - } finally { - if (response != null) { - response.disconnect(); - } - } - } - - @Override - public void put(String key, long length, InputStream in) throws IOException { - put(CAS_PREFIX, key, new InputStreamContent("application/octext-stream", in)); - } - - @Override - public void putActionResult(String key, byte[] in) throws IOException, InterruptedException { - put(ACTION_CACHE_PREFIX, key, new ByteArrayContent("application/octet-stream", in)); - } - - private void put(String urlPrefix, String key, HttpContent content) throws IOException { - HttpResponse response = null; - try { - response = - requestFactory - .buildPutRequest(new GenericUrl(baseUrl + "/" + urlPrefix + "/" + key), content) - .setThrowExceptionOnExecuteError(false) - .execute(); - int statusCode = response.getStatusCode(); - // Accept more than SC_OK to be compatible with Nginx WebDav module. - if (HttpStatus.SC_OK != statusCode - && HttpStatus.SC_ACCEPTED != statusCode - && HttpStatus.SC_CREATED != statusCode - && HttpStatus.SC_NO_CONTENT != statusCode) { - throw new IOException("PUT failed with status code " + statusCode); - } - } finally { - if (response != null) { - response.disconnect(); - } - } - } - - private void validateUrl(String url) throws IOException { - try { - new URI(url); - } catch (URISyntaxException e) { - throw new IOException("Failed to parse remote REST cache URL: " + baseUrl, e); - } - } -} diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/AbstractHttpHandler.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/AbstractHttpHandler.java new file mode 100644 index 0000000000..5b7df8a106 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/AbstractHttpHandler.java @@ -0,0 +1,139 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.blobstore.http; + +import com.google.auth.Credentials; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandler; +import io.netty.channel.ChannelPromise; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.HttpObject; +import io.netty.handler.codec.http.HttpRequest; +import java.io.IOException; +import java.net.SocketAddress; +import java.net.URI; +import java.util.List; +import java.util.Map; + +/** Common functionality shared by concrete classes. */ +abstract class AbstractHttpHandler<T extends HttpObject> extends SimpleChannelInboundHandler<T> + implements ChannelOutboundHandler { + + private final Credentials credentials; + + public AbstractHttpHandler(Credentials credentials) { + this.credentials = credentials; + } + + protected ChannelPromise userPromise; + + @SuppressWarnings("FutureReturnValueIgnored") + protected void failAndResetUserPromise(Throwable t) { + if (userPromise != null && !userPromise.isDone()) { + userPromise.setFailure(t); + } + userPromise = null; + } + + @SuppressWarnings("FutureReturnValueIgnored") + protected void succeedAndResetUserPromise() { + userPromise.setSuccess(); + userPromise = null; + } + + protected void addCredentialHeaders(HttpRequest request, URI uri) throws IOException { + if (credentials == null || !credentials.hasRequestMetadata()) { + return; + } + Map<String, List<String>> authHeaders = credentials.getRequestMetadata(uri); + if (authHeaders == null || authHeaders.isEmpty()) { + return; + } + for (Map.Entry<String, List<String>> entry : authHeaders.entrySet()) { + String name = entry.getKey(); + for (String value : entry.getValue()) { + request.headers().add(name, value); + } + } + } + + protected String constructPath(URI uri, String hash, boolean isCas) { + StringBuilder builder = new StringBuilder(); + builder.append(uri.getPath()); + if (!uri.getPath().endsWith("/")) { + builder.append("/"); + } + builder.append(isCas ? "cas/" : "ac/"); + builder.append(hash); + return builder.toString(); + } + + protected String constructHost(URI uri) { + return uri.getHost() + ":" + uri.getPort(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) + throws Exception { + failAndResetUserPromise(throwable); + } + + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) + throws Exception { + ctx.bind(localAddress, promise); + } + + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public void connect( + ChannelHandlerContext ctx, + SocketAddress remoteAddress, + SocketAddress localAddress, + ChannelPromise promise) + throws Exception { + ctx.connect(remoteAddress, localAddress, promise); + } + + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + ctx.disconnect(promise); + } + + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + ctx.close(promise); + } + + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + ctx.deregister(promise); + } + + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public void read(ChannelHandlerContext ctx) throws Exception { + ctx.read(); + } + + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/BUILD b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/BUILD new file mode 100644 index 0000000000..e532922182 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/BUILD @@ -0,0 +1,24 @@ +package(default_visibility = ["//src:__subpackages__"]) + +filegroup( + name = "srcs", + srcs = glob(["**"]), + visibility = ["//src/main/java/com/google/devtools/build/lib:__pkg__"], +) + +java_library( + name = "http", + srcs = glob(["*.java"]), + tags = ["bazel"], + runtime_deps = [ + # This is required for client TLS. + "//third_party:netty_tcnative", + ], + deps = [ + "//src/main/java/com/google/devtools/build/lib/remote/blobstore", + "//src/main/java/com/google/devtools/common/options", + "//third_party:auth", + "//third_party:guava", + "//third_party:netty", + ], +) diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/DownloadCommand.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/DownloadCommand.java new file mode 100644 index 0000000000..83e3ddcdab --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/DownloadCommand.java @@ -0,0 +1,50 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.blobstore.http; + +import com.google.common.base.Preconditions; +import java.io.OutputStream; +import java.net.URI; + +/** Object sent through the channel pipeline to start a download. */ +final class DownloadCommand { + + private final URI uri; + private final boolean casDownload; + private final String hash; + private final OutputStream out; + + protected DownloadCommand(URI uri, boolean casDownload, String hash, OutputStream out) { + this.uri = Preconditions.checkNotNull(uri); + this.casDownload = casDownload; + this.hash = Preconditions.checkNotNull(hash); + this.out = Preconditions.checkNotNull(out); + } + + public URI uri() { + return uri; + } + + public boolean casDownload() { + return casDownload; + } + + public String hash() { + return hash; + } + + public OutputStream out() { + return out; + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java new file mode 100644 index 0000000000..b1ddb0f4fb --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java @@ -0,0 +1,252 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.blobstore.http; + +import com.google.auth.Credentials; +import com.google.devtools.build.lib.remote.blobstore.SimpleBlobStore; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.pool.ChannelPoolHandler; +import io.netty.channel.pool.SimpleChannelPool; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpRequestEncoder; +import io.netty.handler.codec.http.HttpResponseDecoder; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.ssl.OpenSsl; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.ssl.SslProvider; +import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.util.internal.PlatformDependent; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.util.concurrent.ExecutionException; +import javax.annotation.Nullable; +import javax.net.ssl.SSLEngine; + +/** + * Implementation of {@link SimpleBlobStore} that can talk to a HTTP/1.1 backend. + * + * <p>Blobs (Binary large objects) are uploaded using the {@code PUT} method. Action cache blobs are + * stored under the path {@code /ac/base16-key}. CAS (Content Addressable Storage) blobs are stored + * under the path {@code /cas/base16-key}. Valid status codes for a successful upload are 200 (OK), + * 201 (CREATED), 202 (ACCEPTED) and 204 (NO CONTENT). It's recommended to return 200 (OK) on + * success. The other status codes are supported to be compatibility with the nginx webdav module + * and may be removed in the future. + * + * <p>Blobs are downloaded using the {@code GET} method at the paths they were stored at. A status + * code of 200 should be followed by the content of the blob. The status codes 404 (NOT FOUND) and + * 204 (NO CONTENT) indicate that no cache entry exists. It's recommended to return 404 (NOT FOUND) + * as the 204 (NO CONTENT) status code is only supported for compatibility with the nginx webdav + * module. + * + * <p>TLS is supported and enabled automatically when using HTTPS as the URI scheme. + * + * <p>Uploads do not use {@code Expect: 100-CONTINUE} headers, as this would incur an additional + * roundtrip for every upload and with little practical value as we would expect most uploads to be + * accepted. + * + * <p>The implementation currently does not support transfer encoding chunked. + */ +public final class HttpBlobStore implements SimpleBlobStore { + + private final NioEventLoopGroup eventLoop = new NioEventLoopGroup(2 /* number of threads */); + private final SimpleChannelPool downloadChannels; + private final SimpleChannelPool uploadChannels; + private final URI uri; + + public HttpBlobStore(URI uri, int timeoutMillis, @Nullable final Credentials creds) + throws Exception { + boolean useTls = uri.getScheme().equals("https"); + if (uri.getPort() == -1) { + int port = useTls ? 443 : 80; + uri = + new URI( + uri.getScheme(), + uri.getUserInfo(), + uri.getHost(), + port, + uri.getPath(), + uri.getQuery(), + uri.getFragment()); + } + this.uri = uri; + + final SslContext sslCtx; + if (useTls) { + // OpenSsl gives us a > 2x speed improvement on fast networks, but requires netty tcnative + // to be there which is not available on all platforms and environments. + SslProvider sslProvider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK; + sslCtx = SslContextBuilder.forClient().sslProvider(sslProvider).build(); + } else { + sslCtx = null; + } + Bootstrap clientBootstrap = + new Bootstrap() + .channel(NioSocketChannel.class) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis) + .option(ChannelOption.SO_TIMEOUT, timeoutMillis) + .group(eventLoop) + .remoteAddress(uri.getHost(), uri.getPort()); + downloadChannels = + new SimpleChannelPool( + clientBootstrap, + new ChannelPoolHandler() { + @Override + public void channelReleased(Channel ch) throws Exception {} + + @Override + public void channelAcquired(Channel ch) throws Exception {} + + @Override + public void channelCreated(Channel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + if (sslCtx != null) { + SSLEngine engine = sslCtx.newEngine(ch.alloc()); + engine.setUseClientMode(true); + p.addFirst(new SslHandler(engine)); + } + p.addLast(new HttpClientCodec()); + p.addLast(new HttpDownloadHandler(creds)); + } + }); + uploadChannels = + new SimpleChannelPool( + clientBootstrap, + new ChannelPoolHandler() { + @Override + public void channelReleased(Channel ch) throws Exception {} + + @Override + public void channelAcquired(Channel ch) throws Exception {} + + @Override + public void channelCreated(Channel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + if (sslCtx != null) { + SSLEngine engine = sslCtx.newEngine(ch.alloc()); + engine.setUseClientMode(true); + p.addFirst(new SslHandler(engine)); + } + p.addLast(new HttpResponseDecoder()); + // The 10KiB limit was chosen at random. We only expect HTTP servers to respond with + // an error message in the body and that should always be less than 10KiB. + p.addLast(new HttpObjectAggregator(10 * 1024)); + p.addLast(new HttpRequestEncoder()); + p.addLast(new ChunkedWriteHandler()); + p.addLast(new HttpUploadHandler(creds)); + } + }); + } + + @Override + public boolean containsKey(String key) throws IOException, InterruptedException { + throw new UnsupportedOperationException("HTTP Caching does not use this method."); + } + + @Override + public boolean get(String key, OutputStream out) throws IOException, InterruptedException { + return get(key, out, true); + } + + @SuppressWarnings("FutureReturnValueIgnored") + private boolean get(String key, OutputStream out, boolean casDownload) + throws IOException, InterruptedException { + final Channel ch; + try { + ch = downloadChannels.acquire().get(); + } catch (ExecutionException e) { + PlatformDependent.throwException(e.getCause()); + return false; + } + DownloadCommand download = new DownloadCommand(uri, casDownload, key, out); + try { + ChannelFuture downloadFuture = ch.writeAndFlush(download); + downloadFuture.sync(); + } catch (Exception e) { + // e can be of type HttpException, because Netty uses Unsafe.throwException to re-throw a + // checked exception that hasn't been declared in the method signature. + if (e instanceof HttpException) { + HttpResponseStatus status = ((HttpException) e).status(); + if (status.equals(HttpResponseStatus.NOT_FOUND) + || status.equals(HttpResponseStatus.NO_CONTENT)) { + // Cache miss. Supporting NO_CONTENT for nginx webdav compatibility. + return false; + } + } + throw e; + } finally { + downloadChannels.release(ch); + } + return true; + } + + @Override + public boolean getActionResult(String actionKey, OutputStream out) + throws IOException, InterruptedException { + return get(actionKey, out, false); + } + + @Override + public void put(String key, long length, InputStream in) + throws IOException, InterruptedException { + put(key, length, in, true); + } + + @SuppressWarnings("FutureReturnValueIgnored") + private void put(String key, long length, InputStream in, boolean casUpload) + throws IOException, InterruptedException { + final Channel ch; + try { + ch = uploadChannels.acquire().get(); + } catch (ExecutionException e) { + throw new IOException("Failed to obtain a channel from the pool.", e); + } + UploadCommand upload = new UploadCommand(uri, casUpload, key, in, length); + try { + ChannelFuture uploadFuture = ch.writeAndFlush(upload); + uploadFuture.sync(); + } finally { + uploadChannels.release(ch); + } + } + + @Override + public void putActionResult(String actionKey, byte[] in) + throws IOException, InterruptedException { + put(actionKey, in.length, new ByteArrayInputStream(in), false); + } + + /** + * It's safe to suppress this warning because all methods on Netty + * futures return {@code this}. So we are not ignoring anything. + */ + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public void close() { + downloadChannels.close(); + uploadChannels.close(); + eventLoop.shutdownGracefully(); + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpDownloadHandler.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpDownloadHandler.java new file mode 100644 index 0000000000..502c0ecec6 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpDownloadHandler.java @@ -0,0 +1,137 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.blobstore.http; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.auth.Credentials; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpObject; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.util.internal.StringUtil; +import java.io.IOException; +import java.io.OutputStream; + +/** ChannelHandler for downloads. */ +final class HttpDownloadHandler extends AbstractHttpHandler<HttpObject> { + + private OutputStream out; + private boolean keepAlive = HttpVersion.HTTP_1_1.isKeepAliveDefault(); + + public HttpDownloadHandler(Credentials credentials) { + super(credentials); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { + checkState(userPromise != null, "response before request"); + if (msg instanceof HttpResponse) { + HttpResponse response = (HttpResponse) msg; + keepAlive = HttpUtil.isKeepAlive((HttpResponse) msg); + if (!response.status().equals(HttpResponseStatus.OK)) { + failAndReset( + new HttpException( + response.status(), "Download failed with Status: " + response.status(), null), + ctx); + } + } else if (msg instanceof HttpContent) { + ByteBuf content = ((HttpContent) msg).content(); + content.readBytes(out, content.readableBytes()); + if (msg instanceof LastHttpContent) { + succeedAndReset(ctx); + } + } else { + failAndReset( + new IllegalArgumentException( + "Unsupported message type: " + StringUtil.simpleClassName(msg)), + ctx); + } + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + checkState(userPromise == null, "handler can't be shared between pipelines."); + userPromise = promise; + if (!(msg instanceof DownloadCommand)) { + failAndResetUserPromise( + new IllegalArgumentException( + "Unsupported message type: " + StringUtil.simpleClassName(msg))); + return; + } + out = ((DownloadCommand) msg).out(); + HttpRequest request = buildRequest((DownloadCommand) msg); + addCredentialHeaders(request, ((DownloadCommand) msg).uri()); + ctx.writeAndFlush(request) + .addListener( + (f) -> { + if (!f.isSuccess()) { + failAndReset(f.cause(), ctx); + } + }); + } + + private HttpRequest buildRequest(DownloadCommand request) { + HttpRequest httpRequest = + new DefaultFullHttpRequest( + HttpVersion.HTTP_1_1, + HttpMethod.GET, + constructPath(request.uri(), request.hash(), request.casDownload())); + httpRequest.headers().set(HttpHeaderNames.HOST, constructHost(request.uri())); + httpRequest.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + httpRequest.headers().set(HttpHeaderNames.ACCEPT, "*/*"); + return httpRequest; + } + + private void succeedAndReset(ChannelHandlerContext ctx) throws IOException { + try { + succeedAndResetUserPromise(); + } finally { + reset(ctx); + } + } + + private void failAndReset(Throwable t, ChannelHandlerContext ctx) throws IOException { + try { + failAndResetUserPromise(t); + } finally { + reset(ctx); + } + } + + @SuppressWarnings("FutureReturnValueIgnored") + private void reset(ChannelHandlerContext ctx) throws IOException { + try { + if (!keepAlive) { + ctx.close(); + } + out.close(); + } finally { + out = null; + keepAlive = HttpVersion.HTTP_1_1.isKeepAliveDefault(); + } + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpException.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpException.java new file mode 100644 index 0000000000..0a5368b8d4 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpException.java @@ -0,0 +1,32 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.remote.blobstore.http; + +import io.netty.handler.codec.http.HttpResponseStatus; +import java.io.IOException; + +/** An exception that propagates the http status. */ +final class HttpException extends IOException { + private final HttpResponseStatus status; + + HttpException(HttpResponseStatus status, String message, Throwable cause) { + super(message, cause); + this.status = status; + } + + public HttpResponseStatus status() { + return status; + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpUploadHandler.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpUploadHandler.java new file mode 100644 index 0000000000..0cd919a862 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpUploadHandler.java @@ -0,0 +1,115 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.blobstore.http; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.auth.Credentials; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.DefaultHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpChunkedInput; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.stream.ChunkedStream; +import io.netty.util.internal.StringUtil; + +/** ChannelHandler for uploads. */ +final class HttpUploadHandler extends AbstractHttpHandler<FullHttpResponse> { + + public HttpUploadHandler(Credentials credentials) { + super(credentials); + } + + @SuppressWarnings("FutureReturnValueIgnored") + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse response) + throws Exception { + try { + checkState(userPromise != null, "response before request"); + if (!response.status().equals(HttpResponseStatus.OK) + && !response.status().equals(HttpResponseStatus.ACCEPTED) + && !response.status().equals(HttpResponseStatus.CREATED) + && !response.status().equals(HttpResponseStatus.NO_CONTENT)) { + // Supporting more than OK status to be compatible with nginx webdav. + failAndResetUserPromise( + new HttpException( + response.status(), "Download failed with " + "Status: " + response.status(), null)); + } else { + succeedAndResetUserPromise(); + } + } finally { + if (!HttpUtil.isKeepAlive(response)) { + ctx.close(); + } + } + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + checkState(userPromise == null, "handler can't be shared between pipelines."); + userPromise = promise; + if (!(msg instanceof UploadCommand)) { + failAndResetUserPromise( + new IllegalArgumentException( + "Unsupported message type: " + StringUtil.simpleClassName(msg))); + return; + } + HttpRequest request = buildRequest((UploadCommand) msg); + addCredentialHeaders(request, ((UploadCommand) msg).uri()); + HttpChunkedInput body = buildBody((UploadCommand) msg); + ctx.writeAndFlush(request) + .addListener( + (f) -> { + if (f.isSuccess()) { + return; + } + body.close(); + failAndResetUserPromise(f.cause()); + }); + ctx.writeAndFlush(body) + .addListener( + (f) -> { + if (f.isSuccess()) { + return; + } + body.close(); + failAndResetUserPromise(f.cause()); + }); + } + + private HttpRequest buildRequest(UploadCommand msg) { + HttpRequest request = + new DefaultHttpRequest( + HttpVersion.HTTP_1_1, + HttpMethod.PUT, + constructPath(msg.uri(), msg.hash(), msg.casUpload())); + request.headers().set(HttpHeaderNames.HOST, constructHost(msg.uri())); + request.headers().set(HttpHeaderNames.ACCEPT, "*/*"); + request.headers().set(HttpHeaderNames.CONTENT_LENGTH, msg.contentLength()); + request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + return request; + } + + private HttpChunkedInput buildBody(UploadCommand msg) { + return new HttpChunkedInput(new ChunkedStream(msg.data())); + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/UploadCommand.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/UploadCommand.java new file mode 100644 index 0000000000..e0496a68ae --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/UploadCommand.java @@ -0,0 +1,57 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.blobstore.http; + +import com.google.common.base.Preconditions; +import java.io.InputStream; +import java.net.URI; + +/** Object sent through the channel pipeline to start an upload. */ +final class UploadCommand { + + private final URI uri; + private final boolean casUpload; + private final String hash; + private final InputStream data; + private final long contentLength; + + protected UploadCommand( + URI uri, boolean casUpload, String hash, InputStream data, long contentLength) { + this.uri = Preconditions.checkNotNull(uri); + this.casUpload = casUpload; + this.hash = Preconditions.checkNotNull(hash); + this.data = Preconditions.checkNotNull(data); + this.contentLength = contentLength; + } + + public URI uri() { + return uri; + } + + public boolean casUpload() { + return casUpload; + } + + public String hash() { + return hash; + } + + public InputStream data() { + return data; + } + + public long contentLength() { + return contentLength; + } +} diff --git a/src/test/java/com/google/devtools/build/lib/BUILD b/src/test/java/com/google/devtools/build/lib/BUILD index 817724eb59..822599bfaf 100644 --- a/src/test/java/com/google/devtools/build/lib/BUILD +++ b/src/test/java/com/google/devtools/build/lib/BUILD @@ -1169,7 +1169,7 @@ java_test( java_test( name = "remote-tests", - srcs = glob(["remote/*.java"]), + srcs = glob(["remote/**/*.java"]), test_class = "com.google.devtools.build.lib.AllTests", deps = [ ":analysis_testutil", @@ -1185,11 +1185,14 @@ java_test( "//src/main/java/com/google/devtools/build/lib/authandtls", "//src/main/java/com/google/devtools/build/lib/clock", "//src/main/java/com/google/devtools/build/lib/remote", + "//src/main/java/com/google/devtools/build/lib/remote/blobstore", + "//src/main/java/com/google/devtools/build/lib/remote/blobstore/http", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs/inmemoryfs", "//src/main/java/com/google/devtools/common/options", "//third_party:api_client", "//third_party:mockito", + "//third_party:netty", "//third_party/grpc:grpc-jar", "//third_party/protobuf:protobuf_java", "@googleapis//:google_bytestream_bytestream_java_grpc", diff --git a/src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpDownloadHandlerTest.java b/src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpDownloadHandlerTest.java new file mode 100644 index 0000000000..da16587870 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpDownloadHandlerTest.java @@ -0,0 +1,112 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.blobstore.http; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.verify; + +import com.google.common.net.HttpHeaders; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelPromise; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.DefaultLastHttpContent; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.URI; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; + +/** Tests for {@link HttpDownloadHandler}. */ +@RunWith(JUnit4.class) +public class HttpDownloadHandlerTest { + + private static final URI CACHE_URI = URI.create("http://storage.googleapis.com:80/cache-bucket"); + + /** + * Test that downloading blobs works from both the Action Cache and the CAS. Also test that the + * handler is reusable. + */ + @Test + public void downloadShouldWork() throws IOException { + EmbeddedChannel ch = new EmbeddedChannel(new HttpDownloadHandler(null)); + downloadShouldWork(true, ch); + downloadShouldWork(false, ch); + } + + private void downloadShouldWork(boolean casDownload, EmbeddedChannel ch) throws IOException { + ByteArrayOutputStream out = Mockito.spy(new ByteArrayOutputStream()); + DownloadCommand cmd = new DownloadCommand(CACHE_URI, casDownload, "abcdef", out); + ChannelPromise writePromise = ch.newPromise(); + ch.writeOneOutbound(cmd, writePromise); + + HttpRequest request = ch.readOutbound(); + assertThat(request.method()).isEqualTo(HttpMethod.GET); + assertThat(request.headers().get(HttpHeaderNames.HOST)) + .isEqualTo(CACHE_URI.getHost() + ":" + CACHE_URI.getPort()); + if (casDownload) { + assertThat(request.uri()).isEqualTo("/cache-bucket/cas/abcdef"); + } else { + assertThat(request.uri()).isEqualTo("/cache-bucket/ac/abcdef"); + } + + assertThat(writePromise.isDone()).isFalse(); + + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaders.CONTENT_LENGTH, 5); + response.headers().set(HttpHeaders.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + ch.writeInbound(response); + ByteBuf content = Unpooled.buffer(); + content.writeBytes(new byte[] {1, 2, 3, 4, 5}); + ch.writeInbound(new DefaultLastHttpContent(content)); + + assertThat(writePromise.isDone()).isTrue(); + assertThat(out.toByteArray()).isEqualTo(new byte[] {1, 2, 3, 4, 5}); + verify(out).close(); + assertThat(ch.isActive()).isTrue(); + } + + /** Test that the handler correctly supports http error codes i.e. 404 (NOT FOUND). */ + @Test + public void httpErrorsAreSupported() throws IOException { + EmbeddedChannel ch = new EmbeddedChannel(new HttpDownloadHandler(null)); + ByteArrayOutputStream out = Mockito.spy(new ByteArrayOutputStream()); + DownloadCommand cmd = new DownloadCommand(CACHE_URI, true, "abcdef", out); + ChannelPromise writePromise = ch.newPromise(); + ch.writeOneOutbound(cmd, writePromise); + + HttpResponse response = + new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND); + response.headers().set(HttpHeaders.CONNECTION, HttpHeaderValues.CLOSE); + ch.writeInbound(response); + assertThat(writePromise.isDone()).isTrue(); + assertThat(writePromise.cause()).isInstanceOf(HttpException.class); + assertThat(((HttpException) writePromise.cause()).status()) + .isEqualTo(HttpResponseStatus.NOT_FOUND); + // No data should have been written to the OutputStream and it should have been closed. + assertThat(out.size()).isEqualTo(0); + verify(out).close(); + assertThat(ch.isOpen()).isFalse(); + } +} diff --git a/src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpUploadHandlerTest.java b/src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpUploadHandlerTest.java new file mode 100644 index 0000000000..9ab56509b9 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpUploadHandlerTest.java @@ -0,0 +1,109 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.remote.blobstore.http; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.common.net.HttpHeaders; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelPromise; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpChunkedInput; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import java.io.ByteArrayInputStream; +import java.net.URI; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link HttpUploadHandler}. */ +@RunWith(JUnit4.class) +public class HttpUploadHandlerTest { + + private static final URI CACHE_URI = URI.create("http://storage.googleapis.com:80/cache-bucket"); + + /** + * Test that uploading blobs works to both the Action Cache and the CAS. Also test that the + * handler is reusable. + */ + @Test + public void uploadsShouldWork() throws Exception { + EmbeddedChannel ch = new EmbeddedChannel(new HttpUploadHandler(null)); + HttpResponseStatus[] statuses = new HttpResponseStatus[] {HttpResponseStatus.OK, + HttpResponseStatus.CREATED, HttpResponseStatus.ACCEPTED, HttpResponseStatus.NO_CONTENT}; + + for (HttpResponseStatus status : statuses) { + uploadsShouldWork(true, ch, status); + uploadsShouldWork(false, ch, status); + } + } + + private void uploadsShouldWork(boolean casUpload, EmbeddedChannel ch, HttpResponseStatus status) + throws Exception { + ByteArrayInputStream data = new ByteArrayInputStream(new byte[] {1, 2, 3, 4, 5}); + ChannelPromise writePromise = ch.newPromise(); + ch.writeOneOutbound(new UploadCommand(CACHE_URI, casUpload, "abcdef", data, 5), writePromise); + + HttpRequest request = ch.readOutbound(); + assertThat(request.method()).isEqualTo(HttpMethod.PUT); + assertThat(request.headers().get(HttpHeaders.CONNECTION)) + .isEqualTo(HttpHeaderValues.KEEP_ALIVE.toString()); + + HttpChunkedInput content = ch.readOutbound(); + assertThat(content.readChunk(ByteBufAllocator.DEFAULT).content().readableBytes()).isEqualTo(5); + + FullHttpResponse response = + new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status); + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + + ch.writeInbound(response); + + assertThat(writePromise.isDone()).isTrue(); + assertThat(ch.isOpen()).isTrue(); + } + + /** Test that the handler correctly supports http error codes i.e. 404 (NOT FOUND). */ + @Test + public void httpErrorsAreSupported() throws Exception { + EmbeddedChannel ch = new EmbeddedChannel(new HttpUploadHandler(null)); + ByteArrayInputStream data = new ByteArrayInputStream(new byte[] {1, 2, 3, 4, 5}); + ChannelPromise writePromise = ch.newPromise(); + ch.writeOneOutbound(new UploadCommand(CACHE_URI, true, "abcdef", data, 5), writePromise); + + HttpRequest request = ch.readOutbound(); + assertThat(request).isInstanceOf(HttpRequest.class); + HttpChunkedInput content = ch.readOutbound(); + assertThat(content).isInstanceOf(HttpChunkedInput.class); + + FullHttpResponse response = + new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN); + response.headers().set(HttpHeaders.CONNECTION, HttpHeaderValues.CLOSE); + + ch.writeInbound(response); + + assertThat(writePromise.isDone()).isTrue(); + assertThat(writePromise.cause()).isInstanceOf(HttpException.class); + assertThat(((HttpException) writePromise.cause()).status()) + .isEqualTo(HttpResponseStatus.FORBIDDEN); + assertThat(ch.isOpen()).isFalse(); + } +} diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD index fc890d8e43..cf431fce51 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD @@ -20,6 +20,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib:util", "//src/main/java/com/google/devtools/build/lib/actions", "//src/main/java/com/google/devtools/build/lib/remote", + "//src/main/java/com/google/devtools/build/lib/remote/blobstore", "//src/main/java/com/google/devtools/build/lib/shell", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/common/options", |