From 6e84aba85fefc084c53325d949eb81b6fbfa919f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 23 Apr 2015 15:08:17 -0700 Subject: Make the things compile again --- test/core/channel/channel_stack_test.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'test/core/channel/channel_stack_test.c') diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c index e92db59249..1d1331eb9f 100644 --- a/test/core/channel/channel_stack_test.c +++ b/test/core/channel/channel_stack_test.c @@ -55,7 +55,7 @@ static void channel_init_func(grpc_channel_element *elem, } static void call_init_func(grpc_call_element *elem, - const void *server_transport_data) { + const void *server_transport_data, grpc_transport_op *initial_op) { ++*(int *)(elem->channel_data); *(int *)(elem->call_data) = 0; } @@ -66,8 +66,8 @@ static void call_destroy_func(grpc_call_element *elem) { ++*(int *)(elem->channel_data); } -static void call_func(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { +static void call_func(grpc_call_element *elem, + grpc_transport_op *op) { ++*(int *)(elem->call_data); } @@ -112,7 +112,7 @@ static void test_create_channel_stack(void) { GPR_ASSERT(*channel_data == 0); call_stack = gpr_malloc(channel_stack->call_stack_size); - grpc_call_stack_init(channel_stack, NULL, call_stack); + grpc_call_stack_init(channel_stack, NULL, NULL, call_stack); GPR_ASSERT(call_stack->count == 1); call_elem = grpc_call_stack_element(call_stack, 0); GPR_ASSERT(call_elem->filter == channel_elem->filter); -- cgit v1.2.3 From 1a727fde47d56b42703aedf7e672b1f4e9c7d1c2 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 24 Apr 2015 13:21:22 -0700 Subject: clang-format --- src/core/channel/client_channel.c | 29 +- src/core/security/auth.c | 26 +- src/core/security/server_secure_chttp2.c | 3 +- src/core/surface/call.c | 3 +- src/core/surface/secure_channel_create.c | 2 +- src/core/transport/chttp2_transport.c | 39 ++- src/core/transport/transport.c | 4 +- src/core/transport/transport.h | 4 +- src/core/transport/transport_op_string.c | 3 +- src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 357 ++++++++++++--------- src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs | 163 +++++----- src/csharp/Grpc.Core/Server.cs | 335 +++++++++---------- src/csharp/Grpc.Examples.MathServer/MathServer.cs | 36 +-- .../Grpc.IntegrationTesting.csproj | 3 +- src/ruby/ext/grpc/rb_call.c | 122 ++++--- test/core/channel/channel_stack_test.c | 11 +- .../chttp2_socket_pair_one_byte_at_a_time.c | 9 +- 17 files changed, 596 insertions(+), 553 deletions(-) (limited to 'test/core/channel/channel_stack_test.c') diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index b9a489e0cc..78f8d06d89 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -139,7 +139,8 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) { chand->waiting_child_count = new_count; } -static void handle_op_after_cancellation(grpc_call_element *elem, grpc_transport_op *op) { +static void handle_op_after_cancellation(grpc_call_element *elem, + grpc_transport_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; if (op->send_ops) { @@ -149,10 +150,10 @@ static void handle_op_after_cancellation(grpc_call_element *elem, grpc_transport char status[GPR_LTOA_MIN_BUFSIZE]; grpc_metadata_batch mdb; gpr_ltoa(GRPC_STATUS_CANCELLED, status); - calld->s.cancelled.status.md = grpc_mdelem_from_strings(chand->mdctx, - "grpc-status", status); - calld->s.cancelled.details.md = grpc_mdelem_from_strings(chand->mdctx, - "grpc-message", "Cancelled"); + calld->s.cancelled.status.md = + grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status); + calld->s.cancelled.details.md = + grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled"); calld->s.cancelled.status.prev = calld->s.cancelled.details.next = NULL; calld->s.cancelled.status.next = &calld->s.cancelled.details; calld->s.cancelled.details.prev = &calld->s.cancelled.status; @@ -199,8 +200,10 @@ static void cc_start_transport_op(grpc_call_element *elem, gpr_mu_unlock(&chand->mu); } } else { - /* check to see if we should initiate a connection (if we're not already), - but don't do so until outside the lock to avoid re-entrancy problems if + /* check to see if we should initiate a connection (if we're not + already), + but don't do so until outside the lock to avoid re-entrancy + problems if the callback is immediate */ int initiate_transport_setup = 0; if (!chand->transport_setup_initiated) { @@ -212,9 +215,9 @@ static void cc_start_transport_op(grpc_call_element *elem, if (chand->waiting_child_count == chand->waiting_child_capacity) { chand->waiting_child_capacity = GPR_MAX(chand->waiting_child_capacity * 2, 8); - chand->waiting_children = - gpr_realloc(chand->waiting_children, - chand->waiting_child_capacity * sizeof(call_data *)); + chand->waiting_children = gpr_realloc( + chand->waiting_children, + chand->waiting_child_capacity * sizeof(call_data *)); } calld->s.waiting_op = *op; chand->waiting_children[chand->waiting_child_count++] = calld; @@ -236,8 +239,10 @@ static void cc_start_transport_op(grpc_call_element *elem, handle_op_after_cancellation(elem, &waiting_op); handle_op_after_cancellation(elem, op); } else { - GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) != (op->send_ops == NULL)); - GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) != (op->recv_ops == NULL)); + GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) != + (op->send_ops == NULL)); + GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) != + (op->recv_ops == NULL)); if (op->send_ops) { calld->s.waiting_op.send_ops = op->send_ops; calld->s.waiting_op.is_last_send = op->is_last_send; diff --git a/src/core/security/auth.c b/src/core/security/auth.c index b6a002d43c..2322c12aa5 100644 --- a/src/core/security/auth.c +++ b/src/core/security/auth.c @@ -76,7 +76,8 @@ static void on_credentials_metadata(void *user_data, grpc_mdelem **md_elems, grpc_metadata_batch *mdb; size_t i; GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT); - GPR_ASSERT(op->send_ops && op->send_ops->nops > calld->op_md_idx && op->send_ops->ops[calld->op_md_idx].type == GRPC_OP_METADATA); + GPR_ASSERT(op->send_ops && op->send_ops->nops > calld->op_md_idx && + op->send_ops->ops[calld->op_md_idx].type == GRPC_OP_METADATA); mdb = &op->send_ops->ops[calld->op_md_idx].data.metadata; for (i = 0; i < num_md; i++) { grpc_metadata_batch_add_tail(mdb, &calld->md_links[i], @@ -105,7 +106,8 @@ static char *build_service_url(const char *url_scheme, call_data *calld) { return service_url; } -static void send_security_metadata(grpc_call_element *elem, grpc_transport_op *op) { +static void send_security_metadata(grpc_call_element *elem, + grpc_transport_op *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; @@ -144,7 +146,9 @@ static void on_host_checked(void *user_data, grpc_security_status status) { char *error_msg; gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.", grpc_mdstr_as_c_string(calld->host)); - grpc_transport_op_add_cancellation(&calld->op, GRPC_STATUS_UNAUTHENTICATED, grpc_mdstr_from_string(chand->md_ctx, error_msg)); + grpc_transport_op_add_cancellation( + &calld->op, GRPC_STATUS_UNAUTHENTICATED, + grpc_mdstr_from_string(chand->md_ctx, error_msg)); gpr_free(error_msg); grpc_call_next_op(elem, &calld->op); } @@ -155,8 +159,8 @@ static void on_host_checked(void *user_data, grpc_security_status status) { - a network event (or similar) from below, to receive something op contains type and call direction information, in addition to the data that is being sent or received. */ -static void auth_start_transport_op(grpc_call_element *elem, - grpc_transport_op *op) { +static void auth_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; @@ -195,7 +199,9 @@ static void auth_start_transport_op(grpc_call_element *elem, gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.", call_host); - grpc_transport_op_add_cancellation(&calld->op, GRPC_STATUS_UNAUTHENTICATED, grpc_mdstr_from_string(channeld->md_ctx, error_msg)); + grpc_transport_op_add_cancellation( + &calld->op, GRPC_STATUS_UNAUTHENTICATED, + grpc_mdstr_from_string(channeld->md_ctx, error_msg)); gpr_free(error_msg); grpc_call_next_op(elem, &calld->op); } @@ -220,7 +226,8 @@ static void channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data, grpc_transport_op *initial_op) { + const void *server_transport_data, + grpc_transport_op *initial_op) { /* TODO(jboeuf): Find a way to pass-in the credentials from the caller here. */ call_data *calld = elem->call_data; @@ -297,5 +304,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_client_auth_filter = { - auth_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, - sizeof(channel_data), init_channel_elem, destroy_channel_elem, "auth"}; + auth_start_transport_op, channel_op, sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, "auth"}; diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index 0698161b6d..db9d545c0e 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -72,7 +72,8 @@ static void state_unref(grpc_server_secure_state *state) { static grpc_transport_setup_result setup_transport(void *server, grpc_transport *transport, grpc_mdctx *mdctx) { - static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter}; + static grpc_channel_filter const *extra_filters[] = { + &grpc_http_server_filter}; return grpc_server_setup_transport(server, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx); } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 2f5bd94cff..7c91ca917c 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -423,8 +423,7 @@ static void unlock(grpc_call *call) { memset(&op, 0, sizeof(op)); - if (!call->receiving && - need_more_data(call)) { + if (!call->receiving && need_more_data(call)) { op.recv_ops = &call->recv_ops; op.recv_state = &call->recv_state; op.on_done_recv = call_on_done_recv; diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 0bcbe38131..3e331293b5 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -210,7 +210,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, grpc_arg connector_arg; grpc_channel_args *args_copy; grpc_channel_args *new_args_from_connector; - grpc_channel_security_connector* connector; + grpc_channel_security_connector *connector; grpc_mdctx *mdctx; #define MAX_FILTERS 3 const grpc_channel_filter *filters[MAX_FILTERS]; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 7c3f40e3b9..52de8c218f 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -361,7 +361,7 @@ static void cancel_stream_id(transport *t, gpr_uint32 id, grpc_chttp2_error_code error_code, int send_rst); static void cancel_stream(transport *t, stream *s, grpc_status_code local_status, - grpc_chttp2_error_code error_code, + grpc_chttp2_error_code error_code, grpc_mdstr *optional_message, int send_rst); static void finalize_cancellations(transport *t); static stream *lookup_stream(transport *t, gpr_uint32 id); @@ -731,8 +731,8 @@ static void stream_list_join(transport *t, stream *s, stream_list_id id) { static void remove_from_stream_map(transport *t, stream *s) { if (s->id == 0) return; - IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing stream %d", t->is_client? "CLI" : "SVR", - s->id)); + IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing stream %d", + t->is_client ? "CLI" : "SVR", s->id)); if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) { maybe_start_some_streams(t); } @@ -1001,7 +1001,8 @@ static void maybe_start_some_streams(transport *t) { stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY); if (!s) break; - IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new stream %p to id %d", t->is_client? "CLI" : "SVR", s, t->next_stream_id)); + IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new stream %p to id %d", + t->is_client ? "CLI" : "SVR", s, t->next_stream_id)); GPR_ASSERT(s->id == 0); s->id = t->next_stream_id; @@ -1015,7 +1016,8 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { if (op->cancel_with_status != GRPC_STATUS_OK) { cancel_stream( t, s, op->cancel_with_status, - grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status), op->cancel_message, 1); + grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status), + op->cancel_message, 1); } if (op->send_ops) { @@ -1028,7 +1030,9 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { s->write_state = WRITE_STATE_QUEUED_CLOSE; } if (s->id == 0) { - IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: New stream %p waiting for concurrency", t->is_client? "CLI" : "SVR", s)); + IF_TRACING(gpr_log(GPR_DEBUG, + "HTTP:%s: New stream %p waiting for concurrency", + t->is_client ? "CLI" : "SVR", s)); stream_list_join(t, s, WAITING_FOR_CONCURRENCY); maybe_start_some_streams(t); } else if (s->outgoing_window > 0) { @@ -1120,8 +1124,7 @@ static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) { static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, grpc_status_code local_status, grpc_chttp2_error_code error_code, - grpc_mdstr *optional_message, - int send_rst) { + grpc_mdstr *optional_message, int send_rst) { int had_outgoing; char buffer[GPR_LTOA_MIN_BUFSIZE]; @@ -1157,7 +1160,12 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, break; } } else { - add_incoming_metadata(t, s, grpc_mdelem_from_metadata_strings(t->metadata_context, grpc_mdstr_from_string(t->metadata_context, "grpc-message"), grpc_mdstr_ref(optional_message))); + add_incoming_metadata( + t, s, + grpc_mdelem_from_metadata_strings( + t->metadata_context, + grpc_mdstr_from_string(t->metadata_context, "grpc-message"), + grpc_mdstr_ref(optional_message))); } add_metadata_batch(t, s); maybe_finish_read(t, s); @@ -1182,8 +1190,10 @@ static void cancel_stream_id(transport *t, gpr_uint32 id, static void cancel_stream(transport *t, stream *s, grpc_status_code local_status, - grpc_chttp2_error_code error_code, grpc_mdstr *optional_message, int send_rst) { - cancel_stream_inner(t, s, s->id, local_status, error_code, optional_message, send_rst); + grpc_chttp2_error_code error_code, + grpc_mdstr *optional_message, int send_rst) { + cancel_stream_inner(t, s, s->id, local_status, error_code, optional_message, + send_rst); } static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) { @@ -1310,10 +1320,9 @@ static void on_header(void *tp, grpc_mdelem *md) { GPR_ASSERT(s); - IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:%s:HDR: %s: %s", s->id, - t->is_client? "CLI" : "SVR", - grpc_mdstr_as_c_string(md->key), - grpc_mdstr_as_c_string(md->value))); + IF_TRACING(gpr_log( + GPR_INFO, "HTTP:%d:%s:HDR: %s: %s", s->id, t->is_client ? "CLI" : "SVR", + grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value))); if (md->key == t->str_grpc_timeout) { gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout); diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index cc9392177f..d9a1319c42 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -95,7 +95,9 @@ void grpc_transport_op_finish_with_failure(grpc_transport_op *op) { } } -void grpc_transport_op_add_cancellation(grpc_transport_op *op, grpc_status_code status, grpc_mdstr *message) { +void grpc_transport_op_add_cancellation(grpc_transport_op *op, + grpc_status_code status, + grpc_mdstr *message) { if (op->cancel_with_status == GRPC_STATUS_OK) { op->cancel_with_status = status; op->cancel_message = message; diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 7c4bed1863..cdea0b9a0b 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -135,7 +135,9 @@ void grpc_transport_destroy_stream(grpc_transport *transport, void grpc_transport_op_finish_with_failure(grpc_transport_op *op); -void grpc_transport_op_add_cancellation(grpc_transport_op *op, grpc_status_code status, grpc_mdstr *message); +void grpc_transport_op_add_cancellation(grpc_transport_op *op, + grpc_status_code status, + grpc_mdstr *message); /* TODO(ctiller): remove this */ void grpc_transport_add_to_pollset(grpc_transport *transport, diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c index b9283b7abf..a215710969 100644 --- a/src/core/transport/transport_op_string.c +++ b/src/core/transport/transport_op_string.c @@ -140,7 +140,8 @@ char *grpc_transport_op_string(grpc_transport_op *op) { gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status); gpr_strvec_add(&b, tmp); if (op->cancel_message) { - gpr_asprintf(&tmp, ";msg='%s'", grpc_mdstr_as_c_string(op->cancel_message)); + gpr_asprintf(&tmp, ";msg='%s'", + grpc_mdstr_as_c_string(op->cancel_message)); gpr_strvec_add(&b, tmp); } } diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 14add60c72..19d3f57abb 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -1,11 +1,11 @@ #region Copyright notice and license // Copyright 2015, Google Inc. // All rights reserved. -// +// // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: -// +// // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above @@ -15,7 +15,7 @@ // * Neither the name of Google Inc. nor the names of its // contributors may be used to endorse or promote products derived from // this software without specific prior written permission. -// +// // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR @@ -34,158 +34,203 @@ using System.Diagnostics; using System.Runtime.InteropServices; using Grpc.Core; -namespace Grpc.Core.Internal -{ - internal delegate void CompletionCallbackDelegate(GRPCOpError error, IntPtr batchContextPtr); - - /// - /// grpc_call from - /// - internal class CallSafeHandle : SafeHandleZeroIsInvalid - { - const uint GRPC_WRITE_BUFFER_HINT = 1; - - [DllImport("grpc_csharp_ext.dll")] - static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, - byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray); - - [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, - byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, - MetadataArraySafeHandle metadataArray); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, - byte[] send_buffer, UIntPtr send_buffer_len, - MetadataArraySafeHandle metadataArray); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, - MetadataArraySafeHandle metadataArray); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, - byte[] send_buffer, UIntPtr send_buffer_len); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, StatusCode statusCode, string statusMessage); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); - - [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_call_destroy(IntPtr call); - - private CallSafeHandle() - { - } - - public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline) - { - return grpcsharp_channel_create_call(channel, cq, method, host, deadline); - } - - public void StartUnary(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray) - { - AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray)); - } - - public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray) - { - grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray); - } - - public void StartClientStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray) - { - AssertCallOk(grpcsharp_call_start_client_streaming(this, callback, metadataArray)); - } - - public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray) - { - AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray)); - } - - public void StartDuplexStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray) - { - AssertCallOk(grpcsharp_call_start_duplex_streaming(this, callback, metadataArray)); - } - - public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback) - { - AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong)payload.Length))); - } - - public void StartSendCloseFromClient(CompletionCallbackDelegate callback) - { - AssertCallOk(grpcsharp_call_send_close_from_client(this, callback)); - } - - public void StartSendStatusFromServer(Status status, CompletionCallbackDelegate callback) - { - AssertCallOk(grpcsharp_call_send_status_from_server(this, callback, status.StatusCode, status.Detail)); - } - - public void StartReceiveMessage(CompletionCallbackDelegate callback) - { - AssertCallOk(grpcsharp_call_recv_message(this, callback)); - } - - public void StartServerSide(CompletionCallbackDelegate callback) - { - AssertCallOk(grpcsharp_call_start_serverside(this, callback)); - } - - public void Cancel() - { - AssertCallOk(grpcsharp_call_cancel(this)); - } - - public void CancelWithStatus(Status status) - { - AssertCallOk(grpcsharp_call_cancel_with_status(this, status.StatusCode, status.Detail)); - } - - protected override bool ReleaseHandle() - { - grpcsharp_call_destroy(handle); - return true; - } - - private static void AssertCallOk(GRPCCallError callError) - { - Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); - } - - private static uint GetFlags(bool buffered) - { - return buffered ? 0 : GRPC_WRITE_BUFFER_HINT; - } +namespace Grpc.Core.Internal { + internal delegate void CompletionCallbackDelegate(GRPCOpError error, + IntPtr batchContextPtr); + + /// + /// grpc_call from + /// + internal class CallSafeHandle : SafeHandleZeroIsInvalid { + const uint GRPC_WRITE_BUFFER_HINT = 1; + + [DllImport("grpc_csharp_ext.dll")] static extern CallSafeHandle + grpcsharp_channel_create_call(ChannelSafeHandle channel, + CompletionQueueSafeHandle cq, string method, + string host, Timespec deadline); + + [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError + grpcsharp_call_cancel(CallSafeHandle call); + + [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError + grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, + string description); + + [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError + grpcsharp_call_start_unary( + CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate + callback, + byte[] send_buffer, UIntPtr send_buffer_len, + MetadataArraySafeHandle metadataArray); + + [DllImport("grpc_csharp_ext.dll")] static extern void + grpcsharp_call_blocking_unary( + CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate + callback, + byte[] send_buffer, UIntPtr send_buffer_len, + MetadataArraySafeHandle metadataArray); + + [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError + grpcsharp_call_start_client_streaming( + CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate + callback, + MetadataArraySafeHandle metadataArray); + + [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError + grpcsharp_call_start_server_streaming( + CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate + callback, + byte[] send_buffer, UIntPtr send_buffer_len, + MetadataArraySafeHandle metadataArray); + + [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError + grpcsharp_call_start_duplex_streaming( + CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate + callback, + MetadataArraySafeHandle metadataArray); + + [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError + grpcsharp_call_send_message( + CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate + callback, + byte[] send_buffer, UIntPtr send_buffer_len); + + [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError + grpcsharp_call_send_close_from_client( + CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate + callback); + + [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError + grpcsharp_call_send_status_from_server( + CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate + callback, + StatusCode statusCode, string statusMessage); + + [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError + grpcsharp_call_recv_message( + CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate + callback); + + [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError + grpcsharp_call_start_serverside( + CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate + callback); + + [DllImport("grpc_csharp_ext.dll")] static extern void + grpcsharp_call_destroy(IntPtr call); + + private + CallSafeHandle() {} + + public + static CallSafeHandle Create(ChannelSafeHandle channel, + CompletionQueueSafeHandle cq, string method, + string host, Timespec deadline) { + return grpcsharp_channel_create_call(channel, cq, method, host, deadline); } + + public + void StartUnary(byte[] payload, CompletionCallbackDelegate callback, + MetadataArraySafeHandle metadataArray) { + AssertCallOk(grpcsharp_call_start_unary( + this, callback, payload, new UIntPtr((ulong)payload.Length), + metadataArray)); + } + + public + void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, + CompletionCallbackDelegate callback, + MetadataArraySafeHandle metadataArray) { + grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, + new UIntPtr((ulong)payload.Length), + metadataArray); + } + + public + void StartClientStreaming(CompletionCallbackDelegate callback, + MetadataArraySafeHandle metadataArray) { + AssertCallOk( + grpcsharp_call_start_client_streaming(this, callback, metadataArray)); + } + + public + void StartServerStreaming(byte[] payload, + CompletionCallbackDelegate callback, + MetadataArraySafeHandle metadataArray) { + AssertCallOk(grpcsharp_call_start_server_streaming( + this, callback, payload, new UIntPtr((ulong)payload.Length), + metadataArray)); + } + + public + void StartDuplexStreaming(CompletionCallbackDelegate callback, + MetadataArraySafeHandle metadataArray) { + AssertCallOk( + grpcsharp_call_start_duplex_streaming(this, callback, metadataArray)); + } + + public + void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback) { + AssertCallOk(grpcsharp_call_send_message( + this, callback, payload, new UIntPtr((ulong)payload.Length))); + } + + public + void StartSendCloseFromClient(CompletionCallbackDelegate callback) { + AssertCallOk(grpcsharp_call_send_close_from_client(this, callback)); + } + + public + void StartSendStatusFromServer(Status status, + CompletionCallbackDelegate callback) { + AssertCallOk(grpcsharp_call_send_status_from_server( + this, callback, status.StatusCode, status.Detail)); + } + + public + void StartReceiveMessage(CompletionCallbackDelegate callback) { + AssertCallOk(grpcsharp_call_recv_message(this, callback)); + } + + public + void StartServerSide(CompletionCallbackDelegate callback) { + AssertCallOk(grpcsharp_call_start_serverside(this, callback)); + } + + public + void Cancel() { AssertCallOk(grpcsharp_call_cancel(this)); } + + public + void CancelWithStatus(Status status) { + AssertCallOk(grpcsharp_call_cancel_with_status(this, status.StatusCode, + status.Detail)); + } + + protected + override bool ReleaseHandle() { + grpcsharp_call_destroy(handle); + return true; + } + + private + static void AssertCallOk(GRPCCallError callError) { + Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, + "Status not GRPC_CALL_OK"); + } + + private + static uint GetFlags(bool buffered) { + return buffered ? 0 : GRPC_WRITE_BUFFER_HINT; + } + } } \ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index a59da09822..b56e8d9801 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -36,84 +36,89 @@ using System.Collections.Concurrent; using System.Diagnostics; using System.Runtime.InteropServices; -namespace Grpc.Core.Internal -{ - // TODO: we need to make sure that the delegates are not collected before invoked. - internal delegate void ServerShutdownCallbackDelegate(IntPtr eventPtr); - - /// - /// grpc_server from grpc/grpc.h - /// - internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid - { - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); - - [DllImport("grpc_csharp_ext.dll")] - static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args); - - [DllImport("grpc_csharp_ext.dll")] - static extern int grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr); - - [DllImport("grpc_csharp_ext.dll")] - static extern int grpcsharp_server_add_secure_http2_port(ServerSafeHandle server, string addr, ServerCredentialsSafeHandle creds); - - [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_server_start(ServerSafeHandle server); - - [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_server_shutdown(ServerSafeHandle server); - - // TODO: get rid of the old callback style - [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_server_shutdown_and_notify")] - static extern void grpcsharp_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] ServerShutdownCallbackDelegate callback); - - [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_server_destroy(IntPtr server); - - private ServerSafeHandle() - { - } - - public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, IntPtr args) - { - return grpcsharp_server_create(cq, args); - } - - public int AddListeningPort(string addr) - { - return grpcsharp_server_add_http2_port(this, addr); - } - - public int AddListeningPort(string addr, ServerCredentialsSafeHandle credentials) - { - return grpcsharp_server_add_secure_http2_port(this, addr, credentials); - } - - public void Start() - { - grpcsharp_server_start(this); - } - - public void Shutdown() - { - grpcsharp_server_shutdown(this); - } - - public void ShutdownAndNotify(ServerShutdownCallbackDelegate callback) - { - grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback); - } - - public GRPCCallError RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback) - { - return grpcsharp_server_request_call(this, cq, callback); - } - - protected override bool ReleaseHandle() - { - grpcsharp_server_destroy(handle); - return true; - } +namespace Grpc.Core.Internal { + // TODO: we need to make sure that the delegates are not collected before + // invoked. + internal delegate void ServerShutdownCallbackDelegate(IntPtr eventPtr); + + /// + /// grpc_server from grpc/grpc.h + /// + internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid { + [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError + grpcsharp_server_request_call( + ServerSafeHandle server, CompletionQueueSafeHandle cq, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate + callback); + + [DllImport("grpc_csharp_ext.dll")] static extern ServerSafeHandle + grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args); + + [DllImport("grpc_csharp_ext.dll")] static extern int + grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr); + + [DllImport("grpc_csharp_ext.dll")] static extern int + grpcsharp_server_add_secure_http2_port(ServerSafeHandle server, string addr, + ServerCredentialsSafeHandle creds); + + [DllImport("grpc_csharp_ext.dll")] static extern void + grpcsharp_server_start(ServerSafeHandle server); + + [DllImport("grpc_csharp_ext.dll")] static extern void + grpcsharp_server_shutdown(ServerSafeHandle server); + + // TODO: get rid of the old callback style + [DllImport( + "grpc_csharp_ext.dll", + EntryPoint = "grpcsharp_server_shutdown_and_notify")] static extern void + grpcsharp_server_shutdown_and_notify_CALLBACK( + ServerSafeHandle server, + [MarshalAs(UnmanagedType.FunctionPtr)] ServerShutdownCallbackDelegate + callback); + + [DllImport("grpc_csharp_ext.dll")] static extern void + grpcsharp_server_destroy(IntPtr server); + + private + ServerSafeHandle() {} + + public + static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, + IntPtr args) { + return grpcsharp_server_create(cq, args); } + + public + int AddListeningPort(string addr) { + return grpcsharp_server_add_http2_port(this, addr); + } + + public + int AddListeningPort(string addr, ServerCredentialsSafeHandle credentials) { + return grpcsharp_server_add_secure_http2_port(this, addr, credentials); + } + + public + void Start() { grpcsharp_server_start(this); } + + public + void Shutdown() { grpcsharp_server_shutdown(this); } + + public + void ShutdownAndNotify(ServerShutdownCallbackDelegate callback) { + grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback); + } + + public + GRPCCallError RequestCall(CompletionQueueSafeHandle cq, + CompletionCallbackDelegate callback) { + return grpcsharp_server_request_call(this, cq, callback); + } + + protected + override bool ReleaseHandle() { + grpcsharp_server_destroy(handle); + return true; + } + } } diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index f086fa8beb..308fbfb71c 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -39,205 +39,186 @@ using System.Runtime.InteropServices; using System.Threading.Tasks; using Grpc.Core.Internal; -namespace Grpc.Core -{ - /// - /// Server is implemented only to be able to do - /// in-process testing. - /// - public class Server - { - // TODO: make sure the delegate doesn't get garbage collected while - // native callbacks are in the completion queue. - readonly ServerShutdownCallbackDelegate serverShutdownHandler; - readonly CompletionCallbackDelegate newServerRpcHandler; +namespace Grpc.Core { + /// + /// Server is implemented only to be able to do + /// in-process testing. + /// + public + class Server { + // TODO: make sure the delegate doesn't get garbage collected while + // native callbacks are in the completion queue. + readonly ServerShutdownCallbackDelegate serverShutdownHandler; + readonly CompletionCallbackDelegate newServerRpcHandler; + + readonly BlockingCollection newRpcQueue = + new BlockingCollection(); + readonly ServerSafeHandle handle; + + readonly Dictionary callHandlers = + new Dictionary(); + + readonly TaskCompletionSource shutdownTcs = + new TaskCompletionSource(); + + public + Server() { + this.handle = + ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero); + this.newServerRpcHandler = HandleNewServerRpc; + this.serverShutdownHandler = HandleServerShutdown; + } - readonly BlockingCollection newRpcQueue = new BlockingCollection(); - readonly ServerSafeHandle handle; + // only call this before Start() + public + void AddServiceDefinition(ServerServiceDefinition serviceDefinition) { + foreach (var entry in serviceDefinition.CallHandlers) { + callHandlers.Add(entry.Key, entry.Value); + } + } - readonly Dictionary callHandlers = new Dictionary(); + // only call before Start() + public + int AddListeningPort(string addr) { return handle.AddListeningPort(addr); } - readonly TaskCompletionSource shutdownTcs = new TaskCompletionSource(); + // only call before Start() + public + int AddListeningPort(string addr, ServerCredentials credentials) { + using(var nativeCredentials = credentials.ToNativeCredentials()) { + return handle.AddListeningPort(addr, nativeCredentials); + } + } - public Server() - { - this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero); - this.newServerRpcHandler = HandleNewServerRpc; - this.serverShutdownHandler = HandleServerShutdown; - } + public + void Start() { + handle.Start(); - // only call this before Start() - public void AddServiceDefinition(ServerServiceDefinition serviceDefinition) - { - foreach (var entry in serviceDefinition.CallHandlers) - { - callHandlers.Add(entry.Key, entry.Value); - } - } + // TODO: this basically means the server is single threaded.... + StartHandlingRpcs(); + } - // only call before Start() - public int AddListeningPort(string addr) - { - return handle.AddListeningPort(addr); - } + /// + /// Requests and handles single RPC call. + /// + internal void RunRpc() { + AllowOneRpc(); - // only call before Start() - public int AddListeningPort(string addr, ServerCredentials credentials) - { - using (var nativeCredentials = credentials.ToNativeCredentials()) - { - return handle.AddListeningPort(addr, nativeCredentials); - } - } + try { + var rpcInfo = newRpcQueue.Take(); - public void Start() - { - handle.Start(); + // Console.WriteLine("Server received RPC " + rpcInfo.Method); - // TODO: this basically means the server is single threaded.... - StartHandlingRpcs(); + IServerCallHandler callHandler; + if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler)) { + callHandler = new NoSuchMethodCallHandler(); } + callHandler.StartCall(rpcInfo.Method, rpcInfo.Call, + GetCompletionQueue()); + } catch (Exception e) { + Console.WriteLine("Exception while handling RPC: " + e); + } + } - /// - /// Requests and handles single RPC call. - /// - internal void RunRpc() - { - AllowOneRpc(); - - try - { - var rpcInfo = newRpcQueue.Take(); - - // Console.WriteLine("Server received RPC " + rpcInfo.Method); - - IServerCallHandler callHandler; - if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler)) - { - callHandler = new NoSuchMethodCallHandler(); - } - callHandler.StartCall(rpcInfo.Method, rpcInfo.Call, GetCompletionQueue()); - } - catch (Exception e) - { - Console.WriteLine("Exception while handling RPC: " + e); - } - } + /// + /// Requests server shutdown and when there are no more calls being + /// serviced, + /// cleans up used resources. + /// + /// The async. + public + async Task ShutdownAsync() { + handle.ShutdownAndNotify(serverShutdownHandler); + await shutdownTcs.Task; + handle.Dispose(); + } - /// - /// Requests server shutdown and when there are no more calls being serviced, - /// cleans up used resources. - /// - /// The async. - public async Task ShutdownAsync() - { - handle.ShutdownAndNotify(serverShutdownHandler); - await shutdownTcs.Task; - handle.Dispose(); - } + /// + /// To allow awaiting termination of the server. + /// + public + Task ShutdownTask { + get { return shutdownTcs.Task; } + } - /// - /// To allow awaiting termination of the server. - /// - public Task ShutdownTask - { - get - { - return shutdownTcs.Task; - } - } + public + void Kill() { handle.Dispose(); } - public void Kill() - { - handle.Dispose(); - } + private + async Task StartHandlingRpcs() { + while (true) { + await Task.Factory.StartNew(RunRpc); + } + } - private async Task StartHandlingRpcs() - { - while (true) - { - await Task.Factory.StartNew(RunRpc); - } - } + private + void AllowOneRpc() { + AssertCallOk( + handle.RequestCall(GetCompletionQueue(), newServerRpcHandler)); + } - private void AllowOneRpc() - { - AssertCallOk(handle.RequestCall(GetCompletionQueue(), newServerRpcHandler)); - } + private + void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr) { + try { + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); - private void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr) - { - try - { - var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); - - if (error != GRPCOpError.GRPC_OP_OK) - { - // TODO: handle error - } - - var rpcInfo = new NewRpcInfo(ctx.GetServerRpcNewCall(), ctx.GetServerRpcNewMethod()); - - // after server shutdown, the callback returns with null call - if (!rpcInfo.Call.IsInvalid) - { - newRpcQueue.Add(rpcInfo); - } - } - catch (Exception e) - { - Console.WriteLine("Caught exception in a native handler: " + e); - } + if (error != GRPCOpError.GRPC_OP_OK) { + // TODO: handle error } - private void HandleServerShutdown(IntPtr eventPtr) - { - try - { - shutdownTcs.SetResult(null); - } - catch (Exception e) - { - Console.WriteLine("Caught exception in a native handler: " + e); - } - } + var rpcInfo = new NewRpcInfo(ctx.GetServerRpcNewCall(), + ctx.GetServerRpcNewMethod()); - private static void AssertCallOk(GRPCCallError callError) - { - Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); + // after server shutdown, the callback returns with null call + if (!rpcInfo.Call.IsInvalid) { + newRpcQueue.Add(rpcInfo); } + } catch (Exception e) { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } - private static CompletionQueueSafeHandle GetCompletionQueue() - { - return GrpcEnvironment.ThreadPool.CompletionQueue; - } + private + void HandleServerShutdown(IntPtr eventPtr) { + try { + shutdownTcs.SetResult(null); + } catch (Exception e) { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } - private struct NewRpcInfo - { - private CallSafeHandle call; - private string method; - - public NewRpcInfo(CallSafeHandle call, string method) - { - this.call = call; - this.method = method; - } - - public CallSafeHandle Call - { - get - { - return this.call; - } - } - - public string Method - { - get - { - return this.method; - } - } - } + private + static void AssertCallOk(GRPCCallError callError) { + Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, + "Status not GRPC_CALL_OK"); + } + + private + static CompletionQueueSafeHandle GetCompletionQueue() { + return GrpcEnvironment.ThreadPool.CompletionQueue; + } + + private + struct NewRpcInfo { + private + CallSafeHandle call; + private + string method; + + public + NewRpcInfo(CallSafeHandle call, string method) { + this.call = call; + this.method = method; + } + + public + CallSafeHandle Call { + get { return this.call; } + } + + public + string Method { + get { return this.method; } + } } + } } diff --git a/src/csharp/Grpc.Examples.MathServer/MathServer.cs b/src/csharp/Grpc.Examples.MathServer/MathServer.cs index f7429fb43f..f9a28f4d0d 100644 --- a/src/csharp/Grpc.Examples.MathServer/MathServer.cs +++ b/src/csharp/Grpc.Examples.MathServer/MathServer.cs @@ -34,28 +34,26 @@ using System.Runtime.InteropServices; using System.Threading; using Grpc.Core; -namespace math -{ - class MainClass - { - public static void Main(string[] args) - { - String host = "0.0.0.0"; +namespace math { +class MainClass { + public + static void Main(string[] args) { + String host = "0.0.0.0"; - GrpcEnvironment.Initialize(); + GrpcEnvironment.Initialize(); - Server server = new Server(); - server.AddServiceDefinition(MathGrpc.BindService(new MathServiceImpl())); - int port = server.AddListeningPort(host + ":23456"); - server.Start(); + Server server = new Server(); + server.AddServiceDefinition(MathGrpc.BindService(new MathServiceImpl())); + int port = server.AddListeningPort(host + ":23456"); + server.Start(); - Console.WriteLine("MathServer listening on port " + port); + Console.WriteLine("MathServer listening on port " + port); - Console.WriteLine("Press any key to stop the server..."); - Console.ReadKey(); + Console.WriteLine("Press any key to stop the server..."); + Console.ReadKey(); - server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); - } - } + server.ShutdownAsync().Wait(); + GrpcEnvironment.Shutdown(); + } +} } diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj index 584bf1068d..088c435585 100644 --- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj +++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj @@ -16,7 +16,8 @@ full false bin\Debug - DEBUG; + DEBUG; + prompt 4 true diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index 6da7d3c830..d4e255d2fa 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -118,35 +118,36 @@ static void grpc_rb_call_destroy(void *p) { } static size_t md_ary_datasize(const void *p) { - const grpc_metadata_array* const ary = (grpc_metadata_array*)p; - size_t i, datasize = sizeof(grpc_metadata_array); - for (i = 0; i < ary->count; ++i) { - const grpc_metadata* const md = &ary->metadata[i]; - datasize += strlen(md->key); - datasize += md->value_length; - } - datasize += ary->capacity * sizeof(grpc_metadata); - return datasize; + const grpc_metadata_array *const ary = (grpc_metadata_array *)p; + size_t i, datasize = sizeof(grpc_metadata_array); + for (i = 0; i < ary->count; ++i) { + const grpc_metadata *const md = &ary->metadata[i]; + datasize += strlen(md->key); + datasize += md->value_length; + } + datasize += ary->capacity * sizeof(grpc_metadata); + return datasize; } static const rb_data_type_t grpc_rb_md_ary_data_type = { "grpc_metadata_array", {GRPC_RB_GC_NOT_MARKED, GRPC_RB_GC_DONT_FREE, md_ary_datasize}, - NULL, NULL, - 0 -}; + NULL, + NULL, + 0}; /* Describes grpc_call struct for RTypedData */ static const rb_data_type_t grpc_call_data_type = { "grpc_call", {GRPC_RB_GC_NOT_MARKED, grpc_rb_call_destroy, GRPC_RB_MEMSIZE_UNAVAILABLE}, - NULL, NULL, - /* it is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because grpc_rb_call_destroy + NULL, + NULL, + /* it is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because + * grpc_rb_call_destroy * touches a hash object. * TODO(yugui) Directly use st_table and call the free function earlier? */ - 0 -}; + 0}; /* Error code details is a hash containing text strings describing errors */ VALUE rb_error_code_details; @@ -250,7 +251,7 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) { } md_ary->metadata[md_ary->count].value = RSTRING_PTR(rb_ary_entry(val, i)); md_ary->metadata[md_ary->count].value_length = - RSTRING_LEN(rb_ary_entry(val, i)); + RSTRING_LEN(rb_ary_entry(val, i)); md_ary->count += 1; } } else { @@ -290,10 +291,11 @@ static int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val, /* grpc_rb_md_ary_convert converts a ruby metadata hash into a grpc_metadata_array. */ -static void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ary) { +static void grpc_rb_md_ary_convert(VALUE md_ary_hash, + grpc_metadata_array *md_ary) { VALUE md_ary_obj = Qnil; if (md_ary_hash == Qnil) { - return; /* Do nothing if the expected has value is nil */ + return; /* Do nothing if the expected has value is nil */ } if (TYPE(md_ary_hash) != T_HASH) { rb_raise(rb_eTypeError, "md_ary_convert: got <%s>, want ", @@ -303,8 +305,8 @@ static void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ar /* Initialize the array, compute it's capacity, then fill it. */ grpc_metadata_array_init(md_ary); - md_ary_obj = TypedData_Wrap_Struct(grpc_rb_cMdAry, &grpc_rb_md_ary_data_type, - md_ary); + md_ary_obj = + TypedData_Wrap_Struct(grpc_rb_cMdAry, &grpc_rb_md_ary_data_type, md_ary); rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_capacity_hash_cb, md_ary_obj); md_ary->metadata = gpr_malloc(md_ary->capacity * sizeof(grpc_metadata)); rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_fill_hash_cb, md_ary_obj); @@ -327,16 +329,14 @@ VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary) { rb_hash_aset(result, key, value); } else if (TYPE(value) == T_ARRAY) { /* Add the string to the returned array */ - rb_ary_push(value, - rb_str_new(md_ary->metadata[i].value, - md_ary->metadata[i].value_length)); + rb_ary_push(value, rb_str_new(md_ary->metadata[i].value, + md_ary->metadata[i].value_length)); } else { /* Add the current value with this key and the new one to an array */ new_ary = rb_ary_new(); rb_ary_push(new_ary, value); - rb_ary_push(new_ary, - rb_str_new(md_ary->metadata[i].value, - md_ary->metadata[i].value_length)); + rb_ary_push(new_ary, rb_str_new(md_ary->metadata[i].value, + md_ary->metadata[i].value_length)); rb_hash_aset(result, key, new_ary); } } @@ -355,7 +355,7 @@ static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val, rb_obj_classname(key)); return ST_STOP; } - switch(NUM2INT(key)) { + switch (NUM2INT(key)) { case GRPC_OP_SEND_INITIAL_METADATA: case GRPC_OP_SEND_MESSAGE: case GRPC_OP_SEND_CLOSE_FROM_CLIENT: @@ -367,8 +367,7 @@ static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val, rb_ary_push(ops_ary, key); return ST_CONTINUE; default: - rb_raise(rb_eTypeError, "invalid operation : bad value %d", - NUM2INT(key)); + rb_raise(rb_eTypeError, "invalid operation : bad value %d", NUM2INT(key)); }; return ST_STOP; } @@ -377,8 +376,8 @@ static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val, struct to the 'send_status_from_server' portion of an op. */ static void grpc_rb_op_update_status_from_server(grpc_op *op, - grpc_metadata_array* md_ary, - VALUE status) { + grpc_metadata_array *md_ary, + VALUE status) { VALUE code = rb_struct_aref(status, sym_code); VALUE details = rb_struct_aref(status, sym_details); VALUE metadata_hash = rb_struct_aref(status, sym_metadata); @@ -405,8 +404,8 @@ static void grpc_rb_op_update_status_from_server(grpc_op *op, * grpc_rb_call_run_batch function */ typedef struct run_batch_stack { /* The batch ops */ - grpc_op ops[8]; /* 8 is the maximum number of operations */ - size_t op_num; /* tracks the last added operation */ + grpc_op ops[8]; /* 8 is the maximum number of operations */ + size_t op_num; /* tracks the last added operation */ /* Data being sent */ grpc_metadata_array send_metadata; @@ -424,7 +423,7 @@ typedef struct run_batch_stack { /* grpc_run_batch_stack_init ensures the run_batch_stack is properly * initialized */ -static void grpc_run_batch_stack_init(run_batch_stack* st) { +static void grpc_run_batch_stack_init(run_batch_stack *st) { MEMZERO(st, run_batch_stack, 1); grpc_metadata_array_init(&st->send_metadata); grpc_metadata_array_init(&st->send_trailing_metadata); @@ -435,7 +434,7 @@ static void grpc_run_batch_stack_init(run_batch_stack* st) { /* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly * cleaned up */ -static void grpc_run_batch_stack_cleanup(run_batch_stack* st) { +static void grpc_run_batch_stack_cleanup(run_batch_stack *st) { grpc_metadata_array_destroy(&st->send_metadata); grpc_metadata_array_destroy(&st->send_trailing_metadata); grpc_metadata_array_destroy(&st->recv_metadata); @@ -447,7 +446,7 @@ static void grpc_run_batch_stack_cleanup(run_batch_stack* st) { /* grpc_run_batch_stack_fill_ops fills the run_batch_stack ops array from * ops_hash */ -static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) { +static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) { VALUE this_op = Qnil; VALUE this_value = Qnil; VALUE ops_ary = rb_ary_new(); @@ -460,7 +459,7 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) { for (i = 0; i < (size_t)RARRAY_LEN(ops_ary); i++) { this_op = rb_ary_entry(ops_ary, i); this_value = rb_hash_aref(ops_hash, this_op); - switch(NUM2INT(this_op)) { + switch (NUM2INT(this_op)) { case GRPC_OP_SEND_INITIAL_METADATA: /* N.B. later there is no need to explicitly delete the metadata keys * and values, they are references to data in ruby objects. */ @@ -471,18 +470,16 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) { st->send_metadata.metadata; break; case GRPC_OP_SEND_MESSAGE: - st->ops[st->op_num].data.send_message = - grpc_rb_s_to_byte_buffer(RSTRING_PTR(this_value), - RSTRING_LEN(this_value)); + st->ops[st->op_num].data.send_message = grpc_rb_s_to_byte_buffer( + RSTRING_PTR(this_value), RSTRING_LEN(this_value)); break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: break; case GRPC_OP_SEND_STATUS_FROM_SERVER: /* N.B. later there is no need to explicitly delete the metadata keys * and values, they are references to data in ruby objects. */ - grpc_rb_op_update_status_from_server(&st->ops[st->op_num], - &st->send_trailing_metadata, - this_value); + grpc_rb_op_update_status_from_server( + &st->ops[st->op_num], &st->send_trailing_metadata, this_value); break; case GRPC_OP_RECV_INITIAL_METADATA: st->ops[st->op_num].data.recv_initial_metadata = &st->recv_metadata; @@ -516,12 +513,12 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) { /* grpc_run_batch_stack_build_result fills constructs a ruby BatchResult struct after the results have run */ -static VALUE grpc_run_batch_stack_build_result(run_batch_stack* st) { +static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) { size_t i = 0; VALUE result = rb_struct_new(grpc_rb_sBatchResult, Qnil, Qnil, Qnil, Qnil, Qnil, Qnil, Qnil, Qnil, NULL); for (i = 0; i < st->op_num; i++) { - switch(st->ops[i].op) { + switch (st->ops[i].op) { case GRPC_OP_SEND_INITIAL_METADATA: rb_struct_aset(result, sym_send_metadata, Qtrue); break; @@ -544,13 +541,11 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack* st) { break; case GRPC_OP_RECV_STATUS_ON_CLIENT: rb_struct_aset( - result, - sym_status, - rb_struct_new(grpc_rb_sStatus, - UINT2NUM(st->recv_status), + result, sym_status, + rb_struct_new(grpc_rb_sStatus, UINT2NUM(st->recv_status), (st->recv_status_details == NULL - ? Qnil - : rb_str_new2(st->recv_status_details)), + ? Qnil + : rb_str_new2(st->recv_status_details)), grpc_rb_md_ary_to_h(&st->recv_trailing_metadata), NULL)); break; @@ -682,8 +677,7 @@ static void Init_grpc_error_codes() { static void Init_grpc_op_codes() { /* Constants representing operation type codes in grpc.h */ - VALUE grpc_rb_mCallOps = - rb_define_module_under(grpc_rb_mGrpcCore, "CallOps"); + VALUE grpc_rb_mCallOps = rb_define_module_under(grpc_rb_mGrpcCore, "CallOps"); rb_define_const(grpc_rb_mCallOps, "SEND_INITIAL_METADATA", UINT2NUM(GRPC_OP_SEND_INITIAL_METADATA)); rb_define_const(grpc_rb_mCallOps, "SEND_MESSAGE", @@ -709,14 +703,14 @@ void Init_grpc_call() { grpc_rb_eOutOfTime = rb_define_class_under(grpc_rb_mGrpcCore, "OutOfTime", rb_eException); grpc_rb_cCall = rb_define_class_under(grpc_rb_mGrpcCore, "Call", rb_cObject); - grpc_rb_cMdAry = rb_define_class_under(grpc_rb_mGrpcCore, "MetadataArray", - rb_cObject); + grpc_rb_cMdAry = + rb_define_class_under(grpc_rb_mGrpcCore, "MetadataArray", rb_cObject); /* Prevent allocation or inialization of the Call class */ rb_define_alloc_func(grpc_rb_cCall, grpc_rb_cannot_alloc); rb_define_method(grpc_rb_cCall, "initialize", grpc_rb_cannot_init, 0); - rb_define_method(grpc_rb_cCall, "initialize_copy", - grpc_rb_cannot_init_copy, 1); + rb_define_method(grpc_rb_cCall, "initialize_copy", grpc_rb_cannot_init_copy, + 1); /* Add ruby analogues of the Call methods. */ rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 4); @@ -746,16 +740,8 @@ void Init_grpc_call() { /* The Struct used to return the run_batch result. */ grpc_rb_sBatchResult = rb_struct_define( - "BatchResult", - "send_message", - "send_metadata", - "send_close", - "send_status", - "message", - "metadata", - "status", - "cancelled", - NULL); + "BatchResult", "send_message", "send_metadata", "send_close", + "send_status", "message", "metadata", "status", "cancelled", NULL); /* The hash for reference counting calls, to ensure they can't be destroyed * more than once */ diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c index 1d1331eb9f..957dee1aa7 100644 --- a/test/core/channel/channel_stack_test.c +++ b/test/core/channel/channel_stack_test.c @@ -55,7 +55,8 @@ static void channel_init_func(grpc_channel_element *elem, } static void call_init_func(grpc_call_element *elem, - const void *server_transport_data, grpc_transport_op *initial_op) { + const void *server_transport_data, + grpc_transport_op *initial_op) { ++*(int *)(elem->channel_data); *(int *)(elem->call_data) = 0; } @@ -66,8 +67,7 @@ static void call_destroy_func(grpc_call_element *elem) { ++*(int *)(elem->channel_data); } -static void call_func(grpc_call_element *elem, - grpc_transport_op *op) { +static void call_func(grpc_call_element *elem, grpc_transport_op *op) { ++*(int *)(elem->call_data); } @@ -78,9 +78,8 @@ static void channel_func(grpc_channel_element *elem, static void test_create_channel_stack(void) { const grpc_channel_filter filter = { - call_func, channel_func, sizeof(int), - call_init_func, call_destroy_func, sizeof(int), - channel_init_func, channel_destroy_func, "some_test_filter"}; + call_func, channel_func, sizeof(int), call_init_func, call_destroy_func, + sizeof(int), channel_init_func, channel_destroy_func, "some_test_filter"}; const grpc_channel_filter *filters = &filter; grpc_channel_stack *channel_stack; grpc_call_stack *call_stack; diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c index d861034f8f..ddde585b83 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c @@ -59,7 +59,8 @@ static grpc_transport_setup_result server_setup_transport( void *ts, grpc_transport *transport, grpc_mdctx *mdctx) { grpc_end2end_test_fixture *f = ts; - static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter}; + static grpc_channel_filter const *extra_filters[] = { + &grpc_http_server_filter}; return grpc_server_setup_transport(f->server, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx); } @@ -73,9 +74,9 @@ static grpc_transport_setup_result client_setup_transport( void *ts, grpc_transport *transport, grpc_mdctx *mdctx) { sp_client_setup *cs = ts; - const grpc_channel_filter *filters[] = { - &grpc_client_surface_filter, &grpc_http_client_filter, - &grpc_connected_channel_filter}; + const grpc_channel_filter *filters[] = {&grpc_client_surface_filter, + &grpc_http_client_filter, + &grpc_connected_channel_filter}; size_t nfilters = sizeof(filters) / sizeof(*filters); grpc_channel *channel = grpc_channel_create_from_filters( filters, nfilters, cs->client_args, mdctx, 1); -- cgit v1.2.3