diff options
7 files changed, 206 insertions, 54 deletions
diff --git a/src/test/shell/bazel/remote/remote_execution_http_test.sh b/src/test/shell/bazel/remote/remote_execution_http_test.sh
index 1fe3b1e577..86350e68ff 100755
--- a/src/test/shell/bazel/remote/remote_execution_http_test.sh
+++ b/src/test/shell/bazel/remote/remote_execution_http_test.sh
@@ -29,12 +29,12 @@ function set_up() {
while [ $attempts -le 5 ]; do
(( attempts++ ))
worker_port=$(pick_random_unused_tcp_port) || fail "no port found"
- hazelcast_port=$(pick_random_unused_tcp_port) || fail "no port found"
+ http_port=$(pick_random_unused_tcp_port) || fail "no port found"
"${BAZEL_RUNFILES}/src/tools/remote/worker" \
--work_path="${work_path}" \
--listen_port=${worker_port} \
- --hazelcast_standalone_listen_port=${hazelcast_port} \
- --pid_file="${pid_file}" >& $TEST_log &
+ --http_listen_port=${http_port} \
+ --pid_file="${pid_file}" &
local wait_seconds=0
until [ -s "${pid_file}" ] || [ "$wait_seconds" -eq 15 ]; do
sleep 1
@@ -72,21 +72,21 @@ EOF
#include <iostream>
int main() { std::cout << "Hello world!" << std::endl; return 0; }
- bazel build //a:test >& $TEST_log \
+ bazel build //a:test \
|| fail "Failed to build //a:test without remote cache"
cp -f bazel-bin/a/test ${TEST_TMPDIR}/test_expected
- bazel clean --expunge >& $TEST_log
+ bazel clean --expunge
bazel build \
- --remote_http_cache=http://localhost:${hazelcast_port}/hazelcast/rest/maps \
- //a:test >& $TEST_log \
+ --remote_http_cache=http://localhost:${http_port} \
+ //a:test \
|| fail "Failed to build //a:test with remote REST cache service"
diff bazel-bin/a/test ${TEST_TMPDIR}/test_expected \
|| fail "Remote cache generated different result"
# Check that persistent connections are closed after the build. Is there a good cross-platform way
# to check this?
if [[ "$PLATFORM" = "linux" ]]; then
- if netstat -tn | grep -qE ":${hazelcast_port}\\s+ESTABLISHED$"; then
+ if netstat -tn | grep -qE ":${http_port}\\s+ESTABLISHED$"; then
fail "connections to to cache not closed"
@@ -119,7 +119,7 @@ EOF
# Check that persistent connections are closed after the build. Is there a good cross-platform way
# to check this?
if [[ "$PLATFORM" = "linux" ]]; then
- if netstat -tn | grep -qE ":${hazelcast_port}\\s+ESTABLISHED$"; then
+ if netstat -tn | grep -qE ":${http_port}\\s+ESTABLISHED$"; then
fail "connections to to cache not closed"
@@ -135,7 +135,7 @@ genrule(
bazel build \
--noremote_allow_symlink_upload \
- --remote_http_cache=http://localhost:${hazelcast_port}/hazelcast/rest/maps \
+ --remote_http_cache=http://localhost:${http_port} \
//:make-link &> $TEST_log \
&& fail "should have failed" || true
expect_log "/l is a symbolic link"
@@ -151,7 +151,7 @@ genrule(
bazel build \
--noremote_allow_symlink_upload \
- --remote_http_cache=http://localhost:${hazelcast_port}/hazelcast/rest/maps \
+ --remote_http_cache=http://localhost:${http_port} \
//:make-link &> $TEST_log \
&& fail "should have failed" || true
expect_log "dir/l is a symbolic link"
@@ -265,7 +265,7 @@ function test_directory_artifact_skylark_rest_cache() {
bazel build \
- --remote_rest_cache=http://localhost:${hazelcast_port}/hazelcast/rest/maps \
+ --remote_rest_cache=http://localhost:${http_port} \
//a:test >& $TEST_log \
|| fail "Failed to build //a:test with remote REST cache"
diff bazel-genfiles/a/qux/out.txt a/test_expected \
@@ -276,7 +276,7 @@ function test_directory_artifact_in_runfiles_skylark_rest_cache() {
bazel build \
- --remote_rest_cache=http://localhost:${hazelcast_port}/hazelcast/rest/maps \
+ --remote_rest_cache=http://localhost:${http_port} \
//a:test2 >& $TEST_log \
|| fail "Failed to build //a:test2 with remote REST cache"
diff bazel-genfiles/a/test2-out.txt a/test_expected \
diff --git a/src/tools/remote/BUILD b/src/tools/remote/BUILD
index 31ce3ecb15..02551671d1 100644
--- a/src/tools/remote/BUILD
+++ b/src/tools/remote/BUILD
@@ -6,10 +6,6 @@ filegroup(
name = "worker",
- jvm_flags = [
- # Enables REST for Hazelcast server for testing.
- "-Dhazelcast.rest.enabled=true",
- ],
main_class = "com.google.devtools.build.remote.worker.RemoteWorker",
visibility = ["//visibility:public"],
runtime_deps = ["//src/tools/remote/src/main/java/com/google/devtools/build/remote/worker"],
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD
index 0c126c5b30..9abb6af8b5 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD
@@ -27,7 +27,6 @@ java_library(
- "//third_party:hazelcast",
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/HttpCacheServerHandler.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/HttpCacheServerHandler.java
new file mode 100644
index 0000000000..d51292e505
--- /dev/null
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/HttpCacheServerHandler.java
@@ -0,0 +1,108 @@
+// Copyright 2018 The Bazel Authors. All rights reserved.
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.remote.worker;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.util.CharsetUtil;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+/** A simple HTTP REST in-memory cache used during testing the LRE. */
+public class HttpCacheServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
+ final ConcurrentMap<String, byte[]> cache = new ConcurrentHashMap<>();
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
+ if (!request.decoderResult().isSuccess()) {
+ sendError(ctx, request, HttpResponseStatus.BAD_REQUEST);
+ return;
+ }
+ if (request.method() == HttpMethod.GET) {
+ handleGet(ctx, request);
+ } else if (request.method() == HttpMethod.PUT) {
+ handlePut(ctx, request);
+ } else {
+ sendError(ctx, request, HttpResponseStatus.METHOD_NOT_ALLOWED);
+ }
+ }
+ private void handleGet(ChannelHandlerContext ctx, FullHttpRequest request) {
+ byte[] contents = cache.get(request.uri());
+ if (contents == null) {
+ sendError(ctx, request, HttpResponseStatus.NOT_FOUND);
+ return;
+ }
+ FullHttpResponse response =
+ new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(contents));
+ HttpUtil.setContentLength(response, contents.length);
+ response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream");
+ ChannelFuture lastContentFuture = ctx.writeAndFlush(response);
+ if (!HttpUtil.isKeepAlive(request)) {
+ lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+ private void handlePut(ChannelHandlerContext ctx, FullHttpRequest request) {
+ if (!request.decoderResult().isSuccess()) {
+ sendError(ctx, request, HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ return;
+ }
+ byte[] contentBytes = new byte[request.content().readableBytes()];
+ request.content().readBytes(contentBytes);
+ cache.putIfAbsent(request.uri(), contentBytes);
+ FullHttpResponse response =
+ new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+ ChannelFuture lastContentFuture = ctx.writeAndFlush(response);
+ if (!HttpUtil.isKeepAlive(request)) {
+ lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+ private static void sendError(
+ ChannelHandlerContext ctx, FullHttpRequest request, HttpResponseStatus status) {
+ FullHttpResponse response =
+ new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1,
+ status,
+ Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));
+ response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
+ ChannelFuture future = ctx.writeAndFlush(response);
+ if (!HttpUtil.isKeepAlive(request)) {
+ future.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/HttpCacheServerInitializer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/HttpCacheServerInitializer.java
new file mode 100644
index 0000000000..8d9865541f
--- /dev/null
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/HttpCacheServerInitializer.java
@@ -0,0 +1,33 @@
+// Copyright 2018 The Bazel Authors. All rights reserved.
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.remote.worker;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+/** The initializer used by the HttpCacheServerHandler. */
+public class HttpCacheServerInitializer extends ChannelInitializer<SocketChannel> {
+ @Override
+ protected void initChannel(SocketChannel ch) {
+ ChannelPipeline p = ch.pipeline();
+ p.addLast(new HttpServerCodec());
+ p.addLast(new HttpObjectAggregator(100 * 1024 * 1024));
+ p.addLast(new HttpCacheServerHandler());
+ }
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java
index bf4d89123d..e8d8318224 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java
@@ -57,13 +57,17 @@ import com.google.devtools.remoteexecution.v1test.ActionResult;
import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase;
import com.google.watcher.v1.WatcherGrpc.WatcherImplBase;
-import com.hazelcast.config.Config;
-import com.hazelcast.core.Hazelcast;
-import com.hazelcast.core.HazelcastInstance;
import io.grpc.Server;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.netty.NettyServerBuilder;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -80,6 +84,7 @@ import java.util.logging.Logger;
* based on gRPC.
public final class RemoteWorker {
// We need to keep references to the root and netty loggers to prevent them from being garbage
// collected, which would cause us to loose their configuration.
private static final Logger rootLogger = Logger.getLogger("");
@@ -195,23 +200,7 @@ public final class RemoteWorker {
- /**
- * Construct a {@link SimpleBlobStore} using Hazelcast's version of {@link ConcurrentMap}. This
- * will start a standalone Hazelcast server in the same JVM. There will also be a REST server
- * started for accessing the maps.
- */
- private static SimpleBlobStore createHazelcast(RemoteWorkerOptions options) {
- Config config = new Config();
- config
- .getNetworkConfig()
- .setPort(options.hazelcastStandaloneListenPort)
- .getJoin()
- .getMulticastConfig()
- .setEnabled(false);
- HazelcastInstance instance = Hazelcast.newHazelcastInstance(config);
- return new ConcurrentMapBlobStore(instance.getMap("cache"));
- }
+ @SuppressWarnings("FutureReturnValueIgnored")
public static void main(String[] args) throws Exception {
OptionsParser parser =
OptionsParser.newOptionsParser(RemoteOptions.class, RemoteWorkerOptions.class);
@@ -259,18 +248,14 @@ public final class RemoteWorker {
// The instance of SimpleBlobStore used is based on these criteria in order:
// 1. If remote cache or local disk cache is specified then use it first.
- // 2. Otherwise start a standalone Hazelcast instance and use it as the blob store. This also
- // creates a REST server for testing.
- // 3. Finally use a ConcurrentMap to back the blob store.
+ // 2. Finally use a ConcurrentMap to back the blob store.
final SimpleBlobStore blobStore;
if (usingRemoteCache) {
blobStore = SimpleBlobStoreFactory.create(remoteOptions, null, null);
} else if (remoteWorkerOptions.casPath != null) {
blobStore = new OnDiskBlobStore(fs.getPath(remoteWorkerOptions.casPath));
- } else if (remoteWorkerOptions.hazelcastStandaloneListenPort != 0) {
- blobStore = createHazelcast(remoteWorkerOptions);
} else {
- blobStore = new ConcurrentMapBlobStore(new ConcurrentHashMap<String, byte[]>());
+ blobStore = new ConcurrentMapBlobStore(new ConcurrentHashMap<>());
ListeningScheduledExecutorService retryService =
@@ -292,9 +277,40 @@ public final class RemoteWorker {
final Server server = worker.startServer();
+ EventLoopGroup bossGroup = null, workerGroup = null;
+ Channel ch = null;
+ if (remoteWorkerOptions.httpListenPort != 0) {
+ // Configure the server.
+ bossGroup = new NioEventLoopGroup(1);
+ workerGroup = new NioEventLoopGroup();
+ ServerBootstrap b = new ServerBootstrap();
+ b.group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .childHandler(new HttpCacheServerInitializer());
+ ch = b.bind(remoteWorkerOptions.httpListenPort).sync().channel();
+ logger.log(
+ "Started HTTP cache server on port " + remoteWorkerOptions.httpListenPort);
+ } else {
+ logger.log(INFO, "Not starting HTTP cache server");
+ }
+ if (ch != null) {
+ ch.closeFuture().sync().get();
+ }
+ if (bossGroup != null) {
+ bossGroup.shutdownGracefully();
+ }
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully();
+ }
private static Path prepareSandboxRunner(FileSystem fs, RemoteWorkerOptions remoteWorkerOptions) {
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorkerOptions.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorkerOptions.java
index ca6a431297..ac0b29c078 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorkerOptions.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorkerOptions.java
@@ -135,16 +135,16 @@ public class RemoteWorkerOptions extends OptionsBase {
public int jobs;
- name = "hazelcast_standalone_listen_port",
- defaultValue = "0",
- category = "build_worker",
- documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
- effectTags = {OptionEffectTag.UNKNOWN},
- help =
- "Runs an embedded hazelcast server that listens to this port. The server does not join"
- + " any cluster. This is useful for testing."
- )
- public int hazelcastStandaloneListenPort;
+ name = "http_listen_port",
+ defaultValue = "0",
+ category = "build_worker",
+ documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
+ effectTags = {OptionEffectTag.UNKNOWN},
+ help =
+ "Starts an embedded HTTP REST server on the given port. The server will simply store PUT "
+ + "requests in memory and return them again on GET requests. This is useful for "
+ + "testing only.")
+ public int httpListenPort;
private static final int MAX_JOBS = 16384;