aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/surface/server.c77
-rw-r--r--src/cpp/common/call.cc21
-rw-r--r--src/cpp/server/server.cc12
3 files changed, 80 insertions, 30 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 22588194ea..169fb1a781 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -53,7 +53,8 @@ typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
typedef struct listener {
void *arg;
- void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets, size_t pollset_count);
+ void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets,
+ size_t pollset_count);
void (*destroy)(grpc_server *server, void *arg);
struct listener *next;
} listener;
@@ -129,7 +130,7 @@ struct grpc_server {
const grpc_channel_filter **channel_filters;
grpc_channel_args *channel_args;
grpc_completion_queue *unregistered_cq;
-
+
grpc_completion_queue **cqs;
grpc_pollset **pollsets;
size_t cq_count;
@@ -257,11 +258,21 @@ static void server_ref(grpc_server *server) {
}
static void server_unref(grpc_server *server) {
+ registered_method *rm;
if (gpr_unref(&server->internal_refcount)) {
grpc_channel_args_destroy(server->channel_args);
gpr_mu_destroy(&server->mu);
gpr_free(server->channel_filters);
requested_call_array_destroy(&server->requested_calls);
+ while ((rm = server->registered_methods) != NULL) {
+ server->registered_methods = rm->next;
+ gpr_free(rm->method);
+ gpr_free(rm->host);
+ requested_call_array_destroy(&rm->requested);
+ gpr_free(rm);
+ }
+ gpr_free(server->cqs);
+ gpr_free(server->pollsets);
gpr_free(server);
}
}
@@ -511,7 +522,8 @@ static void destroy_call_elem(grpc_call_element *elem) {
if (chand->server->shutdown && chand->server->have_shutdown_tag &&
chand->server->lists[ALL_CALLS] == NULL) {
for (i = 0; i < chand->server->cq_count; i++) {
- grpc_cq_end_server_shutdown(chand->server->cqs[i], chand->server->shutdown_tag);
+ grpc_cq_end_server_shutdown(chand->server->cqs[i],
+ chand->server->shutdown_tag);
}
}
gpr_mu_unlock(&chand->server->mu);
@@ -547,7 +559,19 @@ static void init_channel_elem(grpc_channel_element *elem,
}
static void destroy_channel_elem(grpc_channel_element *elem) {
+ size_t i;
channel_data *chand = elem->channel_data;
+ if (chand->registered_methods) {
+ for (i = 0; i < chand->registered_method_slots; i++) {
+ if (chand->registered_methods[i].method) {
+ grpc_mdstr_unref(chand->registered_methods[i].method);
+ }
+ if (chand->registered_methods[i].host) {
+ grpc_mdstr_unref(chand->registered_methods[i].host);
+ }
+ }
+ gpr_free(chand->registered_methods);
+ }
if (chand->server) {
gpr_mu_lock(&chand->server->mu);
chand->next->prev = chand->prev;
@@ -571,7 +595,8 @@ static void addcq(grpc_server *server, grpc_completion_queue *cq) {
if (server->cqs[i] == cq) return;
}
n = server->cq_count++;
- server->cqs = gpr_realloc(server->cqs, server->cq_count * sizeof(grpc_completion_queue*));
+ server->cqs = gpr_realloc(server->cqs,
+ server->cq_count * sizeof(grpc_completion_queue *));
server->cqs[n] = cq;
}
@@ -624,7 +649,8 @@ static int streq(const char *a, const char *b) {
}
void *grpc_server_register_method(grpc_server *server, const char *method,
- const char *host, grpc_completion_queue *cq_new_rpc) {
+ const char *host,
+ grpc_completion_queue *cq_new_rpc) {
registered_method *m;
if (!method) {
gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__);
@@ -652,7 +678,7 @@ void grpc_server_start(grpc_server *server) {
listener *l;
size_t i;
- server->pollsets = gpr_malloc(sizeof(grpc_pollset*) * server->cq_count);
+ server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
}
@@ -745,7 +771,7 @@ grpc_transport_setup_result grpc_server_setup_transport(
}
static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
- void *shutdown_tag) {
+ void *shutdown_tag) {
listener *l;
requested_call_array requested_calls;
channel_data **channels;
@@ -781,12 +807,19 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
requested_calls = server->requested_calls;
memset(&server->requested_calls, 0, sizeof(server->requested_calls));
for (rm = server->registered_methods; rm; rm = rm->next) {
- if (requested_calls.count + rm->requested.count > requested_calls.capacity) {
- requested_calls.capacity = GPR_MAX(requested_calls.count + rm->requested.count, 2 * requested_calls.capacity);
- requested_calls.calls = gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) * requested_calls.capacity);
+ if (requested_calls.count + rm->requested.count >
+ requested_calls.capacity) {
+ requested_calls.capacity =
+ GPR_MAX(requested_calls.count + rm->requested.count,
+ 2 * requested_calls.capacity);
+ requested_calls.calls =
+ gpr_realloc(requested_calls.calls, sizeof(*requested_calls.calls) *
+ requested_calls.capacity);
}
- memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls, sizeof(*requested_calls.calls) * rm->requested.count);
+ memcpy(requested_calls.calls + requested_calls.count, rm->requested.calls,
+ sizeof(*requested_calls.calls) * rm->requested.count);
requested_calls.count += rm->requested.count;
+ gpr_free(rm->requested.calls);
memset(&rm->requested, 0, sizeof(rm->requested));
}
@@ -857,7 +890,8 @@ void grpc_server_destroy(grpc_server *server) {
void grpc_server_add_listener(grpc_server *server, void *arg,
void (*start)(grpc_server *server, void *arg,
- grpc_pollset **pollsets, size_t pollset_count),
+ grpc_pollset **pollsets,
+ size_t pollset_count),
void (*destroy)(grpc_server *server, void *arg)) {
listener *l = gpr_malloc(sizeof(listener));
l->arg = arg;
@@ -920,10 +954,9 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
}
grpc_call_error grpc_server_request_registered_call(
- grpc_server *server, void *rm, grpc_call **call,
- gpr_timespec *deadline, grpc_metadata_array *initial_metadata,
- grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bind,
- void *tag) {
+ grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline,
+ grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
+ grpc_completion_queue *cq_bind, void *tag) {
requested_call rc;
registered_method *registered_method = rm;
grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE);
@@ -1025,20 +1058,20 @@ static void begin_call(grpc_server *server, call_data *calld,
static void fail_call(grpc_server *server, requested_call *rc) {
switch (rc->type) {
case LEGACY_CALL:
- grpc_cq_end_new_rpc(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL, NULL,
- NULL, gpr_inf_past, 0, NULL);
+ grpc_cq_end_new_rpc(server->unregistered_cq, rc->tag, NULL, do_nothing,
+ NULL, NULL, NULL, gpr_inf_past, 0, NULL);
break;
case BATCH_CALL:
*rc->data.batch.call = NULL;
rc->data.batch.initial_metadata->count = 0;
- grpc_cq_end_op_complete(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL,
- GRPC_OP_ERROR);
+ grpc_cq_end_op_complete(server->unregistered_cq, rc->tag, NULL,
+ do_nothing, NULL, GRPC_OP_ERROR);
break;
case REGISTERED_CALL:
*rc->data.registered.call = NULL;
rc->data.registered.initial_metadata->count = 0;
- grpc_cq_end_op_complete(rc->data.registered.registered_method->cq, rc->tag, NULL, do_nothing, NULL,
- GRPC_OP_ERROR);
+ grpc_cq_end_op_complete(rc->data.registered.registered_method->cq,
+ rc->tag, NULL, do_nothing, NULL, GRPC_OP_ERROR);
break;
}
}
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
index aae69084eb..d706ec45e5 100644
--- a/src/cpp/common/call.cc
+++ b/src/cpp/common/call.cc
@@ -48,8 +48,7 @@ void CallOpBuffer::Reset(void* next_return_tag) {
gpr_free(initial_metadata_);
recv_initial_metadata_ = nullptr;
- gpr_free(recv_initial_metadata_arr_.metadata);
- recv_initial_metadata_arr_ = {0, 0, nullptr};
+ recv_initial_metadata_arr_.count = 0;
send_message_ = nullptr;
if (send_message_buf_) {
@@ -68,13 +67,9 @@ void CallOpBuffer::Reset(void* next_return_tag) {
recv_trailing_metadata_ = nullptr;
recv_status_ = nullptr;
- gpr_free(recv_trailing_metadata_arr_.metadata);
- recv_trailing_metadata_arr_ = {0, 0, nullptr};
+ recv_trailing_metadata_arr_.count = 0;
status_code_ = GRPC_STATUS_OK;
- gpr_free(status_details_);
- status_details_ = nullptr;
- status_details_capacity_ = 0;
send_status_ = nullptr;
trailing_metadata_count_ = 0;
@@ -83,6 +78,18 @@ void CallOpBuffer::Reset(void* next_return_tag) {
recv_closed_ = nullptr;
}
+CallOpBuffer::~CallOpBuffer() {
+ gpr_free(status_details_);
+ gpr_free(recv_initial_metadata_arr_.metadata);
+ gpr_free(recv_trailing_metadata_arr_.metadata);
+ if (recv_message_buf_) {
+ grpc_byte_buffer_destroy(recv_message_buf_);
+ }
+ if (send_message_buf_) {
+ grpc_byte_buffer_destroy(send_message_buf_);
+ }
+}
+
namespace {
// TODO(yangg) if the map is changed before we send, the pointers will be a
// mess. Make sure it does not happen.
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 7d19834799..294eeae585 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -163,7 +163,11 @@ class Server::SyncRequest final : public CompletionQueueTag {
this));
}
- void FinalizeResult(void** tag, bool* status) override {}
+ void FinalizeResult(void** tag, bool* status) override {
+ if (!*status) {
+ grpc_completion_queue_destroy(cq_);
+ }
+ }
class CallData final {
public:
@@ -182,6 +186,12 @@ class Server::SyncRequest final : public CompletionQueueTag {
mrd->request_metadata_.count = 0;
}
+ ~CallData() {
+ if (has_request_payload_ && request_payload_) {
+ grpc_byte_buffer_destroy(request_payload_);
+ }
+ }
+
void Run() {
std::unique_ptr<google::protobuf::Message> req;
std::unique_ptr<google::protobuf::Message> res;