diff options
author | 2017-06-23 09:28:35 -0700 | |
---|---|---|
committer | 2017-06-23 09:35:13 -0700 | |
commit | 3168a6241afa144943a025c8be55109479a1daa5 (patch) | |
tree | df039f28927192f2ad154f852eee472e2d02aef7 | |
parent | deea43d2b42ec1b14ccafdd296308229be7dbd99 (diff) |
Add TraceMe instrumentation of RunStep in GRPC distributed runtime.
A unique ID is added to each RunStep call that allows the client and server
events to be correlated.
PiperOrigin-RevId: 159956950
5 files changed, 37 insertions, 2 deletions
diff --git a/tensorflow/core/distributed_runtime/rpc/BUILD b/tensorflow/core/distributed_runtime/rpc/BUILD index c0918ef445..bd381dd10f 100644 --- a/tensorflow/core/distributed_runtime/rpc/BUILD +++ b/tensorflow/core/distributed_runtime/rpc/BUILD @@ -193,6 +193,7 @@ cc_library( ":grpc_master_service_impl", ":grpc_util", "//tensorflow/core:lib", + "//tensorflow/core:lib_internal", "//tensorflow/core:master_proto_cc", "//tensorflow/core/distributed_runtime:call_options", "//tensorflow/core/distributed_runtime:master_interface", @@ -210,6 +211,7 @@ cc_library( ":grpc_master_service_impl", ":grpc_util", "//tensorflow/core:lib", + "//tensorflow/core:lib_internal", "//tensorflow/core:master_proto_cc", "//tensorflow/core/distributed_runtime:master", "@grpc//:grpc++_unsecure", diff --git a/tensorflow/core/distributed_runtime/rpc/grpc_call.h b/tensorflow/core/distributed_runtime/rpc/grpc_call.h index e85b8ccbd3..cb133737dd 100644 --- a/tensorflow/core/distributed_runtime/rpc/grpc_call.h +++ b/tensorflow/core/distributed_runtime/rpc/grpc_call.h @@ -233,6 +233,11 @@ class Call : public UntypedCall<Service> { RequestMessage request; ResponseMessage response; + const std::multimap<::grpc::string_ref, ::grpc::string_ref>& client_metadata() + const { + return ctx_.client_metadata(); + } + private: // Creates a completion queue tag for handling cancellation by the client. // NOTE: This method must be called before this call is enqueued on a diff --git a/tensorflow/core/distributed_runtime/rpc/grpc_master_service.cc b/tensorflow/core/distributed_runtime/rpc/grpc_master_service.cc index 07205bb2c2..41ee81c01d 100644 --- a/tensorflow/core/distributed_runtime/rpc/grpc_master_service.cc +++ b/tensorflow/core/distributed_runtime/rpc/grpc_master_service.cc @@ -40,6 +40,7 @@ limitations under the License. #include "tensorflow/core/distributed_runtime/rpc/grpc_util.h" #include "tensorflow/core/platform/logging.h" #include "tensorflow/core/platform/macros.h" +#include "tensorflow/core/platform/tracing.h" #include "tensorflow/core/protobuf/master.pb.h" namespace tensorflow { @@ -172,6 +173,7 @@ class GrpcMasterService : public AsyncServiceInterface { // RPC handler for running one step in a session. void RunStepHandler(MasterCall<RunStepRequest, RunStepResponse>* call) { + auto* trace = TraceRpc("RunStep/Server", call->client_metadata()); CallOptions* call_opts = new CallOptions; if (call->request.options().timeout_in_ms() > 0) { call_opts->SetTimeout(call->request.options().timeout_in_ms()); @@ -184,11 +186,12 @@ class GrpcMasterService : public AsyncServiceInterface { new NonOwnedProtoRunStepResponse(&call->response); call->SetCancelCallback([call_opts]() { call_opts->StartCancel(); }); master_impl_->RunStep(call_opts, wrapped_request, wrapped_response, - [call, call_opts, wrapped_request, - wrapped_response](const Status& status) { + [call, call_opts, wrapped_request, wrapped_response, + trace](const Status& status) { call->ClearCancelCallback(); delete call_opts; delete wrapped_request; + delete trace; call->SendResponse(ToGrpcStatus(status)); }); ENQUEUE_REQUEST(RunStep, true); @@ -224,6 +227,18 @@ class GrpcMasterService : public AsyncServiceInterface { } #undef ENQUEUE_REQUEST + // Start tracing, including the ID attached to the RPC. + port::Tracing::TraceMe* TraceRpc( + StringPiece name, + const std::multimap<::grpc::string_ref, ::grpc::string_ref>& metadata) { + StringPiece id; + auto it = metadata.find(GrpcIdKey()); + if (it != metadata.end()) { + id = StringPiece(it->second.data(), it->second.size()); + } + return new port::Tracing::TraceMe(name, id); + } + TF_DISALLOW_COPY_AND_ASSIGN(GrpcMasterService); }; diff --git a/tensorflow/core/distributed_runtime/rpc/grpc_remote_master.cc b/tensorflow/core/distributed_runtime/rpc/grpc_remote_master.cc index bf72d9a7fc..c04aa44941 100644 --- a/tensorflow/core/distributed_runtime/rpc/grpc_remote_master.cc +++ b/tensorflow/core/distributed_runtime/rpc/grpc_remote_master.cc @@ -23,6 +23,8 @@ limitations under the License. #include "tensorflow/core/distributed_runtime/rpc/grpc_util.h" #include "tensorflow/core/lib/core/errors.h" #include "tensorflow/core/lib/core/status.h" +#include "tensorflow/core/lib/strings/strcat.h" +#include "tensorflow/core/platform/tracing.h" #include "tensorflow/core/protobuf/master.pb.h" namespace tensorflow { @@ -66,6 +68,7 @@ class GrpcRemoteMaster : public MasterInterface { Status RunStep(CallOptions* call_options, RunStepRequestWrapper* request, MutableRunStepResponseWrapper* response) override { ::grpc::ClientContext ctx; + auto trace = TraceRpc("RunStep/Client", &ctx); ctx.set_fail_fast(false); SetDeadline(&ctx, call_options->GetTimeout()); return FromGrpcStatus(stub_->RunStep(&ctx, request->ToProto(), @@ -99,6 +102,14 @@ class GrpcRemoteMaster : public MasterInterface { } private: + // Start tracing, attaching a unique ID to both the trace and the RPC. + port::Tracing::TraceMe TraceRpc(StringPiece name, + ::grpc::ClientContext* ctx) { + string trace_id = strings::StrCat(port::Tracing::UniqueId()); + ctx->AddMetadata(GrpcIdKey(), trace_id); + return port::Tracing::TraceMe(name, trace_id); + } + std::unique_ptr<grpc::MasterService::Stub> stub_; void SetDeadline(::grpc::ClientContext* ctx, int64 time_in_ms) { diff --git a/tensorflow/core/distributed_runtime/rpc/grpc_util.h b/tensorflow/core/distributed_runtime/rpc/grpc_util.h index 44473d1150..5244cd1c13 100644 --- a/tensorflow/core/distributed_runtime/rpc/grpc_util.h +++ b/tensorflow/core/distributed_runtime/rpc/grpc_util.h @@ -43,6 +43,8 @@ inline ::grpc::Status ToGrpcStatus(const ::tensorflow::Status& s) { typedef std::shared_ptr<::grpc::Channel> SharedGrpcChannelPtr; +inline string GrpcIdKey() { return "tf-rpc"; } + } // namespace tensorflow #endif // THIRD_PARTY_TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_RPC_GRPC_UTIL_H_ |