aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/cpp/util/numbers.cc26
-rw-r--r--src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java17
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java91
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java65
-rw-r--r--src/main/java/com/google/devtools/build/lib/server/RPCServer.java5
-rw-r--r--src/main/protobuf/command_server.proto2
-rw-r--r--src/test/cpp/util/BUILD9
-rw-r--r--src/test/cpp/util/numbers_test.cc38
8 files changed, 189 insertions, 64 deletions
diff --git a/src/main/cpp/util/numbers.cc b/src/main/cpp/util/numbers.cc
index 2e3b1095e8..46feb81eda 100644
--- a/src/main/cpp/util/numbers.cc
+++ b/src/main/cpp/util/numbers.cc
@@ -52,10 +52,10 @@ static const int8_t kAsciiToInt[256] = {
36, 36, 36, 36, 36, 36, 36, 36, 36, 36, 36, 36, 36, 36, 36, 36 };
// Parse the sign.
-inline bool safe_parse_sign(const string &text /*inout*/,
+inline bool safe_parse_sign(const char** rest, /*inout*/
bool* negative_ptr /*output*/) {
- const char* start = text.data();
- const char* end = start + text.size();
+ const char* start = *rest;
+ const char* end = start + strlen(start);
// Consume whitespace.
while (start < end && ascii_isspace(start[0])) {
@@ -77,6 +77,7 @@ inline bool safe_parse_sign(const string &text /*inout*/,
}
}
+ *rest = start;
return true;
}
@@ -104,13 +105,13 @@ inline bool safe_parse_sign(const string &text /*inout*/,
//
// Overflow checking becomes simple.
-inline bool safe_parse_positive_int(const string &text, int* value_p) {
+inline bool safe_parse_positive_int(const char *text, int* value_p) {
int value = 0;
const int vmax = std::numeric_limits<int>::max();
static_assert(vmax > 0, "");
const int vmax_over_base = vmax / 10;
- const char* start = text.data();
- const char* end = start + text.size();
+ const char* start = text;
+ const char* end = start + strlen(text);
// loop over digits
for (; start < end; ++start) {
unsigned char c = static_cast<unsigned char>(start[0]);
@@ -134,7 +135,7 @@ inline bool safe_parse_positive_int(const string &text, int* value_p) {
return true;
}
-inline bool safe_parse_negative_int(const string &text, int* value_p) {
+inline bool safe_parse_negative_int(const char *text, int* value_p) {
int value = 0;
const int vmin = std::numeric_limits<int>::min();
static_assert(vmin < 0, "");
@@ -146,8 +147,8 @@ inline bool safe_parse_negative_int(const string &text, int* value_p) {
if (vmin % 10 > 0) {
vmin_over_base += 1;
}
- const char* start = text.data();
- const char* end = start + text.size();
+ const char* start = text;
+ const char* end = start + strlen(text);
// loop over digits
for (; start < end; ++start) {
unsigned char c = static_cast<unsigned char>(start[0]);
@@ -173,14 +174,15 @@ inline bool safe_parse_negative_int(const string &text, int* value_p) {
bool safe_strto32(const string &text, int *value_p) {
*value_p = 0;
+ const char* rest = text.c_str();
bool negative;
- if (!safe_parse_sign(text, &negative)) {
+ if (!safe_parse_sign(&rest, &negative)) {
return false;
}
if (!negative) {
- return safe_parse_positive_int(text, value_p);
+ return safe_parse_positive_int(rest, value_p);
} else {
- return safe_parse_negative_int(text, value_p);
+ return safe_parse_negative_int(rest, value_p);
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java
index ace94597c9..25231366f6 100644
--- a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java
+++ b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java
@@ -838,8 +838,19 @@ public final class BlazeRuntime {
* the program.
*/
private static int serverMain(Iterable<BlazeModule> modules, OutErr outErr, String[] args) {
+ InterruptSignalHandler sigintHandler = null;
try {
- RPCServer blazeServer = createBlazeRPCServer(modules, Arrays.asList(args));
+ final RPCServer blazeServer = createBlazeRPCServer(modules, Arrays.asList(args));
+
+ // Register the signal handler.
+ sigintHandler = new InterruptSignalHandler() {
+ @Override
+ protected void onSignal() {
+ LOG.severe("User interrupt");
+ blazeServer.interrupt();
+ }
+ };
+
blazeServer.serve();
return ExitCode.SUCCESS.getNumericExitCode();
} catch (OptionsParsingException e) {
@@ -851,6 +862,10 @@ public final class BlazeRuntime {
} catch (AbruptExitException e) {
outErr.printErr(e.getMessage());
return e.getExitCode().getNumericExitCode();
+ } finally {
+ if (sigintHandler != null) {
+ sigintHandler.uninstall();
+ }
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java b/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java
index 26473e2323..558b87569a 100644
--- a/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java
+++ b/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java
@@ -19,7 +19,6 @@ import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import com.google.devtools.build.lib.server.RPCService.UnknownCommandException;
-import com.google.devtools.build.lib.server.signal.InterruptSignalHandler;
import com.google.devtools.build.lib.unix.LocalClientSocket;
import com.google.devtools.build.lib.unix.LocalServerSocket;
import com.google.devtools.build.lib.unix.LocalSocketAddress;
@@ -129,59 +128,54 @@ public final class AfUnixServer extends RPCServer {
serverDirectory, workspaceDir);
}
+
+ private final AtomicBoolean inAction = new AtomicBoolean(false);
+ private final AtomicBoolean allowingInterrupt = new AtomicBoolean(true);
+ private final AtomicLong cmdNum = new AtomicLong();
+ private final Thread mainThread = Thread.currentThread();
+ private final Object interruptLock = new Object();
+
+ @Override
+ public void interrupt() {
+ // Only interrupt during actions - otherwise we may end up setting the interrupt bit
+ // at the end of a build and responding to it at the beginning of the subsequent build.
+ synchronized (interruptLock) {
+ if (allowingInterrupt.get()) {
+ mainThread.interrupt();
+ }
+ }
+
+ if (inAction.get()) {
+ Runnable interruptWatcher =
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ long originalCmd = cmdNum.get();
+ Thread.sleep(10 * 1000);
+ if (inAction.get() && cmdNum.get() == originalCmd) {
+ // We're still operating on the same command.
+ // Interrupt took too long.
+ ThreadUtils.warnAboutSlowInterrupt();
+ }
+ } catch (InterruptedException e) {
+ // Ignore.
+ }
+ }
+ };
+ Thread interruptWatcherThread =
+ new Thread(interruptWatcher, "interrupt-watcher-" + cmdNum);
+ interruptWatcherThread.setDaemon(true);
+ interruptWatcherThread.start();
+ }
+ }
+
/**
* Wait on a socket for business (answer requests). Note that this
* method won't return until the server shuts down.
*/
@Override
public void serve() {
- // Register the signal handler.
- final AtomicBoolean inAction = new AtomicBoolean(false);
- final AtomicBoolean allowingInterrupt = new AtomicBoolean(true);
- final AtomicLong cmdNum = new AtomicLong();
- final Thread mainThread = Thread.currentThread();
- final Object interruptLock = new Object();
-
- InterruptSignalHandler sigintHandler =
- new InterruptSignalHandler() {
- @Override
- protected void onSignal() {
- LOG.severe("User interrupt");
-
- // Only interrupt during actions - otherwise we may end up setting the interrupt bit
- // at the end of a build and responding to it at the beginning of the subsequent build.
- synchronized (interruptLock) {
- if (allowingInterrupt.get()) {
- mainThread.interrupt();
- }
- }
-
- if (inAction.get()) {
- Runnable interruptWatcher =
- new Runnable() {
- @Override
- public void run() {
- try {
- long originalCmd = cmdNum.get();
- Thread.sleep(10 * 1000);
- if (inAction.get() && cmdNum.get() == originalCmd) {
- // We're still operating on the same command.
- // Interrupt took too long.
- ThreadUtils.warnAboutSlowInterrupt();
- }
- } catch (InterruptedException e) {
- // Ignore.
- }
- }
- };
- Thread interruptWatcherThread =
- new Thread(interruptWatcher, "interrupt-watcher-" + cmdNum);
- interruptWatcherThread.setDaemon(true);
- interruptWatcherThread.start();
- }
- }
- };
-
try {
while (!lameDuck) {
try {
@@ -246,7 +240,6 @@ public final class AfUnixServer extends RPCServer {
} finally {
rpcService.shutdown();
LOG.info("Logging finished");
- sigintHandler.uninstall();
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
index 477f4b4850..a5dbbf0ac4 100644
--- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
+++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
@@ -14,6 +14,8 @@
package com.google.devtools.build.lib.server;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.LockingMode;
import com.google.devtools.build.lib.runtime.CommandExecutor;
import com.google.devtools.build.lib.server.CommandProtos.CancelRequest;
@@ -25,6 +27,7 @@ import com.google.devtools.build.lib.server.CommandProtos.RunResponse;
import com.google.devtools.build.lib.util.Clock;
import com.google.devtools.build.lib.util.ExitCode;
import com.google.devtools.build.lib.util.Preconditions;
+import com.google.devtools.build.lib.util.ThreadUtils;
import com.google.devtools.build.lib.util.io.OutErr;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
@@ -39,10 +42,15 @@ import java.io.OutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
+import java.nio.charset.Charset;
import java.security.SecureRandom;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.concurrent.GuardedBy;
/**
* gRPC server class.
@@ -51,6 +59,11 @@ import java.util.UUID;
* bootstrapping.
*/
public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.CommandServer {
+ // UTF-8 won't do because we want to be able to pass arbitrary binary strings.
+ // Not that the internals of Bazel handle that correctly, but why not make at least this little
+ // part correct?
+ private static final Charset CHARSET = Charset.forName("ISO-8859-1");
+
private class RunningCommand implements AutoCloseable {
private final Thread thread;
private final String id;
@@ -129,12 +142,14 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma
private static final String REQUEST_COOKIE_FILE = "request_cookie";
private static final String RESPONSE_COOKIE_FILE = "response_cookie";
+ @GuardedBy("runningCommands")
private final Map<String, RunningCommand> runningCommands = new HashMap<>();
private final CommandExecutor commandExecutor;
private final Clock clock;
private final Path serverDirectory;
private final String requestCookie;
private final String responseCookie;
+ private final AtomicLong interruptCounter = new AtomicLong(0);
private Server server;
private int port; // mutable so that we can overwrite it if port 0 is passed in
@@ -165,6 +180,47 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma
return result.toString();
}
+ private void startSlowInterruptWatcher(final ImmutableSet<String> commandIds) {
+ if (commandIds.isEmpty()) {
+ return;
+ }
+
+ Runnable interruptWatcher = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ boolean ok;
+ Thread.sleep(10 * 1000);
+ synchronized (runningCommands) {
+ ok = Collections.disjoint(commandIds, runningCommands.keySet());
+ }
+ if (!ok) {
+ // At least one command was not interrupted. Interrupt took too long.
+ ThreadUtils.warnAboutSlowInterrupt();
+ }
+ } catch (InterruptedException e) {
+ // Ignore.
+ }
+ }
+ };
+
+ Thread interruptWatcherThread =
+ new Thread(interruptWatcher, "interrupt-watcher-" + interruptCounter.incrementAndGet());
+ interruptWatcherThread.setDaemon(true);
+ interruptWatcherThread.start();
+ }
+
+ @Override
+ public void interrupt() {
+ synchronized (runningCommands) {
+ for (RunningCommand command : runningCommands.values()) {
+ command.thread.interrupt();
+ }
+
+ startSlowInterruptWatcher(ImmutableSet.copyOf(runningCommands.keySet()));
+ }
+ }
+
@Override
public void serve() throws IOException {
Preconditions.checkState(!serving);
@@ -245,6 +301,11 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma
return;
}
+ ImmutableList.Builder<String> args = ImmutableList.builder();
+ for (ByteString requestArg : request.getArgList()) {
+ args.add(requestArg.toString(CHARSET));
+ }
+
String commandId;
int exitCode;
try (RunningCommand command = new RunningCommand()) {
@@ -254,7 +315,7 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma
new RpcOutputStream(observer, command.id, StreamType.STDERR));
exitCode = commandExecutor.exec(
- request.getArgList(), rpcOutErr,
+ args.build(), rpcOutErr,
request.getBlockForLock() ? LockingMode.WAIT : LockingMode.ERROR_OUT,
request.getClientDescription(), clock.currentTimeMillis());
} catch (InterruptedException e) {
@@ -308,6 +369,8 @@ public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.Comma
if (command != null) {
command.thread.interrupt();
}
+
+ startSlowInterruptWatcher(ImmutableSet.of(request.getCommandId()));
}
streamObserver.onNext(CancelResponse.newBuilder().setCookie(responseCookie).build());
diff --git a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
index fc6b83ffc2..a9663eaf72 100644
--- a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
+++ b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
@@ -90,4 +90,9 @@ public abstract class RPCServer {
* Start serving and block until the a shutdown command is received.
*/
public abstract void serve() throws IOException;
+
+ /**
+ * Called when the server receives a SIGINT.
+ */
+ public abstract void interrupt();
}
diff --git a/src/main/protobuf/command_server.proto b/src/main/protobuf/command_server.proto
index eef2422c5e..69fc62ee1c 100644
--- a/src/main/protobuf/command_server.proto
+++ b/src/main/protobuf/command_server.proto
@@ -25,7 +25,7 @@ option java_outer_classname = "CommandProtos";
message RunRequest {
// This must be the request cookie from the output base. A rudimentary form of authentication.
string cookie = 1;
- repeated string arg = 2;
+ repeated bytes arg = 2;
bool block_for_lock = 3; // If false, the client won't wait for another client to finish
string client_description = 4;
}
diff --git a/src/test/cpp/util/BUILD b/src/test/cpp/util/BUILD
index e47f35e77f..c9a472013e 100644
--- a/src/test/cpp/util/BUILD
+++ b/src/test/cpp/util/BUILD
@@ -22,6 +22,15 @@ cc_test(
)
cc_test(
+ name = "numbers_test",
+ srcs = ["numbers_test.cc"],
+ deps = [
+ "//src/main/cpp/util",
+ "//third_party:gtest",
+ ],
+)
+
+cc_test(
name = "strings_test",
srcs = ["strings_test.cc"],
shard_count = 2,
diff --git a/src/test/cpp/util/numbers_test.cc b/src/test/cpp/util/numbers_test.cc
new file mode 100644
index 0000000000..71e648798f
--- /dev/null
+++ b/src/test/cpp/util/numbers_test.cc
@@ -0,0 +1,38 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+#include "src/main/cpp/util/numbers.h"
+#include "src/main/cpp/util/port.h"
+#include "gtest/gtest.h"
+
+namespace blaze_util {
+
+TEST(NumbersTest, TestSafeStrto32) {
+ int value;
+ ASSERT_TRUE(safe_strto32("0", &value));
+ ASSERT_EQ(0, value);
+ ASSERT_TRUE(safe_strto32("42", &value));
+ ASSERT_EQ(42, value);
+ ASSERT_TRUE(safe_strto32("007", &value));
+ ASSERT_EQ(7, value);
+ ASSERT_TRUE(safe_strto32("1234567", &value));
+ ASSERT_EQ(1234567, value);
+ ASSERT_TRUE(safe_strto32("-0", &value));
+ ASSERT_EQ(0, value);
+ ASSERT_TRUE(safe_strto32("-273", &value));
+ ASSERT_EQ(-273, value);
+ ASSERT_TRUE(safe_strto32("-0420", &value));
+ ASSERT_EQ(-420, value);
+}
+
+} // namespace blaze_util