aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar A. Unique TensorFlower <gardener@tensorflow.org>2017-06-23 09:28:35 -0700
committerGravatar TensorFlower Gardener <gardener@tensorflow.org>2017-06-23 09:35:13 -0700
commit3168a6241afa144943a025c8be55109479a1daa5 (patch)
treedf039f28927192f2ad154f852eee472e2d02aef7
parentdeea43d2b42ec1b14ccafdd296308229be7dbd99 (diff)
Add TraceMe instrumentation of RunStep in GRPC distributed runtime.
A unique ID is added to each RunStep call that allows the client and server events to be correlated. PiperOrigin-RevId: 159956950
-rw-r--r--tensorflow/core/distributed_runtime/rpc/BUILD2
-rw-r--r--tensorflow/core/distributed_runtime/rpc/grpc_call.h5
-rw-r--r--tensorflow/core/distributed_runtime/rpc/grpc_master_service.cc19
-rw-r--r--tensorflow/core/distributed_runtime/rpc/grpc_remote_master.cc11
-rw-r--r--tensorflow/core/distributed_runtime/rpc/grpc_util.h2
5 files changed, 37 insertions, 2 deletions
diff --git a/tensorflow/core/distributed_runtime/rpc/BUILD b/tensorflow/core/distributed_runtime/rpc/BUILD
index c0918ef445..bd381dd10f 100644
--- a/tensorflow/core/distributed_runtime/rpc/BUILD
+++ b/tensorflow/core/distributed_runtime/rpc/BUILD
@@ -193,6 +193,7 @@ cc_library(
":grpc_master_service_impl",
":grpc_util",
"//tensorflow/core:lib",
+ "//tensorflow/core:lib_internal",
"//tensorflow/core:master_proto_cc",
"//tensorflow/core/distributed_runtime:call_options",
"//tensorflow/core/distributed_runtime:master_interface",
@@ -210,6 +211,7 @@ cc_library(
":grpc_master_service_impl",
":grpc_util",
"//tensorflow/core:lib",
+ "//tensorflow/core:lib_internal",
"//tensorflow/core:master_proto_cc",
"//tensorflow/core/distributed_runtime:master",
"@grpc//:grpc++_unsecure",
diff --git a/tensorflow/core/distributed_runtime/rpc/grpc_call.h b/tensorflow/core/distributed_runtime/rpc/grpc_call.h
index e85b8ccbd3..cb133737dd 100644
--- a/tensorflow/core/distributed_runtime/rpc/grpc_call.h
+++ b/tensorflow/core/distributed_runtime/rpc/grpc_call.h
@@ -233,6 +233,11 @@ class Call : public UntypedCall<Service> {
RequestMessage request;
ResponseMessage response;
+ const std::multimap<::grpc::string_ref, ::grpc::string_ref>& client_metadata()
+ const {
+ return ctx_.client_metadata();
+ }
+
private:
// Creates a completion queue tag for handling cancellation by the client.
// NOTE: This method must be called before this call is enqueued on a
diff --git a/tensorflow/core/distributed_runtime/rpc/grpc_master_service.cc b/tensorflow/core/distributed_runtime/rpc/grpc_master_service.cc
index 07205bb2c2..41ee81c01d 100644
--- a/tensorflow/core/distributed_runtime/rpc/grpc_master_service.cc
+++ b/tensorflow/core/distributed_runtime/rpc/grpc_master_service.cc
@@ -40,6 +40,7 @@ limitations under the License.
#include "tensorflow/core/distributed_runtime/rpc/grpc_util.h"
#include "tensorflow/core/platform/logging.h"
#include "tensorflow/core/platform/macros.h"
+#include "tensorflow/core/platform/tracing.h"
#include "tensorflow/core/protobuf/master.pb.h"
namespace tensorflow {
@@ -172,6 +173,7 @@ class GrpcMasterService : public AsyncServiceInterface {
// RPC handler for running one step in a session.
void RunStepHandler(MasterCall<RunStepRequest, RunStepResponse>* call) {
+ auto* trace = TraceRpc("RunStep/Server", call->client_metadata());
CallOptions* call_opts = new CallOptions;
if (call->request.options().timeout_in_ms() > 0) {
call_opts->SetTimeout(call->request.options().timeout_in_ms());
@@ -184,11 +186,12 @@ class GrpcMasterService : public AsyncServiceInterface {
new NonOwnedProtoRunStepResponse(&call->response);
call->SetCancelCallback([call_opts]() { call_opts->StartCancel(); });
master_impl_->RunStep(call_opts, wrapped_request, wrapped_response,
- [call, call_opts, wrapped_request,
- wrapped_response](const Status& status) {
+ [call, call_opts, wrapped_request, wrapped_response,
+ trace](const Status& status) {
call->ClearCancelCallback();
delete call_opts;
delete wrapped_request;
+ delete trace;
call->SendResponse(ToGrpcStatus(status));
});
ENQUEUE_REQUEST(RunStep, true);
@@ -224,6 +227,18 @@ class GrpcMasterService : public AsyncServiceInterface {
}
#undef ENQUEUE_REQUEST
+ // Start tracing, including the ID attached to the RPC.
+ port::Tracing::TraceMe* TraceRpc(
+ StringPiece name,
+ const std::multimap<::grpc::string_ref, ::grpc::string_ref>& metadata) {
+ StringPiece id;
+ auto it = metadata.find(GrpcIdKey());
+ if (it != metadata.end()) {
+ id = StringPiece(it->second.data(), it->second.size());
+ }
+ return new port::Tracing::TraceMe(name, id);
+ }
+
TF_DISALLOW_COPY_AND_ASSIGN(GrpcMasterService);
};
diff --git a/tensorflow/core/distributed_runtime/rpc/grpc_remote_master.cc b/tensorflow/core/distributed_runtime/rpc/grpc_remote_master.cc
index bf72d9a7fc..c04aa44941 100644
--- a/tensorflow/core/distributed_runtime/rpc/grpc_remote_master.cc
+++ b/tensorflow/core/distributed_runtime/rpc/grpc_remote_master.cc
@@ -23,6 +23,8 @@ limitations under the License.
#include "tensorflow/core/distributed_runtime/rpc/grpc_util.h"
#include "tensorflow/core/lib/core/errors.h"
#include "tensorflow/core/lib/core/status.h"
+#include "tensorflow/core/lib/strings/strcat.h"
+#include "tensorflow/core/platform/tracing.h"
#include "tensorflow/core/protobuf/master.pb.h"
namespace tensorflow {
@@ -66,6 +68,7 @@ class GrpcRemoteMaster : public MasterInterface {
Status RunStep(CallOptions* call_options, RunStepRequestWrapper* request,
MutableRunStepResponseWrapper* response) override {
::grpc::ClientContext ctx;
+ auto trace = TraceRpc("RunStep/Client", &ctx);
ctx.set_fail_fast(false);
SetDeadline(&ctx, call_options->GetTimeout());
return FromGrpcStatus(stub_->RunStep(&ctx, request->ToProto(),
@@ -99,6 +102,14 @@ class GrpcRemoteMaster : public MasterInterface {
}
private:
+ // Start tracing, attaching a unique ID to both the trace and the RPC.
+ port::Tracing::TraceMe TraceRpc(StringPiece name,
+ ::grpc::ClientContext* ctx) {
+ string trace_id = strings::StrCat(port::Tracing::UniqueId());
+ ctx->AddMetadata(GrpcIdKey(), trace_id);
+ return port::Tracing::TraceMe(name, trace_id);
+ }
+
std::unique_ptr<grpc::MasterService::Stub> stub_;
void SetDeadline(::grpc::ClientContext* ctx, int64 time_in_ms) {
diff --git a/tensorflow/core/distributed_runtime/rpc/grpc_util.h b/tensorflow/core/distributed_runtime/rpc/grpc_util.h
index 44473d1150..5244cd1c13 100644
--- a/tensorflow/core/distributed_runtime/rpc/grpc_util.h
+++ b/tensorflow/core/distributed_runtime/rpc/grpc_util.h
@@ -43,6 +43,8 @@ inline ::grpc::Status ToGrpcStatus(const ::tensorflow::Status& s) {
typedef std::shared_ptr<::grpc::Channel> SharedGrpcChannelPtr;
+inline string GrpcIdKey() { return "tf-rpc"; }
+
} // namespace tensorflow
#endif // THIRD_PARTY_TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_RPC_GRPC_UTIL_H_