diff options
Diffstat (limited to 'src/core/lib/surface/server.c')
-rw-r--r-- | src/core/lib/surface/server.c | 53 |
1 files changed, 34 insertions, 19 deletions
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 2db95b9cdf..c9b458faf2 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -895,23 +895,45 @@ const grpc_channel_filter grpc_server_top_filter = { "server", }; -void grpc_server_register_completion_queue(grpc_server *server, - grpc_completion_queue *cq, - void *reserved) { +static void register_completion_queue(grpc_server *server, + grpc_completion_queue *cq, + bool is_non_listening, void *reserved) { size_t i, n; - GRPC_API_TRACE( - "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3, - (server, cq, reserved)); GPR_ASSERT(!reserved); for (i = 0; i < server->cq_count; i++) { if (server->cqs[i] == cq) return; } - GRPC_CQ_INTERNAL_REF(cq, "server"); + grpc_cq_mark_server_cq(cq); - n = server->cq_count++; - server->cqs = gpr_realloc(server->cqs, - server->cq_count * sizeof(grpc_completion_queue *)); - server->cqs[n] = cq; + + /* Non-listening completion queues are not added to server->cqs */ + if (is_non_listening) { + grpc_cq_mark_non_listening_server_cq(cq); + } else { + GRPC_CQ_INTERNAL_REF(cq, "server"); + n = server->cq_count++; + server->cqs = gpr_realloc( + server->cqs, server->cq_count * sizeof(grpc_completion_queue *)); + server->cqs[n] = cq; + } +} + +void grpc_server_register_completion_queue(grpc_server *server, + grpc_completion_queue *cq, + void *reserved) { + GRPC_API_TRACE( + "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3, + (server, cq, reserved)); + register_completion_queue(server, cq, false, reserved); +} + +void grpc_server_register_non_listening_completion_queue( + grpc_server *server, grpc_completion_queue *cq, void *reserved) { + GRPC_API_TRACE( + "grpc_server_register_non_listening_completion_queue(server=%p, cq=%p, " + "reserved=%p)", + 3, (server, cq, reserved)); + register_completion_queue(server, cq, true, reserved); } grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { @@ -1018,7 +1040,6 @@ void grpc_server_start(grpc_server *server) { void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, grpc_transport *transport, const grpc_channel_args *args) { - size_t i; size_t num_registered_methods; size_t alloc; registered_method *rm; @@ -1033,12 +1054,6 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, uint32_t max_probes = 0; grpc_transport_op op; - for (i = 0; i < s->cq_count; i++) { - memset(&op, 0, sizeof(op)); - op.bind_pollset = grpc_cq_pollset(s->cqs[i]); - grpc_transport_perform_op(exec_ctx, transport, &op); - } - channel = grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport); chand = (channel_data *)grpc_channel_stack_element( |