diff options
author | Craig Tiller <ctiller@google.com> | 2015-02-05 11:42:58 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-02-05 11:42:58 -0800 |
commit | 86d9159356d5d23981f122c3bd4ad912203666ca (patch) | |
tree | 1cf0b4622ada6275370277a748438102e44188f4 /src/core/surface | |
parent | 24fc2c4c5ddc56a678f17170ac4eec4ec570c780 (diff) | |
parent | 6b9afb153a82c921c7e80365a4e129c462c0ebad (diff) |
Merge github.com:google/grpc into async-api-new
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/byte_buffer_queue.c | 12 | ||||
-rw-r--r-- | src/core/surface/call.c | 27 | ||||
-rw-r--r-- | src/core/surface/channel.c | 11 | ||||
-rw-r--r-- | src/core/surface/channel.h | 2 | ||||
-rw-r--r-- | src/core/surface/client.c | 3 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 3 | ||||
-rw-r--r-- | src/core/surface/server.c | 2 |
7 files changed, 43 insertions, 17 deletions
diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c index dc280a60c5..9709a665ba 100644 --- a/src/core/surface/byte_buffer_queue.c +++ b/src/core/surface/byte_buffer_queue.c @@ -35,7 +35,13 @@ #include <grpc/support/alloc.h> #include <grpc/support/useful.h> -static void bba_destroy(grpc_bbq_array *array) { gpr_free(array->data); } +static void bba_destroy(grpc_bbq_array *array, size_t start_pos) { + size_t i; + for (i = start_pos; i < array->count; i++) { + grpc_byte_buffer_destroy(array->data[i]); + } + gpr_free(array->data); +} /* Append an operation to an array, expanding as needed */ static void bba_push(grpc_bbq_array *a, grpc_byte_buffer *buffer) { @@ -47,8 +53,8 @@ static void bba_push(grpc_bbq_array *a, grpc_byte_buffer *buffer) { } void grpc_bbq_destroy(grpc_byte_buffer_queue *q) { - bba_destroy(&q->filling); - bba_destroy(&q->draining); + bba_destroy(&q->filling, 0); + bba_destroy(&q->draining, q->drain_pos); } int grpc_bbq_empty(grpc_byte_buffer_queue *q) { diff --git a/src/core/surface/call.c b/src/core/surface/call.c index e0bfec0018..1657edb3de 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -146,10 +146,10 @@ struct grpc_call { /* Active ioreqs. request_set and request_data contain one element per active ioreq operation. - + request_set[op] is an integer specifying a set of operations to which the request belongs: - - if it is < GRPC_IOREQ_OP_COUNT, then this operation is pending + - if it is < GRPC_IOREQ_OP_COUNT, then this operation is pending completion, and the integer represents to which group of operations the ioreq belongs. Each group is represented by one master, and the integer in request_set is an index into masters to find the master @@ -158,16 +158,17 @@ struct grpc_call { started - finally, if request_set[op] is REQSET_DONE, then the operation is complete and unavailable to be started again - + request_data[op] is the request data as supplied by the initiator of a request, and is valid iff request_set[op] <= GRPC_IOREQ_OP_COUNT. The set fields are as per the request type specified by op. - Finally, one element of masters[op] is set per active _group_ of ioreq + Finally, one element of masters is set per active _set_ of ioreq operations. It describes work left outstanding, result status, and what work to perform upon operation completion. As one ioreq of each op type can be active at once, by convention we choose the first element - of a the group to be the master. This allows constant time allocation + of the group to be the master -- ie the master of in-progress operation + op is masters[request_set[op]]. This allows constant time allocation and a strong upper bound of a count of masters to be calculated. */ gpr_uint8 request_set[GRPC_IOREQ_OP_COUNT]; grpc_ioreq_data request_data[GRPC_IOREQ_OP_COUNT]; @@ -199,7 +200,7 @@ struct grpc_call { /* Call refcount - to keep the call alive during asynchronous operations */ gpr_refcount internal_refcount; - /* Data that the legacy api needs to track. To be deleted at some point + /* Data that the legacy api needs to track. To be deleted at some point soon */ legacy_state *legacy_state; }; @@ -278,6 +279,7 @@ static void destroy_call(void *call, int ignored_success) { if (c->legacy_state) { destroy_legacy_state(c->legacy_state); } + grpc_bbq_destroy(&c->incoming_queue); gpr_free(c); } @@ -338,8 +340,10 @@ static void unlock(grpc_call *call) { send_action sa = SEND_NOTHING; completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; int num_completed_requests = call->num_completed_requests; - int need_more_data = call->need_more_data && - !is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA); + int need_more_data = + call->need_more_data && + !call->sending && + call->write_state >= WRITE_STATE_STARTED; int i; if (need_more_data) { @@ -1065,6 +1069,8 @@ struct legacy_state { char *details; grpc_status_code status; + char *send_details; + size_t msg_in_read_idx; grpc_byte_buffer *msg_in; @@ -1090,6 +1096,8 @@ static void destroy_legacy_state(legacy_state *ls) { } gpr_free(ls->initial_md_in.metadata); gpr_free(ls->trailing_md_in.metadata); + gpr_free(ls->details); + gpr_free(ls->send_details); gpr_free(ls); } @@ -1339,8 +1347,7 @@ grpc_call_error grpc_call_start_write_status_old(grpc_call *call, reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer]; reqs[1].op = GRPC_IOREQ_SEND_STATUS; reqs[1].data.send_status.code = status; - /* MEMLEAK */ - reqs[1].data.send_status.details = gpr_strdup(details); + reqs[1].data.send_status.details = ls->send_details = gpr_strdup(details); reqs[2].op = GRPC_IOREQ_SEND_CLOSE; err = start_ioreq(call, reqs, 3, finish_finish, tag); unlock(call); diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 6d47787b7c..514073ce0b 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -52,6 +52,9 @@ struct grpc_channel { }; #define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1)) +#define CHANNEL_FROM_CHANNEL_STACK(channel_stack) (((grpc_channel *)(channel_stack)) - 1) +#define CHANNEL_FROM_TOP_ELEM(top_elem) \ + CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem)) grpc_channel *grpc_channel_create_from_filters( const grpc_channel_filter **filters, size_t num_filters, @@ -60,8 +63,8 @@ grpc_channel *grpc_channel_create_from_filters( sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters); grpc_channel *channel = gpr_malloc(size); channel->is_client = is_client; - /* decremented by grpc_channel_destroy */ - gpr_ref_init(&channel->refs, 1); + /* decremented by grpc_channel_destroy, and grpc_client_channel_closed if is_client */ + gpr_ref_init(&channel->refs, 1 + is_client); channel->metadata_context = mdctx; channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message"); @@ -166,6 +169,10 @@ void grpc_channel_destroy(grpc_channel *channel) { grpc_channel_internal_unref(channel); } +void grpc_client_channel_closed(grpc_channel_element *elem) { + grpc_channel_internal_unref(CHANNEL_FROM_TOP_ELEM(elem)); +} + grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel) { return CHANNEL_STACK_FROM_CHANNEL(channel); } diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index b3ea2ede40..ff9bbc237e 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -45,6 +45,8 @@ grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel); grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); +void grpc_client_channel_closed(grpc_channel_element *elem); + void grpc_channel_internal_ref(grpc_channel *channel); void grpc_channel_internal_unref(grpc_channel *channel); diff --git a/src/core/surface/client.c b/src/core/surface/client.c index fa63e855cc..64ee9d51e8 100644 --- a/src/core/surface/client.c +++ b/src/core/surface/client.c @@ -34,6 +34,7 @@ #include "src/core/surface/client.h" #include "src/core/surface/call.h" +#include "src/core/surface/channel.h" #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -87,7 +88,7 @@ static void channel_op(grpc_channel_element *elem, gpr_log(GPR_ERROR, "Client cannot accept new calls"); break; case GRPC_TRANSPORT_CLOSED: - gpr_log(GPR_ERROR, "Transport closed"); + grpc_client_channel_closed(elem); break; case GRPC_TRANSPORT_GOAWAY: gpr_slice_unref(op->data.goaway.message); diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 2f5eff5584..411dbabfd3 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -76,6 +76,9 @@ static void channel_op(grpc_channel_element *elem, case GRPC_CHANNEL_GOAWAY: gpr_slice_unref(op->data.goaway.message); break; + case GRPC_CHANNEL_DISCONNECT: + grpc_client_channel_closed(elem); + break; default: break; } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index d7e1dcd800..c0c524ad8d 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -265,7 +265,6 @@ static void stream_closed(grpc_call_element *elem) { gpr_mu_lock(&chand->server->mu); switch (calld->state) { case ACTIVATED: - grpc_call_stream_closed(elem); break; case PENDING: call_list_remove(chand->server, calld, PENDING_START); @@ -278,6 +277,7 @@ static void stream_closed(grpc_call_element *elem) { break; } gpr_mu_unlock(&chand->server->mu); + grpc_call_stream_closed(elem); } static void read_closed(grpc_call_element *elem) { |