diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/main/cpp/util/numbers.cc | 26 | ||||
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java | 17 | ||||
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java | 91 | ||||
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java | 65 | ||||
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/server/RPCServer.java | 5 | ||||
-rw-r--r-- | src/main/protobuf/command_server.proto | 2 | ||||
-rw-r--r-- | src/test/cpp/util/BUILD | 9 | ||||
-rw-r--r-- | src/test/cpp/util/numbers_test.cc | 38 |
8 files changed, 189 insertions, 64 deletions
diff --git a/src/main/cpp/util/numbers.cc b/src/main/cpp/util/numbers.cc index 2e3b1095e8..46feb81eda 100644 --- a/src/main/cpp/util/numbers.cc +++ b/src/main/cpp/util/numbers.cc @@ -52,10 +52,10 @@ static const int8_t kAsciiToInt[256] = { 36, 36, 36, 36, 36, 36, 36, 36, 36, 36, 36, 36, 36, 36, 36, 36 }; // Parse the sign. -inline bool safe_parse_sign(const string &text /*inout*/, +inline bool safe_parse_sign(const char** rest, /*inout*/ bool* negative_ptr /*output*/) { - const char* start = text.data(); - const char* end = start + text.size(); + const char* start = *rest; + const char* end = start + strlen(start); // Consume whitespace. while (start < end && ascii_isspace(start[0])) { @@ -77,6 +77,7 @@ inline bool safe_parse_sign(const string &text /*inout*/, } } + *rest = start; return true; } @@ -104,13 +105,13 @@ inline bool safe_parse_sign(const string &text /*inout*/, // // Overflow checking becomes simple. -inline bool safe_parse_positive_int(const string &text, int* value_p) { +inline bool safe_parse_positive_int(const char *text, int* value_p) { int value = 0; const int vmax = std::numeric_limits<int>::max(); static_assert(vmax > 0, ""); const int vmax_over_base = vmax / 10; - const char* start = text.data(); - const char* end = start + text.size(); + const char* start = text; + const char* end = start + strlen(text); // loop over digits for (; start < end; ++start) { unsigned char c = static_cast<unsigned char>(start[0]); @@ -134,7 +135,7 @@ inline bool safe_parse_positive_int(const string &text, int* value_p) { return true; } -inline bool safe_parse_negative_int(const string &text, int* value_p) { +inline bool safe_parse_negative_int(const char *text, int* value_p) { int value = 0; const int vmin = std::numeric_limits<int>::min(); static_assert(vmin < 0, ""); @@ -146,8 +147,8 @@ inline bool safe_parse_negative_int(const string &text, int* value_p) { if (vmin % 10 > 0) { vmin_over_base += 1; } - const char* start = text.data(); - const char* end = start + text.size(); + const char* start = text; + const char* end = start + strlen(text); // loop over digits for (; start < end; ++start) { unsigned char c = static_cast<unsigned char>(start[0]); @@ -173,14 +174,15 @@ inline bool safe_parse_negative_int(const string &text, int* value_p) { bool safe_strto32(const string &text, int *value_p) { *value_p = 0; + const char* rest = text.c_str(); bool negative; - if (!safe_parse_sign(text, &negative)) { + if (!safe_parse_sign(&rest, &negative)) { return false; } if (!negative) { - return safe_parse_positive_int(text, value_p); + return safe_parse_positive_int(rest, value_p); } else { - return safe_parse_negative_int(text, value_p); + return safe_parse_negative_int(rest, value_p); } } diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java index ace94597c9..25231366f6 100644 --- a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java +++ b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java @@ -838,8 +838,19 @@ public final class BlazeRuntime { * the program. */ private static int serverMain(Iterable<BlazeModule> modules, OutErr outErr, String[] args) { + InterruptSignalHandler sigintHandler = null; try { - RPCServer blazeServer = createBlazeRPCServer(modules, Arrays.asList(args)); + final RPCServer blazeServer = createBlazeRPCServer(modules, Arrays.asList(args)); + + // Register the signal handler. + sigintHandler = new InterruptSignalHandler() { + @Override + protected void onSignal() { + LOG.severe("User interrupt"); + blazeServer.interrupt(); + } + }; + blazeServer.serve(); return ExitCode.SUCCESS.getNumericExitCode(); } catch (OptionsParsingException e) { @@ -851,6 +862,10 @@ public final class BlazeRuntime { } catch (AbruptExitException e) { outErr.printErr(e.getMessage()); return e.getExitCode().getNumericExitCode(); + } finally { + if (sigintHandler != null) { + sigintHandler.uninstall(); + } } } diff --git a/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java b/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java index 26473e2323..558b87569a 100644 --- a/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java +++ b/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java @@ -19,7 +19,6 @@ import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.io.ByteStreams; import com.google.devtools.build.lib.server.RPCService.UnknownCommandException; -import com.google.devtools.build.lib.server.signal.InterruptSignalHandler; import com.google.devtools.build.lib.unix.LocalClientSocket; import com.google.devtools.build.lib.unix.LocalServerSocket; import com.google.devtools.build.lib.unix.LocalSocketAddress; @@ -129,59 +128,54 @@ public final class AfUnixServer extends RPCServer { serverDirectory, workspaceDir); } + + private final AtomicBoolean inAction = new AtomicBoolean(false); + private final AtomicBoolean allowingInterrupt = new AtomicBoolean(true); + private final AtomicLong cmdNum = new AtomicLong(); + private final Thread mainThread = Thread.currentThread(); + private final Object interruptLock = new Object(); + + @Override + public void interrupt() { + // Only interrupt during actions - otherwise we may end up setting the interrupt bit + // at the end of a build and responding to it at the beginning of the subsequent build. + synchronized (interruptLock) { + if (allowingInterrupt.get()) { + mainThread.interrupt(); + } + } + + if (inAction.get()) { + Runnable interruptWatcher = + new Runnable() { + @Override + public void run() { + try { + long originalCmd = cmdNum.get(); + Thread.sleep(10 * 1000); + if (inAction.get() && cmdNum.get() == originalCmd) { + // We're still operating on the same command. + // Interrupt took too long. + ThreadUtils.warnAboutSlowInterrupt(); + } + } catch (InterruptedException e) { + // Ignore. + } + } + }; + Thread interruptWatcherThread = + new Thread(interruptWatcher, "interrupt-watcher-" + cmdNum); + interruptWatcherThread.setDaemon(true); + interruptWatcherThread.start(); + } + } + /** * Wait on a socket for business (answer requests). Note that this * method won't return until the server shuts down. */ @Override public void serve() { - // Register the signal handler. - final AtomicBoolean inAction = new AtomicBoolean(false); - final AtomicBoolean allowingInterrupt = new AtomicBoolean(true); - final AtomicLong cmdNum = new AtomicLong(); - final Thread mainThread = Thread.currentThread(); - final Object interruptLock = new Object(); - - InterruptSignalHandler sigintHandler = - new InterruptSignalHandler() { - @Override - protected void onSignal() { - LOG.severe("User interrupt"); - - // Only interrupt during actions - otherwise we may end up setting the interrupt bit - // at the end of a build and responding to it at the beginning of the subsequent build. - synchronized (interruptLock) { - if (allowingInterrupt.get()) { - mainThread.interrupt(); - } - } - - if (inAction.get()) { - Runnable interruptWatcher = - new Runnable() { - @Override - public void run() { - try { - long originalCmd = cmdNum.get(); - Thread.sleep(10 * 1000); - if (inAction.get() && cmdNum.get() == originalCmd) { - // We're still operating on the same command. - // Interrupt took too long. - ThreadUtils.warnAboutSlowInterrupt(); - } - } catch (InterruptedException e) { - // Ignore. - } - } - }; - Thread interruptWatcherThread = - new Thread(interruptWatcher, "interrupt-watcher-" + cmdNum); - interruptWatcherThread.setDaemon(true); - interruptWatcherThread.start(); - } - } - }; - try { while (!lameDuck) { try { @@ -246,7 +240,6 @@ public final class AfUnixServer extends RPCServer { } finally { rpcService.shutdown(); LOG.info("Logging finished"); - sigintHandler.uninstall(); } } diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java index 477f4b4850..a5dbbf0ac4 100644 --- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java +++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java @@ -14,6 +14,8 @@ package com.google.devtools.build.lib.server; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.LockingMode; import com.google.devtools.build.lib.runtime.CommandExecutor; import com.google.devtools.build.lib.server.CommandProtos.CancelRequest; @@ -25,6 +27,7 @@ import com.google.devtools.build.lib.server.CommandProtos.RunResponse; import com.google.devtools.build.lib.util.Clock; import com.google.devtools.build.lib.util.ExitCode; import com.google.devtools.build.lib.util.Preconditions; +import com.google.devtools.build.lib.util.ThreadUtils; import com.google.devtools.build.lib.util.io.OutErr; import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.Path; @@ -39,10 +42,15 @@ import java.io.OutputStream; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.nio.channels.ServerSocketChannel; +import java.nio.charset.Charset; import java.security.SecureRandom; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; + +import javax.annotation.concurrent.GuardedBy; /** * gRPC server class. @@ -51,6 +59,11 @@ import java.util.UUID; * bootstrapping. */ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.CommandServer { + // UTF-8 won't do because we want to be able to pass arbitrary binary strings. + // Not that the internals of Bazel handle that correctly, but why not make at least this little + // part correct? + private static final Charset CHARSET = Charset.forName("ISO-8859-1"); + private class RunningCommand implements AutoCloseable { private final Thread thread; private final String id; @@ -129,12 +142,14 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma private static final String REQUEST_COOKIE_FILE = "request_cookie"; private static final String RESPONSE_COOKIE_FILE = "response_cookie"; + @GuardedBy("runningCommands") private final Map<String, RunningCommand> runningCommands = new HashMap<>(); private final CommandExecutor commandExecutor; private final Clock clock; private final Path serverDirectory; private final String requestCookie; private final String responseCookie; + private final AtomicLong interruptCounter = new AtomicLong(0); private Server server; private int port; // mutable so that we can overwrite it if port 0 is passed in @@ -165,6 +180,47 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma return result.toString(); } + private void startSlowInterruptWatcher(final ImmutableSet<String> commandIds) { + if (commandIds.isEmpty()) { + return; + } + + Runnable interruptWatcher = new Runnable() { + @Override + public void run() { + try { + boolean ok; + Thread.sleep(10 * 1000); + synchronized (runningCommands) { + ok = Collections.disjoint(commandIds, runningCommands.keySet()); + } + if (!ok) { + // At least one command was not interrupted. Interrupt took too long. + ThreadUtils.warnAboutSlowInterrupt(); + } + } catch (InterruptedException e) { + // Ignore. + } + } + }; + + Thread interruptWatcherThread = + new Thread(interruptWatcher, "interrupt-watcher-" + interruptCounter.incrementAndGet()); + interruptWatcherThread.setDaemon(true); + interruptWatcherThread.start(); + } + + @Override + public void interrupt() { + synchronized (runningCommands) { + for (RunningCommand command : runningCommands.values()) { + command.thread.interrupt(); + } + + startSlowInterruptWatcher(ImmutableSet.copyOf(runningCommands.keySet())); + } + } + @Override public void serve() throws IOException { Preconditions.checkState(!serving); @@ -245,6 +301,11 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma return; } + ImmutableList.Builder<String> args = ImmutableList.builder(); + for (ByteString requestArg : request.getArgList()) { + args.add(requestArg.toString(CHARSET)); + } + String commandId; int exitCode; try (RunningCommand command = new RunningCommand()) { @@ -254,7 +315,7 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma new RpcOutputStream(observer, command.id, StreamType.STDERR)); exitCode = commandExecutor.exec( - request.getArgList(), rpcOutErr, + args.build(), rpcOutErr, request.getBlockForLock() ? LockingMode.WAIT : LockingMode.ERROR_OUT, request.getClientDescription(), clock.currentTimeMillis()); } catch (InterruptedException e) { @@ -308,6 +369,8 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma if (command != null) { command.thread.interrupt(); } + + startSlowInterruptWatcher(ImmutableSet.of(request.getCommandId())); } streamObserver.onNext(CancelResponse.newBuilder().setCookie(responseCookie).build()); diff --git a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java index fc6b83ffc2..a9663eaf72 100644 --- a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java +++ b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java @@ -90,4 +90,9 @@ public abstract class RPCServer { * Start serving and block until the a shutdown command is received. */ public abstract void serve() throws IOException; + + /** + * Called when the server receives a SIGINT. + */ + public abstract void interrupt(); } diff --git a/src/main/protobuf/command_server.proto b/src/main/protobuf/command_server.proto index eef2422c5e..69fc62ee1c 100644 --- a/src/main/protobuf/command_server.proto +++ b/src/main/protobuf/command_server.proto @@ -25,7 +25,7 @@ option java_outer_classname = "CommandProtos"; message RunRequest { // This must be the request cookie from the output base. A rudimentary form of authentication. string cookie = 1; - repeated string arg = 2; + repeated bytes arg = 2; bool block_for_lock = 3; // If false, the client won't wait for another client to finish string client_description = 4; } diff --git a/src/test/cpp/util/BUILD b/src/test/cpp/util/BUILD index e47f35e77f..c9a472013e 100644 --- a/src/test/cpp/util/BUILD +++ b/src/test/cpp/util/BUILD @@ -22,6 +22,15 @@ cc_test( ) cc_test( + name = "numbers_test", + srcs = ["numbers_test.cc"], + deps = [ + "//src/main/cpp/util", + "//third_party:gtest", + ], +) + +cc_test( name = "strings_test", srcs = ["strings_test.cc"], shard_count = 2, diff --git a/src/test/cpp/util/numbers_test.cc b/src/test/cpp/util/numbers_test.cc new file mode 100644 index 0000000000..71e648798f --- /dev/null +++ b/src/test/cpp/util/numbers_test.cc @@ -0,0 +1,38 @@ +// Copyright 2016 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include "src/main/cpp/util/numbers.h" +#include "src/main/cpp/util/port.h" +#include "gtest/gtest.h" + +namespace blaze_util { + +TEST(NumbersTest, TestSafeStrto32) { + int value; + ASSERT_TRUE(safe_strto32("0", &value)); + ASSERT_EQ(0, value); + ASSERT_TRUE(safe_strto32("42", &value)); + ASSERT_EQ(42, value); + ASSERT_TRUE(safe_strto32("007", &value)); + ASSERT_EQ(7, value); + ASSERT_TRUE(safe_strto32("1234567", &value)); + ASSERT_EQ(1234567, value); + ASSERT_TRUE(safe_strto32("-0", &value)); + ASSERT_EQ(0, value); + ASSERT_TRUE(safe_strto32("-273", &value)); + ASSERT_EQ(-273, value); + ASSERT_TRUE(safe_strto32("-0420", &value)); + ASSERT_EQ(-420, value); +} + +} // namespace blaze_util |