diff options
author | Philipp Wollermann <philwo@google.com> | 2018-07-25 06:44:35 -0700 |
---|---|---|
committer | Copybara-Service <copybara-piper@google.com> | 2018-07-25 06:46:01 -0700 |
commit | bbc94ebe5d4169b7f413f475dbfef6a4b679386a (patch) | |
tree | 84d897ce33621bdd26129193046cf44d9e627043 /src/tools/remote | |
parent | d7177ab65910cbf8428da86353cf431d232d7cae (diff) |
Remove Hazelcast dependency
The only remaining use was a testing REST backend in the LRE.
I wrote a replacement for that using netty, which we use for our network stuff in Bazel, which means we can now get rid of Hazelcast. :)
I'll remove the Hazelcast files in a separate change when this is merged.
PiperOrigin-RevId: 205985996
Diffstat (limited to 'src/tools/remote')
6 files changed, 193 insertions, 41 deletions
diff --git a/src/tools/remote/BUILD b/src/tools/remote/BUILD index 31ce3ecb15..02551671d1 100644 --- a/src/tools/remote/BUILD +++ b/src/tools/remote/BUILD @@ -6,10 +6,6 @@ filegroup( java_binary( name = "worker", - jvm_flags = [ - # Enables REST for Hazelcast server for testing. - "-Dhazelcast.rest.enabled=true", - ], main_class = "com.google.devtools.build.remote.worker.RemoteWorker", visibility = ["//visibility:public"], runtime_deps = ["//src/tools/remote/src/main/java/com/google/devtools/build/remote/worker"], diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD index 0c126c5b30..9abb6af8b5 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD @@ -27,7 +27,6 @@ java_library( "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/common/options", "//third_party:guava", - "//third_party:hazelcast", "//third_party:netty", "//third_party/grpc:grpc-jar", "//third_party/protobuf:protobuf_java", diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/HttpCacheServerHandler.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/HttpCacheServerHandler.java new file mode 100644 index 0000000000..d51292e505 --- /dev/null +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/HttpCacheServerHandler.java @@ -0,0 +1,108 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.remote.worker; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.util.CharsetUtil; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** A simple HTTP REST in-memory cache used during testing the LRE. */ +public class HttpCacheServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> { + + final ConcurrentMap<String, byte[]> cache = new ConcurrentHashMap<>(); + + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) { + if (!request.decoderResult().isSuccess()) { + sendError(ctx, request, HttpResponseStatus.BAD_REQUEST); + return; + } + + if (request.method() == HttpMethod.GET) { + handleGet(ctx, request); + } else if (request.method() == HttpMethod.PUT) { + handlePut(ctx, request); + } else { + sendError(ctx, request, HttpResponseStatus.METHOD_NOT_ALLOWED); + } + } + + private void handleGet(ChannelHandlerContext ctx, FullHttpRequest request) { + byte[] contents = cache.get(request.uri()); + + if (contents == null) { + sendError(ctx, request, HttpResponseStatus.NOT_FOUND); + return; + } + + FullHttpResponse response = + new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(contents)); + HttpUtil.setContentLength(response, contents.length); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream"); + ChannelFuture lastContentFuture = ctx.writeAndFlush(response); + + if (!HttpUtil.isKeepAlive(request)) { + lastContentFuture.addListener(ChannelFutureListener.CLOSE); + } + } + + private void handlePut(ChannelHandlerContext ctx, FullHttpRequest request) { + if (!request.decoderResult().isSuccess()) { + sendError(ctx, request, HttpResponseStatus.INTERNAL_SERVER_ERROR); + return; + } + + byte[] contentBytes = new byte[request.content().readableBytes()]; + request.content().readBytes(contentBytes); + cache.putIfAbsent(request.uri(), contentBytes); + + FullHttpResponse response = + new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT); + ChannelFuture lastContentFuture = ctx.writeAndFlush(response); + + if (!HttpUtil.isKeepAlive(request)) { + lastContentFuture.addListener(ChannelFutureListener.CLOSE); + } + } + + private static void sendError( + ChannelHandlerContext ctx, FullHttpRequest request, HttpResponseStatus status) { + FullHttpResponse response = + new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, + status, + Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8)); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8"); + ChannelFuture future = ctx.writeAndFlush(response); + + if (!HttpUtil.isKeepAlive(request)) { + future.addListener(ChannelFutureListener.CLOSE); + } + } +} diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/HttpCacheServerInitializer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/HttpCacheServerInitializer.java new file mode 100644 index 0000000000..8d9865541f --- /dev/null +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/HttpCacheServerInitializer.java @@ -0,0 +1,33 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.remote.worker; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; + +/** The initializer used by the HttpCacheServerHandler. */ +public class HttpCacheServerInitializer extends ChannelInitializer<SocketChannel> { + + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline p = ch.pipeline(); + p.addLast(new HttpServerCodec()); + p.addLast(new HttpObjectAggregator(100 * 1024 * 1024)); + p.addLast(new HttpCacheServerHandler()); + } +} diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java index bf4d89123d..e8d8318224 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java @@ -57,13 +57,17 @@ import com.google.devtools.remoteexecution.v1test.ActionResult; import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase; import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase; import com.google.watcher.v1.WatcherGrpc.WatcherImplBase; -import com.hazelcast.config.Config; -import com.hazelcast.core.Hazelcast; -import com.hazelcast.core.HazelcastInstance; import io.grpc.Server; import io.grpc.ServerInterceptor; import io.grpc.ServerInterceptors; import io.grpc.netty.NettyServerBuilder; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; @@ -80,6 +84,7 @@ import java.util.logging.Logger; * based on gRPC. */ public final class RemoteWorker { + // We need to keep references to the root and netty loggers to prevent them from being garbage // collected, which would cause us to loose their configuration. private static final Logger rootLogger = Logger.getLogger(""); @@ -195,23 +200,7 @@ public final class RemoteWorker { }); } - /** - * Construct a {@link SimpleBlobStore} using Hazelcast's version of {@link ConcurrentMap}. This - * will start a standalone Hazelcast server in the same JVM. There will also be a REST server - * started for accessing the maps. - */ - private static SimpleBlobStore createHazelcast(RemoteWorkerOptions options) { - Config config = new Config(); - config - .getNetworkConfig() - .setPort(options.hazelcastStandaloneListenPort) - .getJoin() - .getMulticastConfig() - .setEnabled(false); - HazelcastInstance instance = Hazelcast.newHazelcastInstance(config); - return new ConcurrentMapBlobStore(instance.getMap("cache")); - } - + @SuppressWarnings("FutureReturnValueIgnored") public static void main(String[] args) throws Exception { OptionsParser parser = OptionsParser.newOptionsParser(RemoteOptions.class, RemoteWorkerOptions.class); @@ -259,18 +248,14 @@ public final class RemoteWorker { // The instance of SimpleBlobStore used is based on these criteria in order: // 1. If remote cache or local disk cache is specified then use it first. - // 2. Otherwise start a standalone Hazelcast instance and use it as the blob store. This also - // creates a REST server for testing. - // 3. Finally use a ConcurrentMap to back the blob store. + // 2. Finally use a ConcurrentMap to back the blob store. final SimpleBlobStore blobStore; if (usingRemoteCache) { blobStore = SimpleBlobStoreFactory.create(remoteOptions, null, null); } else if (remoteWorkerOptions.casPath != null) { blobStore = new OnDiskBlobStore(fs.getPath(remoteWorkerOptions.casPath)); - } else if (remoteWorkerOptions.hazelcastStandaloneListenPort != 0) { - blobStore = createHazelcast(remoteWorkerOptions); } else { - blobStore = new ConcurrentMapBlobStore(new ConcurrentHashMap<String, byte[]>()); + blobStore = new ConcurrentMapBlobStore(new ConcurrentHashMap<>()); } ListeningScheduledExecutorService retryService = @@ -292,9 +277,40 @@ public final class RemoteWorker { digestUtil); final Server server = worker.startServer(); + + EventLoopGroup bossGroup = null, workerGroup = null; + Channel ch = null; + if (remoteWorkerOptions.httpListenPort != 0) { + // Configure the server. + bossGroup = new NioEventLoopGroup(1); + workerGroup = new NioEventLoopGroup(); + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new HttpCacheServerInitializer()); + ch = b.bind(remoteWorkerOptions.httpListenPort).sync().channel(); + logger.log( + INFO, + "Started HTTP cache server on port " + remoteWorkerOptions.httpListenPort); + } else { + logger.log(INFO, "Not starting HTTP cache server"); + } + worker.createPidFile(); + server.awaitTermination(); + if (ch != null) { + ch.closeFuture().sync().get(); + } + retryService.shutdownNow(); + if (bossGroup != null) { + bossGroup.shutdownGracefully(); + } + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + } } private static Path prepareSandboxRunner(FileSystem fs, RemoteWorkerOptions remoteWorkerOptions) { diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorkerOptions.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorkerOptions.java index ca6a431297..ac0b29c078 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorkerOptions.java +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorkerOptions.java @@ -135,16 +135,16 @@ public class RemoteWorkerOptions extends OptionsBase { public int jobs; @Option( - name = "hazelcast_standalone_listen_port", - defaultValue = "0", - category = "build_worker", - documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, - effectTags = {OptionEffectTag.UNKNOWN}, - help = - "Runs an embedded hazelcast server that listens to this port. The server does not join" - + " any cluster. This is useful for testing." - ) - public int hazelcastStandaloneListenPort; + name = "http_listen_port", + defaultValue = "0", + category = "build_worker", + documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, + effectTags = {OptionEffectTag.UNKNOWN}, + help = + "Starts an embedded HTTP REST server on the given port. The server will simply store PUT " + + "requests in memory and return them again on GET requests. This is useful for " + + "testing only.") + public int httpListenPort; private static final int MAX_JOBS = 16384; |