aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com
diff options
context:
space:
mode:
authorGravatar buchgr <buchgr@google.com>2017-06-08 23:56:05 +0200
committerGravatar Jakob Buchgraber <buchgr@google.com>2017-06-09 10:23:19 +0200
commit2730bae6223d611fbe5a45463cd788c4f4cc076f (patch)
treeb01b1cd690e075bcc86e0da531d50b03ce839005 /src/main/java/com
parent55c5a60bfbf3429c773b899ab331bc41019ddca3 (diff)
BES: Open Source the build event service gRPC client implementation.
This change moves the BES code from blaze to bazel. RELNOTES: None. PiperOrigin-RevId: 158445754
Diffstat (limited to 'src/main/java/com')
-rw-r--r--src/main/java/com/google/devtools/build/lib/BUILD31
-rw-r--r--src/main/java/com/google/devtools/build/lib/bazel/BazelMain.java4
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventservice/BazelBuildEventServiceModule.java40
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java225
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java112
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java168
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java527
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventservice/client/BUILD31
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java90
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java234
10 files changed, 1460 insertions, 2 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/BUILD b/src/main/java/com/google/devtools/build/lib/BUILD
index 2864670239..6c4c255d38 100644
--- a/src/main/java/com/google/devtools/build/lib/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/BUILD
@@ -21,6 +21,7 @@ filegroup(
"//src/main/java/com/google/devtools/build/lib/bazel/repository/downloader:srcs",
"//src/main/java/com/google/devtools/build/lib/buildeventstream/proto:srcs",
"//src/main/java/com/google/devtools/build/lib/buildeventstream/transports:srcs",
+ "//src/main/java/com/google/devtools/build/lib/buildeventservice/client:srcs",
"//src/main/java/com/google/devtools/build/lib/causes:srcs",
"//src/main/java/com/google/devtools/build/lib/cmdline:srcs",
"//src/main/java/com/google/devtools/build/lib/exec/local:srcs",
@@ -367,6 +368,35 @@ java_library(
)
java_library(
+ name = "buildeventservice",
+ srcs = glob(["buildeventservice/*.java"]),
+ visibility = [
+ "//visibility:public",
+ ],
+ deps = [
+ "//src/main/java/com/google/devtools/build/lib:auth_and_tls_options",
+ "//src/main/java/com/google/devtools/build/lib:buildeventstream",
+ "//src/main/java/com/google/devtools/build/lib:events",
+ "//src/main/java/com/google/devtools/build/lib:io",
+ "//src/main/java/com/google/devtools/build/lib:runtime",
+ "//src/main/java/com/google/devtools/build/lib:util",
+ "//src/main/java/com/google/devtools/build/lib/buildeventservice/client",
+ "//src/main/java/com/google/devtools/build/lib/buildeventstream/proto:build_event_stream_java_proto",
+ "//src/main/java/com/google/devtools/common/options",
+ "//third_party:guava",
+ "//third_party:joda_time",
+ "//third_party:jsr305",
+ "//third_party/grpc:grpc-jar",
+ "@com_google_protobuf//:protobuf_java",
+ "@com_google_protobuf//:protobuf_java_util",
+ "@com_google_protobuf//:well_known_types_any_proto",
+ "@googleapis//:google_devtools_build_v1_build_events_java_proto",
+ "@googleapis//:google_devtools_build_v1_build_status_java_proto",
+ "@googleapis//:google_devtools_build_v1_publish_build_event_java_proto",
+ ],
+)
+
+java_library(
name = "auth_and_tls_options",
srcs = ["authandtls/AuthAndTLSOptions.java"],
visibility = [
@@ -716,6 +746,7 @@ java_library(
":bazel-rules",
":build-base",
":build-info",
+ ":buildeventservice",
":clock",
":events",
":io",
diff --git a/src/main/java/com/google/devtools/build/lib/bazel/BazelMain.java b/src/main/java/com/google/devtools/build/lib/bazel/BazelMain.java
index 5914b765c3..62d070ee86 100644
--- a/src/main/java/com/google/devtools/build/lib/bazel/BazelMain.java
+++ b/src/main/java/com/google/devtools/build/lib/bazel/BazelMain.java
@@ -18,7 +18,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.devtools.build.lib.analysis.BlazeVersionInfo;
import com.google.devtools.build.lib.runtime.BlazeModule;
import com.google.devtools.build.lib.runtime.BlazeRuntime;
-
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
@@ -51,7 +50,8 @@ public final class BazelMain {
com.google.devtools.build.lib.runtime.BuildSummaryStatsModule.class,
com.google.devtools.build.lib.runtime.BuildEventStreamerModule.class,
com.google.devtools.build.lib.bazel.rules.BazelRulesModule.class,
- com.google.devtools.build.lib.bazel.rules.BazelStrategyModule.class);
+ com.google.devtools.build.lib.bazel.rules.BazelStrategyModule.class,
+ com.google.devtools.build.lib.buildeventservice.BazelBuildEventServiceModule.class);
public static void main(String[] args) {
BlazeVersionInfo.setBuildInfo(tryGetBuildInfo());
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BazelBuildEventServiceModule.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BazelBuildEventServiceModule.java
new file mode 100644
index 0000000000..c80f77e58e
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BazelBuildEventServiceModule.java
@@ -0,0 +1,40 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.buildeventservice;
+
+import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
+import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient;
+import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceGrpcClient;
+
+/**
+ * Bazel's BES module.
+ */
+public class BazelBuildEventServiceModule
+ extends BuildEventServiceModule<BuildEventServiceOptions> {
+
+ @Override
+ protected Class<BuildEventServiceOptions> optionsClass() {
+ return BuildEventServiceOptions.class;
+ }
+
+ @Override
+ protected BuildEventServiceClient createBesClient(BuildEventServiceOptions besOptions,
+ AuthAndTLSOptions authAndTLSOptions) {
+ return new BuildEventServiceGrpcClient(
+ besOptions.besBackend, authAndTLSOptions.tlsEnabled, authAndTLSOptions.tlsCertificate,
+ authAndTLSOptions.tlsAuthorityOverride, authAndTLSOptions.authCredentials,
+ authAndTLSOptions.authScope);
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
new file mode 100644
index 0000000000..76c446bd95
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
@@ -0,0 +1,225 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.buildeventservice;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static com.google.devtools.build.lib.buildeventservice.BuildEventServiceTransport.UPLOAD_FAILED_MESSAGE;
+import static java.lang.String.format;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.eventbus.Subscribe;
+import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
+import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient;
+import com.google.devtools.build.lib.buildeventstream.BuildEvent;
+import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
+import com.google.devtools.build.lib.events.Event;
+import com.google.devtools.build.lib.events.EventHandler;
+import com.google.devtools.build.lib.runtime.BlazeModule;
+import com.google.devtools.build.lib.runtime.BuildEventStreamer;
+import com.google.devtools.build.lib.runtime.Command;
+import com.google.devtools.build.lib.runtime.CommandEnvironment;
+import com.google.devtools.build.lib.runtime.SynchronizedOutputStream;
+import com.google.devtools.build.lib.util.AbruptExitException;
+import com.google.devtools.build.lib.util.Clock;
+import com.google.devtools.build.lib.util.ExitCode;
+import com.google.devtools.build.lib.util.io.OutErr;
+import com.google.devtools.common.options.OptionsBase;
+import com.google.devtools.common.options.OptionsProvider;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Logger;
+import javax.annotation.Nullable;
+
+/**
+ * Module responsible for the {@link BuildEventTransport} and its {@link BuildEventStreamer}.
+ *
+ * Implementors of this class have to overwrite {@link #optionsClass()} and
+ * {@link #createBesClient(BuildEventServiceOptions)}.
+ */
+public abstract class BuildEventServiceModule<T extends BuildEventServiceOptions>
+ extends BlazeModule {
+
+ private static final Logger logger = Logger.getLogger(BuildEventServiceModule.class.getName());
+
+ private CommandEnvironment commandEnvironment;
+ private SynchronizedOutputStream out;
+ private SynchronizedOutputStream err;
+
+ private static class BuildEventRecorder {
+ private final List<BuildEvent> events = new ArrayList<>();
+
+ @Subscribe
+ public void buildEvent(BuildEvent event) {
+ events.add(event);
+ }
+
+ List<BuildEvent> getEvents() {
+ return events;
+ }
+ }
+
+ private BuildEventRecorder buildEventRecorder;
+
+ @Override
+ public Iterable<Class<? extends OptionsBase>> getCommandOptions(Command command) {
+ return ImmutableList.of(optionsClass(), AuthAndTLSOptions.class);
+ }
+
+ @Override
+ public void beforeCommand(Command command, CommandEnvironment commandEnvironment)
+ throws AbruptExitException {
+ this.commandEnvironment = commandEnvironment;
+ this.buildEventRecorder = new BuildEventRecorder();
+ commandEnvironment.getEventBus().register(buildEventRecorder);
+ }
+
+ @Override
+ public void handleOptions(OptionsProvider optionsProvider) {
+ checkState(commandEnvironment != null, "Methods called out of order");
+ BuildEventStreamer streamer =
+ tryCreateStreamer(
+ optionsProvider,
+ commandEnvironment.getReporter(),
+ commandEnvironment.getBlazeModuleEnvironment(),
+ commandEnvironment.getRuntime().getClock(),
+ commandEnvironment.getClientEnv().get("BAZEL_INTERNAL_BUILD_REQUEST_ID"),
+ commandEnvironment.getCommandId().toString());
+ if (streamer != null) {
+ commandEnvironment.getReporter().addHandler(streamer);
+ commandEnvironment.getEventBus().register(streamer);
+
+ final SynchronizedOutputStream theOut = this.out;
+ final SynchronizedOutputStream theErr = this.err;
+ // out and err should be non-null at this point, as getOutputListener is supposed to
+ // be always called before handleOptions. But let's still prefer a stream with no
+ // stdout/stderr over an aborted build.
+ streamer.registerOutErrProvider(
+ new BuildEventStreamer.OutErrProvider() {
+ @Override
+ public String getOut() {
+ if (theOut == null) {
+ return null;
+ }
+ return theOut.readAndReset();
+ }
+
+ @Override
+ public String getErr() {
+ if (theErr == null) {
+ return null;
+ }
+ return theErr.readAndReset();
+ }
+ });
+ if (theErr != null) {
+ theErr.registerStreamer(streamer);
+ }
+ if (theOut != null) {
+ theOut.registerStreamer(streamer);
+ }
+ for (BuildEvent event : buildEventRecorder.getEvents()) {
+ streamer.buildEvent(event);
+ }
+ logger.fine("BuildEventStreamer created and registered successfully.");
+ } else {
+ // If there is no streamer to consume the output, we should not try to accumulate it.
+ this.out.setDiscardAll();
+ this.err.setDiscardAll();
+ }
+ commandEnvironment.getEventBus().unregister(buildEventRecorder);
+ this.buildEventRecorder = null;
+ }
+
+ @Override
+ public OutErr getOutputListener() {
+ this.out = new SynchronizedOutputStream();
+ this.err = new SynchronizedOutputStream();
+ return OutErr.create(this.out, this.err);
+ }
+
+ /**
+ * Returns {@code null} if no stream could be created.
+ *
+ * @param buildRequestId if {@code null} or {@code ""} a random UUID is used instead.
+ */
+ @Nullable
+ private BuildEventStreamer tryCreateStreamer(
+ OptionsProvider optionsProvider,
+ EventHandler commandLineReporter,
+ ModuleEnvironment moduleEnvironment,
+ Clock clock,
+ String buildRequestId,
+ String invocationId) {
+ T besOptions = null;
+ try {
+ besOptions =
+ checkNotNull(
+ optionsProvider.getOptions(optionsClass()),
+ "Could not get BuildEventServiceOptions.");
+ AuthAndTLSOptions authTlsOptions =
+ checkNotNull(optionsProvider.getOptions(AuthAndTLSOptions.class),
+ "Could not get AuthAndTLSOptions.");
+ if (isNullOrEmpty(besOptions.besBackend)) {
+ logger.fine("BuildEventServiceTransport is disabled.");
+ } else {
+ logger.fine(format("Will create BuildEventServiceTransport streaming to '%s'",
+ besOptions.besBackend));
+
+ buildRequestId = isNullOrEmpty(buildRequestId)
+ ? UUID.randomUUID().toString()
+ : buildRequestId;
+ commandLineReporter.handle(
+ Event.info(
+ format(
+ "Streaming Build Event Protocol to %s build_request_id: %s invocation_id: %s",
+ besOptions.besBackend, buildRequestId, invocationId)));
+
+ BuildEventTransport besTransport =
+ new BuildEventServiceTransport(
+ createBesClient(besOptions, authTlsOptions),
+ besOptions.besTimeout,
+ besOptions.besBestEffort,
+ besOptions.besLifecycleEvents,
+ buildRequestId,
+ invocationId,
+ moduleEnvironment,
+ clock,
+ commandEnvironment.getRuntime().getPathToUriConverter(),
+ commandLineReporter,
+ besOptions.projectId);
+ logger.fine("BuildEventServiceTransport was created successfully");
+ return new BuildEventStreamer(ImmutableSet.of(besTransport),
+ commandEnvironment.getReporter());
+ }
+ } catch (Exception e) {
+ if (besOptions != null && besOptions.besBestEffort) {
+ commandLineReporter.handle(Event.warn(format(UPLOAD_FAILED_MESSAGE, e.getMessage())));
+ } else {
+ commandLineReporter.handle(Event.error(format(UPLOAD_FAILED_MESSAGE, e.getMessage())));
+ moduleEnvironment.exit(new AbruptExitException(ExitCode.PUBLISH_ERROR));
+ }
+ }
+ return null;
+ }
+
+ protected abstract Class<T> optionsClass();
+
+ protected abstract BuildEventServiceClient createBesClient(T besOptions,
+ AuthAndTLSOptions authAndTLSOptions);
+} \ No newline at end of file
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java
new file mode 100644
index 0000000000..338cd5b950
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java
@@ -0,0 +1,112 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.buildeventservice;
+
+import com.google.devtools.common.options.Converter;
+import com.google.devtools.common.options.Option;
+import com.google.devtools.common.options.OptionsBase;
+import com.google.devtools.common.options.OptionsParsingException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.joda.time.Duration;
+
+/** Options used by {@link BuildEventServiceModule}. */
+public class BuildEventServiceOptions extends OptionsBase {
+
+ @Option(
+ name = "bes_backend",
+ defaultValue = "",
+ help = "Specifies the build event service (BES) backend endpoint as HOST or HOST:PORT. "
+ + "Disabled by default."
+ )
+ public String besBackend;
+
+ @Option(
+ name = "bes_timeout",
+ defaultValue = "0s",
+ converter = DurationConverter.class,
+ help = "Specifies how long bazel should wait for the BES/BEP upload to complete after the "
+ + "build and tests have finished. A valid timeout is a natural number followed by a "
+ + "unit: Days (d), hours (h), minutes (m), seconds (s), and milliseconds (ms). The "
+ + "default value is '0' which means that there is no timeout and that the upload will "
+ + "continue in the background after a build has finished."
+ )
+ public Duration besTimeout;
+
+ @Option(
+ name = "bes_best_effort",
+ defaultValue = "true",
+ help = "Specifies whether a failure to upload the BES protocol should also result in a build "
+ + "failure. If 'true', bazel exits with ExitCode.PUBLISH_ERROR. (defaults to 'true')."
+ )
+ public boolean besBestEffort;
+
+ @Option(
+ name = "bes_lifecycle_events",
+ defaultValue = "true",
+ help = "Specifies whether to publish BES lifecycle events. (defaults to 'true')."
+ )
+ public boolean besLifecycleEvents;
+
+ @Option(
+ name = "project_id",
+ defaultValue = "null",
+ help = "Specifies the BES project identifier. Defaults to null."
+ )
+ public String projectId;
+
+ /**
+ * Simple String to Duration Converter.
+ */
+ public static class DurationConverter implements Converter<Duration> {
+
+ private final Pattern durationRegex = Pattern.compile("^([0-9]+)(d|h|m|s|ms)$");
+
+ @Override
+ public Duration convert(String input) throws OptionsParsingException {
+ // To be compatible with the previous parser, '0' doesn't need a unit.
+ if ("0".equals(input)) {
+ return Duration.ZERO;
+ }
+ Matcher m = durationRegex.matcher(input);
+ if (!m.matches()) {
+ throw new OptionsParsingException("Illegal duration '" + input + "'.");
+ }
+ long duration = Long.parseLong(m.group(1));
+ String unit = m.group(2);
+ switch(unit) {
+ case "d":
+ return Duration.standardDays(duration);
+ case "h":
+ return Duration.standardHours(duration);
+ case "m":
+ return Duration.standardMinutes(duration);
+ case "s":
+ return Duration.standardSeconds(duration);
+ case "ms":
+ return Duration.millis(duration);
+ default:
+ throw new IllegalStateException("This must not happen. Did you update the regex without "
+ + "the switch case?");
+ }
+ }
+
+ @Override
+ public String getTypeDescription() {
+ return "An immutable length of time.";
+ }
+ }
+
+}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java
new file mode 100644
index 0000000000..7ecfcb020a
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java
@@ -0,0 +1,168 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.buildeventservice;
+
+import static com.google.devtools.build.v1.BuildEvent.BuildComponentStreamFinished.FinishType.FINISHED;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.devtools.build.lib.util.Clock;
+import com.google.devtools.build.v1.BuildEvent;
+import com.google.devtools.build.v1.BuildEvent.BuildComponentStreamFinished;
+import com.google.devtools.build.v1.BuildEvent.BuildEnqueued;
+import com.google.devtools.build.v1.BuildEvent.BuildFinished;
+import com.google.devtools.build.v1.BuildEvent.EventCase;
+import com.google.devtools.build.v1.BuildEvent.InvocationAttemptFinished;
+import com.google.devtools.build.v1.BuildEvent.InvocationAttemptStarted;
+import com.google.devtools.build.v1.BuildStatus;
+import com.google.devtools.build.v1.BuildStatus.Result;
+import com.google.devtools.build.v1.OrderedBuildEvent;
+import com.google.devtools.build.v1.PublishLifecycleEventRequest;
+import com.google.devtools.build.v1.StreamId;
+import com.google.devtools.build.v1.StreamId.BuildComponent;
+import com.google.protobuf.Any;
+import com.google.protobuf.util.Timestamps;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
+
+/** Utility class used to build protobuffs requests that are meant to be sent over BES. */
+public final class BuildEventServiceProtoUtil {
+
+ private final String buildRequestId;
+ private final String buildInvocationId;
+ private final String projectId;
+ private final AtomicInteger streamSequenceNumber;
+ private final Clock clock;
+
+ public BuildEventServiceProtoUtil(String buildRequestId, String buildInvocationId,
+ @Nullable String projectId, Clock clock) {
+ this.buildRequestId = buildRequestId;
+ this.buildInvocationId = buildInvocationId;
+ this.projectId = projectId;
+ this.clock = clock;
+ this.streamSequenceNumber = new AtomicInteger(1);
+ }
+
+ public PublishLifecycleEventRequest buildEnqueued() {
+ return lifecycleEvent(projectId, 1,
+ com.google.devtools.build.v1.BuildEvent.newBuilder()
+ .setEventTime(Timestamps.fromMillis(clock.currentTimeMillis()))
+ .setBuildEnqueued(BuildEnqueued.newBuilder()))
+ .build();
+ }
+
+ public PublishLifecycleEventRequest buildFinished(Result result) {
+ return lifecycleEvent(projectId, 2,
+ com.google.devtools.build.v1.BuildEvent.newBuilder()
+ .setEventTime(Timestamps.fromMillis(clock.currentTimeMillis()))
+ .setBuildFinished(
+ BuildFinished.newBuilder()
+ .setStatus(BuildStatus.newBuilder().setResult(result))))
+ .build();
+ }
+
+ public PublishLifecycleEventRequest invocationStarted() {
+ return lifecycleEvent(projectId, 1,
+ com.google.devtools.build.v1.BuildEvent.newBuilder()
+ .setEventTime(Timestamps.fromMillis(clock.currentTimeMillis()))
+ .setInvocationAttemptStarted(
+ InvocationAttemptStarted.newBuilder().setAttemptNumber(1)))
+ .build();
+ }
+
+ public PublishLifecycleEventRequest invocationFinished(Result result) {
+ return lifecycleEvent(projectId, 2,
+ com.google.devtools.build.v1.BuildEvent.newBuilder()
+ .setEventTime(Timestamps.fromMillis(clock.currentTimeMillis()))
+ .setInvocationAttemptFinished(
+ InvocationAttemptFinished.newBuilder()
+ .setInvocationStatus(BuildStatus.newBuilder().setResult(result))))
+ .build();
+ }
+
+ /** Utility method used to create a OrderedBuildEvent that delimits the end of the stream. */
+ public OrderedBuildEvent streamFinished() {
+ return streamFinished(streamSequenceNumber.getAndIncrement());
+ }
+
+ /** Utility method used to create a OrderedBuildEvent from an packed bazel event */
+ public OrderedBuildEvent bazelEvent(Any packedEvent) {
+ return bazelEvent(streamSequenceNumber.getAndIncrement(), packedEvent);
+ }
+
+ @VisibleForTesting
+ public OrderedBuildEvent bazelEvent(int sequenceNumber, Any packedEvent) {
+ return orderedBuildEvent(
+ sequenceNumber,
+ com.google.devtools.build.v1.BuildEvent.newBuilder().setBazelEvent(packedEvent));
+ }
+
+ @VisibleForTesting
+ public OrderedBuildEvent streamFinished(int sequenceNumber) {
+ return orderedBuildEvent(
+ sequenceNumber,
+ BuildEvent.newBuilder()
+ .setComponentStreamFinished(
+ BuildComponentStreamFinished.newBuilder().setType(FINISHED)));
+ }
+
+ @VisibleForTesting
+ public OrderedBuildEvent orderedBuildEvent(int sequenceNumber, BuildEvent.Builder besEvent) {
+ return OrderedBuildEvent.newBuilder()
+ .setSequenceNumber(sequenceNumber)
+ .setEvent(besEvent.setEventTime(Timestamps.fromMillis(clock.currentTimeMillis())))
+ .setStreamId(streamId(besEvent.getEventCase()))
+ .build();
+ }
+
+ @VisibleForTesting
+ public PublishLifecycleEventRequest.Builder lifecycleEvent(@Nullable String projectId,
+ int sequenceNumber, BuildEvent.Builder lifecycleEvent) {
+ PublishLifecycleEventRequest.Builder builder = PublishLifecycleEventRequest.newBuilder()
+ .setServiceLevel(PublishLifecycleEventRequest.ServiceLevel.INTERACTIVE)
+ .setBuildEvent(
+ OrderedBuildEvent.newBuilder()
+ .setSequenceNumber(sequenceNumber)
+ .setStreamId(streamId(lifecycleEvent.getEventCase()))
+ .setEvent(lifecycleEvent));
+ if (projectId != null) {
+ builder.setProjectId(projectId);
+ }
+ return builder;
+ }
+
+ @VisibleForTesting
+ public StreamId streamId(EventCase eventCase) {
+ StreamId.Builder streamId = StreamId.newBuilder().setBuildId(buildRequestId);
+ switch (eventCase) {
+ case BUILD_ENQUEUED:
+ case BUILD_FINISHED:
+ streamId.setComponent(BuildComponent.CONTROLLER);
+ break;
+ case INVOCATION_ATTEMPT_STARTED:
+ case INVOCATION_ATTEMPT_FINISHED:
+ streamId.setInvocationId(buildInvocationId);
+ streamId.setComponent(BuildComponent.CONTROLLER);
+ break;
+ case BAZEL_EVENT:
+ case COMPONENT_STREAM_FINISHED:
+ streamId.setInvocationId(buildInvocationId);
+ streamId.setComponent(BuildComponent.TOOL);
+ break;
+ default:
+ throw new IllegalArgumentException("Illegal EventCase " + eventCase);
+ }
+ return streamId.build();
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
new file mode 100644
index 0000000000..467ee8bb0c
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
@@ -0,0 +1,527 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.buildeventservice;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
+import static com.google.devtools.build.lib.events.EventKind.ERROR;
+import static com.google.devtools.build.lib.events.EventKind.INFO;
+import static com.google.devtools.build.lib.events.EventKind.WARNING;
+import static com.google.devtools.build.v1.BuildEvent.EventCase.COMPONENT_STREAM_FINISHED;
+import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_FAILED;
+import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_SUCCEEDED;
+import static com.google.devtools.build.v1.BuildStatus.Result.UNKNOWN_STATUS;
+import static java.lang.String.format;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.joda.time.Duration.standardSeconds;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient;
+import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
+import com.google.devtools.build.lib.buildeventstream.BuildEvent;
+import com.google.devtools.build.lib.buildeventstream.BuildEventConverters;
+import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
+import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildEvent.PayloadCase;
+import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildFinished;
+import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
+import com.google.devtools.build.lib.buildeventstream.PathConverter;
+import com.google.devtools.build.lib.events.Event;
+import com.google.devtools.build.lib.events.EventHandler;
+import com.google.devtools.build.lib.events.EventKind;
+import com.google.devtools.build.lib.runtime.BlazeModule.ModuleEnvironment;
+import com.google.devtools.build.lib.util.AbruptExitException;
+import com.google.devtools.build.lib.util.Clock;
+import com.google.devtools.build.lib.util.ExitCode;
+import com.google.devtools.build.v1.BuildStatus.Result;
+import com.google.devtools.build.v1.OrderedBuildEvent;
+import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse;
+import com.google.devtools.build.v1.PublishLifecycleEventRequest;
+import com.google.protobuf.Any;
+import io.grpc.Status;
+import java.util.Deque;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.LockSupport;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.annotation.Nullable;
+import org.joda.time.Duration;
+
+/** A {@link BuildEventTransport} that streams {@link BuildEvent}s to BuildEventService. */
+public class BuildEventServiceTransport implements BuildEventTransport {
+
+ static final String UPLOAD_FAILED_MESSAGE = "Build Event Protocol upload failed: %s";
+ static final String UPLOAD_SUCCEEDED_MESSAGE =
+ "Build Event Protocol upload finished successfully.";
+
+ private static final Logger logger = Logger.getLogger(BuildEventServiceTransport.class.getName());
+
+ /** Max wait time until for the Streaming RPC to finish after all events were enqueued. */
+ private static final Duration PUBLISH_EVENT_STREAM_FINISHED_TIMEOUT = standardSeconds(120);
+
+ private final ListeningExecutorService uploaderExecutorService;
+ private final Duration uploadTimeout;
+ private final boolean publishLifecycleEvents;
+ private final boolean bestEffortUpload;
+ private final BuildEventServiceClient besClient;
+ private final BuildEventServiceProtoUtil besProtoUtil;
+ private final ModuleEnvironment moduleEnvironment;
+ private final EventHandler commandLineReporter;
+
+ private final PathConverter pathConverter;
+ /** Contains all pendingAck events that might be retried in case of failures. */
+ private ConcurrentLinkedDeque<OrderedBuildEvent> pendingAck;
+ /** Contains all events should be sent ordered by sequence number. */
+ private final BlockingDeque<OrderedBuildEvent> pendingSend;
+ /** Holds the result status of the BuildEventStreamProtos BuildFinished event. */
+ private Result invocationResult;
+ /** Used to block until all events have been uploaded. */
+ private ListenableFuture<?> uploadComplete;
+ /** Used to ensure that the close logic is only invoked once. */
+ private SettableFuture<Void> shutdownFuture;
+ /**
+ * If the call before the current call threw an exception, this field points to it. If the
+ * previous call was successful, this field is null. This is useful for error reporting, when an
+ * upload times out due to having had to retry several times.
+ */
+ private volatile Exception lastKnownError;
+ /** Returns true if we already reported a warning or error to UI. */
+ private volatile boolean errorsReported;
+
+ public BuildEventServiceTransport(
+ BuildEventServiceClient besClient,
+ Duration uploadTimeout,
+ boolean bestEffortUpload,
+ boolean publishLifecycleEvents,
+ String buildRequestId,
+ String invocationId,
+ ModuleEnvironment moduleEnvironment,
+ Clock clock,
+ PathConverter pathConverter,
+ EventHandler commandLineReporter,
+ @Nullable String projectId) {
+ this(
+ besClient,
+ uploadTimeout,
+ bestEffortUpload,
+ publishLifecycleEvents,
+ moduleEnvironment,
+ new BuildEventServiceProtoUtil(buildRequestId, invocationId, projectId, clock),
+ pathConverter,
+ commandLineReporter);
+ }
+
+ @VisibleForTesting
+ BuildEventServiceTransport(
+ BuildEventServiceClient besClient,
+ Duration uploadTimeout,
+ boolean bestEffortUpload,
+ boolean publishLifecycleEvents,
+ ModuleEnvironment moduleEnvironment,
+ BuildEventServiceProtoUtil besProtoUtil,
+ PathConverter pathConverter,
+ EventHandler commandLineReporter) {
+ this.besClient = besClient;
+ this.besProtoUtil = besProtoUtil;
+ this.publishLifecycleEvents = publishLifecycleEvents;
+ this.moduleEnvironment = moduleEnvironment;
+ this.commandLineReporter = commandLineReporter;
+ this.pendingAck = new ConcurrentLinkedDeque<>();
+ this.pendingSend = new LinkedBlockingDeque<>();
+ // Setting the thread count to 2 instead of 1 is a hack, but necessary as publishEventStream
+ // blocks one thread permanently and thus we can't do any other work on the executor. A proper
+ // fix would be to remove the spinning loop from publishEventStream and instead implement the
+ // loop by publishEventStream re-submitting itself to the executor.
+ // TODO(buchgr): Fix it.
+ this.uploaderExecutorService = listeningDecorator(Executors.newFixedThreadPool(2));
+ this.pathConverter = pathConverter;
+ this.invocationResult = UNKNOWN_STATUS;
+ this.uploadTimeout = uploadTimeout;
+ this.bestEffortUpload = bestEffortUpload;
+ }
+
+ @Override
+ public synchronized ListenableFuture<Void> close() {
+ if (shutdownFuture != null) {
+ return shutdownFuture;
+ }
+
+ logger.log(Level.INFO, "Closing the build event service transport.");
+
+ // The future is completed once the close succeeded or failed.
+ shutdownFuture = SettableFuture.create();
+
+ uploaderExecutorService.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ sendOrderedBuildEvent(besProtoUtil.streamFinished());
+
+ if (errorsReported) {
+ // If we encountered errors before and have already reported them, then we should
+ // not report them a second time.
+ return;
+ }
+
+ if (bestEffortUpload) {
+ // TODO(buchgr): The code structure currently doesn't allow to enforce a timeout for
+ // best effort upload.
+ if (!uploadComplete.isDone()) {
+ report(INFO, "Asynchronous Build Event Protocol upload.");
+ } else {
+ Throwable uploadError = fromFuture(uploadComplete);
+
+ if (uploadError != null) {
+ report(WARNING, UPLOAD_FAILED_MESSAGE, uploadError.getMessage());
+ } else {
+ report(INFO, UPLOAD_SUCCEEDED_MESSAGE);
+ }
+ }
+ } else {
+ report(INFO, "Waiting for Build Event Protocol upload to finish.");
+ try {
+ if (Duration.ZERO.equals(uploadTimeout)) {
+ uploadComplete.get();
+ } else {
+ uploadComplete.get(uploadTimeout.getMillis(), MILLISECONDS);
+ }
+ report(INFO, UPLOAD_SUCCEEDED_MESSAGE);
+ } catch (Exception e) {
+ uploadComplete.cancel(true);
+ reportErrorAndFailBuild(e);
+ }
+ }
+ } finally {
+ shutdownFuture.set(null);
+ uploaderExecutorService.shutdown();
+ }
+ }
+ });
+
+ return shutdownFuture;
+ }
+
+ @Override
+ public String name() {
+ // TODO(buchgr): Also display the hostname / IP.
+ return "Build Event Service";
+ }
+
+ @Override
+ public synchronized void sendBuildEvent(BuildEvent event, final ArtifactGroupNamer namer) {
+ BuildEventStreamProtos.BuildEvent eventProto = event.asStreamProto(
+ new BuildEventConverters() {
+ @Override
+ public PathConverter pathConverter() {
+ return pathConverter;
+ }
+ @Override
+ public ArtifactGroupNamer artifactGroupNamer() {
+ return namer;
+ }
+ });
+ if (PayloadCase.FINISHED.equals(eventProto.getPayloadCase())) {
+ BuildFinished finished = eventProto.getFinished();
+ if (finished.hasExitCode() && finished.getExitCode().getCode() == 0) {
+ invocationResult = COMMAND_SUCCEEDED;
+ } else {
+ invocationResult = COMMAND_FAILED;
+ }
+ }
+
+ sendOrderedBuildEvent(besProtoUtil.bazelEvent(Any.pack(eventProto)));
+ }
+
+ private String errorMessageFromException(Throwable t) {
+ String message;
+ if (t instanceof TimeoutException) {
+ message = "Build Event Protocol upload timed out.";
+ Exception lastKnownError0 = lastKnownError;
+ if (lastKnownError0 != null) {
+ // We may at times get a timeout exception due to an underlying error that was retried
+ // several times. If such an error exists, report it.
+ message += " Transport errors caused the upload to be retried.";
+ message += " Last known reason for retry: ";
+ message += besClient.userReadableError(lastKnownError0);
+ return message;
+ }
+ return message;
+ } else if (t instanceof ExecutionException) {
+ message = format(UPLOAD_FAILED_MESSAGE,
+ t.getCause() != null
+ ? besClient.userReadableError(t.getCause())
+ : t.getMessage());
+ return message;
+ } else {
+ message = format(UPLOAD_FAILED_MESSAGE, besClient.userReadableError(t));
+ return message;
+ }
+ }
+
+ private void reportErrorAndFailBuild(Throwable t) {
+ checkState(!bestEffortUpload);
+
+ String message = errorMessageFromException(t);
+
+ report(ERROR, message);
+ moduleEnvironment.exit(new AbruptExitException(ExitCode.PUBLISH_ERROR));
+ }
+
+ private void maybeReportUploadError() {
+ if (errorsReported) {
+ return;
+ }
+
+ Throwable uploadError = fromFuture(uploadComplete);
+ if (uploadError != null) {
+ errorsReported = true;
+ if (bestEffortUpload) {
+ report(WARNING, UPLOAD_FAILED_MESSAGE, uploadError.getMessage());
+ } else {
+ reportErrorAndFailBuild(uploadError);
+ }
+ }
+ }
+
+ private synchronized void sendOrderedBuildEvent(OrderedBuildEvent serialisedEvent) {
+ if (uploadComplete != null && uploadComplete.isDone()) {
+ maybeReportUploadError();
+ return;
+ }
+
+ pendingSend.add(serialisedEvent);
+ if (uploadComplete == null) {
+ uploadComplete = uploaderExecutorService.submit(new BuildEventServiceUpload());
+ }
+ }
+
+ private synchronized Result getInvocationResult() {
+ return invocationResult;
+ }
+
+ /**
+ * Method responsible for sending all requests to BuildEventService.
+ */
+ private class BuildEventServiceUpload implements Callable<Void> {
+ @Override
+ public Void call() throws Exception {
+ try {
+ publishBuildEnqueuedEvent();
+ publishInvocationStartedEvent();
+ try {
+ publishEventStream0();
+ } finally {
+ Result result = getInvocationResult();
+ publishInvocationFinishedEvent(result);
+ publishBuildFinishedEvent(result);
+ }
+ } finally {
+ besClient.shutdown();
+ }
+ return null;
+ }
+
+ private void publishBuildEnqueuedEvent() throws Exception {
+ retryOnException(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ publishLifecycleEvent(besProtoUtil.buildEnqueued());
+ return null;
+ }
+ });
+ }
+
+ private void publishInvocationStartedEvent() throws Exception {
+ retryOnException(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ publishLifecycleEvent(besProtoUtil.invocationStarted());
+ return null;
+ }
+ });
+ }
+
+ private void publishEventStream0() throws Exception {
+ retryOnException(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ publishEventStream();
+ return null;
+ }
+ });
+ }
+
+ private void publishInvocationFinishedEvent(final Result result) throws Exception {
+ retryOnException(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ publishLifecycleEvent(besProtoUtil.invocationFinished(result));
+ return null;
+ }
+ });
+ }
+
+ private void publishBuildFinishedEvent(final Result result) throws Exception {
+ retryOnException(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ publishLifecycleEvent(besProtoUtil.buildFinished(result));
+ return null;
+ }
+ });
+ }
+ }
+
+ /** Responsible for publishing lifecycle evnts RPC. Safe to retry. */
+ private Status publishLifecycleEvent(PublishLifecycleEventRequest request) throws Exception {
+ if (publishLifecycleEvents) {
+ // Change the status based on BEP data
+ return besClient.publish(request);
+ }
+ return Status.OK;
+ }
+
+ /**
+ * Used as method reference, responsible for the entire Streaming RPC. Safe to retry. This method
+ * it carries states between consecutive calls (pendingAck messages will be added to the head of
+ * of the pendingSend queue), but that is intended behavior.
+ */
+ private Status publishEventStream() throws Exception {
+ // Reschedule unacked messages if required, keeping its original order.
+ OrderedBuildEvent unacked;
+ while ((unacked = pendingAck.pollLast()) != null) {
+ pendingSend.addFirst(unacked);
+ }
+ pendingAck = new ConcurrentLinkedDeque<>();
+
+ return publishEventStream(pendingAck, pendingSend, besClient)
+ .get(PUBLISH_EVENT_STREAM_FINISHED_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ /** Method responsible for a single Streaming RPC. */
+ private static ListenableFuture<Status> publishEventStream(
+ final ConcurrentLinkedDeque<OrderedBuildEvent> pendingAck,
+ final BlockingDeque<OrderedBuildEvent> pendingSend,
+ final BuildEventServiceClient besClient)
+ throws Exception {
+ OrderedBuildEvent event;
+ ListenableFuture<Status> streamDone = besClient.openStream(ackCallback(pendingAck, besClient));
+ try {
+ do {
+ event = pendingSend.takeFirst();
+ pendingAck.add(event);
+ besClient.sendOverStream(event);
+ } while (!isLastEvent(event));
+ besClient.closeStream();
+ logger.log(Level.INFO, "Closing the build event stream.");
+ } catch (Exception e) {
+ logger.log(Level.WARNING, "Aborting publishEventStream.", e);
+ besClient.abortStream(Status.INTERNAL.augmentDescription(e.getMessage()));
+ }
+ return streamDone;
+ }
+
+ private static boolean isLastEvent(OrderedBuildEvent event) {
+ return event != null && event.getEvent().getEventCase() == COMPONENT_STREAM_FINISHED;
+ }
+
+ private static Function<PublishBuildToolEventStreamResponse, Void> ackCallback(
+ final Deque<OrderedBuildEvent> pendingAck, final BuildEventServiceClient besClient) {
+ return new Function<PublishBuildToolEventStreamResponse, Void>() {
+ @Override
+ public Void apply(PublishBuildToolEventStreamResponse ack) {
+ long pendingSeq =
+ pendingAck.isEmpty() ? -1 : pendingAck.peekFirst().getSequenceNumber();
+ long ackSeq = ack.getSequenceNumber();
+ if (pendingSeq != ackSeq) {
+ besClient.abortStream(Status.INTERNAL
+ .augmentDescription(format("Expected ack %s but was %s.", pendingSeq, ackSeq)));
+ } else {
+ pendingAck.removeFirst();
+ }
+ return null;
+ }
+ };
+ }
+
+ private void retryOnException(Callable<?> c) throws Exception {
+ retryOnException(c, 3, 100);
+ }
+
+ /**
+ * Executes a {@link Callable} retrying on exception thrown.
+ */
+ // TODO(eduardocolaco): Implement transient/persistent failures
+ private void retryOnException(Callable<?> c, final int maxRetries, final long initalDelayMillis)
+ throws Exception {
+ int tries = 0;
+ while (tries <= maxRetries) {
+ try {
+ c.call();
+ lastKnownError = null;
+ return;
+ // TODO(buchgr): Narrow the exception to not catch InterruptedException and
+ // RuntimeException's.
+ } catch (Exception e) {
+ tries++;
+ lastKnownError = e;
+ /*
+ * Exponential backoff:
+ * Retry 1: initalDelayMillis * 2^0
+ * Retry 2: initalDelayMillis * 2^1
+ * Retry 3: initalDelayMillis * 2^2
+ * ...
+ */
+ long sleepMillis = initalDelayMillis << (tries - 1);
+ String message = String.format("Retrying RPC to BES. Attempt %s. Backoff %s ms.",
+ tries, sleepMillis);
+ logger.log(Level.INFO, message, lastKnownError);
+ LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(sleepMillis));
+ }
+ }
+ Preconditions.checkNotNull(lastKnownError);
+ throw lastKnownError;
+ }
+
+ private void report(EventKind eventKind, String msg, Object... parameters) {
+ commandLineReporter.handle(Event.of(eventKind, null, format(msg, parameters)));
+ }
+
+ @Nullable
+ private static Throwable fromFuture(Future<?> f) {
+ if (!f.isDone()) {
+ return null;
+ }
+ try {
+ f.get();
+ return null;
+ } catch (Throwable t) {
+ return t;
+ }
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BUILD b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BUILD
new file mode 100644
index 0000000000..3a1032a630
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BUILD
@@ -0,0 +1,31 @@
+filegroup(
+ name = "srcs",
+ srcs = glob(["**"]),
+ visibility = ["//src/main/java/com/google/devtools/build/lib:__pkg__"],
+)
+
+java_library(
+ name = "client",
+ srcs = glob(["*.java"]),
+ visibility = [
+ "//src/main/java/com/google/devtools/build/lib:__pkg__",
+ ],
+ runtime_deps = [
+ # This is required for client TLS.
+ "//third_party:netty_tcnative",
+ ],
+ deps = [
+ "//src/main/java/com/google/devtools/build/lib:util",
+ "//third_party:apache_httpclient",
+ "//third_party:apache_httpcore",
+ "//third_party:auth",
+ "//third_party:gson",
+ "//third_party:guava",
+ "//third_party:joda_time",
+ "//third_party:jsr305",
+ "//third_party:netty",
+ "//third_party/grpc:grpc-jar",
+ "@googleapis//:google_devtools_build_v1_publish_build_event_java_grpc",
+ "@googleapis//:google_devtools_build_v1_publish_build_event_java_proto",
+ ],
+)
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java
new file mode 100644
index 0000000000..6feb53c808
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java
@@ -0,0 +1,90 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.buildeventservice.client;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.devtools.build.v1.OrderedBuildEvent;
+import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse;
+import com.google.devtools.build.v1.PublishLifecycleEventRequest;
+import io.grpc.Status;
+
+/** Interface used to abstract both gRPC and Stubby BuildEventServiceBackend. */
+public interface BuildEventServiceClient {
+
+ /**
+ * Makes a synchronous RPC that publishes the specified lifecycle event.
+ *
+ * @param lifecycleEvent Event to be published.
+ * @return Status of the RPC.
+ */
+ Status publish(PublishLifecycleEventRequest lifecycleEvent) throws Exception;
+
+ /**
+ * Starts a new stream with the given ack callback. Throws an {@link IllegalStateException} if the
+ * there is already opened stream. Callers should wait on the returned Future in order to
+ * guarantee that all callback calls have been received.
+ *
+ * @param ackCallback Consumer called every time a ack message is received.
+ * @return Listenable future that blocks until the the onDone callback is called.
+ * @throws Exception
+ */
+ ListenableFuture<Status> openStream(
+ Function<PublishBuildToolEventStreamResponse, Void> ackCallback) throws Exception;
+
+ /**
+ * Sends an event to the most recently opened stream. This method may block due to flow control.
+ *
+ * @param buildEvent Event that should be sent.
+ * @throws Exception
+ */
+ void sendOverStream(OrderedBuildEvent buildEvent) throws Exception;
+
+ /**
+ * Closes the currently opened opened stream. This method does not block. Callers should block on
+ * the Future returned by {@link #openStream(Function)} in order to make sure that all
+ * ackCallback calls have been received.
+ */
+ void closeStream();
+
+ /**
+ * Closes the currently opened stream with error. This method does not block. Callers should block
+ * on the Future returned by {@link #openStream(Function)} if in order to make sure that all
+ * ackCallback calls have been received.
+ */
+ void abortStream(Status status);
+
+ /**
+ * Checks if there is a currently active stream.
+ *
+ * @return True if the current stream is active, false otherwise.
+ */
+ boolean isStreamActive();
+
+ /**
+ * Called once to dispose resources that this client might be holding (such as thread pools). This
+ * should be the last method called on this object.
+ *
+ * @throws InterruptedException
+ */
+ void shutdown() throws InterruptedException;
+
+ /**
+ * If possible, returns a user readable error message for a given {@link Throwable}.
+ *
+ * <p>As a last resort, it's valid to return {@link Throwable#getMessage()}.
+ */
+ String userReadableError(Throwable t);
+}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
new file mode 100644
index 0000000000..86e33e09e8
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
@@ -0,0 +1,234 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.buildeventservice.client;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static com.google.devtools.build.lib.util.Preconditions.checkNotNull;
+import static com.google.devtools.build.lib.util.Preconditions.checkState;
+import static java.lang.System.getenv;
+import static java.nio.file.Files.newInputStream;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.devtools.build.v1.OrderedBuildEvent;
+import com.google.devtools.build.v1.PublishBuildEventGrpc;
+import com.google.devtools.build.v1.PublishBuildEventGrpc.PublishBuildEventBlockingStub;
+import com.google.devtools.build.v1.PublishBuildEventGrpc.PublishBuildEventStub;
+import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse;
+import com.google.devtools.build.v1.PublishLifecycleEventRequest;
+import io.grpc.CallCredentials;
+import io.grpc.ManagedChannel;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.auth.MoreCallCredentials;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NegotiationType;
+import io.grpc.netty.NettyChannelBuilder;
+import io.grpc.stub.AbstractStub;
+import io.grpc.stub.StreamObserver;
+import io.netty.handler.ssl.SslContext;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.annotation.Nullable;
+import javax.net.ssl.SSLException;
+import org.joda.time.Duration;
+
+/** Implementation of BuildEventServiceClient that uploads data using gRPC. */
+public class BuildEventServiceGrpcClient implements BuildEventServiceClient {
+
+ private static final Logger logger =
+ Logger.getLogger(BuildEventServiceGrpcClient.class.getName());
+
+ /** Max wait time for a single non-streaming RPC to finish */
+ private static final Duration RPC_TIMEOUT = Duration.standardSeconds(15);
+ /** See https://developers.google.com/identity/protocols/application-default-credentials * */
+ private static final String DEFAULT_APP_CREDENTIALS_ENV_VAR = "GOOGLE_APPLICATION_CREDENTIALS";
+ /** TODO(eduardocolaco): Scope documentation.* */
+ private static final String CREDENTIALS_SCOPE =
+ "https://www.googleapis.com/auth/cloud-build-service";
+
+ private final PublishBuildEventStub besAsync;
+ private final PublishBuildEventBlockingStub besBlocking;
+ private final ManagedChannel channel;
+ private final AtomicReference<StreamObserver<OrderedBuildEvent>> streamReference;
+
+ public BuildEventServiceGrpcClient(String serverSpec, boolean tlsEnabled,
+ @Nullable String tlsCertificateFile, @Nullable String tlsAuthorityOverride,
+ @Nullable String credentialsFile, @Nullable String credentialsScope) {
+ this(getChannel(serverSpec, tlsEnabled, tlsCertificateFile, tlsAuthorityOverride),
+ getCallCredentials(credentialsFile, credentialsScope));
+ }
+
+ public BuildEventServiceGrpcClient(
+ ManagedChannel channel,
+ @Nullable CallCredentials callCredentials) {
+ this.channel = channel;
+ this.besAsync = withCallCredentials(
+ PublishBuildEventGrpc.newStub(channel), callCredentials);
+ this.besBlocking = withCallCredentials(
+ PublishBuildEventGrpc.newBlockingStub(channel), callCredentials);
+ this.streamReference = new AtomicReference<>(null);
+ }
+
+ private static <T extends AbstractStub<T>> T withCallCredentials(
+ T stub, @Nullable CallCredentials callCredentials) {
+ stub = callCredentials != null ? stub.withCallCredentials(callCredentials) : stub;
+ return stub;
+ }
+
+ @Override
+ public Status publish(PublishLifecycleEventRequest lifecycleEvent) throws Exception {
+ besBlocking
+ .withDeadlineAfter(RPC_TIMEOUT.getMillis(), MILLISECONDS)
+ .publishLifecycleEvent(lifecycleEvent);
+ return Status.OK;
+ }
+
+ @Override
+ public ListenableFuture<Status> openStream(
+ Function<PublishBuildToolEventStreamResponse, Void> ack)
+ throws Exception {
+ SettableFuture<Status> streamFinished = SettableFuture.create();
+ checkState(
+ streamReference.compareAndSet(null, createStream(ack, streamFinished)),
+ "Starting a new stream without closing the previous one");
+ return streamFinished;
+ }
+
+ private StreamObserver<OrderedBuildEvent> createStream(
+ final Function<PublishBuildToolEventStreamResponse, Void> ack,
+ final SettableFuture<Status> streamFinished) {
+ return besAsync.publishBuildToolEventStream(
+ new StreamObserver<PublishBuildToolEventStreamResponse>() {
+ @Override
+ public void onNext(PublishBuildToolEventStreamResponse response) {
+ ack.apply(response);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ streamReference.set(null);
+ streamFinished.setException(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ streamReference.set(null);
+ streamFinished.set(Status.OK);
+ }
+ });
+ }
+
+ @Override
+ public void sendOverStream(OrderedBuildEvent buildEvent) throws Exception {
+ checkNotNull(streamReference.get(), "Attempting to send over a closed or unopened stream")
+ .onNext(buildEvent);
+ }
+
+ @Override
+ public void closeStream() {
+ StreamObserver<OrderedBuildEvent> stream;
+ if ((stream = streamReference.getAndSet(null)) != null) {
+ stream.onCompleted();
+ }
+ }
+
+ @Override
+ public void abortStream(Status status) {
+ StreamObserver<OrderedBuildEvent> stream;
+ if ((stream = streamReference.getAndSet(null)) != null) {
+ stream.onError(status.asException());
+ }
+ }
+
+ @Override
+ public boolean isStreamActive() {
+ return streamReference.get() != null;
+ }
+
+ @Override
+ public void shutdown() throws InterruptedException {
+ this.channel.shutdown();
+ }
+
+ @Override
+ public String userReadableError(Throwable t) {
+ if (t instanceof StatusRuntimeException) {
+ Throwable rootCause = Throwables.getRootCause(t);
+ String message = ((StatusRuntimeException) t).getStatus().getCode().name();
+ message += ": " + rootCause.getMessage();
+ return message;
+ } else {
+ return t.getMessage();
+ }
+ }
+
+ /**
+ * Returns call credentials read from the specified file (if non-empty) or from
+ * env(GOOGLE_APPLICATION_CREDENTIALS) otherwise.
+ */
+ @Nullable
+ private static CallCredentials getCallCredentials(@Nullable String credentialsFile,
+ @Nullable String credentialsScope) {
+ String effectiveScope = credentialsScope != null ? credentialsScope : CREDENTIALS_SCOPE;
+ try {
+ if (!isNullOrEmpty(credentialsFile)) {
+ return MoreCallCredentials.from(
+ GoogleCredentials.fromStream(newInputStream(Paths.get(credentialsFile)))
+ .createScoped(ImmutableList.of(effectiveScope)));
+
+ } else if (!isNullOrEmpty(getenv(DEFAULT_APP_CREDENTIALS_ENV_VAR))) {
+ return MoreCallCredentials.from(
+ GoogleCredentials.getApplicationDefault()
+ .createScoped(ImmutableList.of(effectiveScope)));
+ }
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "Failed to read credentials", e);
+ }
+ return null;
+ }
+
+ /**
+ * Returns a ManagedChannel to the specified server.
+ */
+ private static ManagedChannel getChannel(String serverSpec, boolean tlsEnabled,
+ @Nullable String tlsCertificateFile, @Nullable String tlsAuthorityOverride) {
+ //TODO(buchgr): Use ManagedChannelBuilder once bazel uses a newer gRPC version.
+ NettyChannelBuilder builder = NettyChannelBuilder.forTarget(serverSpec);
+ builder.negotiationType(tlsEnabled ? NegotiationType.TLS : NegotiationType.PLAINTEXT);
+ if (tlsCertificateFile != null) {
+ try {
+ SslContext sslContext =
+ GrpcSslContexts.forClient().trustManager(new File(tlsCertificateFile)).build();
+ builder.sslContext(sslContext);
+ } catch (SSLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ if (tlsAuthorityOverride != null) {
+ builder.overrideAuthority(tlsAuthorityOverride);
+ }
+ return builder.build();
+ }
+}