diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/surface/server.c | 77 | ||||
-rw-r--r-- | src/cpp/common/call.cc | 21 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 12 |
3 files changed, 80 insertions, 30 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 22588194ea..169fb1a781 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -53,7 +53,8 @@ typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list; typedef struct listener { void *arg; - void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets, size_t pollset_count); + void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets, + size_t pollset_count); void (*destroy)(grpc_server *server, void *arg); struct listener *next; } listener; @@ -129,7 +130,7 @@ struct grpc_server { const grpc_channel_filter **channel_filters; grpc_channel_args *channel_args; grpc_completion_queue *unregistered_cq; - + grpc_completion_queue **cqs; grpc_pollset **pollsets; size_t cq_count; @@ -257,11 +258,21 @@ static void server_ref(grpc_server *server) { } static void server_unref(grpc_server *server) { + registered_method *rm; if (gpr_unref(&server->internal_refcount)) { grpc_channel_args_destroy(server->channel_args); gpr_mu_destroy(&server->mu); gpr_free(server->channel_filters); requested_call_array_destroy(&server->requested_calls); + while ((rm = server->registered_methods) != NULL) { + server->registered_methods = rm->next; + gpr_free(rm->method); + gpr_free(rm->host); + requested_call_array_destroy(&rm->requested); + gpr_free(rm); + } + gpr_free(server->cqs); + gpr_free(server->pollsets); gpr_free(server); } } @@ -511,7 +522,8 @@ static void destroy_call_elem(grpc_call_element *elem) { if (chand->server->shutdown && chand->server->have_shutdown_tag && chand->server->lists[ALL_CALLS] == NULL) { for (i = 0; i < chand->server->cq_count; i++) { - grpc_cq_end_server_shutdown(chand->server->cqs[i], chand->server->shutdown_tag); + grpc_cq_end_server_shutdown(chand->server->cqs[i], + chand->server->shutdown_tag); } } gpr_mu_unlock(&chand->server->mu); @@ -547,7 +559,19 @@ static void init_channel_elem(grpc_channel_element *elem, } static void destroy_channel_elem(grpc_channel_element *elem) { + size_t i; channel_data *chand = elem->channel_data; + if (chand->registered_methods) { + for (i = 0; i < chand->registered_method_slots; i++) { + if (chand->registered_methods[i].method) { + grpc_mdstr_unref(chand->registered_methods[i].method); + } + if (chand->registered_methods[i].host) { + grpc_mdstr_unref(chand->registered_methods[i].host); + } + } + gpr_free(chand->registered_methods); + } if (chand->server) { gpr_mu_lock(&chand->server->mu); chand->next->prev = chand->prev; @@ -571,7 +595,8 @@ static void addcq(grpc_server *server, grpc_completion_queue *cq) { if (server->cqs[i] == cq) return; } n = server->cq_count++; - server->cqs = gpr_realloc(server->cqs, server->cq_count * sizeof(grpc_completion_queue*)); + server->cqs = gpr_realloc(server->cqs, + server->cq_count * sizeof(grpc_completion_queue *)); server->cqs[n] = cq; } @@ -624,7 +649,8 @@ static int streq(const char *a, const char *b) { } void *grpc_server_register_method(grpc_server *server, const char *method, - const char *host, grpc_completion_queue *cq_new_rpc) { + const char *host, + grpc_completion_queue *cq_new_rpc) { registered_method *m; if (!method) { gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__); @@ -652,7 +678,7 @@ void grpc_server_start(grpc_server *server) { listener *l; size_t i; - server->pollsets = gpr_malloc(sizeof(grpc_pollset*) * server->cq_count); + server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); for (i = 0; i < server->cq_count; i++) { server->pollsets[i] = grpc_cq_pollset(server->cqs[i]); } @@ -745,7 +771,7 @@ grpc_transport_setup_result grpc_server_setup_transport( } static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, - void *shutdown_tag) { + void *shutdown_tag) { listener *l; requested_call_array requested_calls; channel_data **channels; @@ -781,12 +807,19 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, requested_calls = server->requested_calls; memset(&server->requested_calls, 0, sizeof(server->requested_calls)); for (rm = server->registered_methods; rm; rm = rm->next) { - if (requested_calls.count + rm->requested.count > requested_calls.capacity) { - requested_calls.capacity = GPR_MAX(requested_calls.count + rm->requested.count, 2 * requested_calls.capacity); - requested_calls.calls = gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) * requested_calls.capacity); + if (requested_calls.count + rm->requested.count > + requested_calls.capacity) { + requested_calls.capacity = + GPR_MAX(requested_calls.count + rm->requested.count, + 2 * requested_calls.capacity); + requested_calls.calls = + gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) * + requested_calls.capacity); } - memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls, sizeof(*requested_calls.calls) * rm->requested.count); + memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls, + sizeof(*requested_calls.calls) * rm->requested.count); requested_calls.count += rm->requested.count; + gpr_free(rm->requested.calls); memset(&rm->requested, 0, sizeof(rm->requested)); } @@ -857,7 +890,8 @@ void grpc_server_destroy(grpc_server *server) { void grpc_server_add_listener(grpc_server *server, void *arg, void (*start)(grpc_server *server, void *arg, - grpc_pollset **pollsets, size_t pollset_count), + grpc_pollset **pollsets, + size_t pollset_count), void (*destroy)(grpc_server *server, void *arg)) { listener *l = gpr_malloc(sizeof(listener)); l->arg = arg; @@ -920,10 +954,9 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, } grpc_call_error grpc_server_request_registered_call( - grpc_server *server, void *rm, grpc_call **call, - gpr_timespec *deadline, grpc_metadata_array *initial_metadata, - grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bind, - void *tag) { + grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline, + grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload, + grpc_completion_queue *cq_bind, void *tag) { requested_call rc; registered_method *registered_method = rm; grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE); @@ -1025,20 +1058,20 @@ static void begin_call(grpc_server *server, call_data *calld, static void fail_call(grpc_server *server, requested_call *rc) { switch (rc->type) { case LEGACY_CALL: - grpc_cq_end_new_rpc(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL, NULL, - NULL, gpr_inf_past, 0, NULL); + grpc_cq_end_new_rpc(server->unregistered_cq, rc->tag, NULL, do_nothing, + NULL, NULL, NULL, gpr_inf_past, 0, NULL); break; case BATCH_CALL: *rc->data.batch.call = NULL; rc->data.batch.initial_metadata->count = 0; - grpc_cq_end_op_complete(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL, - GRPC_OP_ERROR); + grpc_cq_end_op_complete(server->unregistered_cq, rc->tag, NULL, + do_nothing, NULL, GRPC_OP_ERROR); break; case REGISTERED_CALL: *rc->data.registered.call = NULL; rc->data.registered.initial_metadata->count = 0; - grpc_cq_end_op_complete(rc->data.registered.registered_method->cq, rc->tag, NULL, do_nothing, NULL, - GRPC_OP_ERROR); + grpc_cq_end_op_complete(rc->data.registered.registered_method->cq, + rc->tag, NULL, do_nothing, NULL, GRPC_OP_ERROR); break; } } diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index aae69084eb..d706ec45e5 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -48,8 +48,7 @@ void CallOpBuffer::Reset(void* next_return_tag) { gpr_free(initial_metadata_); recv_initial_metadata_ = nullptr; - gpr_free(recv_initial_metadata_arr_.metadata); - recv_initial_metadata_arr_ = {0, 0, nullptr}; + recv_initial_metadata_arr_.count = 0; send_message_ = nullptr; if (send_message_buf_) { @@ -68,13 +67,9 @@ void CallOpBuffer::Reset(void* next_return_tag) { recv_trailing_metadata_ = nullptr; recv_status_ = nullptr; - gpr_free(recv_trailing_metadata_arr_.metadata); - recv_trailing_metadata_arr_ = {0, 0, nullptr}; + recv_trailing_metadata_arr_.count = 0; status_code_ = GRPC_STATUS_OK; - gpr_free(status_details_); - status_details_ = nullptr; - status_details_capacity_ = 0; send_status_ = nullptr; trailing_metadata_count_ = 0; @@ -83,6 +78,18 @@ void CallOpBuffer::Reset(void* next_return_tag) { recv_closed_ = nullptr; } +CallOpBuffer::~CallOpBuffer() { + gpr_free(status_details_); + gpr_free(recv_initial_metadata_arr_.metadata); + gpr_free(recv_trailing_metadata_arr_.metadata); + if (recv_message_buf_) { + grpc_byte_buffer_destroy(recv_message_buf_); + } + if (send_message_buf_) { + grpc_byte_buffer_destroy(send_message_buf_); + } +} + namespace { // TODO(yangg) if the map is changed before we send, the pointers will be a // mess. Make sure it does not happen. diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 7d19834799..294eeae585 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -163,7 +163,11 @@ class Server::SyncRequest final : public CompletionQueueTag { this)); } - void FinalizeResult(void** tag, bool* status) override {} + void FinalizeResult(void** tag, bool* status) override { + if (!*status) { + grpc_completion_queue_destroy(cq_); + } + } class CallData final { public: @@ -182,6 +186,12 @@ class Server::SyncRequest final : public CompletionQueueTag { mrd->request_metadata_.count = 0; } + ~CallData() { + if (has_request_payload_ && request_payload_) { + grpc_byte_buffer_destroy(request_payload_); + } + } + void Run() { std::unique_ptr<google::protobuf::Message> req; std::unique_ptr<google::protobuf::Message> res; |