aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/channel/channel_stack.h3
-rw-r--r--src/core/channel/subchannel_call_holder.c1
-rw-r--r--src/core/channel/subchannel_call_holder.h19
-rw-r--r--src/core/client_config/subchannel.c8
-rw-r--r--src/core/client_config/subchannel.h11
-rw-r--r--src/core/iomgr/pollset_windows.c10
-rw-r--r--src/core/support/histogram.c2
-rw-r--r--src/core/surface/byte_buffer_reader.c19
-rw-r--r--src/core/surface/call.c2
-rw-r--r--src/core/transport/chttp2/frame_data.c13
-rw-r--r--src/core/transport/chttp2/frame_data.h3
-rw-r--r--src/core/transport/chttp2/internal.h2
-rw-r--r--src/core/transport/chttp2_transport.c88
-rw-r--r--src/cpp/util/string_ref.cc2
-rw-r--r--src/csharp/Grpc.Core.Tests/ClientServerTest.cs14
-rw-r--r--src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj1
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/TimespecTest.cs19
-rw-r--r--src/csharp/Grpc.Core.Tests/PerformanceTest.cs99
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.csproj7
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs106
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs42
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs14
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs6
-rw-r--r--src/csharp/Grpc.Core/Internal/Enums.cs3
-rw-r--r--src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs16
-rw-r--r--src/csharp/Grpc.Core/Internal/Timespec.cs13
-rw-r--r--src/csharp/Grpc.Core/Profiling/IProfiler.cs47
-rw-r--r--src/csharp/Grpc.Core/Profiling/ProfilerEntry.cs87
-rw-r--r--src/csharp/Grpc.Core/Profiling/ProfilerScope.cs60
-rw-r--r--src/csharp/Grpc.Core/Profiling/Profilers.cs131
31 files changed, 713 insertions, 143 deletions
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index 1279fec080..5d33ab5b42 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -214,7 +214,8 @@ void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx,
/* Destroy a call stack */
void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack);
-/* Ignore set pollset */
+/* Ignore set pollset - used by filters to implement the set_pollset method
+ if they don't care about pollsets at all. Does nothing. */
void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_pollset *pollset);
diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c
index d39a099234..7251714519 100644
--- a/src/core/channel/subchannel_call_holder.c
+++ b/src/core/channel/subchannel_call_holder.c
@@ -123,6 +123,7 @@ retry:
} else {
switch (holder->creation_phase) {
case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
+ fail_locked(exec_ctx, holder);
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL:
grpc_subchannel_cancel_create_call(exec_ctx, holder->subchannel,
diff --git a/src/core/channel/subchannel_call_holder.h b/src/core/channel/subchannel_call_holder.h
index 3dd43c9c3f..bda051c566 100644
--- a/src/core/channel/subchannel_call_holder.h
+++ b/src/core/channel/subchannel_call_holder.h
@@ -36,6 +36,10 @@
#include "src/core/client_config/subchannel.h"
+/** Pick a subchannel for grpc_subchannel_call_holder;
+ Return 1 if subchannel is available immediately (in which case on_ready
+ should not be called), or 0 otherwise (in which case on_ready should be
+ called when the subchannel is available) */
typedef int (*grpc_subchannel_call_holder_pick_subchannel)(
grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata,
grpc_subchannel **subchannel, grpc_closure *on_ready);
@@ -46,10 +50,21 @@ typedef enum {
GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL
} grpc_subchannel_call_holder_creation_phase;
+/** Wrapper for holding a pointer to grpc_subchannel_call, and the
+ associated machinery to create such a pointer.
+ Handles queueing of stream ops until a call object is ready, waiting
+ for initial metadata before trying to create a call object,
+ and handling cancellation gracefully.
+
+ Both the channel and uchannel filter use this as their call_data. */
typedef struct grpc_subchannel_call_holder {
- /* either 0 for no call, 1 for cancelled, or a pointer to a
- grpc_subchannel_call */
+ /** either 0 for no call, 1 for cancelled, or a pointer to a
+ grpc_subchannel_call */
gpr_atm subchannel_call;
+ /** Helper function to choose the subchannel on which to create
+ the call object. Channel filter delegates to the load
+ balancing policy (once it's ready); uchannel returns
+ immediately */
grpc_subchannel_call_holder_pick_subchannel pick_subchannel;
void *pick_subchannel_arg;
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index e911a46faf..49c2cf9a19 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -22,7 +22,7 @@
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMA`S (INCLUDING, BUT NOT
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
@@ -395,12 +395,12 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
- int call_creation_status;
+ int call_creation_finished_ok;
waiting_for_connect *w4c = arg;
grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset);
- call_creation_status = grpc_subchannel_create_call(
+ call_creation_finished_ok = grpc_subchannel_create_call(
exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify);
- GPR_ASSERT(call_creation_status == 1);
+ GPR_ASSERT(call_creation_finished_ok == 1);
w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success);
GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
gpr_free(w4c);
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 4769a6de68..1fefa1888a 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -77,12 +77,11 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
/** construct a subchannel call (possibly asynchronously).
*
- * If the returned status is \a GRPC_SUBCHANNEL_CALL_CREATE_READY, the call will
- * return immediately and \a target will point to a connected \a subchannel_call
- * instance. Note that \a notify will \em not be invoked in this case.
- * Otherwise, if the returned status is GRPC_SUBCHANNEL_CALL_CREATE_PENDING, the
- * subchannel call will be created asynchronously, invoking the \a notify
- * callback upon completion. */
+ * If the returned status is 1, the call will return immediately and \a target
+ * will point to a connected \a subchannel_call instance. Note that \a notify
+ * will \em not be invoked in this case.
+ * Otherwise, if the returned status is 0, the subchannel call will be created
+ * asynchronously, invoking the \a notify callback upon completion. */
int grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx,
grpc_subchannel *subchannel,
grpc_pollset *pollset, gpr_atm *target,
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index 4443a6a9a7..c3f310ee27 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -35,6 +35,7 @@
#ifdef GPR_WINSOCK_SOCKET
+#include <grpc/support/log.h>
#include <grpc/support/thd.h>
#include "src/core/iomgr/timer_internal.h"
@@ -123,7 +124,14 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
void grpc_pollset_destroy(grpc_pollset *pollset) {}
-void grpc_pollset_reset(grpc_pollset *pollset) {}
+void grpc_pollset_reset(grpc_pollset *pollset) {
+ GPR_ASSERT(pollset->shutting_down);
+ GPR_ASSERT(!has_workers(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET));
+ pollset->shutting_down = 0;
+ pollset->is_iocp_worker = 0;
+ pollset->kicked_without_pollers = 0;
+ pollset->on_shutdown = NULL;
+}
void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker, gpr_timespec now,
diff --git a/src/core/support/histogram.c b/src/core/support/histogram.c
index 8a1a9d9233..77b48af996 100644
--- a/src/core/support/histogram.c
+++ b/src/core/support/histogram.c
@@ -125,7 +125,7 @@ void gpr_histogram_add(gpr_histogram *h, double x) {
h->buckets[bucket_for(h, x)]++;
}
-int gpr_histogram_merge(gpr_histogram *dst, gpr_histogram *src) {
+int gpr_histogram_merge(gpr_histogram *dst, const gpr_histogram *src) {
if ((dst->num_buckets != src->num_buckets) ||
(dst->multiplier != src->multiplier)) {
/* Fail because these histograms don't match */
diff --git a/src/core/surface/byte_buffer_reader.c b/src/core/surface/byte_buffer_reader.c
index 283db83833..9f830df68c 100644
--- a/src/core/surface/byte_buffer_reader.c
+++ b/src/core/surface/byte_buffer_reader.c
@@ -31,6 +31,7 @@
*
*/
+#include <string.h>
#include <grpc/byte_buffer_reader.h>
#include <grpc/compression.h>
@@ -103,3 +104,21 @@ int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
}
return 0;
}
+
+gpr_slice grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader *reader) {
+ gpr_slice in_slice;
+ size_t bytes_read = 0;
+ const size_t input_size = grpc_byte_buffer_length(reader->buffer_out);
+ gpr_slice out_slice = gpr_slice_malloc(input_size);
+ gpr_uint8 *const outbuf = GPR_SLICE_START_PTR(out_slice); /* just an alias */
+
+ while (grpc_byte_buffer_reader_next(reader, &in_slice) != 0) {
+ const size_t slice_length = GPR_SLICE_LENGTH(in_slice);
+ memcpy(&(outbuf[bytes_read]), GPR_SLICE_START_PTR(in_slice), slice_length);
+ bytes_read += slice_length;
+ gpr_slice_unref(in_slice);
+ GPR_ASSERT(bytes_read <= input_size);
+ }
+ return out_slice;
+}
+
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 056d49064e..aa435d44d3 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -944,12 +944,12 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
grpc_call *call = bctl->call;
if (bctl->is_notify_tag_closure) {
+ grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success);
gpr_mu_lock(&call->mu);
bctl->call->used_batches =
(gpr_uint8)(bctl->call->used_batches &
~(gpr_uint8)(1 << (bctl - bctl->call->active_batches)));
gpr_mu_unlock(&call->mu);
- grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success);
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
} else {
grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, bctl->success,
diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c
index 7287f97aaa..e07fbb2cc7 100644
--- a/src/core/transport/chttp2/frame_data.c
+++ b/src/core/transport/chttp2/frame_data.c
@@ -45,11 +45,16 @@
grpc_chttp2_parse_error grpc_chttp2_data_parser_init(
grpc_chttp2_data_parser *parser) {
parser->state = GRPC_CHTTP2_DATA_FH_0;
+ parser->parsing_frame = NULL;
return GRPC_CHTTP2_PARSE_OK;
}
-void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser *parser) {
+void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_data_parser *parser) {
grpc_byte_stream *bs;
+ if (parser->parsing_frame) {
+ grpc_chttp2_incoming_byte_stream_finished(exec_ctx, parser->parsing_frame);
+ }
while (
(bs = grpc_chttp2_incoming_frame_queue_pop(&parser->incoming_frames))) {
grpc_byte_stream_destroy(bs);
@@ -198,8 +203,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
}
p->parsing_frame = incoming_byte_stream =
grpc_chttp2_incoming_byte_stream_create(
- transport_parsing, stream_parsing, p->frame_size, message_flags,
- &p->incoming_frames);
+ exec_ctx, transport_parsing, stream_parsing, p->frame_size,
+ message_flags, &p->incoming_frames);
/* fallthrough */
case GRPC_CHTTP2_DATA_FRAME:
if (cur == end) {
@@ -214,6 +219,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
exec_ctx, p->parsing_frame,
gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame);
+ p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
return GRPC_CHTTP2_PARSE_OK;
} else if ((gpr_uint32)(end - cur) > p->frame_size) {
@@ -222,6 +228,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
gpr_slice_sub(slice, (size_t)(cur - beg),
(size_t)(cur + p->frame_size - beg)));
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame);
+ p->parsing_frame = NULL;
cur += p->frame_size;
goto fh_0; /* loop */
} else {
diff --git a/src/core/transport/chttp2/frame_data.h b/src/core/transport/chttp2/frame_data.h
index bc32f29d97..472f9cebdb 100644
--- a/src/core/transport/chttp2/frame_data.h
+++ b/src/core/transport/chttp2/frame_data.h
@@ -80,7 +80,8 @@ grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop(
grpc_chttp2_parse_error grpc_chttp2_data_parser_init(
grpc_chttp2_data_parser *parser);
-void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser *parser);
+void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_data_parser *parser);
/* start processing a new data frame */
grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index b53c9dee0b..2d0cb4abdb 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -738,7 +738,7 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
#endif
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
- grpc_chttp2_transport_parsing *transport_parsing,
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size,
gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue);
void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 3d98a4fb14..f62294c7c5 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -512,7 +512,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
GPR_ASSERT(s->global.recv_initial_metadata_finished == NULL);
GPR_ASSERT(s->global.recv_message_ready == NULL);
GPR_ASSERT(s->global.recv_trailing_metadata_finished == NULL);
- grpc_chttp2_data_parser_destroy(&s->parsing.data_parser);
+ grpc_chttp2_data_parser_destroy(exec_ctx, &s->parsing.data_parser);
grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[0]);
grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[1]);
grpc_chttp2_incoming_metadata_buffer_destroy(
@@ -806,7 +806,8 @@ static void perform_stream_op_locked(
}
if (stream_global->write_closed) {
grpc_chttp2_complete_closure_step(
- exec_ctx, &stream_global->send_trailing_metadata_finished, 0);
+ exec_ctx, &stream_global->send_trailing_metadata_finished,
+ grpc_metadata_batch_is_empty(op->send_trailing_metadata));
} else if (stream_global->id != 0) {
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
@@ -1364,6 +1365,46 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
* BYTE STREAM
*/
+static void incoming_byte_stream_update_flow_control(
+ grpc_chttp2_transport_global *transport_global,
+ grpc_chttp2_stream_global *stream_global, size_t max_size_hint,
+ size_t have_already) {
+ gpr_uint32 max_recv_bytes;
+
+ /* clamp max recv hint to an allowable size */
+ if (max_size_hint >= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD) {
+ max_recv_bytes = GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD;
+ } else {
+ max_recv_bytes = (gpr_uint32)max_size_hint;
+ }
+
+ /* account for bytes already received but unknown to higher layers */
+ if (max_recv_bytes >= have_already) {
+ max_recv_bytes -= (gpr_uint32)have_already;
+ } else {
+ max_recv_bytes = 0;
+ }
+
+ /* add some small lookahead to keep pipelines flowing */
+ GPR_ASSERT(max_recv_bytes <= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD);
+ max_recv_bytes += GRPC_CHTTP2_STREAM_LOOKAHEAD;
+ if (stream_global->max_recv_bytes < max_recv_bytes) {
+ gpr_uint32 add_max_recv_bytes =
+ max_recv_bytes - stream_global->max_recv_bytes;
+ GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
+ max_recv_bytes, add_max_recv_bytes);
+ GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
+ unannounced_incoming_window_for_parse,
+ add_max_recv_bytes);
+ GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
+ unannounced_incoming_window_for_writing,
+ add_max_recv_bytes);
+ grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global,
+ stream_global);
+ grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ }
+}
+
static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
gpr_slice *slice, size_t max_size_hint,
@@ -1372,41 +1413,11 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
(grpc_chttp2_incoming_byte_stream *)byte_stream;
grpc_chttp2_transport_global *transport_global = &bs->transport->global;
grpc_chttp2_stream_global *stream_global = &bs->stream->global;
- gpr_uint32 max_recv_bytes;
lock(bs->transport);
if (bs->is_tail) {
- /* clamp max recv hint to an allowable size */
- if (max_size_hint >= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD) {
- max_recv_bytes = GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD;
- } else {
- max_recv_bytes = (gpr_uint32)max_size_hint;
- }
-
- /* account for bytes already received but unknown to higher layers */
- if (max_recv_bytes >= bs->slices.length) {
- max_recv_bytes -= (gpr_uint32)bs->slices.length;
- } else {
- max_recv_bytes = 0;
- }
- /* add some small lookahead to keep pipelines flowing */
- GPR_ASSERT(max_recv_bytes <= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD);
- max_recv_bytes += GRPC_CHTTP2_STREAM_LOOKAHEAD;
- if (stream_global->max_recv_bytes < max_recv_bytes) {
- gpr_uint32 add_max_recv_bytes =
- max_recv_bytes - stream_global->max_recv_bytes;
- GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
- max_recv_bytes, add_max_recv_bytes);
- GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
- unannounced_incoming_window_for_parse,
- add_max_recv_bytes);
- GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
- unannounced_incoming_window_for_writing,
- add_max_recv_bytes);
- grpc_chttp2_list_add_unannounced_incoming_window_available(
- transport_global, stream_global);
- grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
- }
+ incoming_byte_stream_update_flow_control(transport_global, stream_global,
+ max_size_hint, bs->slices.length);
}
if (bs->slices.count > 0) {
*slice = gpr_slice_buffer_take_first(&bs->slices);
@@ -1451,7 +1462,7 @@ void grpc_chttp2_incoming_byte_stream_finished(
}
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
- grpc_chttp2_transport_parsing *transport_parsing,
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size,
gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue) {
grpc_chttp2_incoming_byte_stream *incoming_byte_stream =
@@ -1474,6 +1485,13 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
add_to_queue->tail->next_message = incoming_byte_stream;
}
add_to_queue->tail = incoming_byte_stream;
+ if (frame_size == 0) {
+ lock(TRANSPORT_FROM_PARSING(transport_parsing));
+ incoming_byte_stream_update_flow_control(
+ &TRANSPORT_FROM_PARSING(transport_parsing)->global,
+ &STREAM_FROM_PARSING(stream_parsing)->global, 0, 0);
+ unlock(exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing));
+ }
return incoming_byte_stream;
}
diff --git a/src/cpp/util/string_ref.cc b/src/cpp/util/string_ref.cc
index 604134fa9d..66c79a1818 100644
--- a/src/cpp/util/string_ref.cc
+++ b/src/cpp/util/string_ref.cc
@@ -40,7 +40,7 @@
namespace grpc {
-const size_t string_ref::npos;
+const size_t string_ref::npos = size_t(-1);
string_ref& string_ref::operator=(const string_ref& rhs) {
data_ = rhs.data_;
diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
index e58528ff50..25a5a27c8e 100644
--- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
@@ -38,6 +38,7 @@ using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
+using Grpc.Core.Profiling;
using Grpc.Core.Utils;
using NUnit.Framework;
@@ -200,19 +201,6 @@ namespace Grpc.Core.Tests
Assert.AreEqual(headers[1].Key, trailers[1].Key);
CollectionAssert.AreEqual(headers[1].ValueBytes, trailers[1].ValueBytes);
}
-
- [Test]
- public void UnaryCallPerformance()
- {
- helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
- {
- return request;
- });
-
- var callDetails = helper.CreateUnaryCall();
- BenchmarkUtil.RunBenchmark(1, 10,
- () => { Calls.BlockingUnaryCall(callDetails, "ABC"); });
- }
[Test]
public void UnknownMethodHandler()
diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
index 91d072abab..e5ffa31989 100644
--- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
+++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
@@ -88,6 +88,7 @@
<Compile Include="CompressionTest.cs" />
<Compile Include="ContextPropagationTest.cs" />
<Compile Include="MetadataTest.cs" />
+ <Compile Include="PerformanceTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
diff --git a/src/csharp/Grpc.Core.Tests/Internal/TimespecTest.cs b/src/csharp/Grpc.Core.Tests/Internal/TimespecTest.cs
index 874df02baa..9be5450d81 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/TimespecTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/TimespecTest.cs
@@ -34,6 +34,7 @@
using System;
using System.Runtime.InteropServices;
using Grpc.Core.Internal;
+using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
@@ -198,5 +199,23 @@ namespace Grpc.Core.Internal.Tests
Console.WriteLine("Test cannot be run on this platform, skipping the test.");
}
}
+
+ // Test attribute commented out to prevent running as part of the default test suite.
+ // [Test]
+ // [Category("Performance")]
+ public void NowBenchmark()
+ {
+ // approx Timespec.Now latency <33ns
+ BenchmarkUtil.RunBenchmark(10000000, 1000000000, () => { var now = Timespec.Now; });
+ }
+
+ // Test attribute commented out to prevent running as part of the default test suite.
+ // [Test]
+ // [Category("Performance")]
+ public void PreciseNowBenchmark()
+ {
+ // approx Timespec.PreciseNow latency <18ns (when compiled with GRPC_TIMERS_RDTSC)
+ BenchmarkUtil.RunBenchmark(10000000, 1000000000, () => { var now = Timespec.PreciseNow; });
+ }
}
}
diff --git a/src/csharp/Grpc.Core.Tests/PerformanceTest.cs b/src/csharp/Grpc.Core.Tests/PerformanceTest.cs
new file mode 100644
index 0000000000..5516cd3377
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/PerformanceTest.cs
@@ -0,0 +1,99 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Profiling;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Tests
+{
+ public class PerformanceTest
+ {
+ const string Host = "127.0.0.1";
+
+ MockServiceHelper helper;
+ Server server;
+ Channel channel;
+
+ [SetUp]
+ public void Init()
+ {
+ helper = new MockServiceHelper(Host);
+ server = helper.GetServer();
+ server.Start();
+ channel = helper.GetChannel();
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ channel.ShutdownAsync().Wait();
+ server.ShutdownAsync().Wait();
+ }
+
+ // Test attribute commented out to prevent running as part of the default test suite.
+ //[Test]
+ //[Category("Performance")]
+ public void UnaryCallPerformance()
+ {
+ var profiler = new BasicProfiler();
+ Profilers.SetForCurrentThread(profiler);
+
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ return request;
+ });
+
+ var callDetails = helper.CreateUnaryCall();
+ for(int i = 0; i < 3000; i++)
+ {
+ Calls.BlockingUnaryCall(callDetails, "ABC");
+ }
+
+ profiler.Reset();
+
+ for(int i = 0; i < 3000; i++)
+ {
+ Calls.BlockingUnaryCall(callDetails, "ABC");
+ }
+ profiler.Dump("latency_trace_csharp.txt");
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index 92d4e19eac..0aab7bdd8a 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -119,6 +119,10 @@
<Compile Include="CompressionLevel.cs" />
<Compile Include="WriteOptions.cs" />
<Compile Include="ContextPropagationToken.cs" />
+ <Compile Include="Profiling\ProfilerEntry.cs" />
+ <Compile Include="Profiling\ProfilerScope.cs" />
+ <Compile Include="Profiling\IProfiler.cs" />
+ <Compile Include="Profiling\Profilers.cs" />
</ItemGroup>
<ItemGroup>
<None Include="Grpc.Core.nuspec" />
@@ -150,4 +154,7 @@
<Import Project="..\packages\grpc.dependencies.openssl.redist.1.0.2.2\build\portable-net45\grpc.dependencies.openssl.redist.targets" Condition="Exists('..\packages\grpc.dependencies.openssl.redist.1.0.2.2\build\portable-net45\grpc.dependencies.openssl.redist.targets')" />
<Import Project="..\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\portable-net45\grpc.dependencies.zlib.redist.targets" Condition="Exists('..\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\portable-net45\grpc.dependencies.zlib.redist.targets')" />
<ItemGroup />
+ <ItemGroup>
+ <Folder Include="Profiling\" />
+ </ItemGroup>
</Project> \ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 800462c854..e3ecc47282 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -39,6 +39,7 @@ using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
+using Grpc.Core.Profiling;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
@@ -87,6 +88,9 @@ namespace Grpc.Core.Internal
/// </summary>
public TResponse UnaryCall(TRequest msg)
{
+ var profiler = Profilers.ForCurrentThread();
+
+ using (profiler.NewScope("AsyncCall.UnaryCall"))
using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create())
{
byte[] payload = UnsafeSerialize(msg);
@@ -104,24 +108,26 @@ namespace Grpc.Core.Internal
}
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
+ using (var ctx = BatchContextSafeHandle.Create())
{
- using (var ctx = BatchContextSafeHandle.Create())
- {
- call.StartUnary(ctx, payload, metadataArray, GetWriteFlagsForCall());
- var ev = cq.Pluck(ctx.Handle);
+ call.StartUnary(ctx, payload, metadataArray, GetWriteFlagsForCall());
+
+ var ev = cq.Pluck(ctx.Handle);
- bool success = (ev.success != 0);
- try
+ bool success = (ev.success != 0);
+ try
+ {
+ using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch"))
{
HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata());
}
- catch (Exception e)
- {
- Logger.Error(e, "Exception occured while invoking completion delegate.");
- }
+ }
+ catch (Exception e)
+ {
+ Logger.Error(e, "Exception occured while invoking completion delegate.");
}
}
-
+
// Once the blocking call returns, the result should be available synchronously.
// Note that GetAwaiter().GetResult() doesn't wrap exceptions in AggregateException.
return unaryResponseTcs.Task.GetAwaiter().GetResult();
@@ -329,27 +335,35 @@ namespace Grpc.Core.Internal
private void Initialize(CompletionQueueSafeHandle cq)
{
- var call = CreateNativeCall(cq);
- details.Channel.AddCallReference(this);
- InitializeInternal(call);
- RegisterCancellationCallback();
+ using (Profilers.ForCurrentThread().NewScope("AsyncCall.Initialize"))
+ {
+ var call = CreateNativeCall(cq);
+
+ details.Channel.AddCallReference(this);
+ InitializeInternal(call);
+ RegisterCancellationCallback();
+ }
}
private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq)
{
- if (injectedNativeCall != null)
- {
- return injectedNativeCall; // allows injecting a mock INativeCall in tests.
- }
+ using (Profilers.ForCurrentThread().NewScope("AsyncCall.CreateNativeCall"))
+ {
+ if (injectedNativeCall != null)
+ {
+ return injectedNativeCall; // allows injecting a mock INativeCall in tests.
+ }
- var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance;
+ var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance;
- var credentials = details.Options.Credentials;
- using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null)
- {
- return details.Channel.Handle.CreateCall(environment.CompletionRegistry,
- parentCall, ContextPropagationToken.DefaultMask, cq,
- details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials);
+ var credentials = details.Options.Credentials;
+ using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null)
+ {
+ var result = details.Channel.Handle.CreateCall(environment.CompletionRegistry,
+ parentCall, ContextPropagationToken.DefaultMask, cq,
+ details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials);
+ return result;
+ }
}
}
@@ -385,33 +399,37 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)
{
- TResponse msg = default(TResponse);
- var deserializeException = success ? TryDeserialize(receivedMessage, out msg) : null;
-
- lock (myLock)
+ using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryResponse"))
{
- finished = true;
+ TResponse msg = default(TResponse);
+ var deserializeException = success ? TryDeserialize(receivedMessage, out msg) : null;
- if (deserializeException != null && receivedStatus.Status.StatusCode == StatusCode.OK)
+ lock (myLock)
{
- receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers);
+ finished = true;
+
+ if (deserializeException != null && receivedStatus.Status.StatusCode == StatusCode.OK)
+ {
+ receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers);
+ }
+ finishedStatus = receivedStatus;
+
+ ReleaseResourcesIfPossible();
+
}
- finishedStatus = receivedStatus;
- ReleaseResourcesIfPossible();
- }
+ responseHeadersTcs.SetResult(responseHeaders);
- responseHeadersTcs.SetResult(responseHeaders);
+ var status = receivedStatus.Status;
- var status = receivedStatus.Status;
+ if (!success || status.StatusCode != StatusCode.OK)
+ {
+ unaryResponseTcs.SetException(new RpcException(status));
+ return;
+ }
- if (!success || status.StatusCode != StatusCode.OK)
- {
- unaryResponseTcs.SetException(new RpcException(status));
- return;
+ unaryResponseTcs.SetResult(msg);
}
-
- unaryResponseTcs.SetResult(msg);
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 3e2c57c9b5..953f61aa1e 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -41,6 +41,7 @@ using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
+using Grpc.Core.Profiling;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
@@ -167,16 +168,19 @@ namespace Grpc.Core.Internal
/// </summary>
protected bool ReleaseResourcesIfPossible()
{
- if (!disposed && call != null)
+ using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.ReleaseResourcesIfPossible"))
{
- bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished);
- if (noMoreSendCompletions && readingDone && finished)
+ if (!disposed && call != null)
{
- ReleaseResources();
- return true;
+ bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished);
+ if (noMoreSendCompletions && readingDone && finished)
+ {
+ ReleaseResources();
+ return true;
+ }
}
+ return false;
}
- return false;
}
protected abstract bool IsClient
@@ -228,7 +232,10 @@ namespace Grpc.Core.Internal
protected byte[] UnsafeSerialize(TWrite msg)
{
- return serializer(msg);
+ using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.UnsafeSerialize"))
+ {
+ return serializer(msg);
+ }
}
protected Exception TrySerialize(TWrite msg, out byte[] payload)
@@ -247,15 +254,20 @@ namespace Grpc.Core.Internal
protected Exception TryDeserialize(byte[] payload, out TRead msg)
{
- try
- {
- msg = deserializer(payload);
- return null;
- }
- catch (Exception e)
+ using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.TryDeserialize"))
{
- msg = default(TRead);
- return e;
+ try
+ {
+
+ msg = deserializer(payload);
+ return null;
+
+ }
+ catch (Exception e)
+ {
+ msg = default(TRead);
+ return e;
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 0be7a4dd3a..ddeedebd11 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -34,6 +34,7 @@ using System.Diagnostics;
using System.Runtime.InteropServices;
using Grpc.Core;
using Grpc.Core.Utils;
+using Grpc.Core.Profiling;
namespace Grpc.Core.Internal
{
@@ -131,8 +132,11 @@ namespace Grpc.Core.Internal
public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
- grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
- .CheckOk();
+ using (Profilers.ForCurrentThread().NewScope("CallSafeHandle.StartUnary"))
+ {
+ grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
+ .CheckOk();
+ }
}
public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray)
diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
index d270d77526..5f9169bcb2 100644
--- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
@@ -32,6 +32,7 @@ using System;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
+using Grpc.Core.Profiling;
namespace Grpc.Core.Internal
{
@@ -84,13 +85,16 @@ namespace Grpc.Core.Internal
public CallSafeHandle CreateCall(CompletionRegistry registry, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline, CredentialsSafeHandle credentials)
{
- var result = grpcsharp_channel_create_call(this, parentCall, propagationMask, cq, method, host, deadline);
- if (credentials != null)
+ using (Profilers.ForCurrentThread().NewScope("ChannelSafeHandle.CreateCall"))
{
- result.SetCredentials(credentials);
+ var result = grpcsharp_channel_create_call(this, parentCall, propagationMask, cq, method, host, deadline);
+ if (credentials != null)
+ {
+ result.SetCredentials(credentials);
+ }
+ result.SetCompletionRegistry(registry);
+ return result;
}
- result.SetCompletionRegistry(registry);
- return result;
}
public ChannelState CheckConnectivityState(bool tryToConnect)
diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
index f7a3471bb4..9de2bc7950 100644
--- a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
@@ -31,6 +31,7 @@
using System;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
+using Grpc.Core.Profiling;
namespace Grpc.Core.Internal
{
@@ -70,7 +71,10 @@ namespace Grpc.Core.Internal
public CompletionQueueEvent Pluck(IntPtr tag)
{
- return grpcsharp_completion_queue_pluck(this, tag);
+ using (Profilers.ForCurrentThread().NewScope("CompletionQueueSafeHandle.Pluck"))
+ {
+ return grpcsharp_completion_queue_pluck(this, tag);
+ }
}
public void Shutdown()
diff --git a/src/csharp/Grpc.Core/Internal/Enums.cs b/src/csharp/Grpc.Core/Internal/Enums.cs
index 185098160b..b0eab2001b 100644
--- a/src/csharp/Grpc.Core/Internal/Enums.cs
+++ b/src/csharp/Grpc.Core/Internal/Enums.cs
@@ -102,6 +102,9 @@ namespace Grpc.Core.Internal
/* Realtime clock */
Realtime,
+ /* Precise clock good for performance profiling. */
+ Precise,
+
/* Timespan - the distance between two time points */
Timespan
}
diff --git a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
index 31b834c979..ed1bd24498 100644
--- a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
@@ -31,6 +31,7 @@
using System;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
+using Grpc.Core.Profiling;
namespace Grpc.Core.Internal
{
@@ -66,14 +67,17 @@ namespace Grpc.Core.Internal
public static MetadataArraySafeHandle Create(Metadata metadata)
{
- // TODO(jtattermusch): we might wanna check that the metadata is readonly
- var metadataArray = grpcsharp_metadata_array_create(new UIntPtr((ulong)metadata.Count));
- for (int i = 0; i < metadata.Count; i++)
+ using (Profilers.ForCurrentThread().NewScope("MetadataArraySafeHandle.Create"))
{
- var valueBytes = metadata[i].GetSerializedValueUnsafe();
- grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, valueBytes, new UIntPtr((ulong)valueBytes.Length));
+ // TODO(jtattermusch): we might wanna check that the metadata is readonly
+ var metadataArray = grpcsharp_metadata_array_create(new UIntPtr((ulong)metadata.Count));
+ for (int i = 0; i < metadata.Count; i++)
+ {
+ var valueBytes = metadata[i].GetSerializedValueUnsafe();
+ grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, valueBytes, new UIntPtr((ulong)valueBytes.Length));
+ }
+ return metadataArray;
}
- return metadataArray;
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/Internal/Timespec.cs b/src/csharp/Grpc.Core/Internal/Timespec.cs
index daf85d5f61..38fc067d9f 100644
--- a/src/csharp/Grpc.Core/Internal/Timespec.cs
+++ b/src/csharp/Grpc.Core/Internal/Timespec.cs
@@ -239,6 +239,19 @@ namespace Grpc.Core.Internal
}
}
+ /// <summary>
+ /// Gets current timestamp using <c>GPRClockType.Precise</c>.
+ /// Only available internally because core needs to be compiled with
+ /// GRPC_TIMERS_RDTSC support for this to use RDTSC.
+ /// </summary>
+ internal static Timespec PreciseNow
+ {
+ get
+ {
+ return gprsharp_now(GPRClockType.Precise);
+ }
+ }
+
internal static int NativeSize
{
get
diff --git a/src/csharp/Grpc.Core/Profiling/IProfiler.cs b/src/csharp/Grpc.Core/Profiling/IProfiler.cs
new file mode 100644
index 0000000000..c426c365d2
--- /dev/null
+++ b/src/csharp/Grpc.Core/Profiling/IProfiler.cs
@@ -0,0 +1,47 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.IO;
+using System.Threading;
+using Grpc.Core.Internal;
+
+namespace Grpc.Core.Profiling
+{
+ internal interface IProfiler
+ {
+ void Begin(string tag);
+ void End(string tag);
+ void Mark(string tag);
+ }
+}
diff --git a/src/csharp/Grpc.Core/Profiling/ProfilerEntry.cs b/src/csharp/Grpc.Core/Profiling/ProfilerEntry.cs
new file mode 100644
index 0000000000..5cc4c3c054
--- /dev/null
+++ b/src/csharp/Grpc.Core/Profiling/ProfilerEntry.cs
@@ -0,0 +1,87 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.IO;
+using System.Threading;
+using Grpc.Core.Internal;
+
+namespace Grpc.Core.Profiling
+{
+ internal struct ProfilerEntry
+ {
+ public enum Type {
+ BEGIN,
+ END,
+ MARK
+ }
+
+ public ProfilerEntry(Timespec timespec, Type type, string tag)
+ {
+ this.timespec = timespec;
+ this.type = type;
+ this.tag = tag;
+ }
+
+ public Timespec timespec;
+ public Type type;
+ public string tag;
+
+ public override string ToString()
+ {
+ // mimic the output format used by C core.
+ return string.Format(
+ "{{\"t\": {0}.{1}, \"thd\":\"unknown\", \"type\": \"{2}\", \"tag\": \"{3}\", " +
+ "\"file\": \"unknown\", \"line\": 0, \"imp\": 0}}",
+ timespec.TimevalSeconds, timespec.TimevalNanos.ToString("D9"),
+ GetTypeAbbreviation(type), tag);
+ }
+
+ internal static string GetTypeAbbreviation(Type type)
+ {
+ switch (type)
+ {
+ case Type.BEGIN:
+ return "{";
+
+ case Type.END:
+ return "}";
+
+ case Type.MARK:
+ return ".";
+ default:
+ throw new ArgumentException("Unknown type");
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Profiling/ProfilerScope.cs b/src/csharp/Grpc.Core/Profiling/ProfilerScope.cs
new file mode 100644
index 0000000000..413f3a1a35
--- /dev/null
+++ b/src/csharp/Grpc.Core/Profiling/ProfilerScope.cs
@@ -0,0 +1,60 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.IO;
+using System.Threading;
+using Grpc.Core.Internal;
+
+namespace Grpc.Core.Profiling
+{
+ // Allows declaring Begin and End of a profiler scope with a using statement.
+ // declared as struct for better performance.
+ internal struct ProfilerScope : IDisposable
+ {
+ readonly IProfiler profiler;
+ readonly string tag;
+
+ public ProfilerScope(IProfiler profiler, string tag)
+ {
+ this.profiler = profiler;
+ this.tag = tag;
+ this.profiler.Begin(this.tag);
+ }
+
+ public void Dispose()
+ {
+ profiler.End(tag);
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Profiling/Profilers.cs b/src/csharp/Grpc.Core/Profiling/Profilers.cs
new file mode 100644
index 0000000000..c8123347f2
--- /dev/null
+++ b/src/csharp/Grpc.Core/Profiling/Profilers.cs
@@ -0,0 +1,131 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.IO;
+using System.Threading;
+using Grpc.Core.Internal;
+
+namespace Grpc.Core.Profiling
+{
+ internal static class Profilers
+ {
+ static readonly NopProfiler defaultProfiler = new NopProfiler();
+ static readonly ThreadLocal<IProfiler> profilers = new ThreadLocal<IProfiler>();
+
+ public static IProfiler ForCurrentThread()
+ {
+ return profilers.Value ?? defaultProfiler;
+ }
+
+ public static void SetForCurrentThread(IProfiler profiler)
+ {
+ profilers.Value = profiler;
+ }
+
+ public static ProfilerScope NewScope(this IProfiler profiler, string tag)
+ {
+ return new ProfilerScope(profiler, tag);
+ }
+ }
+
+ internal class NopProfiler : IProfiler
+ {
+ public void Begin(string tag)
+ {
+ }
+
+ public void End(string tag)
+ {
+ }
+
+ public void Mark(string tag)
+ {
+ }
+ }
+
+ // Profiler using Timespec.PreciseNow
+ internal class BasicProfiler : IProfiler
+ {
+ ProfilerEntry[] entries;
+ int count;
+
+ public BasicProfiler() : this(1024*1024)
+ {
+ }
+
+ public BasicProfiler(int capacity)
+ {
+ this.entries = new ProfilerEntry[capacity];
+ }
+
+ public void Begin(string tag) {
+ AddEntry(new ProfilerEntry(Timespec.PreciseNow, ProfilerEntry.Type.BEGIN, tag));
+ }
+
+ public void End(string tag) {
+ AddEntry(new ProfilerEntry(Timespec.PreciseNow, ProfilerEntry.Type.END, tag));
+ }
+
+ public void Mark(string tag) {
+ AddEntry(new ProfilerEntry(Timespec.PreciseNow, ProfilerEntry.Type.MARK, tag));
+ }
+
+ public void Reset()
+ {
+ count = 0;
+ }
+
+ public void Dump(string filepath)
+ {
+ using (var stream = new StreamWriter(filepath))
+ {
+ Dump(stream);
+ }
+ }
+
+ public void Dump(TextWriter stream)
+ {
+ for (int i = 0; i < count; i++)
+ {
+ var entry = entries[i];
+ stream.WriteLine(entry.ToString());
+ }
+ }
+
+ // NOT THREADSAFE!
+ void AddEntry(ProfilerEntry entry) {
+ entries[count++] = entry;
+ }
+ }
+}