diff options
Diffstat (limited to 'src')
31 files changed, 713 insertions, 143 deletions
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 1279fec080..5d33ab5b42 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -214,7 +214,8 @@ void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx, /* Destroy a call stack */ void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack); -/* Ignore set pollset */ +/* Ignore set pollset - used by filters to implement the set_pollset method + if they don't care about pollsets at all. Does nothing. */ void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_pollset *pollset); diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c index d39a099234..7251714519 100644 --- a/src/core/channel/subchannel_call_holder.c +++ b/src/core/channel/subchannel_call_holder.c @@ -123,6 +123,7 @@ retry: } else { switch (holder->creation_phase) { case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: + fail_locked(exec_ctx, holder); break; case GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL: grpc_subchannel_cancel_create_call(exec_ctx, holder->subchannel, diff --git a/src/core/channel/subchannel_call_holder.h b/src/core/channel/subchannel_call_holder.h index 3dd43c9c3f..bda051c566 100644 --- a/src/core/channel/subchannel_call_holder.h +++ b/src/core/channel/subchannel_call_holder.h @@ -36,6 +36,10 @@ #include "src/core/client_config/subchannel.h" +/** Pick a subchannel for grpc_subchannel_call_holder; + Return 1 if subchannel is available immediately (in which case on_ready + should not be called), or 0 otherwise (in which case on_ready should be + called when the subchannel is available) */ typedef int (*grpc_subchannel_call_holder_pick_subchannel)( grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata, grpc_subchannel **subchannel, grpc_closure *on_ready); @@ -46,10 +50,21 @@ typedef enum { GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL } grpc_subchannel_call_holder_creation_phase; +/** Wrapper for holding a pointer to grpc_subchannel_call, and the + associated machinery to create such a pointer. + Handles queueing of stream ops until a call object is ready, waiting + for initial metadata before trying to create a call object, + and handling cancellation gracefully. + + Both the channel and uchannel filter use this as their call_data. */ typedef struct grpc_subchannel_call_holder { - /* either 0 for no call, 1 for cancelled, or a pointer to a - grpc_subchannel_call */ + /** either 0 for no call, 1 for cancelled, or a pointer to a + grpc_subchannel_call */ gpr_atm subchannel_call; + /** Helper function to choose the subchannel on which to create + the call object. Channel filter delegates to the load + balancing policy (once it's ready); uchannel returns + immediately */ grpc_subchannel_call_holder_pick_subchannel pick_subchannel; void *pick_subchannel_arg; diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index e911a46faf..49c2cf9a19 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -22,7 +22,7 @@ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMA`S (INCLUDING, BUT NOT + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT @@ -395,12 +395,12 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { - int call_creation_status; + int call_creation_finished_ok; waiting_for_connect *w4c = arg; grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset); - call_creation_status = grpc_subchannel_create_call( + call_creation_finished_ok = grpc_subchannel_create_call( exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify); - GPR_ASSERT(call_creation_status == 1); + GPR_ASSERT(call_creation_finished_ok == 1); w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success); GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect"); gpr_free(w4c); diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 4769a6de68..1fefa1888a 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -77,12 +77,11 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, /** construct a subchannel call (possibly asynchronously). * - * If the returned status is \a GRPC_SUBCHANNEL_CALL_CREATE_READY, the call will - * return immediately and \a target will point to a connected \a subchannel_call - * instance. Note that \a notify will \em not be invoked in this case. - * Otherwise, if the returned status is GRPC_SUBCHANNEL_CALL_CREATE_PENDING, the - * subchannel call will be created asynchronously, invoking the \a notify - * callback upon completion. */ + * If the returned status is 1, the call will return immediately and \a target + * will point to a connected \a subchannel_call instance. Note that \a notify + * will \em not be invoked in this case. + * Otherwise, if the returned status is 0, the subchannel call will be created + * asynchronously, invoking the \a notify callback upon completion. */ int grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *subchannel, grpc_pollset *pollset, gpr_atm *target, diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 4443a6a9a7..c3f310ee27 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -35,6 +35,7 @@ #ifdef GPR_WINSOCK_SOCKET +#include <grpc/support/log.h> #include <grpc/support/thd.h> #include "src/core/iomgr/timer_internal.h" @@ -123,7 +124,14 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, void grpc_pollset_destroy(grpc_pollset *pollset) {} -void grpc_pollset_reset(grpc_pollset *pollset) {} +void grpc_pollset_reset(grpc_pollset *pollset) { + GPR_ASSERT(pollset->shutting_down); + GPR_ASSERT(!has_workers(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET)); + pollset->shutting_down = 0; + pollset->is_iocp_worker = 0; + pollset->kicked_without_pollers = 0; + pollset->on_shutdown = NULL; +} void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec now, diff --git a/src/core/support/histogram.c b/src/core/support/histogram.c index 8a1a9d9233..77b48af996 100644 --- a/src/core/support/histogram.c +++ b/src/core/support/histogram.c @@ -125,7 +125,7 @@ void gpr_histogram_add(gpr_histogram *h, double x) { h->buckets[bucket_for(h, x)]++; } -int gpr_histogram_merge(gpr_histogram *dst, gpr_histogram *src) { +int gpr_histogram_merge(gpr_histogram *dst, const gpr_histogram *src) { if ((dst->num_buckets != src->num_buckets) || (dst->multiplier != src->multiplier)) { /* Fail because these histograms don't match */ diff --git a/src/core/surface/byte_buffer_reader.c b/src/core/surface/byte_buffer_reader.c index 283db83833..9f830df68c 100644 --- a/src/core/surface/byte_buffer_reader.c +++ b/src/core/surface/byte_buffer_reader.c @@ -31,6 +31,7 @@ * */ +#include <string.h> #include <grpc/byte_buffer_reader.h> #include <grpc/compression.h> @@ -103,3 +104,21 @@ int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader, } return 0; } + +gpr_slice grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader *reader) { + gpr_slice in_slice; + size_t bytes_read = 0; + const size_t input_size = grpc_byte_buffer_length(reader->buffer_out); + gpr_slice out_slice = gpr_slice_malloc(input_size); + gpr_uint8 *const outbuf = GPR_SLICE_START_PTR(out_slice); /* just an alias */ + + while (grpc_byte_buffer_reader_next(reader, &in_slice) != 0) { + const size_t slice_length = GPR_SLICE_LENGTH(in_slice); + memcpy(&(outbuf[bytes_read]), GPR_SLICE_START_PTR(in_slice), slice_length); + bytes_read += slice_length; + gpr_slice_unref(in_slice); + GPR_ASSERT(bytes_read <= input_size); + } + return out_slice; +} + diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 056d49064e..aa435d44d3 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -944,12 +944,12 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl) { grpc_call *call = bctl->call; if (bctl->is_notify_tag_closure) { + grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success); gpr_mu_lock(&call->mu); bctl->call->used_batches = (gpr_uint8)(bctl->call->used_batches & ~(gpr_uint8)(1 << (bctl - bctl->call->active_batches))); gpr_mu_unlock(&call->mu); - grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); } else { grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, bctl->success, diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index 7287f97aaa..e07fbb2cc7 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -45,11 +45,16 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_init( grpc_chttp2_data_parser *parser) { parser->state = GRPC_CHTTP2_DATA_FH_0; + parser->parsing_frame = NULL; return GRPC_CHTTP2_PARSE_OK; } -void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser *parser) { +void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx, + grpc_chttp2_data_parser *parser) { grpc_byte_stream *bs; + if (parser->parsing_frame) { + grpc_chttp2_incoming_byte_stream_finished(exec_ctx, parser->parsing_frame); + } while ( (bs = grpc_chttp2_incoming_frame_queue_pop(&parser->incoming_frames))) { grpc_byte_stream_destroy(bs); @@ -198,8 +203,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( } p->parsing_frame = incoming_byte_stream = grpc_chttp2_incoming_byte_stream_create( - transport_parsing, stream_parsing, p->frame_size, message_flags, - &p->incoming_frames); + exec_ctx, transport_parsing, stream_parsing, p->frame_size, + message_flags, &p->incoming_frames); /* fallthrough */ case GRPC_CHTTP2_DATA_FRAME: if (cur == end) { @@ -214,6 +219,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( exec_ctx, p->parsing_frame, gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame); + p->parsing_frame = NULL; p->state = GRPC_CHTTP2_DATA_FH_0; return GRPC_CHTTP2_PARSE_OK; } else if ((gpr_uint32)(end - cur) > p->frame_size) { @@ -222,6 +228,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(cur + p->frame_size - beg))); grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame); + p->parsing_frame = NULL; cur += p->frame_size; goto fh_0; /* loop */ } else { diff --git a/src/core/transport/chttp2/frame_data.h b/src/core/transport/chttp2/frame_data.h index bc32f29d97..472f9cebdb 100644 --- a/src/core/transport/chttp2/frame_data.h +++ b/src/core/transport/chttp2/frame_data.h @@ -80,7 +80,8 @@ grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop( grpc_chttp2_parse_error grpc_chttp2_data_parser_init( grpc_chttp2_data_parser *parser); -void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser *parser); +void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx, + grpc_chttp2_data_parser *parser); /* start processing a new data frame */ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame( diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index b53c9dee0b..2d0cb4abdb 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -738,7 +738,7 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, #endif grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( - grpc_chttp2_transport_parsing *transport_parsing, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size, gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue); void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 3d98a4fb14..f62294c7c5 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -512,7 +512,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, GPR_ASSERT(s->global.recv_initial_metadata_finished == NULL); GPR_ASSERT(s->global.recv_message_ready == NULL); GPR_ASSERT(s->global.recv_trailing_metadata_finished == NULL); - grpc_chttp2_data_parser_destroy(&s->parsing.data_parser); + grpc_chttp2_data_parser_destroy(exec_ctx, &s->parsing.data_parser); grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[0]); grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[1]); grpc_chttp2_incoming_metadata_buffer_destroy( @@ -806,7 +806,8 @@ static void perform_stream_op_locked( } if (stream_global->write_closed) { grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->send_trailing_metadata_finished, 0); + exec_ctx, &stream_global->send_trailing_metadata_finished, + grpc_metadata_batch_is_empty(op->send_trailing_metadata)); } else if (stream_global->id != 0) { /* TODO(ctiller): check if there's flow control for any outstanding bytes before going writable */ @@ -1364,6 +1365,46 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, * BYTE STREAM */ +static void incoming_byte_stream_update_flow_control( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, size_t max_size_hint, + size_t have_already) { + gpr_uint32 max_recv_bytes; + + /* clamp max recv hint to an allowable size */ + if (max_size_hint >= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD) { + max_recv_bytes = GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD; + } else { + max_recv_bytes = (gpr_uint32)max_size_hint; + } + + /* account for bytes already received but unknown to higher layers */ + if (max_recv_bytes >= have_already) { + max_recv_bytes -= (gpr_uint32)have_already; + } else { + max_recv_bytes = 0; + } + + /* add some small lookahead to keep pipelines flowing */ + GPR_ASSERT(max_recv_bytes <= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD); + max_recv_bytes += GRPC_CHTTP2_STREAM_LOOKAHEAD; + if (stream_global->max_recv_bytes < max_recv_bytes) { + gpr_uint32 add_max_recv_bytes = + max_recv_bytes - stream_global->max_recv_bytes; + GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, + max_recv_bytes, add_max_recv_bytes); + GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, + unannounced_incoming_window_for_parse, + add_max_recv_bytes); + GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, + unannounced_incoming_window_for_writing, + add_max_recv_bytes); + grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global, + stream_global); + grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + } +} + static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, gpr_slice *slice, size_t max_size_hint, @@ -1372,41 +1413,11 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, (grpc_chttp2_incoming_byte_stream *)byte_stream; grpc_chttp2_transport_global *transport_global = &bs->transport->global; grpc_chttp2_stream_global *stream_global = &bs->stream->global; - gpr_uint32 max_recv_bytes; lock(bs->transport); if (bs->is_tail) { - /* clamp max recv hint to an allowable size */ - if (max_size_hint >= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD) { - max_recv_bytes = GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD; - } else { - max_recv_bytes = (gpr_uint32)max_size_hint; - } - - /* account for bytes already received but unknown to higher layers */ - if (max_recv_bytes >= bs->slices.length) { - max_recv_bytes -= (gpr_uint32)bs->slices.length; - } else { - max_recv_bytes = 0; - } - /* add some small lookahead to keep pipelines flowing */ - GPR_ASSERT(max_recv_bytes <= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD); - max_recv_bytes += GRPC_CHTTP2_STREAM_LOOKAHEAD; - if (stream_global->max_recv_bytes < max_recv_bytes) { - gpr_uint32 add_max_recv_bytes = - max_recv_bytes - stream_global->max_recv_bytes; - GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, - max_recv_bytes, add_max_recv_bytes); - GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, - unannounced_incoming_window_for_parse, - add_max_recv_bytes); - GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global, - unannounced_incoming_window_for_writing, - add_max_recv_bytes); - grpc_chttp2_list_add_unannounced_incoming_window_available( - transport_global, stream_global); - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); - } + incoming_byte_stream_update_flow_control(transport_global, stream_global, + max_size_hint, bs->slices.length); } if (bs->slices.count > 0) { *slice = gpr_slice_buffer_take_first(&bs->slices); @@ -1451,7 +1462,7 @@ void grpc_chttp2_incoming_byte_stream_finished( } grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( - grpc_chttp2_transport_parsing *transport_parsing, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size, gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue) { grpc_chttp2_incoming_byte_stream *incoming_byte_stream = @@ -1474,6 +1485,13 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( add_to_queue->tail->next_message = incoming_byte_stream; } add_to_queue->tail = incoming_byte_stream; + if (frame_size == 0) { + lock(TRANSPORT_FROM_PARSING(transport_parsing)); + incoming_byte_stream_update_flow_control( + &TRANSPORT_FROM_PARSING(transport_parsing)->global, + &STREAM_FROM_PARSING(stream_parsing)->global, 0, 0); + unlock(exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing)); + } return incoming_byte_stream; } diff --git a/src/cpp/util/string_ref.cc b/src/cpp/util/string_ref.cc index 604134fa9d..66c79a1818 100644 --- a/src/cpp/util/string_ref.cc +++ b/src/cpp/util/string_ref.cc @@ -40,7 +40,7 @@ namespace grpc { -const size_t string_ref::npos; +const size_t string_ref::npos = size_t(-1); string_ref& string_ref::operator=(const string_ref& rhs) { data_ = rhs.data_; diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs index e58528ff50..25a5a27c8e 100644 --- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs @@ -38,6 +38,7 @@ using System.Threading; using System.Threading.Tasks; using Grpc.Core; using Grpc.Core.Internal; +using Grpc.Core.Profiling; using Grpc.Core.Utils; using NUnit.Framework; @@ -200,19 +201,6 @@ namespace Grpc.Core.Tests Assert.AreEqual(headers[1].Key, trailers[1].Key); CollectionAssert.AreEqual(headers[1].ValueBytes, trailers[1].ValueBytes); } - - [Test] - public void UnaryCallPerformance() - { - helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => - { - return request; - }); - - var callDetails = helper.CreateUnaryCall(); - BenchmarkUtil.RunBenchmark(1, 10, - () => { Calls.BlockingUnaryCall(callDetails, "ABC"); }); - } [Test] public void UnknownMethodHandler() diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index 91d072abab..e5ffa31989 100644 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -88,6 +88,7 @@ <Compile Include="CompressionTest.cs" /> <Compile Include="ContextPropagationTest.cs" /> <Compile Include="MetadataTest.cs" /> + <Compile Include="PerformanceTest.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup> diff --git a/src/csharp/Grpc.Core.Tests/Internal/TimespecTest.cs b/src/csharp/Grpc.Core.Tests/Internal/TimespecTest.cs index 874df02baa..9be5450d81 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/TimespecTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/TimespecTest.cs @@ -34,6 +34,7 @@ using System; using System.Runtime.InteropServices; using Grpc.Core.Internal; +using Grpc.Core.Utils; using NUnit.Framework; namespace Grpc.Core.Internal.Tests @@ -198,5 +199,23 @@ namespace Grpc.Core.Internal.Tests Console.WriteLine("Test cannot be run on this platform, skipping the test."); } } + + // Test attribute commented out to prevent running as part of the default test suite. + // [Test] + // [Category("Performance")] + public void NowBenchmark() + { + // approx Timespec.Now latency <33ns + BenchmarkUtil.RunBenchmark(10000000, 1000000000, () => { var now = Timespec.Now; }); + } + + // Test attribute commented out to prevent running as part of the default test suite. + // [Test] + // [Category("Performance")] + public void PreciseNowBenchmark() + { + // approx Timespec.PreciseNow latency <18ns (when compiled with GRPC_TIMERS_RDTSC) + BenchmarkUtil.RunBenchmark(10000000, 1000000000, () => { var now = Timespec.PreciseNow; }); + } } } diff --git a/src/csharp/Grpc.Core.Tests/PerformanceTest.cs b/src/csharp/Grpc.Core.Tests/PerformanceTest.cs new file mode 100644 index 0000000000..5516cd3377 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/PerformanceTest.cs @@ -0,0 +1,99 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Profiling; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Tests +{ + public class PerformanceTest + { + const string Host = "127.0.0.1"; + + MockServiceHelper helper; + Server server; + Channel channel; + + [SetUp] + public void Init() + { + helper = new MockServiceHelper(Host); + server = helper.GetServer(); + server.Start(); + channel = helper.GetChannel(); + } + + [TearDown] + public void Cleanup() + { + channel.ShutdownAsync().Wait(); + server.ShutdownAsync().Wait(); + } + + // Test attribute commented out to prevent running as part of the default test suite. + //[Test] + //[Category("Performance")] + public void UnaryCallPerformance() + { + var profiler = new BasicProfiler(); + Profilers.SetForCurrentThread(profiler); + + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + return request; + }); + + var callDetails = helper.CreateUnaryCall(); + for(int i = 0; i < 3000; i++) + { + Calls.BlockingUnaryCall(callDetails, "ABC"); + } + + profiler.Reset(); + + for(int i = 0; i < 3000; i++) + { + Calls.BlockingUnaryCall(callDetails, "ABC"); + } + profiler.Dump("latency_trace_csharp.txt"); + } + } +} diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 92d4e19eac..0aab7bdd8a 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -119,6 +119,10 @@ <Compile Include="CompressionLevel.cs" /> <Compile Include="WriteOptions.cs" /> <Compile Include="ContextPropagationToken.cs" /> + <Compile Include="Profiling\ProfilerEntry.cs" /> + <Compile Include="Profiling\ProfilerScope.cs" /> + <Compile Include="Profiling\IProfiler.cs" /> + <Compile Include="Profiling\Profilers.cs" /> </ItemGroup> <ItemGroup> <None Include="Grpc.Core.nuspec" /> @@ -150,4 +154,7 @@ <Import Project="..\packages\grpc.dependencies.openssl.redist.1.0.2.2\build\portable-net45\grpc.dependencies.openssl.redist.targets" Condition="Exists('..\packages\grpc.dependencies.openssl.redist.1.0.2.2\build\portable-net45\grpc.dependencies.openssl.redist.targets')" /> <Import Project="..\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\portable-net45\grpc.dependencies.zlib.redist.targets" Condition="Exists('..\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\portable-net45\grpc.dependencies.zlib.redist.targets')" /> <ItemGroup /> + <ItemGroup> + <Folder Include="Profiling\" /> + </ItemGroup> </Project>
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 800462c854..e3ecc47282 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -39,6 +39,7 @@ using System.Threading; using System.Threading.Tasks; using Grpc.Core.Internal; using Grpc.Core.Logging; +using Grpc.Core.Profiling; using Grpc.Core.Utils; namespace Grpc.Core.Internal @@ -87,6 +88,9 @@ namespace Grpc.Core.Internal /// </summary> public TResponse UnaryCall(TRequest msg) { + var profiler = Profilers.ForCurrentThread(); + + using (profiler.NewScope("AsyncCall.UnaryCall")) using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create()) { byte[] payload = UnsafeSerialize(msg); @@ -104,24 +108,26 @@ namespace Grpc.Core.Internal } using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + using (var ctx = BatchContextSafeHandle.Create()) { - using (var ctx = BatchContextSafeHandle.Create()) - { - call.StartUnary(ctx, payload, metadataArray, GetWriteFlagsForCall()); - var ev = cq.Pluck(ctx.Handle); + call.StartUnary(ctx, payload, metadataArray, GetWriteFlagsForCall()); + + var ev = cq.Pluck(ctx.Handle); - bool success = (ev.success != 0); - try + bool success = (ev.success != 0); + try + { + using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch")) { HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata()); } - catch (Exception e) - { - Logger.Error(e, "Exception occured while invoking completion delegate."); - } + } + catch (Exception e) + { + Logger.Error(e, "Exception occured while invoking completion delegate."); } } - + // Once the blocking call returns, the result should be available synchronously. // Note that GetAwaiter().GetResult() doesn't wrap exceptions in AggregateException. return unaryResponseTcs.Task.GetAwaiter().GetResult(); @@ -329,27 +335,35 @@ namespace Grpc.Core.Internal private void Initialize(CompletionQueueSafeHandle cq) { - var call = CreateNativeCall(cq); - details.Channel.AddCallReference(this); - InitializeInternal(call); - RegisterCancellationCallback(); + using (Profilers.ForCurrentThread().NewScope("AsyncCall.Initialize")) + { + var call = CreateNativeCall(cq); + + details.Channel.AddCallReference(this); + InitializeInternal(call); + RegisterCancellationCallback(); + } } private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq) { - if (injectedNativeCall != null) - { - return injectedNativeCall; // allows injecting a mock INativeCall in tests. - } + using (Profilers.ForCurrentThread().NewScope("AsyncCall.CreateNativeCall")) + { + if (injectedNativeCall != null) + { + return injectedNativeCall; // allows injecting a mock INativeCall in tests. + } - var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance; + var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance; - var credentials = details.Options.Credentials; - using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null) - { - return details.Channel.Handle.CreateCall(environment.CompletionRegistry, - parentCall, ContextPropagationToken.DefaultMask, cq, - details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials); + var credentials = details.Options.Credentials; + using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null) + { + var result = details.Channel.Handle.CreateCall(environment.CompletionRegistry, + parentCall, ContextPropagationToken.DefaultMask, cq, + details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials); + return result; + } } } @@ -385,33 +399,37 @@ namespace Grpc.Core.Internal /// </summary> private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders) { - TResponse msg = default(TResponse); - var deserializeException = success ? TryDeserialize(receivedMessage, out msg) : null; - - lock (myLock) + using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryResponse")) { - finished = true; + TResponse msg = default(TResponse); + var deserializeException = success ? TryDeserialize(receivedMessage, out msg) : null; - if (deserializeException != null && receivedStatus.Status.StatusCode == StatusCode.OK) + lock (myLock) { - receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers); + finished = true; + + if (deserializeException != null && receivedStatus.Status.StatusCode == StatusCode.OK) + { + receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers); + } + finishedStatus = receivedStatus; + + ReleaseResourcesIfPossible(); + } - finishedStatus = receivedStatus; - ReleaseResourcesIfPossible(); - } + responseHeadersTcs.SetResult(responseHeaders); - responseHeadersTcs.SetResult(responseHeaders); + var status = receivedStatus.Status; - var status = receivedStatus.Status; + if (!success || status.StatusCode != StatusCode.OK) + { + unaryResponseTcs.SetException(new RpcException(status)); + return; + } - if (!success || status.StatusCode != StatusCode.OK) - { - unaryResponseTcs.SetException(new RpcException(status)); - return; + unaryResponseTcs.SetResult(msg); } - - unaryResponseTcs.SetResult(msg); } /// <summary> diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 3e2c57c9b5..953f61aa1e 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -41,6 +41,7 @@ using System.Threading.Tasks; using Grpc.Core.Internal; using Grpc.Core.Logging; +using Grpc.Core.Profiling; using Grpc.Core.Utils; namespace Grpc.Core.Internal @@ -167,16 +168,19 @@ namespace Grpc.Core.Internal /// </summary> protected bool ReleaseResourcesIfPossible() { - if (!disposed && call != null) + using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.ReleaseResourcesIfPossible")) { - bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished); - if (noMoreSendCompletions && readingDone && finished) + if (!disposed && call != null) { - ReleaseResources(); - return true; + bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished); + if (noMoreSendCompletions && readingDone && finished) + { + ReleaseResources(); + return true; + } } + return false; } - return false; } protected abstract bool IsClient @@ -228,7 +232,10 @@ namespace Grpc.Core.Internal protected byte[] UnsafeSerialize(TWrite msg) { - return serializer(msg); + using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.UnsafeSerialize")) + { + return serializer(msg); + } } protected Exception TrySerialize(TWrite msg, out byte[] payload) @@ -247,15 +254,20 @@ namespace Grpc.Core.Internal protected Exception TryDeserialize(byte[] payload, out TRead msg) { - try - { - msg = deserializer(payload); - return null; - } - catch (Exception e) + using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.TryDeserialize")) { - msg = default(TRead); - return e; + try + { + + msg = deserializer(payload); + return null; + + } + catch (Exception e) + { + msg = default(TRead); + return e; + } } } diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 0be7a4dd3a..ddeedebd11 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -34,6 +34,7 @@ using System.Diagnostics; using System.Runtime.InteropServices; using Grpc.Core; using Grpc.Core.Utils; +using Grpc.Core.Profiling; namespace Grpc.Core.Internal { @@ -131,8 +132,11 @@ namespace Grpc.Core.Internal public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { - grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags) - .CheckOk(); + using (Profilers.ForCurrentThread().NewScope("CallSafeHandle.StartUnary")) + { + grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags) + .CheckOk(); + } } public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray) diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs index d270d77526..5f9169bcb2 100644 --- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs @@ -32,6 +32,7 @@ using System; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; +using Grpc.Core.Profiling; namespace Grpc.Core.Internal { @@ -84,13 +85,16 @@ namespace Grpc.Core.Internal public CallSafeHandle CreateCall(CompletionRegistry registry, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline, CredentialsSafeHandle credentials) { - var result = grpcsharp_channel_create_call(this, parentCall, propagationMask, cq, method, host, deadline); - if (credentials != null) + using (Profilers.ForCurrentThread().NewScope("ChannelSafeHandle.CreateCall")) { - result.SetCredentials(credentials); + var result = grpcsharp_channel_create_call(this, parentCall, propagationMask, cq, method, host, deadline); + if (credentials != null) + { + result.SetCredentials(credentials); + } + result.SetCompletionRegistry(registry); + return result; } - result.SetCompletionRegistry(registry); - return result; } public ChannelState CheckConnectivityState(bool tryToConnect) diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs index f7a3471bb4..9de2bc7950 100644 --- a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs @@ -31,6 +31,7 @@ using System; using System.Runtime.InteropServices; using System.Threading.Tasks; +using Grpc.Core.Profiling; namespace Grpc.Core.Internal { @@ -70,7 +71,10 @@ namespace Grpc.Core.Internal public CompletionQueueEvent Pluck(IntPtr tag) { - return grpcsharp_completion_queue_pluck(this, tag); + using (Profilers.ForCurrentThread().NewScope("CompletionQueueSafeHandle.Pluck")) + { + return grpcsharp_completion_queue_pluck(this, tag); + } } public void Shutdown() diff --git a/src/csharp/Grpc.Core/Internal/Enums.cs b/src/csharp/Grpc.Core/Internal/Enums.cs index 185098160b..b0eab2001b 100644 --- a/src/csharp/Grpc.Core/Internal/Enums.cs +++ b/src/csharp/Grpc.Core/Internal/Enums.cs @@ -102,6 +102,9 @@ namespace Grpc.Core.Internal /* Realtime clock */ Realtime, + /* Precise clock good for performance profiling. */ + Precise, + /* Timespan - the distance between two time points */ Timespan } diff --git a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs index 31b834c979..ed1bd24498 100644 --- a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs @@ -31,6 +31,7 @@ using System; using System.Runtime.InteropServices; using System.Threading.Tasks; +using Grpc.Core.Profiling; namespace Grpc.Core.Internal { @@ -66,14 +67,17 @@ namespace Grpc.Core.Internal public static MetadataArraySafeHandle Create(Metadata metadata) { - // TODO(jtattermusch): we might wanna check that the metadata is readonly - var metadataArray = grpcsharp_metadata_array_create(new UIntPtr((ulong)metadata.Count)); - for (int i = 0; i < metadata.Count; i++) + using (Profilers.ForCurrentThread().NewScope("MetadataArraySafeHandle.Create")) { - var valueBytes = metadata[i].GetSerializedValueUnsafe(); - grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, valueBytes, new UIntPtr((ulong)valueBytes.Length)); + // TODO(jtattermusch): we might wanna check that the metadata is readonly + var metadataArray = grpcsharp_metadata_array_create(new UIntPtr((ulong)metadata.Count)); + for (int i = 0; i < metadata.Count; i++) + { + var valueBytes = metadata[i].GetSerializedValueUnsafe(); + grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, valueBytes, new UIntPtr((ulong)valueBytes.Length)); + } + return metadataArray; } - return metadataArray; } /// <summary> diff --git a/src/csharp/Grpc.Core/Internal/Timespec.cs b/src/csharp/Grpc.Core/Internal/Timespec.cs index daf85d5f61..38fc067d9f 100644 --- a/src/csharp/Grpc.Core/Internal/Timespec.cs +++ b/src/csharp/Grpc.Core/Internal/Timespec.cs @@ -239,6 +239,19 @@ namespace Grpc.Core.Internal } } + /// <summary> + /// Gets current timestamp using <c>GPRClockType.Precise</c>. + /// Only available internally because core needs to be compiled with + /// GRPC_TIMERS_RDTSC support for this to use RDTSC. + /// </summary> + internal static Timespec PreciseNow + { + get + { + return gprsharp_now(GPRClockType.Precise); + } + } + internal static int NativeSize { get diff --git a/src/csharp/Grpc.Core/Profiling/IProfiler.cs b/src/csharp/Grpc.Core/Profiling/IProfiler.cs new file mode 100644 index 0000000000..c426c365d2 --- /dev/null +++ b/src/csharp/Grpc.Core/Profiling/IProfiler.cs @@ -0,0 +1,47 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.IO; +using System.Threading; +using Grpc.Core.Internal; + +namespace Grpc.Core.Profiling +{ + internal interface IProfiler + { + void Begin(string tag); + void End(string tag); + void Mark(string tag); + } +} diff --git a/src/csharp/Grpc.Core/Profiling/ProfilerEntry.cs b/src/csharp/Grpc.Core/Profiling/ProfilerEntry.cs new file mode 100644 index 0000000000..5cc4c3c054 --- /dev/null +++ b/src/csharp/Grpc.Core/Profiling/ProfilerEntry.cs @@ -0,0 +1,87 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.IO; +using System.Threading; +using Grpc.Core.Internal; + +namespace Grpc.Core.Profiling +{ + internal struct ProfilerEntry + { + public enum Type { + BEGIN, + END, + MARK + } + + public ProfilerEntry(Timespec timespec, Type type, string tag) + { + this.timespec = timespec; + this.type = type; + this.tag = tag; + } + + public Timespec timespec; + public Type type; + public string tag; + + public override string ToString() + { + // mimic the output format used by C core. + return string.Format( + "{{\"t\": {0}.{1}, \"thd\":\"unknown\", \"type\": \"{2}\", \"tag\": \"{3}\", " + + "\"file\": \"unknown\", \"line\": 0, \"imp\": 0}}", + timespec.TimevalSeconds, timespec.TimevalNanos.ToString("D9"), + GetTypeAbbreviation(type), tag); + } + + internal static string GetTypeAbbreviation(Type type) + { + switch (type) + { + case Type.BEGIN: + return "{"; + + case Type.END: + return "}"; + + case Type.MARK: + return "."; + default: + throw new ArgumentException("Unknown type"); + } + } + } +} diff --git a/src/csharp/Grpc.Core/Profiling/ProfilerScope.cs b/src/csharp/Grpc.Core/Profiling/ProfilerScope.cs new file mode 100644 index 0000000000..413f3a1a35 --- /dev/null +++ b/src/csharp/Grpc.Core/Profiling/ProfilerScope.cs @@ -0,0 +1,60 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.IO; +using System.Threading; +using Grpc.Core.Internal; + +namespace Grpc.Core.Profiling +{ + // Allows declaring Begin and End of a profiler scope with a using statement. + // declared as struct for better performance. + internal struct ProfilerScope : IDisposable + { + readonly IProfiler profiler; + readonly string tag; + + public ProfilerScope(IProfiler profiler, string tag) + { + this.profiler = profiler; + this.tag = tag; + this.profiler.Begin(this.tag); + } + + public void Dispose() + { + profiler.End(tag); + } + } +} diff --git a/src/csharp/Grpc.Core/Profiling/Profilers.cs b/src/csharp/Grpc.Core/Profiling/Profilers.cs new file mode 100644 index 0000000000..c8123347f2 --- /dev/null +++ b/src/csharp/Grpc.Core/Profiling/Profilers.cs @@ -0,0 +1,131 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.IO; +using System.Threading; +using Grpc.Core.Internal; + +namespace Grpc.Core.Profiling +{ + internal static class Profilers + { + static readonly NopProfiler defaultProfiler = new NopProfiler(); + static readonly ThreadLocal<IProfiler> profilers = new ThreadLocal<IProfiler>(); + + public static IProfiler ForCurrentThread() + { + return profilers.Value ?? defaultProfiler; + } + + public static void SetForCurrentThread(IProfiler profiler) + { + profilers.Value = profiler; + } + + public static ProfilerScope NewScope(this IProfiler profiler, string tag) + { + return new ProfilerScope(profiler, tag); + } + } + + internal class NopProfiler : IProfiler + { + public void Begin(string tag) + { + } + + public void End(string tag) + { + } + + public void Mark(string tag) + { + } + } + + // Profiler using Timespec.PreciseNow + internal class BasicProfiler : IProfiler + { + ProfilerEntry[] entries; + int count; + + public BasicProfiler() : this(1024*1024) + { + } + + public BasicProfiler(int capacity) + { + this.entries = new ProfilerEntry[capacity]; + } + + public void Begin(string tag) { + AddEntry(new ProfilerEntry(Timespec.PreciseNow, ProfilerEntry.Type.BEGIN, tag)); + } + + public void End(string tag) { + AddEntry(new ProfilerEntry(Timespec.PreciseNow, ProfilerEntry.Type.END, tag)); + } + + public void Mark(string tag) { + AddEntry(new ProfilerEntry(Timespec.PreciseNow, ProfilerEntry.Type.MARK, tag)); + } + + public void Reset() + { + count = 0; + } + + public void Dump(string filepath) + { + using (var stream = new StreamWriter(filepath)) + { + Dump(stream); + } + } + + public void Dump(TextWriter stream) + { + for (int i = 0; i < count; i++) + { + var entry = entries[i]; + stream.WriteLine(entry.ToString()); + } + } + + // NOT THREADSAFE! + void AddEntry(ProfilerEntry entry) { + entries[count++] = entry; + } + } +} |