diff options
Diffstat (limited to 'src')
6 files changed, 95 insertions, 6 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 2a34533bfc..d9ab87a9f1 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -133,7 +133,7 @@ public final class RemoteModule extends BlazeModule { LoggingInterceptor logger = null; if (!remoteOptions.experimentalRemoteGrpcLog.isEmpty()) { rpcLogFile = new AsynchronousFileOutputStream(remoteOptions.experimentalRemoteGrpcLog); - logger = new LoggingInterceptor(rpcLogFile); + logger = new LoggingInterceptor(rpcLogFile, env.getRuntime().getClock()); } RemoteRetrier retrier = diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD b/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD index dc1caa6266..0a1d806595 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD @@ -12,10 +12,12 @@ java_library( tags = ["bazel"], deps = [ "//src/main/java/com/google/devtools/build/lib:io", + "//src/main/java/com/google/devtools/build/lib/clock", "//src/main/java/com/google/devtools/build/lib/remote/util", "//src/main/protobuf:remote_execution_log_java_proto", "//third_party:guava", "//third_party/grpc:grpc-jar", + "//third_party/protobuf:protobuf_java", "@googleapis//:google_bytestream_bytestream_java_grpc", "@googleapis//:google_bytestream_bytestream_java_proto", "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_grpc", diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java b/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java index a805eae83b..bf1b393ecb 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java @@ -15,6 +15,7 @@ package com.google.devtools.build.lib.remote.logging; import com.google.bytestream.ByteStreamGrpc; +import com.google.devtools.build.lib.clock.Clock; import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.LogEntry; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import com.google.devtools.build.lib.util.io.AsynchronousFileOutputStream; @@ -22,6 +23,7 @@ import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc; import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc; import com.google.devtools.remoteexecution.v1test.ExecutionGrpc; import com.google.devtools.remoteexecution.v1test.RequestMetadata; +import com.google.protobuf.Timestamp; import com.google.watcher.v1.WatcherGrpc; import io.grpc.CallOptions; import io.grpc.Channel; @@ -32,15 +34,18 @@ import io.grpc.ForwardingClientCallListener; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; +import java.time.Instant; import javax.annotation.Nullable; /** Client interceptor for logging details of certain gRPC calls. */ public class LoggingInterceptor implements ClientInterceptor { - AsynchronousFileOutputStream rpcLogFile; + private final AsynchronousFileOutputStream rpcLogFile; + private final Clock clock; /** Constructs a LoggingInterceptor which logs RPC calls to the given file. */ - public LoggingInterceptor(AsynchronousFileOutputStream rpcLogFile) { + public LoggingInterceptor(AsynchronousFileOutputStream rpcLogFile, Clock clock) { this.rpcLogFile = rpcLogFile; + this.clock = clock; } /** @@ -80,6 +85,15 @@ public class LoggingInterceptor implements ClientInterceptor { } } + /** Get current time as a Timestamp. */ + private Timestamp getCurrentTimestamp() { + Instant time = Instant.ofEpochMilli(clock.currentTimeMillis()); + return Timestamp.newBuilder() + .setSeconds(time.getEpochSecond()) + .setNanos(time.getNano()) + .build(); + } + /** * Wraps client call to log call details by building a {@link LogEntry} and writing it to the RPC * log file. @@ -100,6 +114,7 @@ public class LoggingInterceptor implements ClientInterceptor { @Override public void start(Listener<RespT> responseListener, Metadata headers) { + entryBuilder.setStartTime(getCurrentTimestamp()); RequestMetadata metadata = TracingMetadataUtils.requestMetadataFromHeaders(headers); if (metadata != null) { entryBuilder.setMetadata(metadata); @@ -115,6 +130,7 @@ public class LoggingInterceptor implements ClientInterceptor { @Override public void onClose(Status status, Metadata trailers) { + entryBuilder.setEndTime(getCurrentTimestamp()); entryBuilder.setStatus(makeStatusProto(status)); entryBuilder.setDetails(handler.getDetails()); rpcLogFile.write(entryBuilder.build()); diff --git a/src/main/protobuf/BUILD b/src/main/protobuf/BUILD index 684dcb2e73..85aaa24a67 100644 --- a/src/main/protobuf/BUILD +++ b/src/main/protobuf/BUILD @@ -148,6 +148,7 @@ proto_library( name = "remote_execution_log_proto", srcs = ["remote_execution_log.proto"], deps = [ + "@com_google_protobuf//:well_known_types_timestamp_proto", "@googleapis//:google_bytestream_bytestream_proto", "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_proto", "@googleapis//:google_longrunning_operations_proto", diff --git a/src/main/protobuf/remote_execution_log.proto b/src/main/protobuf/remote_execution_log.proto index 1e4fa9f404..32d76946e8 100644 --- a/src/main/protobuf/remote_execution_log.proto +++ b/src/main/protobuf/remote_execution_log.proto @@ -16,6 +16,7 @@ syntax = "proto3"; package remote_logging; +import "google/protobuf/timestamp.proto"; import "google/bytestream/bytestream.proto"; import "google/devtools/remoteexecution/v1test/remote_execution.proto"; import "google/longrunning/operations.proto"; @@ -39,6 +40,12 @@ message LogEntry { // Method specific details for this call. RpcCallDetails details = 4; + + // Time the call started. + google.protobuf.Timestamp start_time = 5; + + // Time the call closed. + google.protobuf.Timestamp end_time = 6; } // Details for a call to diff --git a/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java b/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java index 4c1c3980d9..c459978fd0 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java @@ -39,6 +39,7 @@ import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDe import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WatchDetails; import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WriteDetails; import com.google.devtools.build.lib.remote.util.DigestUtil; +import com.google.devtools.build.lib.testutil.ManualClock; import com.google.devtools.build.lib.util.io.AsynchronousFileOutputStream; import com.google.devtools.remoteexecution.v1test.Action; import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc; @@ -60,6 +61,7 @@ import com.google.devtools.remoteexecution.v1test.OutputFile; import com.google.longrunning.Operation; import com.google.protobuf.Any; import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; import com.google.watcher.v1.Change; import com.google.watcher.v1.ChangeBatch; import com.google.watcher.v1.Request; @@ -93,12 +95,13 @@ public class LoggingInterceptorTest { private Channel loggedChannel; private LoggingInterceptor interceptor; private AsynchronousFileOutputStream logStream; + private ManualClock clock; // This returns a logging interceptor where all calls are handled by the given handler. @SuppressWarnings({"rawtypes", "unchecked"}) private LoggingInterceptor getInterceptorWithAlwaysThisHandler( LoggingHandler handler, AsynchronousFileOutputStream outputFile) { - return new LoggingInterceptor(outputFile) { + return new LoggingInterceptor(outputFile, clock) { @Override public <ReqT, RespT> LoggingHandler<ReqT, RespT> selectHandler( MethodDescriptor<ReqT, RespT> method) { @@ -117,7 +120,8 @@ public class LoggingInterceptorTest { .build() .start(); logStream = Mockito.mock(AsynchronousFileOutputStream.class); - interceptor = new LoggingInterceptor(logStream); + clock = new ManualClock(); + interceptor = new LoggingInterceptor(logStream, clock); loggedChannel = ClientInterceptors.intercept( InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), interceptor); @@ -139,6 +143,7 @@ public class LoggingInterceptorTest { new ByteStreamImplBase() { @Override public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) { + clock.advanceMillis(1234); responseObserver.onNext(response); responseObserver.onCompleted(); } @@ -161,8 +166,11 @@ public class LoggingInterceptorTest { .setMethodName(ByteStreamGrpc.getReadMethod().getFullMethodName()) .setDetails(details) .setStatus(com.google.rpc.Status.getDefaultInstance()) + .setStartTime(Timestamp.newBuilder().setSeconds(12).setNanos(300000000)) + .setEndTime(Timestamp.newBuilder().setSeconds(13).setNanos(534000000)) .build(); + clock.advanceMillis(12300); stub.read(request).next(); verify(handler).handleReq(request); verify(handler).handleResp(response); @@ -181,7 +189,9 @@ public class LoggingInterceptorTest { new ByteStreamImplBase() { @Override public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) { + clock.advanceMillis(50); responseObserver.onNext(response1); + clock.advanceMillis(1500); responseObserver.onNext(response2); responseObserver.onCompleted(); } @@ -209,6 +219,8 @@ public class LoggingInterceptorTest { .setMethodName(ByteStreamGrpc.getReadMethod().getFullMethodName()) .setDetails(details) .setStatus(com.google.rpc.Status.getDefaultInstance()) + .setStartTime(Timestamp.getDefaultInstance()) + .setEndTime(Timestamp.newBuilder().setSeconds(1).setNanos(550000000)) .build(); verify(handler).handleReq(request); @@ -264,12 +276,14 @@ public class LoggingInterceptorTest { InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), interceptor); ByteStreamStub stub = ByteStreamGrpc.newStub(channel); + clock.advanceMillis(1000); @SuppressWarnings("unchecked") StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class); // Write both responses. StreamObserver<WriteRequest> requester = stub.write(responseObserver); requester.onNext(request1); requester.onNext(request2); + clock.advanceMillis(1000); requester.onCompleted(); ArgumentCaptor<WriteRequest> resultCaptor = ArgumentCaptor.forClass(WriteRequest.class); @@ -279,6 +293,8 @@ public class LoggingInterceptorTest { .setMethodName(ByteStreamGrpc.getWriteMethod().getFullMethodName()) .setDetails(details) .setStatus(com.google.rpc.Status.getDefaultInstance()) + .setStartTime(Timestamp.newBuilder().setSeconds(1)) + .setEndTime(Timestamp.newBuilder().setSeconds(2)) .build(); verify(handler, times(2)).handleReq(resultCaptor.capture()); @@ -298,6 +314,7 @@ public class LoggingInterceptorTest { new ByteStreamImplBase() { @Override public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) { + clock.advanceMillis(100); responseObserver.onError(error.asRuntimeException()); } }); @@ -314,6 +331,7 @@ public class LoggingInterceptorTest { InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), interceptor); ByteStreamBlockingStub stub = ByteStreamGrpc.newBlockingStub(channel); + clock.advanceMillis(1500); assertThrows(StatusRuntimeException.class, () -> stub.read(request).next()); LogEntry expectedEntry = @@ -324,6 +342,8 @@ public class LoggingInterceptorTest { com.google.rpc.Status.newBuilder() .setCode(error.getCode().value()) .setMessage(error.getDescription())) + .setStartTime(Timestamp.newBuilder().setSeconds(1).setNanos(500000000)) + .setEndTime(Timestamp.newBuilder().setSeconds(1).setNanos(600000000)) .build(); verify(handler).handleReq(request); @@ -345,11 +365,13 @@ public class LoggingInterceptorTest { @Override public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) { responseObserver.onNext(response); + clock.advanceMillis(100); responseObserver.onCompleted(); } }); ExecutionBlockingStub stub = ExecutionGrpc.newBlockingStub(loggedChannel); + clock.advanceMillis(15000); stub.execute(request); LogEntry expectedEntry = LogEntry.newBuilder() @@ -359,6 +381,8 @@ public class LoggingInterceptorTest { .setExecute( ExecuteDetails.newBuilder().setRequest(request).setResponse(response))) .setStatus(com.google.rpc.Status.getDefaultInstance()) + .setStartTime(Timestamp.newBuilder().setSeconds(15)) + .setEndTime(Timestamp.newBuilder().setSeconds(15).setNanos(100000000)) .build(); verify(logStream).write(expectedEntry); } @@ -375,11 +399,12 @@ public class LoggingInterceptorTest { new ExecutionImplBase() { @Override public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) { + clock.advanceMillis(1100); responseObserver.onError(error.asRuntimeException()); } }); ExecutionBlockingStub stub = ExecutionGrpc.newBlockingStub(loggedChannel); - + clock.advanceMillis(20000000000001L); assertThrows(StatusRuntimeException.class, () -> stub.execute(request)); LogEntry expectedEntry = LogEntry.newBuilder() @@ -391,6 +416,8 @@ public class LoggingInterceptorTest { com.google.rpc.Status.newBuilder() .setCode(error.getCode().value()) .setMessage(error.getDescription())) + .setStartTime(Timestamp.newBuilder().setSeconds(20000000000L).setNanos(1000000)) + .setEndTime(Timestamp.newBuilder().setSeconds(20000000001L).setNanos(101000000)) .build(); verify(logStream).write(expectedEntry); } @@ -411,6 +438,7 @@ public class LoggingInterceptorTest { public void findMissingBlobs( FindMissingBlobsRequest request, StreamObserver<FindMissingBlobsResponse> responseObserver) { + clock.advanceMillis(200); responseObserver.onNext(response); responseObserver.onCompleted(); } @@ -419,6 +447,7 @@ public class LoggingInterceptorTest { ContentAddressableStorageBlockingStub stub = ContentAddressableStorageGrpc.newBlockingStub(loggedChannel); + clock.advanceMillis(14900); stub.findMissingBlobs(request); LogEntry expectedEntry = LogEntry.newBuilder() @@ -431,6 +460,8 @@ public class LoggingInterceptorTest { .setRequest(request) .setResponse(response))) .setStatus(com.google.rpc.Status.getDefaultInstance()) + .setStartTime(Timestamp.newBuilder().setSeconds(14).setNanos(900000000)) + .setEndTime(Timestamp.newBuilder().setSeconds(15).setNanos(100000000)) .build(); verify(logStream).write(expectedEntry); } @@ -454,12 +485,14 @@ public class LoggingInterceptorTest { @Override public void getActionResult( GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) { + clock.advanceMillis(22222); responseObserver.onNext(response); responseObserver.onCompleted(); } }); ActionCacheBlockingStub stub = ActionCacheGrpc.newBlockingStub(loggedChannel); + clock.advanceMillis(11111); stub.getActionResult(request); LogEntry expectedEntry = LogEntry.newBuilder() @@ -471,6 +504,8 @@ public class LoggingInterceptorTest { .setRequest(request) .setResponse(response))) .setStatus(com.google.rpc.Status.getDefaultInstance()) + .setStartTime(Timestamp.newBuilder().setSeconds(11).setNanos(111000000)) + .setEndTime(Timestamp.newBuilder().setSeconds(33).setNanos(333000000)) .build(); verify(logStream).write(expectedEntry); } @@ -493,11 +528,14 @@ public class LoggingInterceptorTest { @Override public void watch(Request request, StreamObserver<ChangeBatch> responseObserver) { responseObserver.onNext(response1); + clock.advanceMillis(2200); responseObserver.onNext(response2); + clock.advanceMillis(1100); responseObserver.onCompleted(); } }); + clock.advanceMillis(50000); Iterator<ChangeBatch> replies = WatcherGrpc.newBlockingStub(loggedChannel).watch(request); // Read both responses. @@ -516,6 +554,8 @@ public class LoggingInterceptorTest { .addResponses(response1) .addResponses(response2))) .setStatus(com.google.rpc.Status.getDefaultInstance()) + .setStartTime(Timestamp.newBuilder().setSeconds(50)) + .setEndTime(Timestamp.newBuilder().setSeconds(53).setNanos(300000000)) .build(); verify(logStream).write(expectedEntry); } @@ -533,10 +573,14 @@ public class LoggingInterceptorTest { new WatcherImplBase() { @Override public void watch(Request request, StreamObserver<ChangeBatch> responseObserver) { + clock.advanceMillis(100); responseObserver.onNext(response); + clock.advanceMillis(100); responseObserver.onError(error.asRuntimeException()); } }); + + clock.advanceMillis(2000); Iterator<ChangeBatch> replies = WatcherGrpc.newBlockingStub(loggedChannel).watch(request); assertThrows( StatusRuntimeException.class, @@ -556,6 +600,8 @@ public class LoggingInterceptorTest { com.google.rpc.Status.newBuilder() .setCode(error.getCode().value()) .setMessage(error.getDescription())) + .setStartTime(Timestamp.newBuilder().setSeconds(2)) + .setEndTime(Timestamp.newBuilder().setSeconds(2).setNanos(200000000)) .build(); verify(logStream).write(expectedEntry); } @@ -574,10 +620,12 @@ public class LoggingInterceptorTest { public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) { responseObserver.onNext(response1); responseObserver.onNext(response2); + clock.advanceMillis(2000); responseObserver.onCompleted(); } }); + clock.advanceMillis(500000); Iterator<ReadResponse> replies = ByteStreamGrpc.newBlockingStub(loggedChannel).read(request); // Read both responses. @@ -596,6 +644,8 @@ public class LoggingInterceptorTest { .setNumReads(2) .setBytesRead(6))) .setStatus(com.google.rpc.Status.getDefaultInstance()) + .setStartTime(Timestamp.newBuilder().setSeconds(500)) + .setEndTime(Timestamp.newBuilder().setSeconds(502)) .build(); verify(logStream).write(expectedEntry); } @@ -612,6 +662,7 @@ public class LoggingInterceptorTest { @Override public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) { responseObserver.onNext(response1); + clock.advanceMillis(100); responseObserver.onError(error.asRuntimeException()); } }); @@ -638,6 +689,8 @@ public class LoggingInterceptorTest { com.google.rpc.Status.newBuilder() .setCode(error.getCode().value()) .setMessage(error.getDescription())) + .setStartTime(Timestamp.getDefaultInstance()) + .setEndTime(Timestamp.newBuilder().setNanos(100000000)) .build(); verify(logStream).write(expectedEntry); } @@ -679,11 +732,15 @@ public class LoggingInterceptorTest { @SuppressWarnings("unchecked") StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class); + clock.advanceMillis(10000); // Request three writes, the first identical with the third. StreamObserver<WriteRequest> requester = stub.write(responseObserver); requester.onNext(request1); + clock.advanceMillis(100); requester.onNext(request2); + clock.advanceMillis(200); requester.onNext(request1); + clock.advanceMillis(100); requester.onCompleted(); LogEntry expectedEntry = @@ -699,6 +756,8 @@ public class LoggingInterceptorTest { .setBytesSent(9) .setNumWrites(3))) .setStatus(com.google.rpc.Status.getDefaultInstance()) + .setStartTime(Timestamp.newBuilder().setSeconds(10)) + .setEndTime(Timestamp.newBuilder().setSeconds(10).setNanos(400000000)) .build(); verify(logStream).write(expectedEntry); @@ -724,10 +783,12 @@ public class LoggingInterceptorTest { ByteStreamStub stub = ByteStreamGrpc.newStub(loggedChannel); @SuppressWarnings("unchecked") StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class); + clock.advanceMillis(10000000000L); // Write both responses. StreamObserver<WriteRequest> requester = stub.write(responseObserver); requester.onNext(request); + clock.advanceMillis(10000000000L); requester.onError(error.asRuntimeException()); Status expectedCancel = Status.CANCELLED.withCause(error.asRuntimeException()); @@ -746,6 +807,8 @@ public class LoggingInterceptorTest { .addResourceNames("test") .setNumWrites(1) .setBytesSent(3))) + .setStartTime(Timestamp.newBuilder().setSeconds(10000000)) + .setEndTime(Timestamp.newBuilder().setSeconds(20000000)) .build(); verify(logStream).write(expectedEntry); |