aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-02-06 12:49:57 -0800
committerGravatar Craig Tiller <ctiller@google.com>2015-02-06 12:49:57 -0800
commit62d424a1a4ce55a66a7565795db17ff898bc05f5 (patch)
tree89b6e34c94fb6144c705ecec0751121fa6d0917b /src/core/surface
parentfd6f5079b8e11cb604664994aec08423fca6f836 (diff)
parent4acf81e90246dffa806de6ad1a830d0e62cf727f (diff)
Merge github.com:google/grpc into buffer
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/byte_buffer_queue.c12
-rw-r--r--src/core/surface/call.c14
-rw-r--r--src/core/surface/channel.c11
-rw-r--r--src/core/surface/channel.h2
-rw-r--r--src/core/surface/client.c3
-rw-r--r--src/core/surface/lame_client.c3
-rw-r--r--src/core/surface/server.c2
7 files changed, 36 insertions, 11 deletions
diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c
index dc280a60c5..9709a665ba 100644
--- a/src/core/surface/byte_buffer_queue.c
+++ b/src/core/surface/byte_buffer_queue.c
@@ -35,7 +35,13 @@
#include <grpc/support/alloc.h>
#include <grpc/support/useful.h>
-static void bba_destroy(grpc_bbq_array *array) { gpr_free(array->data); }
+static void bba_destroy(grpc_bbq_array *array, size_t start_pos) {
+ size_t i;
+ for (i = start_pos; i < array->count; i++) {
+ grpc_byte_buffer_destroy(array->data[i]);
+ }
+ gpr_free(array->data);
+}
/* Append an operation to an array, expanding as needed */
static void bba_push(grpc_bbq_array *a, grpc_byte_buffer *buffer) {
@@ -47,8 +53,8 @@ static void bba_push(grpc_bbq_array *a, grpc_byte_buffer *buffer) {
}
void grpc_bbq_destroy(grpc_byte_buffer_queue *q) {
- bba_destroy(&q->filling);
- bba_destroy(&q->draining);
+ bba_destroy(&q->filling, 0);
+ bba_destroy(&q->draining, q->drain_pos);
}
int grpc_bbq_empty(grpc_byte_buffer_queue *q) {
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index feb3926281..0af524cead 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -275,6 +275,7 @@ static void destroy_call(void *call, int ignored_success) {
if (c->legacy_state) {
destroy_legacy_state(c->legacy_state);
}
+ grpc_bbq_destroy(&c->incoming_queue);
gpr_free(c);
}
@@ -335,8 +336,10 @@ static void unlock(grpc_call *call) {
send_action sa = SEND_NOTHING;
completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
int num_completed_requests = call->num_completed_requests;
- int need_more_data = call->need_more_data &&
- !is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA);
+ int need_more_data =
+ call->need_more_data &&
+ !call->sending &&
+ call->write_state >= WRITE_STATE_STARTED;
int i;
if (need_more_data) {
@@ -960,6 +963,8 @@ struct legacy_state {
char *details;
grpc_status_code status;
+ char *send_details;
+
size_t msg_in_read_idx;
grpc_byte_buffer *msg_in;
@@ -985,6 +990,8 @@ static void destroy_legacy_state(legacy_state *ls) {
}
gpr_free(ls->initial_md_in.metadata);
gpr_free(ls->trailing_md_in.metadata);
+ gpr_free(ls->details);
+ gpr_free(ls->send_details);
gpr_free(ls);
}
@@ -1233,8 +1240,7 @@ grpc_call_error grpc_call_start_write_status_old(grpc_call *call,
reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
reqs[1].op = GRPC_IOREQ_SEND_STATUS;
reqs[1].data.send_status.code = status;
- /* MEMLEAK */
- reqs[1].data.send_status.details = gpr_strdup(details);
+ reqs[1].data.send_status.details = ls->send_details = gpr_strdup(details);
reqs[2].op = GRPC_IOREQ_SEND_CLOSE;
err = start_ioreq(call, reqs, 3, finish_finish, tag);
unlock(call);
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index c33ea923e8..b33bd7b357 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -52,6 +52,9 @@ struct grpc_channel {
};
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1))
+#define CHANNEL_FROM_CHANNEL_STACK(channel_stack) (((grpc_channel *)(channel_stack)) - 1)
+#define CHANNEL_FROM_TOP_ELEM(top_elem) \
+ CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem))
grpc_channel *grpc_channel_create_from_filters(
const grpc_channel_filter **filters, size_t num_filters,
@@ -60,8 +63,8 @@ grpc_channel *grpc_channel_create_from_filters(
sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters);
grpc_channel *channel = gpr_malloc(size);
channel->is_client = is_client;
- /* decremented by grpc_channel_destroy */
- gpr_ref_init(&channel->refs, 1);
+ /* decremented by grpc_channel_destroy, and grpc_client_channel_closed if is_client */
+ gpr_ref_init(&channel->refs, 1 + is_client);
channel->metadata_context = mdctx;
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status");
channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message");
@@ -158,6 +161,10 @@ void grpc_channel_destroy(grpc_channel *channel) {
grpc_channel_internal_unref(channel);
}
+void grpc_client_channel_closed(grpc_channel_element *elem) {
+ grpc_channel_internal_unref(CHANNEL_FROM_TOP_ELEM(elem));
+}
+
grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel) {
return CHANNEL_STACK_FROM_CHANNEL(channel);
}
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index b3ea2ede40..ff9bbc237e 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -45,6 +45,8 @@ grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
+void grpc_client_channel_closed(grpc_channel_element *elem);
+
void grpc_channel_internal_ref(grpc_channel *channel);
void grpc_channel_internal_unref(grpc_channel *channel);
diff --git a/src/core/surface/client.c b/src/core/surface/client.c
index fa63e855cc..64ee9d51e8 100644
--- a/src/core/surface/client.c
+++ b/src/core/surface/client.c
@@ -34,6 +34,7 @@
#include "src/core/surface/client.h"
#include "src/core/surface/call.h"
+#include "src/core/surface/channel.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -87,7 +88,7 @@ static void channel_op(grpc_channel_element *elem,
gpr_log(GPR_ERROR, "Client cannot accept new calls");
break;
case GRPC_TRANSPORT_CLOSED:
- gpr_log(GPR_ERROR, "Transport closed");
+ grpc_client_channel_closed(elem);
break;
case GRPC_TRANSPORT_GOAWAY:
gpr_slice_unref(op->data.goaway.message);
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 2f5eff5584..411dbabfd3 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -76,6 +76,9 @@ static void channel_op(grpc_channel_element *elem,
case GRPC_CHANNEL_GOAWAY:
gpr_slice_unref(op->data.goaway.message);
break;
+ case GRPC_CHANNEL_DISCONNECT:
+ grpc_client_channel_closed(elem);
+ break;
default:
break;
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index a057694f13..455bd4337f 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -258,7 +258,6 @@ static void stream_closed(grpc_call_element *elem) {
gpr_mu_lock(&chand->server->mu);
switch (calld->state) {
case ACTIVATED:
- grpc_call_stream_closed(elem);
break;
case PENDING:
call_list_remove(chand->server, calld, PENDING_START);
@@ -271,6 +270,7 @@ static void stream_closed(grpc_call_element *elem) {
break;
}
gpr_mu_unlock(&chand->server->mu);
+ grpc_call_stream_closed(elem);
}
static void read_closed(grpc_call_element *elem) {