aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java82
1 files changed, 40 insertions, 42 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
index 3c5dfbc79a..4bc09148ee 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
@@ -14,10 +14,10 @@
package com.google.devtools.build.lib.buildeventservice.client;
-import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@@ -30,11 +30,11 @@ import com.google.devtools.build.v1.PublishLifecycleEventRequest;
import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.grpc.Status;
+import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.AbstractStub;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.Nullable;
@@ -45,14 +45,12 @@ public abstract class BuildEventServiceGrpcClient implements BuildEventServiceCl
private final PublishBuildEventStub besAsync;
private final PublishBuildEventBlockingStub besBlocking;
- private final AtomicReference<StreamObserver<PublishBuildToolEventStreamRequest>> streamReference;
+ private volatile StreamObserver<PublishBuildToolEventStreamRequest> stream;
public BuildEventServiceGrpcClient(Channel channel, @Nullable CallCredentials callCredentials) {
- this.besAsync = withCallCredentials(
- PublishBuildEventGrpc.newStub(channel), callCredentials);
- this.besBlocking = withCallCredentials(
- PublishBuildEventGrpc.newBlockingStub(channel), callCredentials);
- this.streamReference = new AtomicReference<>(null);
+ this.besAsync = withCallCredentials(PublishBuildEventGrpc.newStub(channel), callCredentials);
+ this.besBlocking =
+ withCallCredentials(PublishBuildEventGrpc.newBlockingStub(channel), callCredentials);
}
private static <T extends AbstractStub<T>> T withCallCredentials(
@@ -62,98 +60,98 @@ public abstract class BuildEventServiceGrpcClient implements BuildEventServiceCl
}
@Override
- public Status publish(PublishLifecycleEventRequest lifecycleEvent) throws Exception {
+ public void publish(PublishLifecycleEventRequest lifecycleEvent)
+ throws StatusException, InterruptedException {
try {
besBlocking
.withDeadlineAfter(RPC_TIMEOUT.toMillis(), MILLISECONDS)
.publishLifecycleEvent(lifecycleEvent);
} catch (StatusRuntimeException e) {
- Throwable rootCause = Throwables.getRootCause(e);
- Throwables.throwIfInstanceOf(rootCause, InterruptedException.class);
- throw e;
+ Throwables.throwIfInstanceOf(Throwables.getRootCause(e), InterruptedException.class);
+ throw e.getStatus().asException();
}
- return Status.OK;
}
@Override
public ListenableFuture<Status> openStream(
- Function<PublishBuildToolEventStreamResponse, Void> ack)
- throws Exception {
+ Function<PublishBuildToolEventStreamResponse, Void> ackCallback)
+ throws StatusException, InterruptedException {
+ Preconditions.checkState(
+ stream == null, "Starting a new stream without closing the previous one");
SettableFuture<Status> streamFinished = SettableFuture.create();
- checkState(
- streamReference.compareAndSet(null, createStream(ack, streamFinished)),
- "Starting a new stream without closing the previous one");
+ stream = createStream(ackCallback, streamFinished);
return streamFinished;
}
private StreamObserver<PublishBuildToolEventStreamRequest> createStream(
- final Function<PublishBuildToolEventStreamResponse, Void> ack,
- final SettableFuture<Status> streamFinished) throws InterruptedException {
+ final Function<PublishBuildToolEventStreamResponse, Void> ackCallback,
+ final SettableFuture<Status> streamFinished)
+ throws StatusException, InterruptedException {
try {
return besAsync.publishBuildToolEventStream(
new StreamObserver<PublishBuildToolEventStreamResponse>() {
@Override
public void onNext(PublishBuildToolEventStreamResponse response) {
- ack.apply(response);
+ ackCallback.apply(response);
}
@Override
public void onError(Throwable t) {
- streamReference.set(null);
- streamFinished.setException(t);
+ stream = null;
+ streamFinished.set(Status.fromThrowable(t));
}
@Override
public void onCompleted() {
- streamReference.set(null);
+ stream = null;
streamFinished.set(Status.OK);
}
});
} catch (StatusRuntimeException e) {
- Throwable rootCause = Throwables.getRootCause(e);
- Throwables.throwIfInstanceOf(rootCause, InterruptedException.class);
- throw e;
+ Throwables.throwIfInstanceOf(Throwables.getRootCause(e), InterruptedException.class);
+ throw e.getStatus().asException();
}
}
@Override
- public void sendOverStream(PublishBuildToolEventStreamRequest buildEvent) throws Exception {
+ public void sendOverStream(PublishBuildToolEventStreamRequest buildEvent)
+ throws StatusException, InterruptedException {
+ StreamObserver<PublishBuildToolEventStreamRequest> stream0 = stream;
+ checkState(stream0 != null, "Attempting to send over a closed stream");
try {
- checkNotNull(streamReference.get(), "Attempting to send over a closed or unopened stream")
- .onNext(buildEvent);
+ stream0.onNext(buildEvent);
} catch (StatusRuntimeException e) {
- Throwable rootCause = Throwables.getRootCause(e);
- Throwables.throwIfInstanceOf(rootCause, InterruptedException.class);
- throw e;
+ Throwables.throwIfInstanceOf(Throwables.getRootCause(e), InterruptedException.class);
+ throw e.getStatus().asException();
}
}
@Override
public void closeStream() {
- StreamObserver<PublishBuildToolEventStreamRequest> stream;
- if ((stream = streamReference.getAndSet(null)) != null) {
- stream.onCompleted();
+ StreamObserver<PublishBuildToolEventStreamRequest> stream0 = stream;
+ if (stream0 != null) {
+ stream0.onCompleted();
}
}
@Override
public void abortStream(Status status) {
- StreamObserver<PublishBuildToolEventStreamRequest> stream;
- if ((stream = streamReference.getAndSet(null)) != null) {
- stream.onError(status.asException());
+ StreamObserver<PublishBuildToolEventStreamRequest> stream0 = stream;
+ if (stream0 != null) {
+ stream0.onError(status.asException());
}
}
@Override
public boolean isStreamActive() {
- return streamReference.get() != null;
+ return stream != null;
}
@Override
public String userReadableError(Throwable t) {
- if (t instanceof StatusRuntimeException) {
+ if (t instanceof StatusException) {
Throwable rootCause = Throwables.getRootCause(t);
- String message = ((StatusRuntimeException) t).getStatus().getCode().name();
+ String message = ((StatusException) t).getStatus().getCode().name();
message += ": " + rootCause.getMessage();
return message;
} else {