aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--site/docs/remote-caching.md14
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/BUILD1
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java11
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java20
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java62
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStoreTest.java291
6 files changed, 321 insertions, 78 deletions
diff --git a/site/docs/remote-caching.md b/site/docs/remote-caching.md
index 416550f63c..b484d6d942 100644
--- a/site/docs/remote-caching.md
+++ b/site/docs/remote-caching.md
@@ -307,6 +307,19 @@ You may want to delete content from the cache to:
* Create a clean cache after a cache was poisoned
* Reduce the amount of storage used by deleting old outputs
+### Unix sockets
+
+The remote HTTP cache supports connecting over unix domain sockets. The behavior is similar to
+curl's `--unix-socket` flag. Use the following to configure unix domain socket:
+
+```
+build --experimental_remote_spawn_cache
+build --remote_http_cache=http://replace-with-your.host:port
+build --remote_cache_proxy=unix:/replace/with/socket/path
+```
+
+This feature is unsupported on Windows.
+
## Disk cache
Bazel can use a directory on the file system as a remote cache. This is
@@ -392,4 +405,3 @@ execution platform.
[gRPC protocol]: https://github.com/googleapis/googleapis/blob/master/google/devtools/remoteexecution/v1test/remote_execution.proto
[Buildfarm]: https://github.com/bazelbuild/bazel-buildfarm
[issue #4558]: https://github.com/bazelbuild/bazel/issues/4558
-
diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD
index 18e38edb9e..5c4ef45f49 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD
@@ -37,6 +37,7 @@ java_library(
"//src/main/java/com/google/devtools/common/options",
"//third_party:auth",
"//third_party:guava",
+ "//third_party:netty",
"//third_party/grpc:grpc-jar",
"//third_party/protobuf:protobuf_java",
"//third_party/protobuf:protobuf_java_util",
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
index bb61082a3d..90f42077c4 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
@@ -36,6 +36,17 @@ public final class RemoteOptions extends OptionsBase {
public String remoteHttpCache;
@Option(
+ name = "remote_cache_proxy",
+ defaultValue = "null",
+ documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
+ effectTags = {OptionEffectTag.UNKNOWN},
+ help =
+ "Connect to the remote cache through a proxy. Currently this flag can only be used to "
+ + "configure a Unix domain socket (unix:/path/to/socket) for the HTTP cache."
+ )
+ public String remoteCacheProxy;
+
+ @Option(
name = "remote_max_connections",
defaultValue = "100",
documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java
index 47a14bffdd..ef8ffe22f7 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java
@@ -22,6 +22,7 @@ import com.google.devtools.build.lib.remote.blobstore.SimpleBlobStore;
import com.google.devtools.build.lib.remote.blobstore.http.HttpBlobStore;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
+import io.netty.channel.unix.DomainSocketAddress;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.TimeUnit;
@@ -37,11 +38,20 @@ public final class SimpleBlobStoreFactory {
public static SimpleBlobStore createRest(RemoteOptions options, Credentials creds) {
try {
- return new HttpBlobStore(
- URI.create(options.remoteHttpCache),
- (int) TimeUnit.SECONDS.toMillis(options.remoteTimeout),
- options.remoteMaxConnections,
- creds);
+ URI uri = URI.create(options.remoteHttpCache);
+ int timeoutMillis = (int) TimeUnit.SECONDS.toMillis(options.remoteTimeout);
+
+ if (options.remoteCacheProxy != null) {
+ if (options.remoteCacheProxy.startsWith("unix:")) {
+ return HttpBlobStore.create(
+ new DomainSocketAddress(options.remoteCacheProxy.replaceFirst("^unix:", "")),
+ uri, timeoutMillis, options.remoteMaxConnections, creds);
+ } else {
+ throw new Exception("Remote cache proxy unsupported: " + options.remoteCacheProxy);
+ }
+ } else {
+ return HttpBlobStore.create(uri, timeoutMillis, options.remoteMaxConnections, creds);
+ }
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java
index a017d2a9c5..0bcc68bd64 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java
@@ -24,12 +24,20 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollDomainSocketChannel;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.kqueue.KQueue;
+import io.netty.channel.kqueue.KQueueDomainSocketChannel;
+import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.pool.FixedChannelPool;
import io.netty.channel.pool.SimpleChannelPool;
import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.channel.unix.DomainSocketAddress;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpObjectAggregator;
@@ -53,11 +61,14 @@ import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.net.URI;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
@@ -92,7 +103,7 @@ public final class HttpBlobStore implements SimpleBlobStore {
private static final Pattern INVALID_TOKEN_ERROR =
Pattern.compile("\\s*error\\s*=\\s*\"?invalid_token\"?");
- private final NioEventLoopGroup eventLoop = new NioEventLoopGroup(2 /* number of threads */);
+ private final EventLoopGroup eventLoop;
private final ChannelPool channelPool;
private final URI uri;
private final int timeoutMillis;
@@ -105,10 +116,44 @@ public final class HttpBlobStore implements SimpleBlobStore {
@GuardedBy("credentialsLock")
private long lastRefreshTime;
- @SuppressWarnings("FutureReturnValueIgnored")
- public HttpBlobStore(
+ public static HttpBlobStore create(URI uri, int timeoutMillis,
+ int remoteMaxConnections, @Nullable final Credentials creds)
+ throws Exception {
+ return new HttpBlobStore(
+ NioEventLoopGroup::new,
+ NioSocketChannel.class,
+ uri, timeoutMillis, remoteMaxConnections, creds,
+ null);
+ }
+
+ public static HttpBlobStore create(
+ DomainSocketAddress domainSocketAddress,
URI uri, int timeoutMillis, int remoteMaxConnections, @Nullable final Credentials creds)
throws Exception {
+
+ if (KQueue.isAvailable()) {
+ return new HttpBlobStore(
+ KQueueEventLoopGroup::new,
+ KQueueDomainSocketChannel.class,
+ uri, timeoutMillis, remoteMaxConnections, creds,
+ domainSocketAddress);
+ } else if (Epoll.isAvailable()) {
+ return new HttpBlobStore(
+ EpollEventLoopGroup::new,
+ EpollDomainSocketChannel.class,
+ uri, timeoutMillis, remoteMaxConnections, creds,
+ domainSocketAddress);
+ } else {
+ throw new Exception("Unix domain sockets are unsupported on this platform");
+ }
+ }
+
+ private HttpBlobStore(
+ Function<Integer, EventLoopGroup> newEventLoopGroup,
+ Class<? extends Channel> channelClass,
+ URI uri, int timeoutMillis, int remoteMaxConnections, @Nullable final Credentials creds,
+ @Nullable SocketAddress socketAddress)
+ throws Exception {
boolean useTls = uri.getScheme().equals("https");
if (uri.getPort() == -1) {
int port = useTls ? 443 : 80;
@@ -123,6 +168,10 @@ public final class HttpBlobStore implements SimpleBlobStore {
uri.getFragment());
}
this.uri = uri;
+ if (socketAddress == null) {
+ socketAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
+ }
+
final SslContext sslCtx;
if (useTls) {
// OpenSsl gives us a > 2x speed improvement on fast networks, but requires netty tcnative
@@ -132,12 +181,15 @@ public final class HttpBlobStore implements SimpleBlobStore {
} else {
sslCtx = null;
}
+
+ this.eventLoop = newEventLoopGroup.apply(2);
Bootstrap clientBootstrap =
new Bootstrap()
- .channel(NioSocketChannel.class)
+ .channel(channelClass)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis)
.group(eventLoop)
- .remoteAddress(uri.getHost(), uri.getPort());
+ .remoteAddress(socketAddress);
+
ChannelPoolHandler channelPoolHandler =
new ChannelPoolHandler() {
@Override
diff --git a/src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStoreTest.java b/src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStoreTest.java
index c677874b4c..b272173bc6 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStoreTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStoreTest.java
@@ -20,28 +20,38 @@ import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import com.google.api.client.util.Preconditions;
import com.google.auth.Credentials;
import com.google.common.base.Charsets;
import com.google.devtools.build.lib.remote.blobstore.http.HttpBlobStoreTest.NotAuthorizedHandler.ErrorType;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandler.Sharable;
-import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandler;
+import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.ServerChannel;
import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerDomainSocketChannel;
+import io.netty.channel.kqueue.KQueue;
+import io.netty.channel.kqueue.KQueueEventLoopGroup;
+import io.netty.channel.kqueue.KQueueServerDomainSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.channel.unix.DomainSocketAddress;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
@@ -54,57 +64,218 @@ import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.timeout.ReadTimeoutException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.File;
import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.IntFunction;
+import javax.annotation.Nullable;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
-/** Tests for {@link HttpBlobStore}. */
-@RunWith(JUnit4.class)
+/**
+ * Tests for {@link HttpBlobStore}.
+ */
+@RunWith(Parameterized.class)
public class HttpBlobStoreTest {
- private ServerSocketChannel startServer(ChannelHandler handler) throws Exception {
- EventLoopGroup eventLoop = new NioEventLoopGroup(1);
+ private static ServerChannel createServer(
+ Class<? extends ServerChannel> serverChannelClass,
+ IntFunction<EventLoopGroup> newEventLoopGroup,
+ SocketAddress socketAddress,
+ ChannelHandler handler) {
+ EventLoopGroup eventLoop = newEventLoopGroup.apply(1);
ServerBootstrap sb =
new ServerBootstrap()
.group(eventLoop)
- .channel(NioServerSocketChannel.class)
+ .channel(serverChannelClass)
.childHandler(
- new ChannelInitializer<NioSocketChannel>() {
+ new ChannelInitializer<Channel>() {
@Override
- protected void initChannel(NioSocketChannel ch) {
+ protected void initChannel(Channel ch) {
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new HttpObjectAggregator(1000));
ch.pipeline().addLast(handler);
}
});
- return ((ServerSocketChannel) sb.bind("localhost", 0).sync().channel());
+ try {
+ return ((ServerChannel) sb.bind(socketAddress).sync().channel());
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private static DomainSocketAddress newDomainSocketAddress() {
+ try {
+ File file = File.createTempFile("bazel", ".sock", new File("/tmp"));
+ file.delete();
+ return new DomainSocketAddress(file.getAbsoluteFile());
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ interface TestServer {
+ ServerChannel start(ChannelInboundHandler handler);
+
+ void stop(ServerChannel serverChannel);
+ }
+
+ private static final class InetTestServer implements TestServer {
+
+ public ServerChannel start(ChannelInboundHandler handler) {
+ return createServer(
+ NioServerSocketChannel.class,
+ NioEventLoopGroup::new,
+ new InetSocketAddress("localhost", 0),
+ handler);
+ }
+
+ public void stop(ServerChannel serverChannel) {
+ try {
+ serverChannel.close();
+ serverChannel.closeFuture().sync();
+ serverChannel.eventLoop().shutdownGracefully().sync();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ private static final class UnixDomainServer implements TestServer {
+
+ // Note: this odd implementation is a workaround because we're unable to shut down and restart
+ // KQueue backed implementations. See https://github.com/netty/netty/issues/7047.
+
+ private final ServerChannel serverChannel;
+ private ChannelInboundHandler handler = null;
+
+ public UnixDomainServer(
+ Class<? extends ServerChannel> serverChannelClass,
+ IntFunction<EventLoopGroup> newEventLoopGroup
+ ) {
+ EventLoopGroup eventLoop = newEventLoopGroup.apply(1);
+ ServerBootstrap sb =
+ new ServerBootstrap()
+ .group(eventLoop)
+ .channel(serverChannelClass)
+ .childHandler(
+ new ChannelInitializer<Channel>() {
+ @Override
+ protected void initChannel(Channel ch) {
+ ch.pipeline().addLast(new HttpServerCodec());
+ ch.pipeline().addLast(new HttpObjectAggregator(1000));
+ ch.pipeline().addLast(Preconditions.checkNotNull(handler));
+ }
+ });
+ try {
+ ServerChannel actual = ((ServerChannel) sb.bind(newDomainSocketAddress()).sync().channel());
+ this.serverChannel = mock(ServerChannel.class, AdditionalAnswers.delegatesTo(actual));
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+
+ }
+
+ public ServerChannel start(ChannelInboundHandler handler) {
+ reset(this.serverChannel);
+ this.handler = handler;
+ return this.serverChannel;
+ }
+
+ public void stop(ServerChannel serverChannel) {
+ // Note: In the tests, we expect that connecting to a closed server channel results
+ // in a channel connection error. Netty doesn't seem to handle closing domain socket
+ // addresses very well-- often connecting to a closed domain socket will result in a
+ // read timeout instead of a connection timeout.
+ //
+ // This is a hack to ensure connection timeouts are "received" by the tests for this
+ // dummy domain socket server. In particular, this lets the timeoutShouldWork_connect
+ // test work for both inet and domain sockets.
+ //
+ // This is also part of the workaround for https://github.com/netty/netty/issues/7047.
+ when(this.serverChannel.localAddress()).thenReturn(new DomainSocketAddress(""));
+ this.handler = null;
+ }
+ }
+
+
+ @Parameters
+ public static Collection createInputValues() {
+ ArrayList<Object[]> parameters = new ArrayList<Object[]>(
+ Arrays.asList(new Object[][]{
+ { new InetTestServer() }
+ }));
+
+ if (Epoll.isAvailable()) {
+ parameters.add(new Object[]{
+ new UnixDomainServer(EpollServerDomainSocketChannel.class, EpollEventLoopGroup::new)
+ });
+ }
+
+ if (KQueue.isAvailable()) {
+ parameters.add(new Object[]{
+ new UnixDomainServer(KQueueServerDomainSocketChannel.class, KQueueEventLoopGroup::new)
+ });
+ }
+
+ return parameters;
+ }
+
+ private final TestServer testServer;
+
+ public HttpBlobStoreTest(TestServer testServer) {
+ this.testServer = testServer;
+ }
+
+ private HttpBlobStore createHttpBlobStore(ServerChannel serverChannel, int timeoutMillis,
+ int remoteMaxConnections, @Nullable final Credentials creds) throws Exception {
+ SocketAddress socketAddress = serverChannel.localAddress();
+ if (socketAddress instanceof DomainSocketAddress) {
+ DomainSocketAddress domainSocketAddress = (DomainSocketAddress) socketAddress;
+ URI uri = new URI("http://localhost");
+ return HttpBlobStore.create(domainSocketAddress, uri, timeoutMillis, remoteMaxConnections,
+ creds);
+ } else if (socketAddress instanceof InetSocketAddress) {
+ InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
+ URI uri = new URI("http://localhost:" + inetSocketAddress.getPort());
+ return HttpBlobStore.create(uri, timeoutMillis, remoteMaxConnections, creds);
+ } else {
+ throw new IllegalStateException(
+ "unsupported socket address class " + socketAddress.getClass());
+ }
}
@Test(expected = ConnectException.class, timeout = 30000)
public void timeoutShouldWork_connect() throws Exception {
- ServerSocketChannel server = startServer(new ChannelHandlerAdapter() {});
- int serverPort = server.localAddress().getPort();
- closeServerChannel(server);
+ ServerChannel server = testServer.start(new ChannelInboundHandlerAdapter() {});
+ testServer.stop(server);
Credentials credentials = newCredentials();
HttpBlobStore blobStore =
- new HttpBlobStore(new URI("http://localhost:" + serverPort), 5, 0, credentials);
+ createHttpBlobStore(server, 5, 0, credentials);
getFromFuture(blobStore.get("key", new ByteArrayOutputStream()));
+
fail("Exception expected");
}
@Test(expected = ReadTimeoutException.class, timeout = 30000)
public void timeoutShouldWork_read() throws Exception {
- ServerSocketChannel server = null;
+ ServerChannel server = null;
try {
server =
- startServer(
+ testServer.start(
new SimpleChannelInboundHandler<FullHttpRequest>() {
@Override
protected void channelRead0(
@@ -112,15 +283,14 @@ public class HttpBlobStoreTest {
// Don't respond and force a client timeout.
}
});
- int serverPort = server.localAddress().getPort();
Credentials credentials = newCredentials();
HttpBlobStore blobStore =
- new HttpBlobStore(new URI("http://localhost:" + serverPort), 5, 0, credentials);
+ createHttpBlobStore(server, 5, 0, credentials);
getFromFuture(blobStore.get("key", new ByteArrayOutputStream()));
fail("Exception expected");
} finally {
- closeServerChannel(server);
+ testServer.stop(server);
}
}
@@ -131,14 +301,13 @@ public class HttpBlobStoreTest {
}
private void expiredAuthTokensShouldBeRetried_get(ErrorType errorType) throws Exception {
- ServerSocketChannel server = null;
+ ServerChannel server = null;
try {
- server = startServer(new NotAuthorizedHandler(errorType));
- int serverPort = server.localAddress().getPort();
+ server = testServer.start(new NotAuthorizedHandler(errorType));
Credentials credentials = newCredentials();
HttpBlobStore blobStore =
- new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, 0, credentials);
+ createHttpBlobStore(server, 30, 0, credentials);
ByteArrayOutputStream out = Mockito.spy(new ByteArrayOutputStream());
getFromFuture(blobStore.get("key", out));
assertThat(out.toString(Charsets.US_ASCII.name())).isEqualTo("File Contents");
@@ -149,7 +318,7 @@ public class HttpBlobStoreTest {
verify(out, never()).close();
verifyNoMoreInteractions(credentials);
} finally {
- closeServerChannel(server);
+ testServer.stop(server);
}
}
@@ -160,14 +329,13 @@ public class HttpBlobStoreTest {
}
private void expiredAuthTokensShouldBeRetried_put(ErrorType errorType) throws Exception {
- ServerSocketChannel server = null;
+ ServerChannel server = null;
try {
- server = startServer(new NotAuthorizedHandler(errorType));
- int serverPort = server.localAddress().getPort();
+ server = testServer.start(new NotAuthorizedHandler(errorType));
Credentials credentials = newCredentials();
HttpBlobStore blobStore =
- new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, 0, credentials);
+ createHttpBlobStore(server, 30, 0, credentials);
byte[] data = "File Contents".getBytes(Charsets.US_ASCII);
ByteArrayInputStream in = new ByteArrayInputStream(data);
blobStore.put("key", data.length, in);
@@ -176,26 +344,24 @@ public class HttpBlobStoreTest {
verify(credentials, times(2)).hasRequestMetadata();
verifyNoMoreInteractions(credentials);
} finally {
- closeServerChannel(server);
+ testServer.stop(server);
}
}
@Test
- public void errorCodesThatShouldNotBeRetried_get() throws InterruptedException {
+ public void errorCodesThatShouldNotBeRetried_get() {
errorCodeThatShouldNotBeRetried_get(ErrorType.INSUFFICIENT_SCOPE);
errorCodeThatShouldNotBeRetried_get(ErrorType.INVALID_REQUEST);
}
- private void errorCodeThatShouldNotBeRetried_get(ErrorType errorType)
- throws InterruptedException {
- ServerSocketChannel server = null;
+ private void errorCodeThatShouldNotBeRetried_get(ErrorType errorType) {
+ ServerChannel server = null;
try {
- server = startServer(new NotAuthorizedHandler(errorType));
- int serverPort = server.localAddress().getPort();
+ server = testServer.start(new NotAuthorizedHandler(errorType));
Credentials credentials = newCredentials();
HttpBlobStore blobStore =
- new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, 0, credentials);
+ createHttpBlobStore(server, 30, 0, credentials);
getFromFuture(blobStore.get("key", new ByteArrayOutputStream()));
fail("Exception expected.");
} catch (Exception e) {
@@ -203,34 +369,32 @@ public class HttpBlobStoreTest {
assertThat(((HttpException) e).response().status())
.isEqualTo(HttpResponseStatus.UNAUTHORIZED);
} finally {
- closeServerChannel(server);
+ testServer.stop(server);
}
}
@Test
- public void errorCodesThatShouldNotBeRetried_put() throws InterruptedException {
+ public void errorCodesThatShouldNotBeRetried_put() {
errorCodeThatShouldNotBeRetried_put(ErrorType.INSUFFICIENT_SCOPE);
errorCodeThatShouldNotBeRetried_put(ErrorType.INVALID_REQUEST);
}
- private void errorCodeThatShouldNotBeRetried_put(ErrorType errorType)
- throws InterruptedException {
- ServerSocketChannel server = null;
+ private void errorCodeThatShouldNotBeRetried_put(ErrorType errorType) {
+ ServerChannel server = null;
try {
- server = startServer(new NotAuthorizedHandler(errorType));
- int serverPort = server.localAddress().getPort();
+ server = testServer.start(new NotAuthorizedHandler(errorType));
Credentials credentials = newCredentials();
HttpBlobStore blobStore =
- new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, 0, credentials);
- blobStore.put("key", 1, new ByteArrayInputStream(new byte[] {0}));
+ createHttpBlobStore(server, 30, 0, credentials);
+ blobStore.put("key", 1, new ByteArrayInputStream(new byte[]{0}));
fail("Exception expected.");
} catch (Exception e) {
assertThat(e).isInstanceOf(HttpException.class);
assertThat(((HttpException) e).response().status())
.isEqualTo(HttpResponseStatus.UNAUTHORIZED);
} finally {
- closeServerChannel(server);
+ testServer.stop(server);
}
}
@@ -241,12 +405,12 @@ public class HttpBlobStoreTest {
headers.put("Authorization", singletonList("Bearer invalidToken"));
when(credentials.getRequestMetadata(any(URI.class))).thenReturn(headers);
Mockito.doAnswer(
- (mock) -> {
- Map<String, List<String>> headers2 = new HashMap<>();
- headers2.put("Authorization", singletonList("Bearer validToken"));
- when(credentials.getRequestMetadata(any(URI.class))).thenReturn(headers2);
- return null;
- })
+ (mock) -> {
+ Map<String, List<String>> headers2 = new HashMap<>();
+ headers2.put("Authorization", singletonList("Bearer validToken"));
+ when(credentials.getRequestMetadata(any(URI.class))).thenReturn(headers2);
+ return null;
+ })
.when(credentials)
.refresh();
return credentials;
@@ -278,8 +442,8 @@ public class HttpBlobStoreTest {
if (messageCount == 0) {
if (!"Bearer invalidToken".equals(request.headers().get(HttpHeaderNames.AUTHORIZATION))) {
ctx.writeAndFlush(
- new DefaultFullHttpResponse(
- HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR))
+ new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR))
.addListener(ChannelFutureListener.CLOSE);
return;
}
@@ -305,8 +469,8 @@ public class HttpBlobStoreTest {
} else if (messageCount == 1) {
if (!"Bearer validToken".equals(request.headers().get(HttpHeaderNames.AUTHORIZATION))) {
ctx.writeAndFlush(
- new DefaultFullHttpResponse(
- HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR))
+ new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR))
.addListener(ChannelFutureListener.CLOSE);
return;
}
@@ -321,18 +485,11 @@ public class HttpBlobStoreTest {
} else {
// No third message expected.
ctx.writeAndFlush(
- new DefaultFullHttpResponse(
- HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR))
+ new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR))
.addListener(ChannelFutureListener.CLOSE);
}
}
}
- private void closeServerChannel(ServerSocketChannel server) throws InterruptedException {
- if (server != null) {
- server.close();
- server.closeFuture().sync();
- server.eventLoop().shutdownGracefully().sync();
- }
- }
}