diff options
author | Craig Tiller <ctiller@google.com> | 2015-02-06 12:49:57 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-02-06 12:49:57 -0800 |
commit | 62d424a1a4ce55a66a7565795db17ff898bc05f5 (patch) | |
tree | 89b6e34c94fb6144c705ecec0751121fa6d0917b /src/core/surface | |
parent | fd6f5079b8e11cb604664994aec08423fca6f836 (diff) | |
parent | 4acf81e90246dffa806de6ad1a830d0e62cf727f (diff) |
Merge github.com:google/grpc into buffer
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/byte_buffer_queue.c | 12 | ||||
-rw-r--r-- | src/core/surface/call.c | 14 | ||||
-rw-r--r-- | src/core/surface/channel.c | 11 | ||||
-rw-r--r-- | src/core/surface/channel.h | 2 | ||||
-rw-r--r-- | src/core/surface/client.c | 3 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 3 | ||||
-rw-r--r-- | src/core/surface/server.c | 2 |
7 files changed, 36 insertions, 11 deletions
diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c index dc280a60c5..9709a665ba 100644 --- a/src/core/surface/byte_buffer_queue.c +++ b/src/core/surface/byte_buffer_queue.c @@ -35,7 +35,13 @@ #include <grpc/support/alloc.h> #include <grpc/support/useful.h> -static void bba_destroy(grpc_bbq_array *array) { gpr_free(array->data); } +static void bba_destroy(grpc_bbq_array *array, size_t start_pos) { + size_t i; + for (i = start_pos; i < array->count; i++) { + grpc_byte_buffer_destroy(array->data[i]); + } + gpr_free(array->data); +} /* Append an operation to an array, expanding as needed */ static void bba_push(grpc_bbq_array *a, grpc_byte_buffer *buffer) { @@ -47,8 +53,8 @@ static void bba_push(grpc_bbq_array *a, grpc_byte_buffer *buffer) { } void grpc_bbq_destroy(grpc_byte_buffer_queue *q) { - bba_destroy(&q->filling); - bba_destroy(&q->draining); + bba_destroy(&q->filling, 0); + bba_destroy(&q->draining, q->drain_pos); } int grpc_bbq_empty(grpc_byte_buffer_queue *q) { diff --git a/src/core/surface/call.c b/src/core/surface/call.c index feb3926281..0af524cead 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -275,6 +275,7 @@ static void destroy_call(void *call, int ignored_success) { if (c->legacy_state) { destroy_legacy_state(c->legacy_state); } + grpc_bbq_destroy(&c->incoming_queue); gpr_free(c); } @@ -335,8 +336,10 @@ static void unlock(grpc_call *call) { send_action sa = SEND_NOTHING; completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; int num_completed_requests = call->num_completed_requests; - int need_more_data = call->need_more_data && - !is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA); + int need_more_data = + call->need_more_data && + !call->sending && + call->write_state >= WRITE_STATE_STARTED; int i; if (need_more_data) { @@ -960,6 +963,8 @@ struct legacy_state { char *details; grpc_status_code status; + char *send_details; + size_t msg_in_read_idx; grpc_byte_buffer *msg_in; @@ -985,6 +990,8 @@ static void destroy_legacy_state(legacy_state *ls) { } gpr_free(ls->initial_md_in.metadata); gpr_free(ls->trailing_md_in.metadata); + gpr_free(ls->details); + gpr_free(ls->send_details); gpr_free(ls); } @@ -1233,8 +1240,7 @@ grpc_call_error grpc_call_start_write_status_old(grpc_call *call, reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer]; reqs[1].op = GRPC_IOREQ_SEND_STATUS; reqs[1].data.send_status.code = status; - /* MEMLEAK */ - reqs[1].data.send_status.details = gpr_strdup(details); + reqs[1].data.send_status.details = ls->send_details = gpr_strdup(details); reqs[2].op = GRPC_IOREQ_SEND_CLOSE; err = start_ioreq(call, reqs, 3, finish_finish, tag); unlock(call); diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index c33ea923e8..b33bd7b357 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -52,6 +52,9 @@ struct grpc_channel { }; #define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1)) +#define CHANNEL_FROM_CHANNEL_STACK(channel_stack) (((grpc_channel *)(channel_stack)) - 1) +#define CHANNEL_FROM_TOP_ELEM(top_elem) \ + CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem)) grpc_channel *grpc_channel_create_from_filters( const grpc_channel_filter **filters, size_t num_filters, @@ -60,8 +63,8 @@ grpc_channel *grpc_channel_create_from_filters( sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters); grpc_channel *channel = gpr_malloc(size); channel->is_client = is_client; - /* decremented by grpc_channel_destroy */ - gpr_ref_init(&channel->refs, 1); + /* decremented by grpc_channel_destroy, and grpc_client_channel_closed if is_client */ + gpr_ref_init(&channel->refs, 1 + is_client); channel->metadata_context = mdctx; channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message"); @@ -158,6 +161,10 @@ void grpc_channel_destroy(grpc_channel *channel) { grpc_channel_internal_unref(channel); } +void grpc_client_channel_closed(grpc_channel_element *elem) { + grpc_channel_internal_unref(CHANNEL_FROM_TOP_ELEM(elem)); +} + grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel) { return CHANNEL_STACK_FROM_CHANNEL(channel); } diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index b3ea2ede40..ff9bbc237e 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -45,6 +45,8 @@ grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel); grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); +void grpc_client_channel_closed(grpc_channel_element *elem); + void grpc_channel_internal_ref(grpc_channel *channel); void grpc_channel_internal_unref(grpc_channel *channel); diff --git a/src/core/surface/client.c b/src/core/surface/client.c index fa63e855cc..64ee9d51e8 100644 --- a/src/core/surface/client.c +++ b/src/core/surface/client.c @@ -34,6 +34,7 @@ #include "src/core/surface/client.h" #include "src/core/surface/call.h" +#include "src/core/surface/channel.h" #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -87,7 +88,7 @@ static void channel_op(grpc_channel_element *elem, gpr_log(GPR_ERROR, "Client cannot accept new calls"); break; case GRPC_TRANSPORT_CLOSED: - gpr_log(GPR_ERROR, "Transport closed"); + grpc_client_channel_closed(elem); break; case GRPC_TRANSPORT_GOAWAY: gpr_slice_unref(op->data.goaway.message); diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 2f5eff5584..411dbabfd3 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -76,6 +76,9 @@ static void channel_op(grpc_channel_element *elem, case GRPC_CHANNEL_GOAWAY: gpr_slice_unref(op->data.goaway.message); break; + case GRPC_CHANNEL_DISCONNECT: + grpc_client_channel_closed(elem); + break; default: break; } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index a057694f13..455bd4337f 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -258,7 +258,6 @@ static void stream_closed(grpc_call_element *elem) { gpr_mu_lock(&chand->server->mu); switch (calld->state) { case ACTIVATED: - grpc_call_stream_closed(elem); break; case PENDING: call_list_remove(chand->server, calld, PENDING_START); @@ -271,6 +270,7 @@ static void stream_closed(grpc_call_element *elem) { break; } gpr_mu_unlock(&chand->server->mu); + grpc_call_stream_closed(elem); } static void read_closed(grpc_call_element *elem) { |