aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Googler <noreply@google.com>2018-04-23 09:39:10 -0700
committerGravatar Copybara-Service <copybara-piper@google.com>2018-04-23 09:40:45 -0700
commit0a0f2112852b471318aec4a331e1411dad2ffef3 (patch)
tree436159b5b9cfdca4404ec2ad65d80c7d88c5ad7e /src
parent87621656da2377f5b59b0ee84ecaf54ea77b7bd8 (diff)
This change adds timestamps (call start and end time) to the remote grpc log.
PiperOrigin-RevId: 193937177
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java2
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/logging/BUILD2
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java20
-rw-r--r--src/main/protobuf/BUILD1
-rw-r--r--src/main/protobuf/remote_execution_log.proto7
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java69
6 files changed, 95 insertions, 6 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index 2a34533bfc..d9ab87a9f1 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -133,7 +133,7 @@ public final class RemoteModule extends BlazeModule {
LoggingInterceptor logger = null;
if (!remoteOptions.experimentalRemoteGrpcLog.isEmpty()) {
rpcLogFile = new AsynchronousFileOutputStream(remoteOptions.experimentalRemoteGrpcLog);
- logger = new LoggingInterceptor(rpcLogFile);
+ logger = new LoggingInterceptor(rpcLogFile, env.getRuntime().getClock());
}
RemoteRetrier retrier =
diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD b/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD
index dc1caa6266..0a1d806595 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD
@@ -12,10 +12,12 @@ java_library(
tags = ["bazel"],
deps = [
"//src/main/java/com/google/devtools/build/lib:io",
+ "//src/main/java/com/google/devtools/build/lib/clock",
"//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/protobuf:remote_execution_log_java_proto",
"//third_party:guava",
"//third_party/grpc:grpc-jar",
+ "//third_party/protobuf:protobuf_java",
"@googleapis//:google_bytestream_bytestream_java_grpc",
"@googleapis//:google_bytestream_bytestream_java_proto",
"@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_grpc",
diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java b/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java
index a805eae83b..bf1b393ecb 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java
@@ -15,6 +15,7 @@
package com.google.devtools.build.lib.remote.logging;
import com.google.bytestream.ByteStreamGrpc;
+import com.google.devtools.build.lib.clock.Clock;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.LogEntry;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.util.io.AsynchronousFileOutputStream;
@@ -22,6 +23,7 @@ import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc;
import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc;
import com.google.devtools.remoteexecution.v1test.ExecutionGrpc;
import com.google.devtools.remoteexecution.v1test.RequestMetadata;
+import com.google.protobuf.Timestamp;
import com.google.watcher.v1.WatcherGrpc;
import io.grpc.CallOptions;
import io.grpc.Channel;
@@ -32,15 +34,18 @@ import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
+import java.time.Instant;
import javax.annotation.Nullable;
/** Client interceptor for logging details of certain gRPC calls. */
public class LoggingInterceptor implements ClientInterceptor {
- AsynchronousFileOutputStream rpcLogFile;
+ private final AsynchronousFileOutputStream rpcLogFile;
+ private final Clock clock;
/** Constructs a LoggingInterceptor which logs RPC calls to the given file. */
- public LoggingInterceptor(AsynchronousFileOutputStream rpcLogFile) {
+ public LoggingInterceptor(AsynchronousFileOutputStream rpcLogFile, Clock clock) {
this.rpcLogFile = rpcLogFile;
+ this.clock = clock;
}
/**
@@ -80,6 +85,15 @@ public class LoggingInterceptor implements ClientInterceptor {
}
}
+ /** Get current time as a Timestamp. */
+ private Timestamp getCurrentTimestamp() {
+ Instant time = Instant.ofEpochMilli(clock.currentTimeMillis());
+ return Timestamp.newBuilder()
+ .setSeconds(time.getEpochSecond())
+ .setNanos(time.getNano())
+ .build();
+ }
+
/**
* Wraps client call to log call details by building a {@link LogEntry} and writing it to the RPC
* log file.
@@ -100,6 +114,7 @@ public class LoggingInterceptor implements ClientInterceptor {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
+ entryBuilder.setStartTime(getCurrentTimestamp());
RequestMetadata metadata = TracingMetadataUtils.requestMetadataFromHeaders(headers);
if (metadata != null) {
entryBuilder.setMetadata(metadata);
@@ -115,6 +130,7 @@ public class LoggingInterceptor implements ClientInterceptor {
@Override
public void onClose(Status status, Metadata trailers) {
+ entryBuilder.setEndTime(getCurrentTimestamp());
entryBuilder.setStatus(makeStatusProto(status));
entryBuilder.setDetails(handler.getDetails());
rpcLogFile.write(entryBuilder.build());
diff --git a/src/main/protobuf/BUILD b/src/main/protobuf/BUILD
index 684dcb2e73..85aaa24a67 100644
--- a/src/main/protobuf/BUILD
+++ b/src/main/protobuf/BUILD
@@ -148,6 +148,7 @@ proto_library(
name = "remote_execution_log_proto",
srcs = ["remote_execution_log.proto"],
deps = [
+ "@com_google_protobuf//:well_known_types_timestamp_proto",
"@googleapis//:google_bytestream_bytestream_proto",
"@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_proto",
"@googleapis//:google_longrunning_operations_proto",
diff --git a/src/main/protobuf/remote_execution_log.proto b/src/main/protobuf/remote_execution_log.proto
index 1e4fa9f404..32d76946e8 100644
--- a/src/main/protobuf/remote_execution_log.proto
+++ b/src/main/protobuf/remote_execution_log.proto
@@ -16,6 +16,7 @@ syntax = "proto3";
package remote_logging;
+import "google/protobuf/timestamp.proto";
import "google/bytestream/bytestream.proto";
import "google/devtools/remoteexecution/v1test/remote_execution.proto";
import "google/longrunning/operations.proto";
@@ -39,6 +40,12 @@ message LogEntry {
// Method specific details for this call.
RpcCallDetails details = 4;
+
+ // Time the call started.
+ google.protobuf.Timestamp start_time = 5;
+
+ // Time the call closed.
+ google.protobuf.Timestamp end_time = 6;
}
// Details for a call to
diff --git a/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java b/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java
index 4c1c3980d9..c459978fd0 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java
@@ -39,6 +39,7 @@ import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDe
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WatchDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WriteDetails;
import com.google.devtools.build.lib.remote.util.DigestUtil;
+import com.google.devtools.build.lib.testutil.ManualClock;
import com.google.devtools.build.lib.util.io.AsynchronousFileOutputStream;
import com.google.devtools.remoteexecution.v1test.Action;
import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc;
@@ -60,6 +61,7 @@ import com.google.devtools.remoteexecution.v1test.OutputFile;
import com.google.longrunning.Operation;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
import com.google.watcher.v1.Change;
import com.google.watcher.v1.ChangeBatch;
import com.google.watcher.v1.Request;
@@ -93,12 +95,13 @@ public class LoggingInterceptorTest {
private Channel loggedChannel;
private LoggingInterceptor interceptor;
private AsynchronousFileOutputStream logStream;
+ private ManualClock clock;
// This returns a logging interceptor where all calls are handled by the given handler.
@SuppressWarnings({"rawtypes", "unchecked"})
private LoggingInterceptor getInterceptorWithAlwaysThisHandler(
LoggingHandler handler, AsynchronousFileOutputStream outputFile) {
- return new LoggingInterceptor(outputFile) {
+ return new LoggingInterceptor(outputFile, clock) {
@Override
public <ReqT, RespT> LoggingHandler<ReqT, RespT> selectHandler(
MethodDescriptor<ReqT, RespT> method) {
@@ -117,7 +120,8 @@ public class LoggingInterceptorTest {
.build()
.start();
logStream = Mockito.mock(AsynchronousFileOutputStream.class);
- interceptor = new LoggingInterceptor(logStream);
+ clock = new ManualClock();
+ interceptor = new LoggingInterceptor(logStream, clock);
loggedChannel =
ClientInterceptors.intercept(
InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), interceptor);
@@ -139,6 +143,7 @@ public class LoggingInterceptorTest {
new ByteStreamImplBase() {
@Override
public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
+ clock.advanceMillis(1234);
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@@ -161,8 +166,11 @@ public class LoggingInterceptorTest {
.setMethodName(ByteStreamGrpc.getReadMethod().getFullMethodName())
.setDetails(details)
.setStatus(com.google.rpc.Status.getDefaultInstance())
+ .setStartTime(Timestamp.newBuilder().setSeconds(12).setNanos(300000000))
+ .setEndTime(Timestamp.newBuilder().setSeconds(13).setNanos(534000000))
.build();
+ clock.advanceMillis(12300);
stub.read(request).next();
verify(handler).handleReq(request);
verify(handler).handleResp(response);
@@ -181,7 +189,9 @@ public class LoggingInterceptorTest {
new ByteStreamImplBase() {
@Override
public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
+ clock.advanceMillis(50);
responseObserver.onNext(response1);
+ clock.advanceMillis(1500);
responseObserver.onNext(response2);
responseObserver.onCompleted();
}
@@ -209,6 +219,8 @@ public class LoggingInterceptorTest {
.setMethodName(ByteStreamGrpc.getReadMethod().getFullMethodName())
.setDetails(details)
.setStatus(com.google.rpc.Status.getDefaultInstance())
+ .setStartTime(Timestamp.getDefaultInstance())
+ .setEndTime(Timestamp.newBuilder().setSeconds(1).setNanos(550000000))
.build();
verify(handler).handleReq(request);
@@ -264,12 +276,14 @@ public class LoggingInterceptorTest {
InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), interceptor);
ByteStreamStub stub = ByteStreamGrpc.newStub(channel);
+ clock.advanceMillis(1000);
@SuppressWarnings("unchecked")
StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class);
// Write both responses.
StreamObserver<WriteRequest> requester = stub.write(responseObserver);
requester.onNext(request1);
requester.onNext(request2);
+ clock.advanceMillis(1000);
requester.onCompleted();
ArgumentCaptor<WriteRequest> resultCaptor = ArgumentCaptor.forClass(WriteRequest.class);
@@ -279,6 +293,8 @@ public class LoggingInterceptorTest {
.setMethodName(ByteStreamGrpc.getWriteMethod().getFullMethodName())
.setDetails(details)
.setStatus(com.google.rpc.Status.getDefaultInstance())
+ .setStartTime(Timestamp.newBuilder().setSeconds(1))
+ .setEndTime(Timestamp.newBuilder().setSeconds(2))
.build();
verify(handler, times(2)).handleReq(resultCaptor.capture());
@@ -298,6 +314,7 @@ public class LoggingInterceptorTest {
new ByteStreamImplBase() {
@Override
public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
+ clock.advanceMillis(100);
responseObserver.onError(error.asRuntimeException());
}
});
@@ -314,6 +331,7 @@ public class LoggingInterceptorTest {
InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), interceptor);
ByteStreamBlockingStub stub = ByteStreamGrpc.newBlockingStub(channel);
+ clock.advanceMillis(1500);
assertThrows(StatusRuntimeException.class, () -> stub.read(request).next());
LogEntry expectedEntry =
@@ -324,6 +342,8 @@ public class LoggingInterceptorTest {
com.google.rpc.Status.newBuilder()
.setCode(error.getCode().value())
.setMessage(error.getDescription()))
+ .setStartTime(Timestamp.newBuilder().setSeconds(1).setNanos(500000000))
+ .setEndTime(Timestamp.newBuilder().setSeconds(1).setNanos(600000000))
.build();
verify(handler).handleReq(request);
@@ -345,11 +365,13 @@ public class LoggingInterceptorTest {
@Override
public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
responseObserver.onNext(response);
+ clock.advanceMillis(100);
responseObserver.onCompleted();
}
});
ExecutionBlockingStub stub = ExecutionGrpc.newBlockingStub(loggedChannel);
+ clock.advanceMillis(15000);
stub.execute(request);
LogEntry expectedEntry =
LogEntry.newBuilder()
@@ -359,6 +381,8 @@ public class LoggingInterceptorTest {
.setExecute(
ExecuteDetails.newBuilder().setRequest(request).setResponse(response)))
.setStatus(com.google.rpc.Status.getDefaultInstance())
+ .setStartTime(Timestamp.newBuilder().setSeconds(15))
+ .setEndTime(Timestamp.newBuilder().setSeconds(15).setNanos(100000000))
.build();
verify(logStream).write(expectedEntry);
}
@@ -375,11 +399,12 @@ public class LoggingInterceptorTest {
new ExecutionImplBase() {
@Override
public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
+ clock.advanceMillis(1100);
responseObserver.onError(error.asRuntimeException());
}
});
ExecutionBlockingStub stub = ExecutionGrpc.newBlockingStub(loggedChannel);
-
+ clock.advanceMillis(20000000000001L);
assertThrows(StatusRuntimeException.class, () -> stub.execute(request));
LogEntry expectedEntry =
LogEntry.newBuilder()
@@ -391,6 +416,8 @@ public class LoggingInterceptorTest {
com.google.rpc.Status.newBuilder()
.setCode(error.getCode().value())
.setMessage(error.getDescription()))
+ .setStartTime(Timestamp.newBuilder().setSeconds(20000000000L).setNanos(1000000))
+ .setEndTime(Timestamp.newBuilder().setSeconds(20000000001L).setNanos(101000000))
.build();
verify(logStream).write(expectedEntry);
}
@@ -411,6 +438,7 @@ public class LoggingInterceptorTest {
public void findMissingBlobs(
FindMissingBlobsRequest request,
StreamObserver<FindMissingBlobsResponse> responseObserver) {
+ clock.advanceMillis(200);
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@@ -419,6 +447,7 @@ public class LoggingInterceptorTest {
ContentAddressableStorageBlockingStub stub =
ContentAddressableStorageGrpc.newBlockingStub(loggedChannel);
+ clock.advanceMillis(14900);
stub.findMissingBlobs(request);
LogEntry expectedEntry =
LogEntry.newBuilder()
@@ -431,6 +460,8 @@ public class LoggingInterceptorTest {
.setRequest(request)
.setResponse(response)))
.setStatus(com.google.rpc.Status.getDefaultInstance())
+ .setStartTime(Timestamp.newBuilder().setSeconds(14).setNanos(900000000))
+ .setEndTime(Timestamp.newBuilder().setSeconds(15).setNanos(100000000))
.build();
verify(logStream).write(expectedEntry);
}
@@ -454,12 +485,14 @@ public class LoggingInterceptorTest {
@Override
public void getActionResult(
GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
+ clock.advanceMillis(22222);
responseObserver.onNext(response);
responseObserver.onCompleted();
}
});
ActionCacheBlockingStub stub = ActionCacheGrpc.newBlockingStub(loggedChannel);
+ clock.advanceMillis(11111);
stub.getActionResult(request);
LogEntry expectedEntry =
LogEntry.newBuilder()
@@ -471,6 +504,8 @@ public class LoggingInterceptorTest {
.setRequest(request)
.setResponse(response)))
.setStatus(com.google.rpc.Status.getDefaultInstance())
+ .setStartTime(Timestamp.newBuilder().setSeconds(11).setNanos(111000000))
+ .setEndTime(Timestamp.newBuilder().setSeconds(33).setNanos(333000000))
.build();
verify(logStream).write(expectedEntry);
}
@@ -493,11 +528,14 @@ public class LoggingInterceptorTest {
@Override
public void watch(Request request, StreamObserver<ChangeBatch> responseObserver) {
responseObserver.onNext(response1);
+ clock.advanceMillis(2200);
responseObserver.onNext(response2);
+ clock.advanceMillis(1100);
responseObserver.onCompleted();
}
});
+ clock.advanceMillis(50000);
Iterator<ChangeBatch> replies = WatcherGrpc.newBlockingStub(loggedChannel).watch(request);
// Read both responses.
@@ -516,6 +554,8 @@ public class LoggingInterceptorTest {
.addResponses(response1)
.addResponses(response2)))
.setStatus(com.google.rpc.Status.getDefaultInstance())
+ .setStartTime(Timestamp.newBuilder().setSeconds(50))
+ .setEndTime(Timestamp.newBuilder().setSeconds(53).setNanos(300000000))
.build();
verify(logStream).write(expectedEntry);
}
@@ -533,10 +573,14 @@ public class LoggingInterceptorTest {
new WatcherImplBase() {
@Override
public void watch(Request request, StreamObserver<ChangeBatch> responseObserver) {
+ clock.advanceMillis(100);
responseObserver.onNext(response);
+ clock.advanceMillis(100);
responseObserver.onError(error.asRuntimeException());
}
});
+
+ clock.advanceMillis(2000);
Iterator<ChangeBatch> replies = WatcherGrpc.newBlockingStub(loggedChannel).watch(request);
assertThrows(
StatusRuntimeException.class,
@@ -556,6 +600,8 @@ public class LoggingInterceptorTest {
com.google.rpc.Status.newBuilder()
.setCode(error.getCode().value())
.setMessage(error.getDescription()))
+ .setStartTime(Timestamp.newBuilder().setSeconds(2))
+ .setEndTime(Timestamp.newBuilder().setSeconds(2).setNanos(200000000))
.build();
verify(logStream).write(expectedEntry);
}
@@ -574,10 +620,12 @@ public class LoggingInterceptorTest {
public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
responseObserver.onNext(response1);
responseObserver.onNext(response2);
+ clock.advanceMillis(2000);
responseObserver.onCompleted();
}
});
+ clock.advanceMillis(500000);
Iterator<ReadResponse> replies = ByteStreamGrpc.newBlockingStub(loggedChannel).read(request);
// Read both responses.
@@ -596,6 +644,8 @@ public class LoggingInterceptorTest {
.setNumReads(2)
.setBytesRead(6)))
.setStatus(com.google.rpc.Status.getDefaultInstance())
+ .setStartTime(Timestamp.newBuilder().setSeconds(500))
+ .setEndTime(Timestamp.newBuilder().setSeconds(502))
.build();
verify(logStream).write(expectedEntry);
}
@@ -612,6 +662,7 @@ public class LoggingInterceptorTest {
@Override
public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
responseObserver.onNext(response1);
+ clock.advanceMillis(100);
responseObserver.onError(error.asRuntimeException());
}
});
@@ -638,6 +689,8 @@ public class LoggingInterceptorTest {
com.google.rpc.Status.newBuilder()
.setCode(error.getCode().value())
.setMessage(error.getDescription()))
+ .setStartTime(Timestamp.getDefaultInstance())
+ .setEndTime(Timestamp.newBuilder().setNanos(100000000))
.build();
verify(logStream).write(expectedEntry);
}
@@ -679,11 +732,15 @@ public class LoggingInterceptorTest {
@SuppressWarnings("unchecked")
StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class);
+ clock.advanceMillis(10000);
// Request three writes, the first identical with the third.
StreamObserver<WriteRequest> requester = stub.write(responseObserver);
requester.onNext(request1);
+ clock.advanceMillis(100);
requester.onNext(request2);
+ clock.advanceMillis(200);
requester.onNext(request1);
+ clock.advanceMillis(100);
requester.onCompleted();
LogEntry expectedEntry =
@@ -699,6 +756,8 @@ public class LoggingInterceptorTest {
.setBytesSent(9)
.setNumWrites(3)))
.setStatus(com.google.rpc.Status.getDefaultInstance())
+ .setStartTime(Timestamp.newBuilder().setSeconds(10))
+ .setEndTime(Timestamp.newBuilder().setSeconds(10).setNanos(400000000))
.build();
verify(logStream).write(expectedEntry);
@@ -724,10 +783,12 @@ public class LoggingInterceptorTest {
ByteStreamStub stub = ByteStreamGrpc.newStub(loggedChannel);
@SuppressWarnings("unchecked")
StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class);
+ clock.advanceMillis(10000000000L);
// Write both responses.
StreamObserver<WriteRequest> requester = stub.write(responseObserver);
requester.onNext(request);
+ clock.advanceMillis(10000000000L);
requester.onError(error.asRuntimeException());
Status expectedCancel = Status.CANCELLED.withCause(error.asRuntimeException());
@@ -746,6 +807,8 @@ public class LoggingInterceptorTest {
.addResourceNames("test")
.setNumWrites(1)
.setBytesSent(3)))
+ .setStartTime(Timestamp.newBuilder().setSeconds(10000000))
+ .setEndTime(Timestamp.newBuilder().setSeconds(20000000))
.build();
verify(logStream).write(expectedEntry);