diff options
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java | 82 |
1 files changed, 40 insertions, 42 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java index 3c5dfbc79a..4bc09148ee 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java @@ -14,10 +14,10 @@ package com.google.devtools.build.lib.buildeventservice.client; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -30,11 +30,11 @@ import com.google.devtools.build.v1.PublishLifecycleEventRequest; import io.grpc.CallCredentials; import io.grpc.Channel; import io.grpc.Status; +import io.grpc.StatusException; import io.grpc.StatusRuntimeException; import io.grpc.stub.AbstractStub; import io.grpc.stub.StreamObserver; import java.time.Duration; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import javax.annotation.Nullable; @@ -45,14 +45,12 @@ public abstract class BuildEventServiceGrpcClient implements BuildEventServiceCl private final PublishBuildEventStub besAsync; private final PublishBuildEventBlockingStub besBlocking; - private final AtomicReference<StreamObserver<PublishBuildToolEventStreamRequest>> streamReference; + private volatile StreamObserver<PublishBuildToolEventStreamRequest> stream; public BuildEventServiceGrpcClient(Channel channel, @Nullable CallCredentials callCredentials) { - this.besAsync = withCallCredentials( - PublishBuildEventGrpc.newStub(channel), callCredentials); - this.besBlocking = withCallCredentials( - PublishBuildEventGrpc.newBlockingStub(channel), callCredentials); - this.streamReference = new AtomicReference<>(null); + this.besAsync = withCallCredentials(PublishBuildEventGrpc.newStub(channel), callCredentials); + this.besBlocking = + withCallCredentials(PublishBuildEventGrpc.newBlockingStub(channel), callCredentials); } private static <T extends AbstractStub<T>> T withCallCredentials( @@ -62,98 +60,98 @@ public abstract class BuildEventServiceGrpcClient implements BuildEventServiceCl } @Override - public Status publish(PublishLifecycleEventRequest lifecycleEvent) throws Exception { + public void publish(PublishLifecycleEventRequest lifecycleEvent) + throws StatusException, InterruptedException { try { besBlocking .withDeadlineAfter(RPC_TIMEOUT.toMillis(), MILLISECONDS) .publishLifecycleEvent(lifecycleEvent); } catch (StatusRuntimeException e) { - Throwable rootCause = Throwables.getRootCause(e); - Throwables.throwIfInstanceOf(rootCause, InterruptedException.class); - throw e; + Throwables.throwIfInstanceOf(Throwables.getRootCause(e), InterruptedException.class); + throw e.getStatus().asException(); } - return Status.OK; } @Override public ListenableFuture<Status> openStream( - Function<PublishBuildToolEventStreamResponse, Void> ack) - throws Exception { + Function<PublishBuildToolEventStreamResponse, Void> ackCallback) + throws StatusException, InterruptedException { + Preconditions.checkState( + stream == null, "Starting a new stream without closing the previous one"); SettableFuture<Status> streamFinished = SettableFuture.create(); - checkState( - streamReference.compareAndSet(null, createStream(ack, streamFinished)), - "Starting a new stream without closing the previous one"); + stream = createStream(ackCallback, streamFinished); return streamFinished; } private StreamObserver<PublishBuildToolEventStreamRequest> createStream( - final Function<PublishBuildToolEventStreamResponse, Void> ack, - final SettableFuture<Status> streamFinished) throws InterruptedException { + final Function<PublishBuildToolEventStreamResponse, Void> ackCallback, + final SettableFuture<Status> streamFinished) + throws StatusException, InterruptedException { try { return besAsync.publishBuildToolEventStream( new StreamObserver<PublishBuildToolEventStreamResponse>() { @Override public void onNext(PublishBuildToolEventStreamResponse response) { - ack.apply(response); + ackCallback.apply(response); } @Override public void onError(Throwable t) { - streamReference.set(null); - streamFinished.setException(t); + stream = null; + streamFinished.set(Status.fromThrowable(t)); } @Override public void onCompleted() { - streamReference.set(null); + stream = null; streamFinished.set(Status.OK); } }); } catch (StatusRuntimeException e) { - Throwable rootCause = Throwables.getRootCause(e); - Throwables.throwIfInstanceOf(rootCause, InterruptedException.class); - throw e; + Throwables.throwIfInstanceOf(Throwables.getRootCause(e), InterruptedException.class); + throw e.getStatus().asException(); } } @Override - public void sendOverStream(PublishBuildToolEventStreamRequest buildEvent) throws Exception { + public void sendOverStream(PublishBuildToolEventStreamRequest buildEvent) + throws StatusException, InterruptedException { + StreamObserver<PublishBuildToolEventStreamRequest> stream0 = stream; + checkState(stream0 != null, "Attempting to send over a closed stream"); try { - checkNotNull(streamReference.get(), "Attempting to send over a closed or unopened stream") - .onNext(buildEvent); + stream0.onNext(buildEvent); } catch (StatusRuntimeException e) { - Throwable rootCause = Throwables.getRootCause(e); - Throwables.throwIfInstanceOf(rootCause, InterruptedException.class); - throw e; + Throwables.throwIfInstanceOf(Throwables.getRootCause(e), InterruptedException.class); + throw e.getStatus().asException(); } } @Override public void closeStream() { - StreamObserver<PublishBuildToolEventStreamRequest> stream; - if ((stream = streamReference.getAndSet(null)) != null) { - stream.onCompleted(); + StreamObserver<PublishBuildToolEventStreamRequest> stream0 = stream; + if (stream0 != null) { + stream0.onCompleted(); } } @Override public void abortStream(Status status) { - StreamObserver<PublishBuildToolEventStreamRequest> stream; - if ((stream = streamReference.getAndSet(null)) != null) { - stream.onError(status.asException()); + StreamObserver<PublishBuildToolEventStreamRequest> stream0 = stream; + if (stream0 != null) { + stream0.onError(status.asException()); } } @Override public boolean isStreamActive() { - return streamReference.get() != null; + return stream != null; } @Override public String userReadableError(Throwable t) { - if (t instanceof StatusRuntimeException) { + if (t instanceof StatusException) { Throwable rootCause = Throwables.getRootCause(t); - String message = ((StatusRuntimeException) t).getStatus().getCode().name(); + String message = ((StatusException) t).getStatus().getCode().name(); message += ": " + rootCause.getMessage(); return message; } else { |