aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-04-15 13:48:21 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-04-15 13:48:21 -0700
commit0afc3addf4b438d32ab9a7424c0ae588a1c8e212 (patch)
tree7249af8189e2435516f2580d2dff6a1420ac1b9a
parent00f9f4ff7215ab19d96e780874ec3a557efbd198 (diff)
parent4e01659555c0179aa46f758c5f4cfa412a31de78 (diff)
Merge pull request #1285 from vjpai/timers
Inserts latency profiling timers at various interesting points.
-rw-r--r--src/core/iomgr/tcp_posix.c11
-rw-r--r--src/core/profiling/timers.c8
-rw-r--r--src/core/profiling/timers.h9
-rw-r--r--src/cpp/client/channel.cc4
-rw-r--r--src/cpp/common/call.cc5
-rw-r--r--src/cpp/server/async_server_context.cc4
-rw-r--r--src/cpp/server/server.cc5
7 files changed, 38 insertions, 8 deletions
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 597a2a62d3..86721e9c95 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -46,6 +46,7 @@
#include "src/core/support/string.h"
#include "src/core/debug/trace.h"
+#include "src/core/profiling/timers.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/slice.h>
@@ -326,6 +327,7 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
gpr_slice *final_slices;
size_t final_nslices;
+ GRPC_TIMER_MARK(HANDLE_READ_BEGIN, 0);
slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE,
0);
@@ -348,9 +350,11 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
msg.msg_controllen = 0;
msg.msg_flags = 0;
+ GRPC_TIMER_MARK(RECVMSG_BEGIN, 0);
do {
read_bytes = recvmsg(tcp->fd, &msg, 0);
} while (read_bytes < 0 && errno == EINTR);
+ GRPC_TIMER_MARK(RECVMSG_END, 0);
if (read_bytes < allocated_bytes) {
/* TODO(klempner): Consider a second read first, in hopes of getting a
@@ -402,6 +406,7 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
++iov_size;
}
}
+ GRPC_TIMER_MARK(HANDLE_READ_END, 0);
}
static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
@@ -433,10 +438,12 @@ static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) {
msg.msg_controllen = 0;
msg.msg_flags = 0;
+ GRPC_TIMER_MARK(SENDMSG_BEGIN, 0);
do {
/* TODO(klempner): Cork if this is a partial write */
sent_length = sendmsg(tcp->fd, &msg, 0);
} while (sent_length < 0 && errno == EINTR);
+ GRPC_TIMER_MARK(SENDMSG_END, 0);
if (sent_length < 0) {
if (errno == EAGAIN) {
@@ -472,6 +479,7 @@ static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) {
return;
}
+ GRPC_TIMER_MARK(CB_WRITE_BEGIN, 0);
write_status = grpc_tcp_flush(tcp);
if (write_status == GRPC_ENDPOINT_WRITE_PENDING) {
grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
@@ -487,6 +495,7 @@ static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) {
cb(tcp->write_user_data, cb_status);
grpc_tcp_unref(tcp);
}
+ GRPC_TIMER_MARK(CB_WRITE_END, 0);
}
static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
@@ -509,6 +518,7 @@ static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
}
}
+ GRPC_TIMER_MARK(WRITE_BEGIN, 0);
GPR_ASSERT(tcp->write_cb == NULL);
slice_state_init(&tcp->write_state, slices, nslices, nslices);
@@ -522,6 +532,7 @@ static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
}
+ GRPC_TIMER_MARK(WRITE_END, 0);
return status;
}
diff --git a/src/core/profiling/timers.c b/src/core/profiling/timers.c
index 478397d1bf..7cc79bd22b 100644
--- a/src/core/profiling/timers.c
+++ b/src/core/profiling/timers.c
@@ -45,7 +45,7 @@
typedef struct grpc_timer_entry {
grpc_precise_clock tm;
const char* tag;
- int seq;
+ void* id;
const char* file;
int line;
} grpc_timer_entry;
@@ -85,7 +85,7 @@ static void log_report_locked(grpc_timers_log* log) {
grpc_timer_entry* entry = &(log->log[i]);
fprintf(fp, "GRPC_LAT_PROF ");
grpc_precise_clock_print(&entry->tm, fp);
- fprintf(fp, " %s#%d,%s:%d\n", entry->tag, entry->seq, entry->file,
+ fprintf(fp, " %s %p %s %d\n", entry->tag, entry->id, entry->file,
entry->line);
}
@@ -104,7 +104,7 @@ void grpc_timers_log_destroy(grpc_timers_log* log) {
gpr_free(log);
}
-void grpc_timers_log_add(grpc_timers_log* log, const char* tag, int seq,
+void grpc_timers_log_add(grpc_timers_log* log, const char* tag, void* id,
const char* file, int line) {
grpc_timer_entry* entry;
@@ -118,7 +118,7 @@ void grpc_timers_log_add(grpc_timers_log* log, const char* tag, int seq,
grpc_precise_clock_now(&entry->tm);
entry->tag = tag;
- entry->seq = seq;
+ entry->id = id;
entry->file = file;
entry->line = line;
diff --git a/src/core/profiling/timers.h b/src/core/profiling/timers.h
index ef4cad112a..1a6b9ffac9 100644
--- a/src/core/profiling/timers.h
+++ b/src/core/profiling/timers.h
@@ -44,15 +44,16 @@ extern "C" {
typedef struct grpc_timers_log grpc_timers_log;
-grpc_timers_log *grpc_timers_log_create(int capacity_limit, FILE *dump);
-void grpc_timers_log_add(grpc_timers_log *, const char *tag, int seq,
- const char *file, int line);
+grpc_timers_log* grpc_timers_log_create(int capacity_limit, FILE* dump);
+void grpc_timers_log_add(grpc_timers_log*, const char* tag, void* id,
+ const char* file, int line);
void grpc_timers_log_destroy(grpc_timers_log *);
extern grpc_timers_log *grpc_timers_log_global;
#define GRPC_TIMER_MARK(x, s) \
- grpc_timers_log_add(grpc_timers_log_global, #x, s, __FILE__, __LINE__)
+ grpc_timers_log_add(grpc_timers_log_global, #x, ((void *)(gpr_intptr)(s)), \
+ __FILE__, __LINE__)
#else /* !GRPC_LATENCY_PROFILER */
#define GRPC_TIMER_MARK(x, s) \
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index 5380d3a232..72123abbc8 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -41,6 +41,7 @@
#include <grpc/support/log.h>
#include <grpc/support/slice.h>
+#include "src/core/profiling/timers.h"
#include "src/cpp/proto/proto_utils.h"
#include <grpc++/channel_arguments.h>
#include <grpc++/client_context.h>
@@ -65,6 +66,7 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
? target_.c_str()
: context->authority().c_str(),
context->RawDeadline());
+ GRPC_TIMER_MARK(CALL_CREATED,c_call);
context->set_call(c_call);
return Call(c_call, this, cq);
}
@@ -73,9 +75,11 @@ void Channel::PerformOpsOnCall(CallOpBuffer* buf, Call* call) {
static const size_t MAX_OPS = 8;
size_t nops = MAX_OPS;
grpc_op ops[MAX_OPS];
+ GRPC_TIMER_MARK(PERFORM_OPS_BEGIN, call->call());
buf->FillOps(ops, &nops);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(call->call(), ops, nops, buf));
+ GRPC_TIMER_MARK(PERFORM_OPS_END, call->call());
}
} // namespace grpc
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
index e75e77e0b5..9878133331 100644
--- a/src/cpp/common/call.cc
+++ b/src/cpp/common/call.cc
@@ -38,6 +38,7 @@
#include <grpc++/client_context.h>
#include <grpc++/channel_interface.h>
+#include "src/core/profiling/timers.h"
#include "src/cpp/proto/proto_utils.h"
namespace grpc {
@@ -231,11 +232,13 @@ void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) {
}
if (send_message_ || send_message_buffer_) {
if (send_message_) {
+ GRPC_TIMER_MARK(SER_PROTO_BEGIN, 0);
bool success = SerializeProto(*send_message_, &send_buf_);
if (!success) {
abort();
// TODO handle parse failure
}
+ GRPC_TIMER_MARK(SER_PROTO_END, 0);
} else {
send_buf_ = send_message_buffer_->buffer();
}
@@ -307,8 +310,10 @@ bool CallOpBuffer::FinalizeResult(void** tag, bool* status) {
if (recv_buf_) {
got_message = *status;
if (recv_message_) {
+ GRPC_TIMER_MARK(DESER_PROTO_BEGIN, 0);
*status = *status && DeserializeProto(recv_buf_, recv_message_);
grpc_byte_buffer_destroy(recv_buf_);
+ GRPC_TIMER_MARK(DESER_PROTO_END, 0);
} else {
recv_message_buffer_->set_buffer(recv_buf_);
}
diff --git a/src/cpp/server/async_server_context.cc b/src/cpp/server/async_server_context.cc
index 628822a338..e1f29452a4 100644
--- a/src/cpp/server/async_server_context.cc
+++ b/src/cpp/server/async_server_context.cc
@@ -68,9 +68,11 @@ bool AsyncServerContext::StartRead(grpc::protobuf::Message* request) {
bool AsyncServerContext::StartWrite(const grpc::protobuf::Message& response,
int flags) {
grpc_byte_buffer* buffer = nullptr;
+ GRPC_TIMER_MARK(SER_PROTO_BEGIN, call_->call());
if (!SerializeProto(response, &buffer)) {
return false;
}
+ GRPC_TIMER_MARK(SER_PROTO_END, call_->call());
grpc_call_error err = grpc_call_start_write_old(call_, buffer, this, flags);
grpc_byte_buffer_destroy(buffer);
return err == GRPC_CALL_OK;
@@ -87,7 +89,9 @@ bool AsyncServerContext::StartWriteStatus(const Status& status) {
bool AsyncServerContext::ParseRead(grpc_byte_buffer* read_buffer) {
GPR_ASSERT(request_);
+ GRPC_TIMER_MARK(DESER_PROTO_BEGIN, call_->call());
bool success = DeserializeProto(read_buffer, request_);
+ GRPC_TIMER_MARK(DESER_PROTO_END, call_->call());
request_ = nullptr;
return success;
}
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 046133c5eb..b3cd1fdd74 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -46,6 +46,7 @@
#include <grpc++/server_credentials.h>
#include <grpc++/thread_pool_interface.h>
+#include "src/core/profiling/timers.h"
#include "src/cpp/proto/proto_utils.h"
#include "src/cpp/util/time.h"
@@ -123,10 +124,12 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
std::unique_ptr<grpc::protobuf::Message> req;
std::unique_ptr<grpc::protobuf::Message> res;
if (has_request_payload_) {
+ GRPC_TIMER_MARK(DESER_PROTO_BEGIN, call_.call());
req.reset(method_->AllocateRequestProto());
if (!DeserializeProto(request_payload_, req.get())) {
abort(); // for now
}
+ GRPC_TIMER_MARK(DESER_PROTO_END, call_.call());
}
if (has_response_payload_) {
res.reset(method_->AllocateResponseProto());
@@ -340,7 +343,9 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
bool orig_status = *status;
if (*status && request_) {
if (payload_) {
+ GRPC_TIMER_MARK(DESER_PROTO_BEGIN, call_);
*status = DeserializeProto(payload_, request_);
+ GRPC_TIMER_MARK(DESER_PROTO_END, call_);
} else {
*status = false;
}