diff options
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.c | 77 | ||||
-rw-r--r-- | src/core/lib/iomgr/udp_server.c | 113 | ||||
-rw-r--r-- | src/core/lib/iomgr/udp_server.h | 2 | ||||
-rw-r--r-- | src/csharp/Grpc.Auth/GoogleAuthInterceptors.cs | 2 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Grpc.Core.csproj | 1 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCall.cs | 2 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ServerCallHandler.cs | 2 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Utils/TaskUtils.cs | 59 | ||||
-rw-r--r-- | src/csharp/Grpc.IntegrationTesting/InterarrivalTimers.cs | 2 |
9 files changed, 169 insertions, 91 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 97780d90f2..ecf3aea870 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -853,53 +853,51 @@ static bool contains_non_ok_status(grpc_metadata_batch *batch) { static void add_fetched_slice_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_stream *s); + grpc_chttp2_stream *s) { + s->fetched_send_message_length += + (uint32_t)GPR_SLICE_LENGTH(s->fetching_slice); + gpr_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice); + if (s->id != 0) { + grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message"); + } +} static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - if (s->fetching_send_message == NULL) { - /* Stream was cancelled before message fetch completed */ - abort(); /* TODO(ctiller): what cleanup here? */ - return; - } - if (s->fetched_send_message_length == s->fetching_send_message->length) { - int64_t notify_offset = s->next_message_end_offset; - if (notify_offset <= s->flow_controlled_bytes_written) { - grpc_chttp2_complete_closure_step( - exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE, - "fetching_send_message_finished"); - } else { - grpc_chttp2_write_cb *cb = t->write_cb_pool; - if (cb == NULL) { - cb = gpr_malloc(sizeof(*cb)); + for (;;) { + if (s->fetching_send_message == NULL) { + /* Stream was cancelled before message fetch completed */ + abort(); /* TODO(ctiller): what cleanup here? */ + return; /* early out */ + } + if (s->fetched_send_message_length == s->fetching_send_message->length) { + int64_t notify_offset = s->next_message_end_offset; + if (notify_offset <= s->flow_controlled_bytes_written) { + grpc_chttp2_complete_closure_step( + exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE, + "fetching_send_message_finished"); } else { - t->write_cb_pool = cb->next; + grpc_chttp2_write_cb *cb = t->write_cb_pool; + if (cb == NULL) { + cb = gpr_malloc(sizeof(*cb)); + } else { + t->write_cb_pool = cb->next; + } + cb->call_at_byte = notify_offset; + cb->closure = s->fetching_send_message_finished; + s->fetching_send_message_finished = NULL; + cb->next = s->on_write_finished_cbs; + s->on_write_finished_cbs = cb; } - cb->call_at_byte = notify_offset; - cb->closure = s->fetching_send_message_finished; - s->fetching_send_message_finished = NULL; - cb->next = s->on_write_finished_cbs; - s->on_write_finished_cbs = cb; + s->fetching_send_message = NULL; + return; /* early out */ + } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, + &s->fetching_slice, UINT32_MAX, + &s->complete_fetch)) { + add_fetched_slice_locked(exec_ctx, t, s); } - s->fetching_send_message = NULL; - } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, - &s->fetching_slice, UINT32_MAX, - &s->complete_fetch)) { - add_fetched_slice_locked(exec_ctx, t, s); - } -} - -static void add_fetched_slice_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { - s->fetched_send_message_length += - (uint32_t)GPR_SLICE_LENGTH(s->fetching_slice); - gpr_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice); - if (s->id != 0) { - grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message"); } - continue_fetching_send_locked(exec_ctx, t, s); } static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, @@ -908,6 +906,7 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, grpc_chttp2_transport *t = s->t; if (error == GRPC_ERROR_NONE) { add_fetched_slice_locked(exec_ctx, t, s); + continue_fetching_send_locked(exec_ctx, t, s); } else { /* TODO(ctiller): what to do here */ abort(); diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index edf7b133e9..20eae7cbdd 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -64,12 +64,12 @@ #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/socket_utils_posix.h" +#include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/support/string.h" -#define INIT_PORT_CAP 2 - /* one listening port */ -typedef struct { +typedef struct grpc_udp_listener grpc_udp_listener; +struct grpc_udp_listener { int fd; grpc_fd *emfd; grpc_udp_server *server; @@ -82,12 +82,13 @@ typedef struct { grpc_closure destroyed_closure; grpc_udp_server_read_cb read_cb; grpc_udp_server_orphan_cb orphan_cb; -} server_port; + + struct grpc_udp_listener *next; +}; /* the overall server */ struct grpc_udp_server { gpr_mu mu; - gpr_cv cv; /* active port count: how many ports are actually still listening */ size_t active_ports; @@ -97,10 +98,10 @@ struct grpc_udp_server { /* is this server shutting down? (boolean) */ int shutdown; - /* all listening ports */ - server_port *ports; - size_t nports; - size_t port_capacity; + /* linked list of server ports */ + grpc_udp_listener *head; + grpc_udp_listener *tail; + unsigned nports; /* shutdown callback */ grpc_closure *shutdown_complete; @@ -116,24 +117,29 @@ struct grpc_udp_server { grpc_udp_server *grpc_udp_server_create(void) { grpc_udp_server *s = gpr_malloc(sizeof(grpc_udp_server)); gpr_mu_init(&s->mu); - gpr_cv_init(&s->cv); s->active_ports = 0; s->destroyed_ports = 0; s->shutdown = 0; - s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); + s->head = NULL; + s->tail = NULL; s->nports = 0; - s->port_capacity = INIT_PORT_CAP; return s; } static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { - grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); + if (s->shutdown_complete != NULL) { + grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); + } gpr_mu_destroy(&s->mu); - gpr_cv_destroy(&s->cv); - gpr_free(s->ports); + while (s->head) { + grpc_udp_listener *sp = s->head; + s->head = sp->next; + gpr_free(sp); + } + gpr_free(s); } @@ -154,8 +160,6 @@ static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, events will be received on them - at this point it's safe to destroy things */ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { - size_t i; - /* delete ALL the things */ gpr_mu_lock(&s->mu); @@ -164,9 +168,11 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { return; } - if (s->nports) { - for (i = 0; i < s->nports; i++) { - server_port *sp = &s->ports[i]; + if (s->head) { + grpc_udp_listener *sp; + for (sp = s->head; sp; sp = sp->next) { + grpc_unlink_if_unix_domain_socket(&sp->addr.sockaddr); + sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb_arg = s; @@ -187,7 +193,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, grpc_closure *on_done) { - size_t i; + grpc_udp_listener *sp; gpr_mu_lock(&s->mu); GPR_ASSERT(!s->shutdown); @@ -197,14 +203,10 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, /* shutdown all fd's */ if (s->active_ports) { - for (i = 0; i < s->nports; i++) { - server_port *sp = &s->ports[i]; - /* Call the orphan_cb to signal that the FD is about to be closed and - * should no longer be used. */ + for (sp = s->head; sp; sp = sp->next) { GPR_ASSERT(sp->orphan_cb); sp->orphan_cb(sp->emfd); - - grpc_fd_shutdown(exec_ctx, s->ports[i].emfd); + grpc_fd_shutdown(exec_ctx, sp->emfd); } gpr_mu_unlock(&s->mu); } else { @@ -281,10 +283,10 @@ error: /* event manager callback when reads are ready */ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - server_port *sp = arg; + grpc_udp_listener *sp = arg; + gpr_mu_lock(&sp->server->mu); if (error != GRPC_ERROR_NONE) { - gpr_mu_lock(&sp->server->mu); if (0 == --sp->server->active_ports) { gpr_mu_unlock(&sp->server->mu); deactivated_all_ports(exec_ctx, sp->server); @@ -300,13 +302,14 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { /* Re-arm the notification event so we get another chance to read. */ grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + gpr_mu_unlock(&sp->server->mu); } static int add_socket_to_server(grpc_udp_server *s, int fd, const struct sockaddr *addr, size_t addr_len, grpc_udp_server_read_cb read_cb, grpc_udp_server_orphan_cb orphan_cb) { - server_port *sp; + grpc_udp_listener *sp; int port; char *addr_str; char *name; @@ -317,12 +320,15 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, gpr_asprintf(&name, "udp-server-listener:%s", addr_str); gpr_free(addr_str); gpr_mu_lock(&s->mu); - /* append it to the list under a lock */ - if (s->nports == s->port_capacity) { - s->port_capacity *= 2; - s->ports = gpr_realloc(s->ports, sizeof(server_port) * s->port_capacity); + s->nports++; + sp = gpr_malloc(sizeof(grpc_udp_listener)); + sp->next = NULL; + if (s->head == NULL) { + s->head = sp; + } else { + s->tail->next = sp; } - sp = &s->ports[s->nports++]; + s->tail = sp; sp->server = s; sp->fd = fd; sp->emfd = grpc_fd_create(fd, name); @@ -341,9 +347,9 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, size_t addr_len, grpc_udp_server_read_cb read_cb, grpc_udp_server_orphan_cb orphan_cb) { + grpc_udp_listener *sp; int allocated_port1 = -1; int allocated_port2 = -1; - unsigned i; int fd; grpc_dualstack_mode dsmode; struct sockaddr_in6 addr6_v4mapped; @@ -358,9 +364,9 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, /* Check if this is a wildcard port, and if so, try to keep the port the same as some previously created listener. */ if (grpc_sockaddr_get_port(addr) == 0) { - for (i = 0; i < s->nports; i++) { + for (sp = s->head; sp; sp = sp->next) { sockname_len = sizeof(sockname_temp); - if (0 == getsockname(s->ports[i].fd, (struct sockaddr *)&sockname_temp, + if (0 == getsockname(sp->fd, (struct sockaddr *)&sockname_temp, &sockname_len)) { port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); if (port > 0) { @@ -421,27 +427,40 @@ done: } int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) { - return (port_index < s->nports) ? s->ports[port_index].fd : -1; + grpc_udp_listener *sp; + if (port_index >= s->nports) { + return -1; + } + + for (sp = s->head; sp && port_index != 0; sp = sp->next) { + --port_index; + } + return sp->fd; } void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, grpc_pollset **pollsets, size_t pollset_count, grpc_server *server) { - size_t i, j; + size_t i; gpr_mu_lock(&s->mu); + grpc_udp_listener *sp; GPR_ASSERT(s->active_ports == 0); s->pollsets = pollsets; s->grpc_server = server; - for (i = 0; i < s->nports; i++) { - for (j = 0; j < pollset_count; j++) { - grpc_pollset_add_fd(exec_ctx, pollsets[j], s->ports[i].emfd); + + sp = s->head; + while (sp != NULL) { + for (i = 0; i < pollset_count; i++) { + grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd); } - s->ports[i].read_closure.cb = on_read; - s->ports[i].read_closure.cb_arg = &s->ports[i]; - grpc_fd_notify_on_read(exec_ctx, s->ports[i].emfd, - &s->ports[i].read_closure); + sp->read_closure.cb = on_read; + sp->read_closure.cb_arg = sp; + grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + s->active_ports++; + sp = sp->next; } + gpr_mu_unlock(&s->mu); } diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index 33c5ce11cd..70d0f19454 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -59,7 +59,7 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *udp_server, grpc_pollset **pollsets, size_t pollset_count, struct grpc_server *server); -int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); +int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index); /* Add a port to the server, returning port number on success, or negative on failure. diff --git a/src/csharp/Grpc.Auth/GoogleAuthInterceptors.cs b/src/csharp/Grpc.Auth/GoogleAuthInterceptors.cs index 722b51f470..c5bca444b4 100644 --- a/src/csharp/Grpc.Auth/GoogleAuthInterceptors.cs +++ b/src/csharp/Grpc.Auth/GoogleAuthInterceptors.cs @@ -76,7 +76,7 @@ namespace Grpc.Auth return new AsyncAuthInterceptor((context, metadata) => { metadata.Add(CreateBearerTokenHeader(accessToken)); - return Task.FromResult<object>(null); + return TaskUtils.CompletedTask; }); } diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index e75dc9faf1..d315e6d667 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -139,6 +139,7 @@ <Compile Include="Logging\LogLevel.cs" /> <Compile Include="Logging\LogLevelFilterLogger.cs" /> <Compile Include="Internal\RequestCallContextSafeHandle.cs" /> + <Compile Include="Utils\TaskUtils.cs" /> </ItemGroup> <ItemGroup> <None Include="Grpc.Core.nuspec" /> diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 9abaf1120f..49d0a111ef 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -265,7 +265,7 @@ namespace Grpc.Core.Internal // the halfclose has already been done implicitly, so just return // completed task here. halfcloseRequested = true; - return Task.FromResult<object>(null); + return TaskUtils.CompletedTask; } call.StartSendCloseFromClient(HandleSendFinished); diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index ebb7e0021e..1dbd4bbdc1 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -296,7 +296,7 @@ namespace Grpc.Core.Internal private Task UnimplementedMethod(IAsyncStreamReader<byte[]> requestStream, IServerStreamWriter<byte[]> responseStream, ServerCallContext ctx) { ctx.Status = new Status(StatusCode.Unimplemented, ""); - return Task.FromResult<object>(null); + return TaskUtils.CompletedTask; } public Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) diff --git a/src/csharp/Grpc.Core/Utils/TaskUtils.cs b/src/csharp/Grpc.Core/Utils/TaskUtils.cs new file mode 100644 index 0000000000..2cf1144143 --- /dev/null +++ b/src/csharp/Grpc.Core/Utils/TaskUtils.cs @@ -0,0 +1,59 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Threading.Tasks; + +namespace Grpc.Core.Utils +{ + /// <summary> + /// Utility methods for task parallel library. + /// </summary> + public static class TaskUtils + { + /// <summary> + /// Framework independent equivalent of <c>Task.CompletedTask</c>. + /// </summary> + public static Task CompletedTask + { + get + { +#if NETSTANDARD1_5 + return Task.CompletedTask; +#else + return Task.FromResult<object>(null); // for .NET45, emulate the functionality +#endif + } + } + } +} diff --git a/src/csharp/Grpc.IntegrationTesting/InterarrivalTimers.cs b/src/csharp/Grpc.IntegrationTesting/InterarrivalTimers.cs index 6492d34890..8bea083bc2 100644 --- a/src/csharp/Grpc.IntegrationTesting/InterarrivalTimers.cs +++ b/src/csharp/Grpc.IntegrationTesting/InterarrivalTimers.cs @@ -69,7 +69,7 @@ namespace Grpc.IntegrationTesting public Task WaitForNextAsync() { - return Task.FromResult<object>(null); + return TaskUtils.CompletedTask; } } |