diff options
6 files changed, 321 insertions, 78 deletions
diff --git a/site/docs/remote-caching.md b/site/docs/remote-caching.md index 416550f63c..b484d6d942 100644 --- a/site/docs/remote-caching.md +++ b/site/docs/remote-caching.md @@ -307,6 +307,19 @@ You may want to delete content from the cache to: * Create a clean cache after a cache was poisoned * Reduce the amount of storage used by deleting old outputs +### Unix sockets + +The remote HTTP cache supports connecting over unix domain sockets. The behavior is similar to +curl's `--unix-socket` flag. Use the following to configure unix domain socket: + +``` +build --experimental_remote_spawn_cache +build --remote_http_cache=http://replace-with-your.host:port +build --remote_cache_proxy=unix:/replace/with/socket/path +``` + +This feature is unsupported on Windows. + ## Disk cache Bazel can use a directory on the file system as a remote cache. This is @@ -392,4 +405,3 @@ execution platform. [gRPC protocol]: https://github.com/googleapis/googleapis/blob/master/google/devtools/remoteexecution/v1test/remote_execution.proto [Buildfarm]: https://github.com/bazelbuild/bazel-buildfarm [issue #4558]: https://github.com/bazelbuild/bazel/issues/4558 - diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD index 18e38edb9e..5c4ef45f49 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD @@ -37,6 +37,7 @@ java_library( "//src/main/java/com/google/devtools/common/options", "//third_party:auth", "//third_party:guava", + "//third_party:netty", "//third_party/grpc:grpc-jar", "//third_party/protobuf:protobuf_java", "//third_party/protobuf:protobuf_java_util", diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java index bb61082a3d..90f42077c4 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java @@ -36,6 +36,17 @@ public final class RemoteOptions extends OptionsBase { public String remoteHttpCache; @Option( + name = "remote_cache_proxy", + defaultValue = "null", + documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, + effectTags = {OptionEffectTag.UNKNOWN}, + help = + "Connect to the remote cache through a proxy. Currently this flag can only be used to " + + "configure a Unix domain socket (unix:/path/to/socket) for the HTTP cache." + ) + public String remoteCacheProxy; + + @Option( name = "remote_max_connections", defaultValue = "100", documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java index 47a14bffdd..ef8ffe22f7 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java +++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java @@ -22,6 +22,7 @@ import com.google.devtools.build.lib.remote.blobstore.SimpleBlobStore; import com.google.devtools.build.lib.remote.blobstore.http.HttpBlobStore; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; +import io.netty.channel.unix.DomainSocketAddress; import java.io.IOException; import java.net.URI; import java.util.concurrent.TimeUnit; @@ -37,11 +38,20 @@ public final class SimpleBlobStoreFactory { public static SimpleBlobStore createRest(RemoteOptions options, Credentials creds) { try { - return new HttpBlobStore( - URI.create(options.remoteHttpCache), - (int) TimeUnit.SECONDS.toMillis(options.remoteTimeout), - options.remoteMaxConnections, - creds); + URI uri = URI.create(options.remoteHttpCache); + int timeoutMillis = (int) TimeUnit.SECONDS.toMillis(options.remoteTimeout); + + if (options.remoteCacheProxy != null) { + if (options.remoteCacheProxy.startsWith("unix:")) { + return HttpBlobStore.create( + new DomainSocketAddress(options.remoteCacheProxy.replaceFirst("^unix:", "")), + uri, timeoutMillis, options.remoteMaxConnections, creds); + } else { + throw new Exception("Remote cache proxy unsupported: " + options.remoteCacheProxy); + } + } else { + return HttpBlobStore.create(uri, timeoutMillis, options.remoteMaxConnections, creds); + } } catch (Exception e) { throw new RuntimeException(e); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java index a017d2a9c5..0bcc68bd64 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java +++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java @@ -24,12 +24,20 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollDomainSocketChannel; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.kqueue.KQueue; +import io.netty.channel.kqueue.KQueueDomainSocketChannel; +import io.netty.channel.kqueue.KQueueEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.pool.ChannelPool; import io.netty.channel.pool.ChannelPoolHandler; import io.netty.channel.pool.FixedChannelPool; import io.netty.channel.pool.SimpleChannelPool; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpObjectAggregator; @@ -53,11 +61,14 @@ import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.net.URI; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.regex.Pattern; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -92,7 +103,7 @@ public final class HttpBlobStore implements SimpleBlobStore { private static final Pattern INVALID_TOKEN_ERROR = Pattern.compile("\\s*error\\s*=\\s*\"?invalid_token\"?"); - private final NioEventLoopGroup eventLoop = new NioEventLoopGroup(2 /* number of threads */); + private final EventLoopGroup eventLoop; private final ChannelPool channelPool; private final URI uri; private final int timeoutMillis; @@ -105,10 +116,44 @@ public final class HttpBlobStore implements SimpleBlobStore { @GuardedBy("credentialsLock") private long lastRefreshTime; - @SuppressWarnings("FutureReturnValueIgnored") - public HttpBlobStore( + public static HttpBlobStore create(URI uri, int timeoutMillis, + int remoteMaxConnections, @Nullable final Credentials creds) + throws Exception { + return new HttpBlobStore( + NioEventLoopGroup::new, + NioSocketChannel.class, + uri, timeoutMillis, remoteMaxConnections, creds, + null); + } + + public static HttpBlobStore create( + DomainSocketAddress domainSocketAddress, URI uri, int timeoutMillis, int remoteMaxConnections, @Nullable final Credentials creds) throws Exception { + + if (KQueue.isAvailable()) { + return new HttpBlobStore( + KQueueEventLoopGroup::new, + KQueueDomainSocketChannel.class, + uri, timeoutMillis, remoteMaxConnections, creds, + domainSocketAddress); + } else if (Epoll.isAvailable()) { + return new HttpBlobStore( + EpollEventLoopGroup::new, + EpollDomainSocketChannel.class, + uri, timeoutMillis, remoteMaxConnections, creds, + domainSocketAddress); + } else { + throw new Exception("Unix domain sockets are unsupported on this platform"); + } + } + + private HttpBlobStore( + Function<Integer, EventLoopGroup> newEventLoopGroup, + Class<? extends Channel> channelClass, + URI uri, int timeoutMillis, int remoteMaxConnections, @Nullable final Credentials creds, + @Nullable SocketAddress socketAddress) + throws Exception { boolean useTls = uri.getScheme().equals("https"); if (uri.getPort() == -1) { int port = useTls ? 443 : 80; @@ -123,6 +168,10 @@ public final class HttpBlobStore implements SimpleBlobStore { uri.getFragment()); } this.uri = uri; + if (socketAddress == null) { + socketAddress = new InetSocketAddress(uri.getHost(), uri.getPort()); + } + final SslContext sslCtx; if (useTls) { // OpenSsl gives us a > 2x speed improvement on fast networks, but requires netty tcnative @@ -132,12 +181,15 @@ public final class HttpBlobStore implements SimpleBlobStore { } else { sslCtx = null; } + + this.eventLoop = newEventLoopGroup.apply(2); Bootstrap clientBootstrap = new Bootstrap() - .channel(NioSocketChannel.class) + .channel(channelClass) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis) .group(eventLoop) - .remoteAddress(uri.getHost(), uri.getPort()); + .remoteAddress(socketAddress); + ChannelPoolHandler channelPoolHandler = new ChannelPoolHandler() { @Override diff --git a/src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStoreTest.java b/src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStoreTest.java index c677874b4c..b272173bc6 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStoreTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStoreTest.java @@ -20,28 +20,38 @@ import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import com.google.api.client.util.Preconditions; import com.google.auth.Credentials; import com.google.common.base.Charsets; import com.google.devtools.build.lib.remote.blobstore.http.HttpBlobStoreTest.NotAuthorizedHandler.ErrorType; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler.Sharable; -import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollServerDomainSocketChannel; +import io.netty.channel.kqueue.KQueue; +import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.kqueue.KQueueServerDomainSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; @@ -54,57 +64,218 @@ import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.timeout.ReadTimeoutException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.File; import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.IntFunction; +import javax.annotation.Nullable; import org.junit.Test; import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.mockito.AdditionalAnswers; import org.mockito.Mockito; -/** Tests for {@link HttpBlobStore}. */ -@RunWith(JUnit4.class) +/** + * Tests for {@link HttpBlobStore}. + */ +@RunWith(Parameterized.class) public class HttpBlobStoreTest { - private ServerSocketChannel startServer(ChannelHandler handler) throws Exception { - EventLoopGroup eventLoop = new NioEventLoopGroup(1); + private static ServerChannel createServer( + Class<? extends ServerChannel> serverChannelClass, + IntFunction<EventLoopGroup> newEventLoopGroup, + SocketAddress socketAddress, + ChannelHandler handler) { + EventLoopGroup eventLoop = newEventLoopGroup.apply(1); ServerBootstrap sb = new ServerBootstrap() .group(eventLoop) - .channel(NioServerSocketChannel.class) + .channel(serverChannelClass) .childHandler( - new ChannelInitializer<NioSocketChannel>() { + new ChannelInitializer<Channel>() { @Override - protected void initChannel(NioSocketChannel ch) { + protected void initChannel(Channel ch) { ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new HttpObjectAggregator(1000)); ch.pipeline().addLast(handler); } }); - return ((ServerSocketChannel) sb.bind("localhost", 0).sync().channel()); + try { + return ((ServerChannel) sb.bind(socketAddress).sync().channel()); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + private static DomainSocketAddress newDomainSocketAddress() { + try { + File file = File.createTempFile("bazel", ".sock", new File("/tmp")); + file.delete(); + return new DomainSocketAddress(file.getAbsoluteFile()); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + interface TestServer { + ServerChannel start(ChannelInboundHandler handler); + + void stop(ServerChannel serverChannel); + } + + private static final class InetTestServer implements TestServer { + + public ServerChannel start(ChannelInboundHandler handler) { + return createServer( + NioServerSocketChannel.class, + NioEventLoopGroup::new, + new InetSocketAddress("localhost", 0), + handler); + } + + public void stop(ServerChannel serverChannel) { + try { + serverChannel.close(); + serverChannel.closeFuture().sync(); + serverChannel.eventLoop().shutdownGracefully().sync(); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + } + + private static final class UnixDomainServer implements TestServer { + + // Note: this odd implementation is a workaround because we're unable to shut down and restart + // KQueue backed implementations. See https://github.com/netty/netty/issues/7047. + + private final ServerChannel serverChannel; + private ChannelInboundHandler handler = null; + + public UnixDomainServer( + Class<? extends ServerChannel> serverChannelClass, + IntFunction<EventLoopGroup> newEventLoopGroup + ) { + EventLoopGroup eventLoop = newEventLoopGroup.apply(1); + ServerBootstrap sb = + new ServerBootstrap() + .group(eventLoop) + .channel(serverChannelClass) + .childHandler( + new ChannelInitializer<Channel>() { + @Override + protected void initChannel(Channel ch) { + ch.pipeline().addLast(new HttpServerCodec()); + ch.pipeline().addLast(new HttpObjectAggregator(1000)); + ch.pipeline().addLast(Preconditions.checkNotNull(handler)); + } + }); + try { + ServerChannel actual = ((ServerChannel) sb.bind(newDomainSocketAddress()).sync().channel()); + this.serverChannel = mock(ServerChannel.class, AdditionalAnswers.delegatesTo(actual)); + } catch (Exception e) { + throw new IllegalStateException(e); + } + + } + + public ServerChannel start(ChannelInboundHandler handler) { + reset(this.serverChannel); + this.handler = handler; + return this.serverChannel; + } + + public void stop(ServerChannel serverChannel) { + // Note: In the tests, we expect that connecting to a closed server channel results + // in a channel connection error. Netty doesn't seem to handle closing domain socket + // addresses very well-- often connecting to a closed domain socket will result in a + // read timeout instead of a connection timeout. + // + // This is a hack to ensure connection timeouts are "received" by the tests for this + // dummy domain socket server. In particular, this lets the timeoutShouldWork_connect + // test work for both inet and domain sockets. + // + // This is also part of the workaround for https://github.com/netty/netty/issues/7047. + when(this.serverChannel.localAddress()).thenReturn(new DomainSocketAddress("")); + this.handler = null; + } + } + + + @Parameters + public static Collection createInputValues() { + ArrayList<Object[]> parameters = new ArrayList<Object[]>( + Arrays.asList(new Object[][]{ + { new InetTestServer() } + })); + + if (Epoll.isAvailable()) { + parameters.add(new Object[]{ + new UnixDomainServer(EpollServerDomainSocketChannel.class, EpollEventLoopGroup::new) + }); + } + + if (KQueue.isAvailable()) { + parameters.add(new Object[]{ + new UnixDomainServer(KQueueServerDomainSocketChannel.class, KQueueEventLoopGroup::new) + }); + } + + return parameters; + } + + private final TestServer testServer; + + public HttpBlobStoreTest(TestServer testServer) { + this.testServer = testServer; + } + + private HttpBlobStore createHttpBlobStore(ServerChannel serverChannel, int timeoutMillis, + int remoteMaxConnections, @Nullable final Credentials creds) throws Exception { + SocketAddress socketAddress = serverChannel.localAddress(); + if (socketAddress instanceof DomainSocketAddress) { + DomainSocketAddress domainSocketAddress = (DomainSocketAddress) socketAddress; + URI uri = new URI("http://localhost"); + return HttpBlobStore.create(domainSocketAddress, uri, timeoutMillis, remoteMaxConnections, + creds); + } else if (socketAddress instanceof InetSocketAddress) { + InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; + URI uri = new URI("http://localhost:" + inetSocketAddress.getPort()); + return HttpBlobStore.create(uri, timeoutMillis, remoteMaxConnections, creds); + } else { + throw new IllegalStateException( + "unsupported socket address class " + socketAddress.getClass()); + } } @Test(expected = ConnectException.class, timeout = 30000) public void timeoutShouldWork_connect() throws Exception { - ServerSocketChannel server = startServer(new ChannelHandlerAdapter() {}); - int serverPort = server.localAddress().getPort(); - closeServerChannel(server); + ServerChannel server = testServer.start(new ChannelInboundHandlerAdapter() {}); + testServer.stop(server); Credentials credentials = newCredentials(); HttpBlobStore blobStore = - new HttpBlobStore(new URI("http://localhost:" + serverPort), 5, 0, credentials); + createHttpBlobStore(server, 5, 0, credentials); getFromFuture(blobStore.get("key", new ByteArrayOutputStream())); + fail("Exception expected"); } @Test(expected = ReadTimeoutException.class, timeout = 30000) public void timeoutShouldWork_read() throws Exception { - ServerSocketChannel server = null; + ServerChannel server = null; try { server = - startServer( + testServer.start( new SimpleChannelInboundHandler<FullHttpRequest>() { @Override protected void channelRead0( @@ -112,15 +283,14 @@ public class HttpBlobStoreTest { // Don't respond and force a client timeout. } }); - int serverPort = server.localAddress().getPort(); Credentials credentials = newCredentials(); HttpBlobStore blobStore = - new HttpBlobStore(new URI("http://localhost:" + serverPort), 5, 0, credentials); + createHttpBlobStore(server, 5, 0, credentials); getFromFuture(blobStore.get("key", new ByteArrayOutputStream())); fail("Exception expected"); } finally { - closeServerChannel(server); + testServer.stop(server); } } @@ -131,14 +301,13 @@ public class HttpBlobStoreTest { } private void expiredAuthTokensShouldBeRetried_get(ErrorType errorType) throws Exception { - ServerSocketChannel server = null; + ServerChannel server = null; try { - server = startServer(new NotAuthorizedHandler(errorType)); - int serverPort = server.localAddress().getPort(); + server = testServer.start(new NotAuthorizedHandler(errorType)); Credentials credentials = newCredentials(); HttpBlobStore blobStore = - new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, 0, credentials); + createHttpBlobStore(server, 30, 0, credentials); ByteArrayOutputStream out = Mockito.spy(new ByteArrayOutputStream()); getFromFuture(blobStore.get("key", out)); assertThat(out.toString(Charsets.US_ASCII.name())).isEqualTo("File Contents"); @@ -149,7 +318,7 @@ public class HttpBlobStoreTest { verify(out, never()).close(); verifyNoMoreInteractions(credentials); } finally { - closeServerChannel(server); + testServer.stop(server); } } @@ -160,14 +329,13 @@ public class HttpBlobStoreTest { } private void expiredAuthTokensShouldBeRetried_put(ErrorType errorType) throws Exception { - ServerSocketChannel server = null; + ServerChannel server = null; try { - server = startServer(new NotAuthorizedHandler(errorType)); - int serverPort = server.localAddress().getPort(); + server = testServer.start(new NotAuthorizedHandler(errorType)); Credentials credentials = newCredentials(); HttpBlobStore blobStore = - new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, 0, credentials); + createHttpBlobStore(server, 30, 0, credentials); byte[] data = "File Contents".getBytes(Charsets.US_ASCII); ByteArrayInputStream in = new ByteArrayInputStream(data); blobStore.put("key", data.length, in); @@ -176,26 +344,24 @@ public class HttpBlobStoreTest { verify(credentials, times(2)).hasRequestMetadata(); verifyNoMoreInteractions(credentials); } finally { - closeServerChannel(server); + testServer.stop(server); } } @Test - public void errorCodesThatShouldNotBeRetried_get() throws InterruptedException { + public void errorCodesThatShouldNotBeRetried_get() { errorCodeThatShouldNotBeRetried_get(ErrorType.INSUFFICIENT_SCOPE); errorCodeThatShouldNotBeRetried_get(ErrorType.INVALID_REQUEST); } - private void errorCodeThatShouldNotBeRetried_get(ErrorType errorType) - throws InterruptedException { - ServerSocketChannel server = null; + private void errorCodeThatShouldNotBeRetried_get(ErrorType errorType) { + ServerChannel server = null; try { - server = startServer(new NotAuthorizedHandler(errorType)); - int serverPort = server.localAddress().getPort(); + server = testServer.start(new NotAuthorizedHandler(errorType)); Credentials credentials = newCredentials(); HttpBlobStore blobStore = - new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, 0, credentials); + createHttpBlobStore(server, 30, 0, credentials); getFromFuture(blobStore.get("key", new ByteArrayOutputStream())); fail("Exception expected."); } catch (Exception e) { @@ -203,34 +369,32 @@ public class HttpBlobStoreTest { assertThat(((HttpException) e).response().status()) .isEqualTo(HttpResponseStatus.UNAUTHORIZED); } finally { - closeServerChannel(server); + testServer.stop(server); } } @Test - public void errorCodesThatShouldNotBeRetried_put() throws InterruptedException { + public void errorCodesThatShouldNotBeRetried_put() { errorCodeThatShouldNotBeRetried_put(ErrorType.INSUFFICIENT_SCOPE); errorCodeThatShouldNotBeRetried_put(ErrorType.INVALID_REQUEST); } - private void errorCodeThatShouldNotBeRetried_put(ErrorType errorType) - throws InterruptedException { - ServerSocketChannel server = null; + private void errorCodeThatShouldNotBeRetried_put(ErrorType errorType) { + ServerChannel server = null; try { - server = startServer(new NotAuthorizedHandler(errorType)); - int serverPort = server.localAddress().getPort(); + server = testServer.start(new NotAuthorizedHandler(errorType)); Credentials credentials = newCredentials(); HttpBlobStore blobStore = - new HttpBlobStore(new URI("http://localhost:" + serverPort), 30, 0, credentials); - blobStore.put("key", 1, new ByteArrayInputStream(new byte[] {0})); + createHttpBlobStore(server, 30, 0, credentials); + blobStore.put("key", 1, new ByteArrayInputStream(new byte[]{0})); fail("Exception expected."); } catch (Exception e) { assertThat(e).isInstanceOf(HttpException.class); assertThat(((HttpException) e).response().status()) .isEqualTo(HttpResponseStatus.UNAUTHORIZED); } finally { - closeServerChannel(server); + testServer.stop(server); } } @@ -241,12 +405,12 @@ public class HttpBlobStoreTest { headers.put("Authorization", singletonList("Bearer invalidToken")); when(credentials.getRequestMetadata(any(URI.class))).thenReturn(headers); Mockito.doAnswer( - (mock) -> { - Map<String, List<String>> headers2 = new HashMap<>(); - headers2.put("Authorization", singletonList("Bearer validToken")); - when(credentials.getRequestMetadata(any(URI.class))).thenReturn(headers2); - return null; - }) + (mock) -> { + Map<String, List<String>> headers2 = new HashMap<>(); + headers2.put("Authorization", singletonList("Bearer validToken")); + when(credentials.getRequestMetadata(any(URI.class))).thenReturn(headers2); + return null; + }) .when(credentials) .refresh(); return credentials; @@ -278,8 +442,8 @@ public class HttpBlobStoreTest { if (messageCount == 0) { if (!"Bearer invalidToken".equals(request.headers().get(HttpHeaderNames.AUTHORIZATION))) { ctx.writeAndFlush( - new DefaultFullHttpResponse( - HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR)) + new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR)) .addListener(ChannelFutureListener.CLOSE); return; } @@ -305,8 +469,8 @@ public class HttpBlobStoreTest { } else if (messageCount == 1) { if (!"Bearer validToken".equals(request.headers().get(HttpHeaderNames.AUTHORIZATION))) { ctx.writeAndFlush( - new DefaultFullHttpResponse( - HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR)) + new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR)) .addListener(ChannelFutureListener.CLOSE); return; } @@ -321,18 +485,11 @@ public class HttpBlobStoreTest { } else { // No third message expected. ctx.writeAndFlush( - new DefaultFullHttpResponse( - HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR)) + new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR)) .addListener(ChannelFutureListener.CLOSE); } } } - private void closeServerChannel(ServerSocketChannel server) throws InterruptedException { - if (server != null) { - server.close(); - server.closeFuture().sync(); - server.eventLoop().shutdownGracefully().sync(); - } - } } |