diff options
author | 2017-06-08 23:56:05 +0200 | |
---|---|---|
committer | 2017-06-09 10:23:19 +0200 | |
commit | 2730bae6223d611fbe5a45463cd788c4f4cc076f (patch) | |
tree | b01b1cd690e075bcc86e0da531d50b03ce839005 /src/main/java/com | |
parent | 55c5a60bfbf3429c773b899ab331bc41019ddca3 (diff) |
BES: Open Source the build event service gRPC client implementation.
This change moves the BES code from blaze to bazel.
RELNOTES: None.
PiperOrigin-RevId: 158445754
Diffstat (limited to 'src/main/java/com')
10 files changed, 1460 insertions, 2 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/BUILD b/src/main/java/com/google/devtools/build/lib/BUILD index 2864670239..6c4c255d38 100644 --- a/src/main/java/com/google/devtools/build/lib/BUILD +++ b/src/main/java/com/google/devtools/build/lib/BUILD @@ -21,6 +21,7 @@ filegroup( "//src/main/java/com/google/devtools/build/lib/bazel/repository/downloader:srcs", "//src/main/java/com/google/devtools/build/lib/buildeventstream/proto:srcs", "//src/main/java/com/google/devtools/build/lib/buildeventstream/transports:srcs", + "//src/main/java/com/google/devtools/build/lib/buildeventservice/client:srcs", "//src/main/java/com/google/devtools/build/lib/causes:srcs", "//src/main/java/com/google/devtools/build/lib/cmdline:srcs", "//src/main/java/com/google/devtools/build/lib/exec/local:srcs", @@ -367,6 +368,35 @@ java_library( ) java_library( + name = "buildeventservice", + srcs = glob(["buildeventservice/*.java"]), + visibility = [ + "//visibility:public", + ], + deps = [ + "//src/main/java/com/google/devtools/build/lib:auth_and_tls_options", + "//src/main/java/com/google/devtools/build/lib:buildeventstream", + "//src/main/java/com/google/devtools/build/lib:events", + "//src/main/java/com/google/devtools/build/lib:io", + "//src/main/java/com/google/devtools/build/lib:runtime", + "//src/main/java/com/google/devtools/build/lib:util", + "//src/main/java/com/google/devtools/build/lib/buildeventservice/client", + "//src/main/java/com/google/devtools/build/lib/buildeventstream/proto:build_event_stream_java_proto", + "//src/main/java/com/google/devtools/common/options", + "//third_party:guava", + "//third_party:joda_time", + "//third_party:jsr305", + "//third_party/grpc:grpc-jar", + "@com_google_protobuf//:protobuf_java", + "@com_google_protobuf//:protobuf_java_util", + "@com_google_protobuf//:well_known_types_any_proto", + "@googleapis//:google_devtools_build_v1_build_events_java_proto", + "@googleapis//:google_devtools_build_v1_build_status_java_proto", + "@googleapis//:google_devtools_build_v1_publish_build_event_java_proto", + ], +) + +java_library( name = "auth_and_tls_options", srcs = ["authandtls/AuthAndTLSOptions.java"], visibility = [ @@ -716,6 +746,7 @@ java_library( ":bazel-rules", ":build-base", ":build-info", + ":buildeventservice", ":clock", ":events", ":io", diff --git a/src/main/java/com/google/devtools/build/lib/bazel/BazelMain.java b/src/main/java/com/google/devtools/build/lib/bazel/BazelMain.java index 5914b765c3..62d070ee86 100644 --- a/src/main/java/com/google/devtools/build/lib/bazel/BazelMain.java +++ b/src/main/java/com/google/devtools/build/lib/bazel/BazelMain.java @@ -18,7 +18,6 @@ import com.google.common.collect.ImmutableMap; import com.google.devtools.build.lib.analysis.BlazeVersionInfo; import com.google.devtools.build.lib.runtime.BlazeModule; import com.google.devtools.build.lib.runtime.BlazeRuntime; - import java.io.IOException; import java.io.InputStream; import java.util.Properties; @@ -51,7 +50,8 @@ public final class BazelMain { com.google.devtools.build.lib.runtime.BuildSummaryStatsModule.class, com.google.devtools.build.lib.runtime.BuildEventStreamerModule.class, com.google.devtools.build.lib.bazel.rules.BazelRulesModule.class, - com.google.devtools.build.lib.bazel.rules.BazelStrategyModule.class); + com.google.devtools.build.lib.bazel.rules.BazelStrategyModule.class, + com.google.devtools.build.lib.buildeventservice.BazelBuildEventServiceModule.class); public static void main(String[] args) { BlazeVersionInfo.setBuildInfo(tryGetBuildInfo()); diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BazelBuildEventServiceModule.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BazelBuildEventServiceModule.java new file mode 100644 index 0000000000..c80f77e58e --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BazelBuildEventServiceModule.java @@ -0,0 +1,40 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.buildeventservice; + +import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; +import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient; +import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceGrpcClient; + +/** + * Bazel's BES module. + */ +public class BazelBuildEventServiceModule + extends BuildEventServiceModule<BuildEventServiceOptions> { + + @Override + protected Class<BuildEventServiceOptions> optionsClass() { + return BuildEventServiceOptions.class; + } + + @Override + protected BuildEventServiceClient createBesClient(BuildEventServiceOptions besOptions, + AuthAndTLSOptions authAndTLSOptions) { + return new BuildEventServiceGrpcClient( + besOptions.besBackend, authAndTLSOptions.tlsEnabled, authAndTLSOptions.tlsCertificate, + authAndTLSOptions.tlsAuthorityOverride, authAndTLSOptions.authCredentials, + authAndTLSOptions.authScope); + } +} diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java new file mode 100644 index 0000000000..76c446bd95 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java @@ -0,0 +1,225 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.buildeventservice; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Strings.isNullOrEmpty; +import static com.google.devtools.build.lib.buildeventservice.BuildEventServiceTransport.UPLOAD_FAILED_MESSAGE; +import static java.lang.String.format; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.eventbus.Subscribe; +import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; +import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient; +import com.google.devtools.build.lib.buildeventstream.BuildEvent; +import com.google.devtools.build.lib.buildeventstream.BuildEventTransport; +import com.google.devtools.build.lib.events.Event; +import com.google.devtools.build.lib.events.EventHandler; +import com.google.devtools.build.lib.runtime.BlazeModule; +import com.google.devtools.build.lib.runtime.BuildEventStreamer; +import com.google.devtools.build.lib.runtime.Command; +import com.google.devtools.build.lib.runtime.CommandEnvironment; +import com.google.devtools.build.lib.runtime.SynchronizedOutputStream; +import com.google.devtools.build.lib.util.AbruptExitException; +import com.google.devtools.build.lib.util.Clock; +import com.google.devtools.build.lib.util.ExitCode; +import com.google.devtools.build.lib.util.io.OutErr; +import com.google.devtools.common.options.OptionsBase; +import com.google.devtools.common.options.OptionsProvider; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +/** + * Module responsible for the {@link BuildEventTransport} and its {@link BuildEventStreamer}. + * + * Implementors of this class have to overwrite {@link #optionsClass()} and + * {@link #createBesClient(BuildEventServiceOptions)}. + */ +public abstract class BuildEventServiceModule<T extends BuildEventServiceOptions> + extends BlazeModule { + + private static final Logger logger = Logger.getLogger(BuildEventServiceModule.class.getName()); + + private CommandEnvironment commandEnvironment; + private SynchronizedOutputStream out; + private SynchronizedOutputStream err; + + private static class BuildEventRecorder { + private final List<BuildEvent> events = new ArrayList<>(); + + @Subscribe + public void buildEvent(BuildEvent event) { + events.add(event); + } + + List<BuildEvent> getEvents() { + return events; + } + } + + private BuildEventRecorder buildEventRecorder; + + @Override + public Iterable<Class<? extends OptionsBase>> getCommandOptions(Command command) { + return ImmutableList.of(optionsClass(), AuthAndTLSOptions.class); + } + + @Override + public void beforeCommand(Command command, CommandEnvironment commandEnvironment) + throws AbruptExitException { + this.commandEnvironment = commandEnvironment; + this.buildEventRecorder = new BuildEventRecorder(); + commandEnvironment.getEventBus().register(buildEventRecorder); + } + + @Override + public void handleOptions(OptionsProvider optionsProvider) { + checkState(commandEnvironment != null, "Methods called out of order"); + BuildEventStreamer streamer = + tryCreateStreamer( + optionsProvider, + commandEnvironment.getReporter(), + commandEnvironment.getBlazeModuleEnvironment(), + commandEnvironment.getRuntime().getClock(), + commandEnvironment.getClientEnv().get("BAZEL_INTERNAL_BUILD_REQUEST_ID"), + commandEnvironment.getCommandId().toString()); + if (streamer != null) { + commandEnvironment.getReporter().addHandler(streamer); + commandEnvironment.getEventBus().register(streamer); + + final SynchronizedOutputStream theOut = this.out; + final SynchronizedOutputStream theErr = this.err; + // out and err should be non-null at this point, as getOutputListener is supposed to + // be always called before handleOptions. But let's still prefer a stream with no + // stdout/stderr over an aborted build. + streamer.registerOutErrProvider( + new BuildEventStreamer.OutErrProvider() { + @Override + public String getOut() { + if (theOut == null) { + return null; + } + return theOut.readAndReset(); + } + + @Override + public String getErr() { + if (theErr == null) { + return null; + } + return theErr.readAndReset(); + } + }); + if (theErr != null) { + theErr.registerStreamer(streamer); + } + if (theOut != null) { + theOut.registerStreamer(streamer); + } + for (BuildEvent event : buildEventRecorder.getEvents()) { + streamer.buildEvent(event); + } + logger.fine("BuildEventStreamer created and registered successfully."); + } else { + // If there is no streamer to consume the output, we should not try to accumulate it. + this.out.setDiscardAll(); + this.err.setDiscardAll(); + } + commandEnvironment.getEventBus().unregister(buildEventRecorder); + this.buildEventRecorder = null; + } + + @Override + public OutErr getOutputListener() { + this.out = new SynchronizedOutputStream(); + this.err = new SynchronizedOutputStream(); + return OutErr.create(this.out, this.err); + } + + /** + * Returns {@code null} if no stream could be created. + * + * @param buildRequestId if {@code null} or {@code ""} a random UUID is used instead. + */ + @Nullable + private BuildEventStreamer tryCreateStreamer( + OptionsProvider optionsProvider, + EventHandler commandLineReporter, + ModuleEnvironment moduleEnvironment, + Clock clock, + String buildRequestId, + String invocationId) { + T besOptions = null; + try { + besOptions = + checkNotNull( + optionsProvider.getOptions(optionsClass()), + "Could not get BuildEventServiceOptions."); + AuthAndTLSOptions authTlsOptions = + checkNotNull(optionsProvider.getOptions(AuthAndTLSOptions.class), + "Could not get AuthAndTLSOptions."); + if (isNullOrEmpty(besOptions.besBackend)) { + logger.fine("BuildEventServiceTransport is disabled."); + } else { + logger.fine(format("Will create BuildEventServiceTransport streaming to '%s'", + besOptions.besBackend)); + + buildRequestId = isNullOrEmpty(buildRequestId) + ? UUID.randomUUID().toString() + : buildRequestId; + commandLineReporter.handle( + Event.info( + format( + "Streaming Build Event Protocol to %s build_request_id: %s invocation_id: %s", + besOptions.besBackend, buildRequestId, invocationId))); + + BuildEventTransport besTransport = + new BuildEventServiceTransport( + createBesClient(besOptions, authTlsOptions), + besOptions.besTimeout, + besOptions.besBestEffort, + besOptions.besLifecycleEvents, + buildRequestId, + invocationId, + moduleEnvironment, + clock, + commandEnvironment.getRuntime().getPathToUriConverter(), + commandLineReporter, + besOptions.projectId); + logger.fine("BuildEventServiceTransport was created successfully"); + return new BuildEventStreamer(ImmutableSet.of(besTransport), + commandEnvironment.getReporter()); + } + } catch (Exception e) { + if (besOptions != null && besOptions.besBestEffort) { + commandLineReporter.handle(Event.warn(format(UPLOAD_FAILED_MESSAGE, e.getMessage()))); + } else { + commandLineReporter.handle(Event.error(format(UPLOAD_FAILED_MESSAGE, e.getMessage()))); + moduleEnvironment.exit(new AbruptExitException(ExitCode.PUBLISH_ERROR)); + } + } + return null; + } + + protected abstract Class<T> optionsClass(); + + protected abstract BuildEventServiceClient createBesClient(T besOptions, + AuthAndTLSOptions authAndTLSOptions); +}
\ No newline at end of file diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java new file mode 100644 index 0000000000..338cd5b950 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceOptions.java @@ -0,0 +1,112 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.buildeventservice; + +import com.google.devtools.common.options.Converter; +import com.google.devtools.common.options.Option; +import com.google.devtools.common.options.OptionsBase; +import com.google.devtools.common.options.OptionsParsingException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.joda.time.Duration; + +/** Options used by {@link BuildEventServiceModule}. */ +public class BuildEventServiceOptions extends OptionsBase { + + @Option( + name = "bes_backend", + defaultValue = "", + help = "Specifies the build event service (BES) backend endpoint as HOST or HOST:PORT. " + + "Disabled by default." + ) + public String besBackend; + + @Option( + name = "bes_timeout", + defaultValue = "0s", + converter = DurationConverter.class, + help = "Specifies how long bazel should wait for the BES/BEP upload to complete after the " + + "build and tests have finished. A valid timeout is a natural number followed by a " + + "unit: Days (d), hours (h), minutes (m), seconds (s), and milliseconds (ms). The " + + "default value is '0' which means that there is no timeout and that the upload will " + + "continue in the background after a build has finished." + ) + public Duration besTimeout; + + @Option( + name = "bes_best_effort", + defaultValue = "true", + help = "Specifies whether a failure to upload the BES protocol should also result in a build " + + "failure. If 'true', bazel exits with ExitCode.PUBLISH_ERROR. (defaults to 'true')." + ) + public boolean besBestEffort; + + @Option( + name = "bes_lifecycle_events", + defaultValue = "true", + help = "Specifies whether to publish BES lifecycle events. (defaults to 'true')." + ) + public boolean besLifecycleEvents; + + @Option( + name = "project_id", + defaultValue = "null", + help = "Specifies the BES project identifier. Defaults to null." + ) + public String projectId; + + /** + * Simple String to Duration Converter. + */ + public static class DurationConverter implements Converter<Duration> { + + private final Pattern durationRegex = Pattern.compile("^([0-9]+)(d|h|m|s|ms)$"); + + @Override + public Duration convert(String input) throws OptionsParsingException { + // To be compatible with the previous parser, '0' doesn't need a unit. + if ("0".equals(input)) { + return Duration.ZERO; + } + Matcher m = durationRegex.matcher(input); + if (!m.matches()) { + throw new OptionsParsingException("Illegal duration '" + input + "'."); + } + long duration = Long.parseLong(m.group(1)); + String unit = m.group(2); + switch(unit) { + case "d": + return Duration.standardDays(duration); + case "h": + return Duration.standardHours(duration); + case "m": + return Duration.standardMinutes(duration); + case "s": + return Duration.standardSeconds(duration); + case "ms": + return Duration.millis(duration); + default: + throw new IllegalStateException("This must not happen. Did you update the regex without " + + "the switch case?"); + } + } + + @Override + public String getTypeDescription() { + return "An immutable length of time."; + } + } + +} diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java new file mode 100644 index 0000000000..7ecfcb020a --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java @@ -0,0 +1,168 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.buildeventservice; + +import static com.google.devtools.build.v1.BuildEvent.BuildComponentStreamFinished.FinishType.FINISHED; + +import com.google.common.annotations.VisibleForTesting; +import com.google.devtools.build.lib.util.Clock; +import com.google.devtools.build.v1.BuildEvent; +import com.google.devtools.build.v1.BuildEvent.BuildComponentStreamFinished; +import com.google.devtools.build.v1.BuildEvent.BuildEnqueued; +import com.google.devtools.build.v1.BuildEvent.BuildFinished; +import com.google.devtools.build.v1.BuildEvent.EventCase; +import com.google.devtools.build.v1.BuildEvent.InvocationAttemptFinished; +import com.google.devtools.build.v1.BuildEvent.InvocationAttemptStarted; +import com.google.devtools.build.v1.BuildStatus; +import com.google.devtools.build.v1.BuildStatus.Result; +import com.google.devtools.build.v1.OrderedBuildEvent; +import com.google.devtools.build.v1.PublishLifecycleEventRequest; +import com.google.devtools.build.v1.StreamId; +import com.google.devtools.build.v1.StreamId.BuildComponent; +import com.google.protobuf.Any; +import com.google.protobuf.util.Timestamps; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; + +/** Utility class used to build protobuffs requests that are meant to be sent over BES. */ +public final class BuildEventServiceProtoUtil { + + private final String buildRequestId; + private final String buildInvocationId; + private final String projectId; + private final AtomicInteger streamSequenceNumber; + private final Clock clock; + + public BuildEventServiceProtoUtil(String buildRequestId, String buildInvocationId, + @Nullable String projectId, Clock clock) { + this.buildRequestId = buildRequestId; + this.buildInvocationId = buildInvocationId; + this.projectId = projectId; + this.clock = clock; + this.streamSequenceNumber = new AtomicInteger(1); + } + + public PublishLifecycleEventRequest buildEnqueued() { + return lifecycleEvent(projectId, 1, + com.google.devtools.build.v1.BuildEvent.newBuilder() + .setEventTime(Timestamps.fromMillis(clock.currentTimeMillis())) + .setBuildEnqueued(BuildEnqueued.newBuilder())) + .build(); + } + + public PublishLifecycleEventRequest buildFinished(Result result) { + return lifecycleEvent(projectId, 2, + com.google.devtools.build.v1.BuildEvent.newBuilder() + .setEventTime(Timestamps.fromMillis(clock.currentTimeMillis())) + .setBuildFinished( + BuildFinished.newBuilder() + .setStatus(BuildStatus.newBuilder().setResult(result)))) + .build(); + } + + public PublishLifecycleEventRequest invocationStarted() { + return lifecycleEvent(projectId, 1, + com.google.devtools.build.v1.BuildEvent.newBuilder() + .setEventTime(Timestamps.fromMillis(clock.currentTimeMillis())) + .setInvocationAttemptStarted( + InvocationAttemptStarted.newBuilder().setAttemptNumber(1))) + .build(); + } + + public PublishLifecycleEventRequest invocationFinished(Result result) { + return lifecycleEvent(projectId, 2, + com.google.devtools.build.v1.BuildEvent.newBuilder() + .setEventTime(Timestamps.fromMillis(clock.currentTimeMillis())) + .setInvocationAttemptFinished( + InvocationAttemptFinished.newBuilder() + .setInvocationStatus(BuildStatus.newBuilder().setResult(result)))) + .build(); + } + + /** Utility method used to create a OrderedBuildEvent that delimits the end of the stream. */ + public OrderedBuildEvent streamFinished() { + return streamFinished(streamSequenceNumber.getAndIncrement()); + } + + /** Utility method used to create a OrderedBuildEvent from an packed bazel event */ + public OrderedBuildEvent bazelEvent(Any packedEvent) { + return bazelEvent(streamSequenceNumber.getAndIncrement(), packedEvent); + } + + @VisibleForTesting + public OrderedBuildEvent bazelEvent(int sequenceNumber, Any packedEvent) { + return orderedBuildEvent( + sequenceNumber, + com.google.devtools.build.v1.BuildEvent.newBuilder().setBazelEvent(packedEvent)); + } + + @VisibleForTesting + public OrderedBuildEvent streamFinished(int sequenceNumber) { + return orderedBuildEvent( + sequenceNumber, + BuildEvent.newBuilder() + .setComponentStreamFinished( + BuildComponentStreamFinished.newBuilder().setType(FINISHED))); + } + + @VisibleForTesting + public OrderedBuildEvent orderedBuildEvent(int sequenceNumber, BuildEvent.Builder besEvent) { + return OrderedBuildEvent.newBuilder() + .setSequenceNumber(sequenceNumber) + .setEvent(besEvent.setEventTime(Timestamps.fromMillis(clock.currentTimeMillis()))) + .setStreamId(streamId(besEvent.getEventCase())) + .build(); + } + + @VisibleForTesting + public PublishLifecycleEventRequest.Builder lifecycleEvent(@Nullable String projectId, + int sequenceNumber, BuildEvent.Builder lifecycleEvent) { + PublishLifecycleEventRequest.Builder builder = PublishLifecycleEventRequest.newBuilder() + .setServiceLevel(PublishLifecycleEventRequest.ServiceLevel.INTERACTIVE) + .setBuildEvent( + OrderedBuildEvent.newBuilder() + .setSequenceNumber(sequenceNumber) + .setStreamId(streamId(lifecycleEvent.getEventCase())) + .setEvent(lifecycleEvent)); + if (projectId != null) { + builder.setProjectId(projectId); + } + return builder; + } + + @VisibleForTesting + public StreamId streamId(EventCase eventCase) { + StreamId.Builder streamId = StreamId.newBuilder().setBuildId(buildRequestId); + switch (eventCase) { + case BUILD_ENQUEUED: + case BUILD_FINISHED: + streamId.setComponent(BuildComponent.CONTROLLER); + break; + case INVOCATION_ATTEMPT_STARTED: + case INVOCATION_ATTEMPT_FINISHED: + streamId.setInvocationId(buildInvocationId); + streamId.setComponent(BuildComponent.CONTROLLER); + break; + case BAZEL_EVENT: + case COMPONENT_STREAM_FINISHED: + streamId.setInvocationId(buildInvocationId); + streamId.setComponent(BuildComponent.TOOL); + break; + default: + throw new IllegalArgumentException("Illegal EventCase " + eventCase); + } + return streamId.build(); + } +} diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java new file mode 100644 index 0000000000..467ee8bb0c --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java @@ -0,0 +1,527 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.buildeventservice; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; +import static com.google.devtools.build.lib.events.EventKind.ERROR; +import static com.google.devtools.build.lib.events.EventKind.INFO; +import static com.google.devtools.build.lib.events.EventKind.WARNING; +import static com.google.devtools.build.v1.BuildEvent.EventCase.COMPONENT_STREAM_FINISHED; +import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_FAILED; +import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_SUCCEEDED; +import static com.google.devtools.build.v1.BuildStatus.Result.UNKNOWN_STATUS; +import static java.lang.String.format; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.joda.time.Duration.standardSeconds; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.SettableFuture; +import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient; +import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer; +import com.google.devtools.build.lib.buildeventstream.BuildEvent; +import com.google.devtools.build.lib.buildeventstream.BuildEventConverters; +import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos; +import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildEvent.PayloadCase; +import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildFinished; +import com.google.devtools.build.lib.buildeventstream.BuildEventTransport; +import com.google.devtools.build.lib.buildeventstream.PathConverter; +import com.google.devtools.build.lib.events.Event; +import com.google.devtools.build.lib.events.EventHandler; +import com.google.devtools.build.lib.events.EventKind; +import com.google.devtools.build.lib.runtime.BlazeModule.ModuleEnvironment; +import com.google.devtools.build.lib.util.AbruptExitException; +import com.google.devtools.build.lib.util.Clock; +import com.google.devtools.build.lib.util.ExitCode; +import com.google.devtools.build.v1.BuildStatus.Result; +import com.google.devtools.build.v1.OrderedBuildEvent; +import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse; +import com.google.devtools.build.v1.PublishLifecycleEventRequest; +import com.google.protobuf.Any; +import io.grpc.Status; +import java.util.Deque; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.LockSupport; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; +import org.joda.time.Duration; + +/** A {@link BuildEventTransport} that streams {@link BuildEvent}s to BuildEventService. */ +public class BuildEventServiceTransport implements BuildEventTransport { + + static final String UPLOAD_FAILED_MESSAGE = "Build Event Protocol upload failed: %s"; + static final String UPLOAD_SUCCEEDED_MESSAGE = + "Build Event Protocol upload finished successfully."; + + private static final Logger logger = Logger.getLogger(BuildEventServiceTransport.class.getName()); + + /** Max wait time until for the Streaming RPC to finish after all events were enqueued. */ + private static final Duration PUBLISH_EVENT_STREAM_FINISHED_TIMEOUT = standardSeconds(120); + + private final ListeningExecutorService uploaderExecutorService; + private final Duration uploadTimeout; + private final boolean publishLifecycleEvents; + private final boolean bestEffortUpload; + private final BuildEventServiceClient besClient; + private final BuildEventServiceProtoUtil besProtoUtil; + private final ModuleEnvironment moduleEnvironment; + private final EventHandler commandLineReporter; + + private final PathConverter pathConverter; + /** Contains all pendingAck events that might be retried in case of failures. */ + private ConcurrentLinkedDeque<OrderedBuildEvent> pendingAck; + /** Contains all events should be sent ordered by sequence number. */ + private final BlockingDeque<OrderedBuildEvent> pendingSend; + /** Holds the result status of the BuildEventStreamProtos BuildFinished event. */ + private Result invocationResult; + /** Used to block until all events have been uploaded. */ + private ListenableFuture<?> uploadComplete; + /** Used to ensure that the close logic is only invoked once. */ + private SettableFuture<Void> shutdownFuture; + /** + * If the call before the current call threw an exception, this field points to it. If the + * previous call was successful, this field is null. This is useful for error reporting, when an + * upload times out due to having had to retry several times. + */ + private volatile Exception lastKnownError; + /** Returns true if we already reported a warning or error to UI. */ + private volatile boolean errorsReported; + + public BuildEventServiceTransport( + BuildEventServiceClient besClient, + Duration uploadTimeout, + boolean bestEffortUpload, + boolean publishLifecycleEvents, + String buildRequestId, + String invocationId, + ModuleEnvironment moduleEnvironment, + Clock clock, + PathConverter pathConverter, + EventHandler commandLineReporter, + @Nullable String projectId) { + this( + besClient, + uploadTimeout, + bestEffortUpload, + publishLifecycleEvents, + moduleEnvironment, + new BuildEventServiceProtoUtil(buildRequestId, invocationId, projectId, clock), + pathConverter, + commandLineReporter); + } + + @VisibleForTesting + BuildEventServiceTransport( + BuildEventServiceClient besClient, + Duration uploadTimeout, + boolean bestEffortUpload, + boolean publishLifecycleEvents, + ModuleEnvironment moduleEnvironment, + BuildEventServiceProtoUtil besProtoUtil, + PathConverter pathConverter, + EventHandler commandLineReporter) { + this.besClient = besClient; + this.besProtoUtil = besProtoUtil; + this.publishLifecycleEvents = publishLifecycleEvents; + this.moduleEnvironment = moduleEnvironment; + this.commandLineReporter = commandLineReporter; + this.pendingAck = new ConcurrentLinkedDeque<>(); + this.pendingSend = new LinkedBlockingDeque<>(); + // Setting the thread count to 2 instead of 1 is a hack, but necessary as publishEventStream + // blocks one thread permanently and thus we can't do any other work on the executor. A proper + // fix would be to remove the spinning loop from publishEventStream and instead implement the + // loop by publishEventStream re-submitting itself to the executor. + // TODO(buchgr): Fix it. + this.uploaderExecutorService = listeningDecorator(Executors.newFixedThreadPool(2)); + this.pathConverter = pathConverter; + this.invocationResult = UNKNOWN_STATUS; + this.uploadTimeout = uploadTimeout; + this.bestEffortUpload = bestEffortUpload; + } + + @Override + public synchronized ListenableFuture<Void> close() { + if (shutdownFuture != null) { + return shutdownFuture; + } + + logger.log(Level.INFO, "Closing the build event service transport."); + + // The future is completed once the close succeeded or failed. + shutdownFuture = SettableFuture.create(); + + uploaderExecutorService.execute( + new Runnable() { + @Override + public void run() { + try { + sendOrderedBuildEvent(besProtoUtil.streamFinished()); + + if (errorsReported) { + // If we encountered errors before and have already reported them, then we should + // not report them a second time. + return; + } + + if (bestEffortUpload) { + // TODO(buchgr): The code structure currently doesn't allow to enforce a timeout for + // best effort upload. + if (!uploadComplete.isDone()) { + report(INFO, "Asynchronous Build Event Protocol upload."); + } else { + Throwable uploadError = fromFuture(uploadComplete); + + if (uploadError != null) { + report(WARNING, UPLOAD_FAILED_MESSAGE, uploadError.getMessage()); + } else { + report(INFO, UPLOAD_SUCCEEDED_MESSAGE); + } + } + } else { + report(INFO, "Waiting for Build Event Protocol upload to finish."); + try { + if (Duration.ZERO.equals(uploadTimeout)) { + uploadComplete.get(); + } else { + uploadComplete.get(uploadTimeout.getMillis(), MILLISECONDS); + } + report(INFO, UPLOAD_SUCCEEDED_MESSAGE); + } catch (Exception e) { + uploadComplete.cancel(true); + reportErrorAndFailBuild(e); + } + } + } finally { + shutdownFuture.set(null); + uploaderExecutorService.shutdown(); + } + } + }); + + return shutdownFuture; + } + + @Override + public String name() { + // TODO(buchgr): Also display the hostname / IP. + return "Build Event Service"; + } + + @Override + public synchronized void sendBuildEvent(BuildEvent event, final ArtifactGroupNamer namer) { + BuildEventStreamProtos.BuildEvent eventProto = event.asStreamProto( + new BuildEventConverters() { + @Override + public PathConverter pathConverter() { + return pathConverter; + } + @Override + public ArtifactGroupNamer artifactGroupNamer() { + return namer; + } + }); + if (PayloadCase.FINISHED.equals(eventProto.getPayloadCase())) { + BuildFinished finished = eventProto.getFinished(); + if (finished.hasExitCode() && finished.getExitCode().getCode() == 0) { + invocationResult = COMMAND_SUCCEEDED; + } else { + invocationResult = COMMAND_FAILED; + } + } + + sendOrderedBuildEvent(besProtoUtil.bazelEvent(Any.pack(eventProto))); + } + + private String errorMessageFromException(Throwable t) { + String message; + if (t instanceof TimeoutException) { + message = "Build Event Protocol upload timed out."; + Exception lastKnownError0 = lastKnownError; + if (lastKnownError0 != null) { + // We may at times get a timeout exception due to an underlying error that was retried + // several times. If such an error exists, report it. + message += " Transport errors caused the upload to be retried."; + message += " Last known reason for retry: "; + message += besClient.userReadableError(lastKnownError0); + return message; + } + return message; + } else if (t instanceof ExecutionException) { + message = format(UPLOAD_FAILED_MESSAGE, + t.getCause() != null + ? besClient.userReadableError(t.getCause()) + : t.getMessage()); + return message; + } else { + message = format(UPLOAD_FAILED_MESSAGE, besClient.userReadableError(t)); + return message; + } + } + + private void reportErrorAndFailBuild(Throwable t) { + checkState(!bestEffortUpload); + + String message = errorMessageFromException(t); + + report(ERROR, message); + moduleEnvironment.exit(new AbruptExitException(ExitCode.PUBLISH_ERROR)); + } + + private void maybeReportUploadError() { + if (errorsReported) { + return; + } + + Throwable uploadError = fromFuture(uploadComplete); + if (uploadError != null) { + errorsReported = true; + if (bestEffortUpload) { + report(WARNING, UPLOAD_FAILED_MESSAGE, uploadError.getMessage()); + } else { + reportErrorAndFailBuild(uploadError); + } + } + } + + private synchronized void sendOrderedBuildEvent(OrderedBuildEvent serialisedEvent) { + if (uploadComplete != null && uploadComplete.isDone()) { + maybeReportUploadError(); + return; + } + + pendingSend.add(serialisedEvent); + if (uploadComplete == null) { + uploadComplete = uploaderExecutorService.submit(new BuildEventServiceUpload()); + } + } + + private synchronized Result getInvocationResult() { + return invocationResult; + } + + /** + * Method responsible for sending all requests to BuildEventService. + */ + private class BuildEventServiceUpload implements Callable<Void> { + @Override + public Void call() throws Exception { + try { + publishBuildEnqueuedEvent(); + publishInvocationStartedEvent(); + try { + publishEventStream0(); + } finally { + Result result = getInvocationResult(); + publishInvocationFinishedEvent(result); + publishBuildFinishedEvent(result); + } + } finally { + besClient.shutdown(); + } + return null; + } + + private void publishBuildEnqueuedEvent() throws Exception { + retryOnException(new Callable<Void>() { + @Override + public Void call() throws Exception { + publishLifecycleEvent(besProtoUtil.buildEnqueued()); + return null; + } + }); + } + + private void publishInvocationStartedEvent() throws Exception { + retryOnException(new Callable<Void>() { + @Override + public Void call() throws Exception { + publishLifecycleEvent(besProtoUtil.invocationStarted()); + return null; + } + }); + } + + private void publishEventStream0() throws Exception { + retryOnException(new Callable<Void>() { + @Override + public Void call() throws Exception { + publishEventStream(); + return null; + } + }); + } + + private void publishInvocationFinishedEvent(final Result result) throws Exception { + retryOnException(new Callable<Void>() { + @Override + public Void call() throws Exception { + publishLifecycleEvent(besProtoUtil.invocationFinished(result)); + return null; + } + }); + } + + private void publishBuildFinishedEvent(final Result result) throws Exception { + retryOnException(new Callable<Void>() { + @Override + public Void call() throws Exception { + publishLifecycleEvent(besProtoUtil.buildFinished(result)); + return null; + } + }); + } + } + + /** Responsible for publishing lifecycle evnts RPC. Safe to retry. */ + private Status publishLifecycleEvent(PublishLifecycleEventRequest request) throws Exception { + if (publishLifecycleEvents) { + // Change the status based on BEP data + return besClient.publish(request); + } + return Status.OK; + } + + /** + * Used as method reference, responsible for the entire Streaming RPC. Safe to retry. This method + * it carries states between consecutive calls (pendingAck messages will be added to the head of + * of the pendingSend queue), but that is intended behavior. + */ + private Status publishEventStream() throws Exception { + // Reschedule unacked messages if required, keeping its original order. + OrderedBuildEvent unacked; + while ((unacked = pendingAck.pollLast()) != null) { + pendingSend.addFirst(unacked); + } + pendingAck = new ConcurrentLinkedDeque<>(); + + return publishEventStream(pendingAck, pendingSend, besClient) + .get(PUBLISH_EVENT_STREAM_FINISHED_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS); + } + + /** Method responsible for a single Streaming RPC. */ + private static ListenableFuture<Status> publishEventStream( + final ConcurrentLinkedDeque<OrderedBuildEvent> pendingAck, + final BlockingDeque<OrderedBuildEvent> pendingSend, + final BuildEventServiceClient besClient) + throws Exception { + OrderedBuildEvent event; + ListenableFuture<Status> streamDone = besClient.openStream(ackCallback(pendingAck, besClient)); + try { + do { + event = pendingSend.takeFirst(); + pendingAck.add(event); + besClient.sendOverStream(event); + } while (!isLastEvent(event)); + besClient.closeStream(); + logger.log(Level.INFO, "Closing the build event stream."); + } catch (Exception e) { + logger.log(Level.WARNING, "Aborting publishEventStream.", e); + besClient.abortStream(Status.INTERNAL.augmentDescription(e.getMessage())); + } + return streamDone; + } + + private static boolean isLastEvent(OrderedBuildEvent event) { + return event != null && event.getEvent().getEventCase() == COMPONENT_STREAM_FINISHED; + } + + private static Function<PublishBuildToolEventStreamResponse, Void> ackCallback( + final Deque<OrderedBuildEvent> pendingAck, final BuildEventServiceClient besClient) { + return new Function<PublishBuildToolEventStreamResponse, Void>() { + @Override + public Void apply(PublishBuildToolEventStreamResponse ack) { + long pendingSeq = + pendingAck.isEmpty() ? -1 : pendingAck.peekFirst().getSequenceNumber(); + long ackSeq = ack.getSequenceNumber(); + if (pendingSeq != ackSeq) { + besClient.abortStream(Status.INTERNAL + .augmentDescription(format("Expected ack %s but was %s.", pendingSeq, ackSeq))); + } else { + pendingAck.removeFirst(); + } + return null; + } + }; + } + + private void retryOnException(Callable<?> c) throws Exception { + retryOnException(c, 3, 100); + } + + /** + * Executes a {@link Callable} retrying on exception thrown. + */ + // TODO(eduardocolaco): Implement transient/persistent failures + private void retryOnException(Callable<?> c, final int maxRetries, final long initalDelayMillis) + throws Exception { + int tries = 0; + while (tries <= maxRetries) { + try { + c.call(); + lastKnownError = null; + return; + // TODO(buchgr): Narrow the exception to not catch InterruptedException and + // RuntimeException's. + } catch (Exception e) { + tries++; + lastKnownError = e; + /* + * Exponential backoff: + * Retry 1: initalDelayMillis * 2^0 + * Retry 2: initalDelayMillis * 2^1 + * Retry 3: initalDelayMillis * 2^2 + * ... + */ + long sleepMillis = initalDelayMillis << (tries - 1); + String message = String.format("Retrying RPC to BES. Attempt %s. Backoff %s ms.", + tries, sleepMillis); + logger.log(Level.INFO, message, lastKnownError); + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(sleepMillis)); + } + } + Preconditions.checkNotNull(lastKnownError); + throw lastKnownError; + } + + private void report(EventKind eventKind, String msg, Object... parameters) { + commandLineReporter.handle(Event.of(eventKind, null, format(msg, parameters))); + } + + @Nullable + private static Throwable fromFuture(Future<?> f) { + if (!f.isDone()) { + return null; + } + try { + f.get(); + return null; + } catch (Throwable t) { + return t; + } + } +} diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BUILD b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BUILD new file mode 100644 index 0000000000..3a1032a630 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BUILD @@ -0,0 +1,31 @@ +filegroup( + name = "srcs", + srcs = glob(["**"]), + visibility = ["//src/main/java/com/google/devtools/build/lib:__pkg__"], +) + +java_library( + name = "client", + srcs = glob(["*.java"]), + visibility = [ + "//src/main/java/com/google/devtools/build/lib:__pkg__", + ], + runtime_deps = [ + # This is required for client TLS. + "//third_party:netty_tcnative", + ], + deps = [ + "//src/main/java/com/google/devtools/build/lib:util", + "//third_party:apache_httpclient", + "//third_party:apache_httpcore", + "//third_party:auth", + "//third_party:gson", + "//third_party:guava", + "//third_party:joda_time", + "//third_party:jsr305", + "//third_party:netty", + "//third_party/grpc:grpc-jar", + "@googleapis//:google_devtools_build_v1_publish_build_event_java_grpc", + "@googleapis//:google_devtools_build_v1_publish_build_event_java_proto", + ], +) diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java new file mode 100644 index 0000000000..6feb53c808 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java @@ -0,0 +1,90 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.buildeventservice.client; + +import com.google.common.base.Function; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.devtools.build.v1.OrderedBuildEvent; +import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse; +import com.google.devtools.build.v1.PublishLifecycleEventRequest; +import io.grpc.Status; + +/** Interface used to abstract both gRPC and Stubby BuildEventServiceBackend. */ +public interface BuildEventServiceClient { + + /** + * Makes a synchronous RPC that publishes the specified lifecycle event. + * + * @param lifecycleEvent Event to be published. + * @return Status of the RPC. + */ + Status publish(PublishLifecycleEventRequest lifecycleEvent) throws Exception; + + /** + * Starts a new stream with the given ack callback. Throws an {@link IllegalStateException} if the + * there is already opened stream. Callers should wait on the returned Future in order to + * guarantee that all callback calls have been received. + * + * @param ackCallback Consumer called every time a ack message is received. + * @return Listenable future that blocks until the the onDone callback is called. + * @throws Exception + */ + ListenableFuture<Status> openStream( + Function<PublishBuildToolEventStreamResponse, Void> ackCallback) throws Exception; + + /** + * Sends an event to the most recently opened stream. This method may block due to flow control. + * + * @param buildEvent Event that should be sent. + * @throws Exception + */ + void sendOverStream(OrderedBuildEvent buildEvent) throws Exception; + + /** + * Closes the currently opened opened stream. This method does not block. Callers should block on + * the Future returned by {@link #openStream(Function)} in order to make sure that all + * ackCallback calls have been received. + */ + void closeStream(); + + /** + * Closes the currently opened stream with error. This method does not block. Callers should block + * on the Future returned by {@link #openStream(Function)} if in order to make sure that all + * ackCallback calls have been received. + */ + void abortStream(Status status); + + /** + * Checks if there is a currently active stream. + * + * @return True if the current stream is active, false otherwise. + */ + boolean isStreamActive(); + + /** + * Called once to dispose resources that this client might be holding (such as thread pools). This + * should be the last method called on this object. + * + * @throws InterruptedException + */ + void shutdown() throws InterruptedException; + + /** + * If possible, returns a user readable error message for a given {@link Throwable}. + * + * <p>As a last resort, it's valid to return {@link Throwable#getMessage()}. + */ + String userReadableError(Throwable t); +} diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java new file mode 100644 index 0000000000..86e33e09e8 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java @@ -0,0 +1,234 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.buildeventservice.client; + +import static com.google.common.base.Strings.isNullOrEmpty; +import static com.google.devtools.build.lib.util.Preconditions.checkNotNull; +import static com.google.devtools.build.lib.util.Preconditions.checkState; +import static java.lang.System.getenv; +import static java.nio.file.Files.newInputStream; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import com.google.auth.oauth2.GoogleCredentials; +import com.google.common.base.Function; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.devtools.build.v1.OrderedBuildEvent; +import com.google.devtools.build.v1.PublishBuildEventGrpc; +import com.google.devtools.build.v1.PublishBuildEventGrpc.PublishBuildEventBlockingStub; +import com.google.devtools.build.v1.PublishBuildEventGrpc.PublishBuildEventStub; +import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse; +import com.google.devtools.build.v1.PublishLifecycleEventRequest; +import io.grpc.CallCredentials; +import io.grpc.ManagedChannel; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.auth.MoreCallCredentials; +import io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.NegotiationType; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.stub.AbstractStub; +import io.grpc.stub.StreamObserver; +import io.netty.handler.ssl.SslContext; +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; +import javax.net.ssl.SSLException; +import org.joda.time.Duration; + +/** Implementation of BuildEventServiceClient that uploads data using gRPC. */ +public class BuildEventServiceGrpcClient implements BuildEventServiceClient { + + private static final Logger logger = + Logger.getLogger(BuildEventServiceGrpcClient.class.getName()); + + /** Max wait time for a single non-streaming RPC to finish */ + private static final Duration RPC_TIMEOUT = Duration.standardSeconds(15); + /** See https://developers.google.com/identity/protocols/application-default-credentials * */ + private static final String DEFAULT_APP_CREDENTIALS_ENV_VAR = "GOOGLE_APPLICATION_CREDENTIALS"; + /** TODO(eduardocolaco): Scope documentation.* */ + private static final String CREDENTIALS_SCOPE = + "https://www.googleapis.com/auth/cloud-build-service"; + + private final PublishBuildEventStub besAsync; + private final PublishBuildEventBlockingStub besBlocking; + private final ManagedChannel channel; + private final AtomicReference<StreamObserver<OrderedBuildEvent>> streamReference; + + public BuildEventServiceGrpcClient(String serverSpec, boolean tlsEnabled, + @Nullable String tlsCertificateFile, @Nullable String tlsAuthorityOverride, + @Nullable String credentialsFile, @Nullable String credentialsScope) { + this(getChannel(serverSpec, tlsEnabled, tlsCertificateFile, tlsAuthorityOverride), + getCallCredentials(credentialsFile, credentialsScope)); + } + + public BuildEventServiceGrpcClient( + ManagedChannel channel, + @Nullable CallCredentials callCredentials) { + this.channel = channel; + this.besAsync = withCallCredentials( + PublishBuildEventGrpc.newStub(channel), callCredentials); + this.besBlocking = withCallCredentials( + PublishBuildEventGrpc.newBlockingStub(channel), callCredentials); + this.streamReference = new AtomicReference<>(null); + } + + private static <T extends AbstractStub<T>> T withCallCredentials( + T stub, @Nullable CallCredentials callCredentials) { + stub = callCredentials != null ? stub.withCallCredentials(callCredentials) : stub; + return stub; + } + + @Override + public Status publish(PublishLifecycleEventRequest lifecycleEvent) throws Exception { + besBlocking + .withDeadlineAfter(RPC_TIMEOUT.getMillis(), MILLISECONDS) + .publishLifecycleEvent(lifecycleEvent); + return Status.OK; + } + + @Override + public ListenableFuture<Status> openStream( + Function<PublishBuildToolEventStreamResponse, Void> ack) + throws Exception { + SettableFuture<Status> streamFinished = SettableFuture.create(); + checkState( + streamReference.compareAndSet(null, createStream(ack, streamFinished)), + "Starting a new stream without closing the previous one"); + return streamFinished; + } + + private StreamObserver<OrderedBuildEvent> createStream( + final Function<PublishBuildToolEventStreamResponse, Void> ack, + final SettableFuture<Status> streamFinished) { + return besAsync.publishBuildToolEventStream( + new StreamObserver<PublishBuildToolEventStreamResponse>() { + @Override + public void onNext(PublishBuildToolEventStreamResponse response) { + ack.apply(response); + } + + @Override + public void onError(Throwable t) { + streamReference.set(null); + streamFinished.setException(t); + } + + @Override + public void onCompleted() { + streamReference.set(null); + streamFinished.set(Status.OK); + } + }); + } + + @Override + public void sendOverStream(OrderedBuildEvent buildEvent) throws Exception { + checkNotNull(streamReference.get(), "Attempting to send over a closed or unopened stream") + .onNext(buildEvent); + } + + @Override + public void closeStream() { + StreamObserver<OrderedBuildEvent> stream; + if ((stream = streamReference.getAndSet(null)) != null) { + stream.onCompleted(); + } + } + + @Override + public void abortStream(Status status) { + StreamObserver<OrderedBuildEvent> stream; + if ((stream = streamReference.getAndSet(null)) != null) { + stream.onError(status.asException()); + } + } + + @Override + public boolean isStreamActive() { + return streamReference.get() != null; + } + + @Override + public void shutdown() throws InterruptedException { + this.channel.shutdown(); + } + + @Override + public String userReadableError(Throwable t) { + if (t instanceof StatusRuntimeException) { + Throwable rootCause = Throwables.getRootCause(t); + String message = ((StatusRuntimeException) t).getStatus().getCode().name(); + message += ": " + rootCause.getMessage(); + return message; + } else { + return t.getMessage(); + } + } + + /** + * Returns call credentials read from the specified file (if non-empty) or from + * env(GOOGLE_APPLICATION_CREDENTIALS) otherwise. + */ + @Nullable + private static CallCredentials getCallCredentials(@Nullable String credentialsFile, + @Nullable String credentialsScope) { + String effectiveScope = credentialsScope != null ? credentialsScope : CREDENTIALS_SCOPE; + try { + if (!isNullOrEmpty(credentialsFile)) { + return MoreCallCredentials.from( + GoogleCredentials.fromStream(newInputStream(Paths.get(credentialsFile))) + .createScoped(ImmutableList.of(effectiveScope))); + + } else if (!isNullOrEmpty(getenv(DEFAULT_APP_CREDENTIALS_ENV_VAR))) { + return MoreCallCredentials.from( + GoogleCredentials.getApplicationDefault() + .createScoped(ImmutableList.of(effectiveScope))); + } + } catch (IOException e) { + logger.log(Level.WARNING, "Failed to read credentials", e); + } + return null; + } + + /** + * Returns a ManagedChannel to the specified server. + */ + private static ManagedChannel getChannel(String serverSpec, boolean tlsEnabled, + @Nullable String tlsCertificateFile, @Nullable String tlsAuthorityOverride) { + //TODO(buchgr): Use ManagedChannelBuilder once bazel uses a newer gRPC version. + NettyChannelBuilder builder = NettyChannelBuilder.forTarget(serverSpec); + builder.negotiationType(tlsEnabled ? NegotiationType.TLS : NegotiationType.PLAINTEXT); + if (tlsCertificateFile != null) { + try { + SslContext sslContext = + GrpcSslContexts.forClient().trustManager(new File(tlsCertificateFile)).build(); + builder.sslContext(sslContext); + } catch (SSLException e) { + throw new RuntimeException(e); + } + } + if (tlsAuthorityOverride != null) { + builder.overrideAuthority(tlsAuthorityOverride); + } + return builder.build(); + } +} |