aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/buildeventservice
diff options
context:
space:
mode:
authorGravatar buchgr <buchgr@google.com>2018-07-22 03:00:25 -0700
committerGravatar Copybara-Service <copybara-piper@google.com>2018-07-22 03:01:56 -0700
commit8dc77037cf73e4e0d1300aaa4143145e61c7d267 (patch)
treedee1d12a5914f2ffba3e135dea43d554d3c6fbb0 /src/main/java/com/google/devtools/build/lib/buildeventservice
parentfc8b9bfe2fc81c0e264ba93a3f7f74e28f7c1643 (diff)
bes: make error handling sane
- refactor the BuildEventServiceClient interface to report errors via StatusException and InterruptException. - do the groundwork necessary to do retries based on rpc status codes. - improve the execution speed of the BuildEventServiceStubbyClientTest from 1m5s to 5s. RELNOTES: None PiperOrigin-RevId: 205563431
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/buildeventservice')
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java179
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java46
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java82
3 files changed, 139 insertions, 168 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
index 36a4aa1b60..fa2fb67b24 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
@@ -14,7 +14,6 @@
package com.google.devtools.build.lib.buildeventservice;
-import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static com.google.devtools.build.lib.events.EventKind.INFO;
import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_FAILED;
@@ -58,6 +57,7 @@ import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse;
import com.google.devtools.build.v1.PublishLifecycleEventRequest;
import com.google.protobuf.Any;
import io.grpc.Status;
+import io.grpc.StatusException;
import java.time.Duration;
import java.util.Collection;
import java.util.Deque;
@@ -119,7 +119,7 @@ public class BuildEventServiceTransport implements BuildEventTransport {
* previous call was successful, this field is null. This is useful for error reporting, when an
* upload times out due to having had to retry several times.
*/
- private volatile Exception lastRetryError;
+ private volatile StatusException lastRetryError;
/** Returns true if we already reported a warning or error to UI. */
private volatile boolean errorsReported;
/**
@@ -195,16 +195,19 @@ public class BuildEventServiceTransport implements BuildEventTransport {
// fix would be to remove the spinning loop from publishEventStream and instead implement the
// loop by publishEventStream re-submitting itself to the executor.
// TODO(buchgr): Fix it.
- this.uploaderExecutorService = listeningDecorator(Executors.newFixedThreadPool(2,
- new ThreadFactory() {
-
- private final AtomicInteger count = new AtomicInteger();
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "bes-uploader-" + count.incrementAndGet());
- }
- }));
+ this.uploaderExecutorService =
+ listeningDecorator(
+ Executors.newFixedThreadPool(
+ 2,
+ new ThreadFactory() {
+
+ private final AtomicInteger count = new AtomicInteger();
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "bes-uploader-" + count.incrementAndGet());
+ }
+ }));
this.protocolOptions = protocolOptions;
this.invocationResult = UNKNOWN_STATUS;
this.uploadTimeout = uploadTimeout;
@@ -335,8 +338,9 @@ public class BuildEventServiceTransport implements BuildEventTransport {
localFileMap.put(localFile.path, localFile);
}
ListenableFuture<PathConverter> upload = artifactUploader.upload(localFileMap.build());
- InternalOrderedBuildEvent buildEvent = new DefaultInternalOrderedBuildEvent(event, namer,
- upload, besProtoUtil.nextSequenceNumber());
+ InternalOrderedBuildEvent buildEvent =
+ new DefaultInternalOrderedBuildEvent(
+ event, namer, upload, besProtoUtil.nextSequenceNumber());
sendOrderedBuildEvent(buildEvent);
}
@@ -344,7 +348,7 @@ public class BuildEventServiceTransport implements BuildEventTransport {
String message;
if (t instanceof TimeoutException) {
message = "Build Event Protocol upload timed out.";
- Exception lastRetryError0 = lastRetryError;
+ StatusException lastRetryError0 = lastRetryError;
if (lastRetryError0 != null) {
// We may at times get a timeout exception due to an underlying error that was retried
// several times. If such an error exists, report it.
@@ -406,19 +410,19 @@ public class BuildEventServiceTransport implements BuildEventTransport {
return invocationResult;
}
- /** Method responsible for sending all requests to BuildEventService. */
+ /** Class responsible for sending lifecycle and build events. */
private class BuildEventServiceUpload implements Callable<Void> {
@Override
public Void call() throws Exception {
try {
- publishBuildEnqueuedEvent();
- publishInvocationStartedEvent();
+ publishLifecycleEvent(besProtoUtil.buildEnqueued());
+ publishLifecycleEvent(besProtoUtil.invocationStarted());
try {
- publishEventStream0();
+ retryOnException(BuildEventServiceTransport.this::publishEventStream);
} finally {
Result result = getInvocationResult();
- publishInvocationFinishedEvent(result);
- publishBuildFinishedEvent(result);
+ publishLifecycleEvent(besProtoUtil.invocationFinished(result));
+ publishLifecycleEvent(besProtoUtil.buildFinished(result));
}
} finally {
try {
@@ -430,54 +434,11 @@ public class BuildEventServiceTransport implements BuildEventTransport {
return null;
}
- private void publishBuildEnqueuedEvent() throws Exception {
- retryOnException(
- () -> {
- publishLifecycleEvent(besProtoUtil.buildEnqueued());
- return null;
- });
- }
-
- private void publishInvocationStartedEvent() throws Exception {
- retryOnException(
- () -> {
- publishLifecycleEvent(besProtoUtil.invocationStarted());
- return null;
- });
- }
-
- private void publishEventStream0() throws Exception {
- retryOnException(
- () -> {
- publishEventStream();
- return null;
- });
- }
-
- private void publishInvocationFinishedEvent(final Result result) throws Exception {
- retryOnException(
- () -> {
- publishLifecycleEvent(besProtoUtil.invocationFinished(result));
- return null;
- });
- }
-
- private void publishBuildFinishedEvent(final Result result) throws Exception {
- retryOnException(
- () -> {
- publishLifecycleEvent(besProtoUtil.buildFinished(result));
- return null;
- });
- }
- }
-
- /** Responsible for publishing lifecycle evnts RPC. Safe to retry. */
- private Status publishLifecycleEvent(PublishLifecycleEventRequest request) throws Exception {
- if (publishLifecycleEvents) {
- // Change the status based on BEP data
- return besClient.publish(request);
+ private void publishLifecycleEvent(PublishLifecycleEventRequest request) throws Exception {
+ if (publishLifecycleEvents) {
+ retryOnException(() -> besClient.publish(request));
+ }
}
- return Status.OK;
}
/**
@@ -485,7 +446,8 @@ public class BuildEventServiceTransport implements BuildEventTransport {
* carries over the state between consecutive calls (pendingAck messages will be added to the head
* of the pendingSend queue), but that is intended behavior.
*/
- private void publishEventStream() throws Exception {
+ private void publishEventStream()
+ throws StatusException, LocalFileUploadException, InterruptedException {
// Reschedule unacked messages if required, keeping its original order.
InternalOrderedBuildEvent unacked;
while ((unacked = pendingAck.pollLast()) != null) {
@@ -500,16 +462,12 @@ public class BuildEventServiceTransport implements BuildEventTransport {
final ConcurrentLinkedDeque<InternalOrderedBuildEvent> pendingAck,
final BlockingDeque<InternalOrderedBuildEvent> pendingSend,
final BuildEventServiceClient besClient)
- throws Exception {
+ throws StatusException, LocalFileUploadException, InterruptedException {
+ ListenableFuture<Status> stream = besClient.openStream(ackCallback(pendingAck, besClient));
logger.log(
Level.INFO,
String.format(
- "Starting PublishBuildToolEventStream() RPC pendingSendCount=%s", pendingSend.size()));
- ListenableFuture<Status> streamDone = besClient.openStream(ackCallback(pendingAck, besClient));
- logger.log(
- Level.INFO,
- String.format(
- "Started PublishBuildToolEventStream() RPC pendingSendCount=%s", pendingSend.size()));
+ "Started PublishBuildToolEventStream RPC (pendingSendCount=%s)", pendingSend.size()));
try {
@Nullable InternalOrderedBuildEvent orderedBuildEvent;
do {
@@ -519,7 +477,10 @@ public class BuildEventServiceTransport implements BuildEventTransport {
PathConverter pathConverter = waitForLocalFileUploads(orderedBuildEvent);
besClient.sendOverStream(orderedBuildEvent.serialize(pathConverter));
}
- checkState(besClient.isStreamActive(), "Stream was closed prematurely.");
+ Status streamStatus = getFromStreamFuture(stream);
+ if (streamStatus != null) {
+ throw streamStatus.augmentDescription("Stream closed prematurely").asException();
+ }
} while (orderedBuildEvent == null || !orderedBuildEvent.isLastEvent());
logger.log(
Level.INFO,
@@ -530,25 +491,24 @@ public class BuildEventServiceTransport implements BuildEventTransport {
// By convention the interrupted flag should have been cleared,
// but just to be sure clear it.
Thread.interrupted();
- String additionalDetails = "Sending build events.";
- besClient.abortStream(Status.CANCELLED.augmentDescription(additionalDetails));
+ besClient.abortStream(
+ Status.CANCELLED.augmentDescription("The build event upload was interrupted."));
throw e;
- } catch (Exception e) {
- Status status = streamDone.isDone() ? streamDone.get() : null;
- String additionalDetail = e.getMessage();
- logger.log(
- Level.WARNING,
- String.format(
- "Aborting publishBuildToolEventStream RPC (status=%s): %s", status, additionalDetail),
- e);
- besClient.abortStream(Status.INTERNAL.augmentDescription(additionalDetail));
+ } catch (StatusException e) {
+ besClient.abortStream(e.getStatus());
+ throw e;
+ } catch (LocalFileUploadException e) {
+ besClient.abortStream(Status.INTERNAL.augmentDescription("Local file upload failed."));
throw e;
}
try {
Status status =
- streamDone.get(PUBLISH_EVENT_STREAM_FINISHED_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
+ stream.get(PUBLISH_EVENT_STREAM_FINISHED_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
logger.log(Level.INFO, "Done with publishEventStream(). Status: " + status);
+ if (!status.isOk()) {
+ throw status.asException();
+ }
} catch (InterruptedException e) {
// By convention the interrupted flag should have been cleared,
// but just to be sure clear it.
@@ -560,10 +520,26 @@ public class BuildEventServiceTransport implements BuildEventTransport {
String additionalDetail = "Build Event Protocol upload timed out waiting for ACK messages";
logger.log(Level.WARNING, "Cancelling publishBuildToolEventStream RPC: " + additionalDetail);
besClient.abortStream(Status.CANCELLED.augmentDescription(additionalDetail));
- throw e;
+ throw Status.DEADLINE_EXCEEDED.augmentDescription(additionalDetail).asException();
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(
+ "The stream future is expected to never fail per API contract", e);
}
}
+ @Nullable
+ private Status getFromStreamFuture(ListenableFuture<Status> stream) throws InterruptedException {
+ if (stream.isDone()) {
+ try {
+ return stream.get();
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(
+ "The stream future is expected to never fail per API contract", e);
+ }
+ }
+ return null;
+ }
+
private PathConverter waitForLocalFileUploads(InternalOrderedBuildEvent orderedBuildEvent)
throws LocalFileUploadException, InterruptedException {
try {
@@ -573,7 +549,8 @@ public class BuildEventServiceTransport implements BuildEventTransport {
logger.log(
Level.WARNING,
String.format(
- "Failed to upload local files referenced by build event: %s", e.getMessage()), e);
+ "Failed to upload local files referenced by build event: %s", e.getMessage()),
+ e);
throw new LocalFileUploadException(e.getCause());
}
}
@@ -608,7 +585,7 @@ public class BuildEventServiceTransport implements BuildEventTransport {
}
/** Executes a {@link Callable} retrying on exception thrown. */
- private void retryOnException(Callable<?> c) throws Exception {
+ private void retryOnException(EventUploadCallable c) throws Exception {
final int maxRetries = 5;
final long initialDelayMillis = 0;
final long delayMillis = 1000;
@@ -624,7 +601,7 @@ public class BuildEventServiceTransport implements BuildEventTransport {
throw e;
} catch (LocalFileUploadException e) {
throw (Exception) e.getCause();
- } catch (Exception e) {
+ } catch (StatusException e) {
if (acksReceivedSinceLastRetry.get() > 0) {
logger.fine(
String.format(
@@ -680,8 +657,7 @@ public class BuildEventServiceTransport implements BuildEventTransport {
ListenableFuture<PathConverter> localFileUploadProgress();
- PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter)
- throws ExecutionException, InterruptedException;
+ PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter);
}
private class DefaultInternalOrderedBuildEvent implements InternalOrderedBuildEvent {
@@ -691,8 +667,10 @@ public class BuildEventServiceTransport implements BuildEventTransport {
private final int sequenceNumber;
DefaultInternalOrderedBuildEvent(
- BuildEvent event, ArtifactGroupNamer artifactGroupNamer,
- ListenableFuture<PathConverter> artifactUpload, int sequenceNumber) {
+ BuildEvent event,
+ ArtifactGroupNamer artifactGroupNamer,
+ ListenableFuture<PathConverter> artifactUpload,
+ int sequenceNumber) {
this.event = Preconditions.checkNotNull(event);
this.artifactGroupNamer = Preconditions.checkNotNull(artifactGroupNamer);
this.artifactUpload = artifactUpload;
@@ -715,8 +693,7 @@ public class BuildEventServiceTransport implements BuildEventTransport {
}
@Override
- public PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter)
- throws ExecutionException, InterruptedException {
+ public PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter) {
BuildEventStreamProtos.BuildEvent eventProto =
event.asStreamProto(
new BuildEventContext() {
@@ -766,4 +743,8 @@ public class BuildEventServiceTransport implements BuildEventTransport {
return besProtoUtil.streamFinished(sequenceNumber);
}
}
+
+ private interface EventUploadCallable {
+ void call() throws StatusException, LocalFileUploadException, InterruptedException;
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java
index bdaf11490f..2f571dd6af 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java
@@ -19,57 +19,49 @@ import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse;
import com.google.devtools.build.v1.PublishLifecycleEventRequest;
import io.grpc.Status;
+import io.grpc.StatusException;
import java.util.function.Function;
-/** Interface used to abstract both gRPC and Stubby BuildEventServiceBackend. */
+/** Interface used to abstract the Stubby and gRPC client implementations. */
public interface BuildEventServiceClient {
- /**
- * Makes a synchronous RPC that publishes the specified lifecycle event.
- *
- * @param lifecycleEvent Event to be published.
- * @return Status of the RPC.
- */
- Status publish(PublishLifecycleEventRequest lifecycleEvent) throws Exception;
+ /** Makes a blocking RPC call that publishes a {@code lifecycleEvent}. */
+ void publish(PublishLifecycleEventRequest lifecycleEvent)
+ throws StatusException, InterruptedException;
/**
- * Starts a new stream with the given ack callback. Throws an {@link IllegalStateException} if the
- * there is already opened stream. Callers should wait on the returned Future in order to
- * guarantee that all callback calls have been received.
- *
- * @param ackCallback Consumer called every time a ack message is received.
- * @return Listenable future that blocks until the onDone callback is called.
- * @throws Exception
+ * Starts a new stream with the given {@code ackCallback}. Callers must wait on the returned
+ * future in order to guarantee that all callback calls have been received. The returned future
+ * will never fail, but in case of error will contain a corresponding status.
*/
ListenableFuture<Status> openStream(
- Function<PublishBuildToolEventStreamResponse, Void> ackCallback) throws Exception;
+ Function<PublishBuildToolEventStreamResponse, Void> ackCallback)
+ throws StatusException, InterruptedException;
/**
- * Sends an event to the most recently opened stream. This method may block due to flow control.
- *
- * @param buildEvent Event that should be sent.
- * @throws Exception
+ * Sends an event over the currently open stream. This method may block due to flow control.
*/
- void sendOverStream(PublishBuildToolEventStreamRequest buildEvent) throws Exception;
+ void sendOverStream(PublishBuildToolEventStreamRequest buildEvent)
+ throws StatusException, InterruptedException;
/**
- * Closes the currently opened opened stream. This method does not block. Callers should block on
- * the Future returned by {@link #openStream(Function)} in order to make sure that all
- * ackCallback calls have been received.
+ * Closes the currently opened stream. This method does not block. Callers should block on
+ * the future returned by {@link #openStream(Function)} in order to make sure that all
+ * {@code ackCallback} calls have been received.
*/
void closeStream();
/**
* Closes the currently opened stream with error. This method does not block. Callers should block
- * on the Future returned by {@link #openStream(Function)} if in order to make sure that all
+ * on the future returned by {@link #openStream(Function)} if in order to make sure that all
* ackCallback calls have been received. This method is NOOP if the stream was already finished.
*/
void abortStream(Status status);
/**
- * Checks if there is a currently active stream.
+ * Checks if there is a currently an active stream.
*
- * @return True if the current stream is active, false otherwise.
+ * @return {@code true} if the current stream is active, false otherwise.
*/
boolean isStreamActive();
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
index 3c5dfbc79a..4bc09148ee 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
@@ -14,10 +14,10 @@
package com.google.devtools.build.lib.buildeventservice.client;
-import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@@ -30,11 +30,11 @@ import com.google.devtools.build.v1.PublishLifecycleEventRequest;
import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.grpc.Status;
+import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.AbstractStub;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.Nullable;
@@ -45,14 +45,12 @@ public abstract class BuildEventServiceGrpcClient implements BuildEventServiceCl
private final PublishBuildEventStub besAsync;
private final PublishBuildEventBlockingStub besBlocking;
- private final AtomicReference<StreamObserver<PublishBuildToolEventStreamRequest>> streamReference;
+ private volatile StreamObserver<PublishBuildToolEventStreamRequest> stream;
public BuildEventServiceGrpcClient(Channel channel, @Nullable CallCredentials callCredentials) {
- this.besAsync = withCallCredentials(
- PublishBuildEventGrpc.newStub(channel), callCredentials);
- this.besBlocking = withCallCredentials(
- PublishBuildEventGrpc.newBlockingStub(channel), callCredentials);
- this.streamReference = new AtomicReference<>(null);
+ this.besAsync = withCallCredentials(PublishBuildEventGrpc.newStub(channel), callCredentials);
+ this.besBlocking =
+ withCallCredentials(PublishBuildEventGrpc.newBlockingStub(channel), callCredentials);
}
private static <T extends AbstractStub<T>> T withCallCredentials(
@@ -62,98 +60,98 @@ public abstract class BuildEventServiceGrpcClient implements BuildEventServiceCl
}
@Override
- public Status publish(PublishLifecycleEventRequest lifecycleEvent) throws Exception {
+ public void publish(PublishLifecycleEventRequest lifecycleEvent)
+ throws StatusException, InterruptedException {
try {
besBlocking
.withDeadlineAfter(RPC_TIMEOUT.toMillis(), MILLISECONDS)
.publishLifecycleEvent(lifecycleEvent);
} catch (StatusRuntimeException e) {
- Throwable rootCause = Throwables.getRootCause(e);
- Throwables.throwIfInstanceOf(rootCause, InterruptedException.class);
- throw e;
+ Throwables.throwIfInstanceOf(Throwables.getRootCause(e), InterruptedException.class);
+ throw e.getStatus().asException();
}
- return Status.OK;
}
@Override
public ListenableFuture<Status> openStream(
- Function<PublishBuildToolEventStreamResponse, Void> ack)
- throws Exception {
+ Function<PublishBuildToolEventStreamResponse, Void> ackCallback)
+ throws StatusException, InterruptedException {
+ Preconditions.checkState(
+ stream == null, "Starting a new stream without closing the previous one");
SettableFuture<Status> streamFinished = SettableFuture.create();
- checkState(
- streamReference.compareAndSet(null, createStream(ack, streamFinished)),
- "Starting a new stream without closing the previous one");
+ stream = createStream(ackCallback, streamFinished);
return streamFinished;
}
private StreamObserver<PublishBuildToolEventStreamRequest> createStream(
- final Function<PublishBuildToolEventStreamResponse, Void> ack,
- final SettableFuture<Status> streamFinished) throws InterruptedException {
+ final Function<PublishBuildToolEventStreamResponse, Void> ackCallback,
+ final SettableFuture<Status> streamFinished)
+ throws StatusException, InterruptedException {
try {
return besAsync.publishBuildToolEventStream(
new StreamObserver<PublishBuildToolEventStreamResponse>() {
@Override
public void onNext(PublishBuildToolEventStreamResponse response) {
- ack.apply(response);
+ ackCallback.apply(response);
}
@Override
public void onError(Throwable t) {
- streamReference.set(null);
- streamFinished.setException(t);
+ stream = null;
+ streamFinished.set(Status.fromThrowable(t));
}
@Override
public void onCompleted() {
- streamReference.set(null);
+ stream = null;
streamFinished.set(Status.OK);
}
});
} catch (StatusRuntimeException e) {
- Throwable rootCause = Throwables.getRootCause(e);
- Throwables.throwIfInstanceOf(rootCause, InterruptedException.class);
- throw e;
+ Throwables.throwIfInstanceOf(Throwables.getRootCause(e), InterruptedException.class);
+ throw e.getStatus().asException();
}
}
@Override
- public void sendOverStream(PublishBuildToolEventStreamRequest buildEvent) throws Exception {
+ public void sendOverStream(PublishBuildToolEventStreamRequest buildEvent)
+ throws StatusException, InterruptedException {
+ StreamObserver<PublishBuildToolEventStreamRequest> stream0 = stream;
+ checkState(stream0 != null, "Attempting to send over a closed stream");
try {
- checkNotNull(streamReference.get(), "Attempting to send over a closed or unopened stream")
- .onNext(buildEvent);
+ stream0.onNext(buildEvent);
} catch (StatusRuntimeException e) {
- Throwable rootCause = Throwables.getRootCause(e);
- Throwables.throwIfInstanceOf(rootCause, InterruptedException.class);
- throw e;
+ Throwables.throwIfInstanceOf(Throwables.getRootCause(e), InterruptedException.class);
+ throw e.getStatus().asException();
}
}
@Override
public void closeStream() {
- StreamObserver<PublishBuildToolEventStreamRequest> stream;
- if ((stream = streamReference.getAndSet(null)) != null) {
- stream.onCompleted();
+ StreamObserver<PublishBuildToolEventStreamRequest> stream0 = stream;
+ if (stream0 != null) {
+ stream0.onCompleted();
}
}
@Override
public void abortStream(Status status) {
- StreamObserver<PublishBuildToolEventStreamRequest> stream;
- if ((stream = streamReference.getAndSet(null)) != null) {
- stream.onError(status.asException());
+ StreamObserver<PublishBuildToolEventStreamRequest> stream0 = stream;
+ if (stream0 != null) {
+ stream0.onError(status.asException());
}
}
@Override
public boolean isStreamActive() {
- return streamReference.get() != null;
+ return stream != null;
}
@Override
public String userReadableError(Throwable t) {
- if (t instanceof StatusRuntimeException) {
+ if (t instanceof StatusException) {
Throwable rootCause = Throwables.getRootCause(t);
- String message = ((StatusRuntimeException) t).getStatus().getCode().name();
+ String message = ((StatusException) t).getStatus().getCode().name();
message += ": " + rootCause.getMessage();
return message;
} else {