aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/channel
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-09-12 11:59:45 -0700
committerGravatar GitHub <noreply@github.com>2016-09-12 11:59:45 -0700
commit57726ca5a9c08ba17c882a798b93e03d333edd9f (patch)
treebdf922ed4a33a2290bab32f236b6584cbfc0e50d /src/core/lib/channel
parent6e51f992c6bfdfba61d984ab173305da455bd2e7 (diff)
Revert "Revert "Grand unified closures""
Diffstat (limited to 'src/core/lib/channel')
-rw-r--r--src/core/lib/channel/channel_stack.c23
-rw-r--r--src/core/lib/channel/compress_filter.c14
2 files changed, 22 insertions, 15 deletions
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 98f304f2da..0655b9353f 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -32,6 +32,7 @@
*/
#include "src/core/lib/channel/channel_stack.h"
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <stdlib.h>
@@ -270,21 +271,27 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
sizeof(grpc_call_stack)));
}
+static void destroy_op(grpc_exec_ctx *exec_ctx, void *op, grpc_error *error) {
+ gpr_free(op);
+}
+
void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx,
grpc_call_element *cur_elem) {
- grpc_transport_stream_op op;
- memset(&op, 0, sizeof(op));
- op.cancel_error = GRPC_ERROR_CANCELLED;
- grpc_call_next_op(exec_ctx, cur_elem, &op);
+ grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
+ memset(op, 0, sizeof(*op));
+ op->cancel_error = GRPC_ERROR_CANCELLED;
+ op->on_complete = grpc_closure_create(destroy_op, op);
+ grpc_call_next_op(exec_ctx, cur_elem, op);
}
void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *cur_elem,
grpc_status_code status,
gpr_slice *optional_message) {
- grpc_transport_stream_op op;
- memset(&op, 0, sizeof(op));
- grpc_transport_stream_op_add_cancellation_with_message(&op, status,
+ grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
+ memset(op, 0, sizeof(*op));
+ op->on_complete = grpc_closure_create(destroy_op, op);
+ grpc_transport_stream_op_add_cancellation_with_message(op, status,
optional_message);
- grpc_call_next_op(exec_ctx, cur_elem, &op);
+ grpc_call_next_op(exec_ctx, cur_elem, op);
}
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 134180e619..0981d59f63 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -60,7 +60,7 @@ typedef struct call_data {
/** If true, contents of \a compression_algorithm are authoritative */
int has_compression_algorithm;
- grpc_transport_stream_op send_op;
+ grpc_transport_stream_op *send_op;
uint32_t send_length;
uint32_t send_flags;
gpr_slice incoming_slice;
@@ -199,11 +199,11 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
calld->send_flags);
- calld->send_op.send_message = &calld->replacement_stream.base;
- calld->post_send = calld->send_op.on_complete;
- calld->send_op.on_complete = &calld->send_done;
+ calld->send_op->send_message = &calld->replacement_stream.base;
+ calld->post_send = calld->send_op->on_complete;
+ calld->send_op->on_complete = &calld->send_done;
- grpc_call_next_op(exec_ctx, elem, &calld->send_op);
+ grpc_call_next_op(exec_ctx, elem, calld->send_op);
}
static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
@@ -220,7 +220,7 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
static void continue_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
call_data *calld = elem->call_data;
- while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message,
+ while (grpc_byte_stream_next(exec_ctx, calld->send_op->send_message,
&calld->incoming_slice, ~(size_t)0,
&calld->got_slice)) {
gpr_slice_buffer_add(&calld->slices, calld->incoming_slice);
@@ -243,7 +243,7 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
}
if (op->send_message != NULL && !skip_compression(elem) &&
0 == (op->send_message->flags & GRPC_WRITE_NO_COMPRESS)) {
- calld->send_op = *op;
+ calld->send_op = op;
calld->send_length = op->send_message->length;
calld->send_flags = op->send_message->flags;
continue_send_message(exec_ctx, elem);