aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/call.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r--src/core/surface/call.c221
1 files changed, 178 insertions, 43 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index eea02211ae..304af259d6 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -42,6 +42,7 @@
#include "src/core/surface/completion_queue.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <assert.h>
#include <stdio.h>
@@ -98,6 +99,8 @@ typedef enum {
/* Status came from 'the wire' - or somewhere below the surface
layer */
STATUS_FROM_WIRE,
+ /* Status came from the server sending status */
+ STATUS_FROM_SERVER_STATUS,
STATUS_SOURCE_COUNT
} status_source;
@@ -147,12 +150,19 @@ struct grpc_call {
gpr_uint8 receiving;
/* are we currently completing requests */
gpr_uint8 completing;
+ /** has grpc_call_destroy been called */
+ gpr_uint8 destroy_called;
/* pairs with completed_requests */
gpr_uint8 num_completed_requests;
/* are we currently reading a message? */
gpr_uint8 reading_message;
/* have we bound a pollset yet? */
gpr_uint8 bound_pollset;
+ /* is an error status set */
+ gpr_uint8 error_status_set;
+ /** should the alarm be cancelled */
+ gpr_uint8 cancel_alarm;
+
/* flags with bits corresponding to write states allowing us to determine
what was sent */
gpr_uint16 last_send_contains;
@@ -188,6 +198,7 @@ struct grpc_call {
and a strong upper bound of a count of masters to be calculated. */
gpr_uint8 request_set[GRPC_IOREQ_OP_COUNT];
grpc_ioreq_data request_data[GRPC_IOREQ_OP_COUNT];
+ gpr_uint32 request_flags[GRPC_IOREQ_OP_COUNT];
reqinfo_master masters[GRPC_IOREQ_OP_COUNT];
/* Dynamic array of ioreq's that have completed: the count of
@@ -210,6 +221,9 @@ struct grpc_call {
/* Received call statuses from various sources */
received_status status[STATUS_SOURCE_COUNT];
+ /* Compression level for the call */
+ grpc_compression_level compression_level;
+
/* Contexts for various subsystems (security, tracing, ...). */
grpc_call_context_element context[GRPC_CONTEXT_COUNT];
@@ -231,7 +245,11 @@ struct grpc_call {
gpr_slice_buffer incoming_message;
gpr_uint32 incoming_message_length;
+ gpr_uint32 incoming_message_flags;
grpc_iomgr_closure destroy_closure;
+ grpc_iomgr_closure on_done_recv;
+ grpc_iomgr_closure on_done_send;
+ grpc_iomgr_closure on_done_bind;
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -250,6 +268,7 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *metadata);
static void finish_read_ops(grpc_call *call);
static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
const char *description);
+static void finished_loose_op(void *call, int success);
static void lock(grpc_call *call);
static void unlock(grpc_call *call);
@@ -293,16 +312,18 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
grpc_sopb_init(&call->send_ops);
grpc_sopb_init(&call->recv_ops);
gpr_slice_buffer_init(&call->incoming_message);
- /* dropped in destroy */
- gpr_ref_init(&call->internal_refcount, 1);
+ grpc_iomgr_closure_init(&call->on_done_recv, call_on_done_recv, call);
+ grpc_iomgr_closure_init(&call->on_done_send, call_on_done_send, call);
+ grpc_iomgr_closure_init(&call->on_done_bind, finished_loose_op, call);
+ /* dropped in destroy and when READ_STATE_STREAM_CLOSED received */
+ gpr_ref_init(&call->internal_refcount, 2);
/* server hack: start reads immediately so we can get initial metadata.
TODO(ctiller): figure out a cleaner solution */
if (!call->is_client) {
memset(&initial_op, 0, sizeof(initial_op));
initial_op.recv_ops = &call->recv_ops;
initial_op.recv_state = &call->recv_state;
- initial_op.on_done_recv = call_on_done_recv;
- initial_op.recv_user_data = call;
+ initial_op.on_done_recv = &call->on_done_recv;
initial_op.context = call->context;
call->receiving = 1;
GRPC_CALL_INTERNAL_REF(call, "receiving");
@@ -397,14 +418,22 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
static void set_status_code(grpc_call *call, status_source source,
gpr_uint32 status) {
+ if (call->status[source].is_set) return;
+
call->status[source].is_set = 1;
call->status[source].code = status;
+ call->error_status_set = status != GRPC_STATUS_OK;
if (status != GRPC_STATUS_OK && !grpc_bbq_empty(&call->incoming_queue)) {
grpc_bbq_flush(&call->incoming_queue);
}
}
+static void set_decode_compression_level(grpc_call *call,
+ grpc_compression_level clevel) {
+ call->compression_level = clevel;
+}
+
static void set_status_details(grpc_call *call, status_source source,
grpc_mdstr *status) {
if (call->status[source].details != NULL) {
@@ -435,7 +464,8 @@ static int need_more_data(grpc_call *call) {
(is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
grpc_bbq_empty(&call->incoming_queue)) ||
(call->write_state == WRITE_STATE_INITIAL && !call->is_client) ||
- (call->cancel_with_status != GRPC_STATUS_OK);
+ (call->cancel_with_status != GRPC_STATUS_OK) ||
+ call->destroy_called;
}
static void unlock(grpc_call *call) {
@@ -446,6 +476,7 @@ static void unlock(grpc_call *call) {
int i;
const gpr_uint32 MAX_RECV_PEEK_AHEAD = 65536;
size_t buffered_bytes;
+ int cancel_alarm = 0;
memset(&op, 0, sizeof(op));
@@ -453,11 +484,13 @@ static void unlock(grpc_call *call) {
start_op = op.cancel_with_status != GRPC_STATUS_OK;
call->cancel_with_status = GRPC_STATUS_OK; /* reset */
+ cancel_alarm = call->cancel_alarm;
+ call->cancel_alarm = 0;
+
if (!call->receiving && need_more_data(call)) {
op.recv_ops = &call->recv_ops;
op.recv_state = &call->recv_state;
- op.on_done_recv = call_on_done_recv;
- op.recv_user_data = call;
+ op.on_done_recv = &call->on_done_recv;
if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) {
op.max_recv_bytes = call->incoming_message_length -
call->incoming_message.length + MAX_RECV_PEEK_AHEAD;
@@ -499,6 +532,10 @@ static void unlock(grpc_call *call) {
gpr_mu_unlock(&call->mu);
+ if (cancel_alarm) {
+ grpc_alarm_cancel(&call->alarm);
+ }
+
if (start_op) {
execute_op(call, &op);
}
@@ -590,10 +627,18 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
call->write_state = WRITE_STATE_WRITE_CLOSED;
}
break;
+ case GRPC_IOREQ_SEND_STATUS:
+ if (call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details !=
+ NULL) {
+ grpc_mdstr_unref(
+ call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details);
+ call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details =
+ NULL;
+ }
+ break;
case GRPC_IOREQ_RECV_CLOSE:
case GRPC_IOREQ_SEND_INITIAL_METADATA:
case GRPC_IOREQ_SEND_TRAILING_METADATA:
- case GRPC_IOREQ_SEND_STATUS:
case GRPC_IOREQ_SEND_CLOSE:
break;
case GRPC_IOREQ_RECV_STATUS:
@@ -676,13 +721,13 @@ static void call_on_done_send(void *pc, int success) {
}
static void finish_message(grpc_call *call) {
- /* TODO(ctiller): this could be a lot faster if coded directly */
- grpc_byte_buffer *byte_buffer = grpc_byte_buffer_create(
- call->incoming_message.slices, call->incoming_message.count);
+ if (call->error_status_set == 0) {
+ /* TODO(ctiller): this could be a lot faster if coded directly */
+ grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create(
+ call->incoming_message.slices, call->incoming_message.count);
+ grpc_bbq_push(&call->incoming_queue, byte_buffer);
+ }
gpr_slice_buffer_reset_and_unref(&call->incoming_message);
-
- grpc_bbq_push(&call->incoming_queue, byte_buffer);
-
GPR_ASSERT(call->incoming_message.count == 0);
call->reading_message = 0;
}
@@ -711,6 +756,7 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) {
} else if (msg.length > 0) {
call->reading_message = 1;
call->incoming_message_length = msg.length;
+ call->incoming_message_flags = msg.flags;
return 1;
} else {
finish_message(call);
@@ -782,10 +828,8 @@ static void call_on_done_recv(void *pc, int success) {
if (call->recv_state == GRPC_STREAM_CLOSED) {
GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED);
call->read_state = READ_STATE_STREAM_CLOSED;
- if (call->have_alarm) {
- grpc_alarm_cancel(&call->alarm);
- call->have_alarm = 0;
- }
+ call->cancel_alarm |= call->have_alarm;
+ GRPC_CALL_INTERNAL_UNREF(call, "closed", 0);
}
finish_read_ops(call);
} else {
@@ -800,7 +844,7 @@ static void call_on_done_recv(void *pc, int success) {
unlock(call);
GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0);
- GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
+ GRPC_TIMER_END(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
}
static int prepare_application_metadata(grpc_call *call, size_t count,
@@ -847,9 +891,9 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
size_t i;
switch (byte_buffer->type) {
- case GRPC_BB_SLICE_BUFFER:
- for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) {
- gpr_slice slice = byte_buffer->data.slice_buffer.slices[i];
+ case GRPC_BB_RAW:
+ for (i = 0; i < byte_buffer->data.raw.slice_buffer.count; i++) {
+ gpr_slice slice = byte_buffer->data.raw.slice_buffer.slices[i];
gpr_slice_ref(slice);
grpc_sopb_add_slice(sopb, slice);
}
@@ -859,9 +903,9 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
grpc_ioreq_data data;
+ gpr_uint32 flags;
grpc_metadata_batch mdb;
size_t i;
- char status_str[GPR_LTOA_MIN_BUFSIZE];
GPR_ASSERT(op->send_ops == NULL);
switch (call->write_state) {
@@ -885,8 +929,9 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
case WRITE_STATE_STARTED:
if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
+ flags = call->request_flags[GRPC_IOREQ_SEND_MESSAGE];
grpc_sopb_add_begin_message(
- &call->send_ops, grpc_byte_buffer_length(data.send_message), 0);
+ &call->send_ops, grpc_byte_buffer_length(data.send_message), flags);
copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops);
op->send_ops = &call->send_ops;
call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE;
@@ -905,13 +950,10 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
/* send status */
/* TODO(ctiller): cache common status values */
data = call->request_data[GRPC_IOREQ_SEND_STATUS];
- gpr_ltoa(data.send_status.code, status_str);
grpc_metadata_batch_add_tail(
&mdb, &call->status_link,
- grpc_mdelem_from_metadata_strings(
- call->metadata_context,
- grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
- grpc_mdstr_from_string(call->metadata_context, status_str)));
+ grpc_channel_get_reffed_status_elem(call->channel,
+ data.send_status.code));
if (data.send_status.details) {
grpc_metadata_batch_add_tail(
&mdb, &call->details_link,
@@ -919,8 +961,9 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
call->metadata_context,
grpc_mdstr_ref(
grpc_channel_get_message_string(call->channel)),
- grpc_mdstr_from_string(call->metadata_context,
- data.send_status.details)));
+ data.send_status.details));
+ call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details =
+ NULL;
}
grpc_sopb_add_metadata(&call->send_ops, mdb);
}
@@ -930,8 +973,7 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
break;
}
if (op->send_ops) {
- op->on_done_send = call_on_done_send;
- op->send_user_data = call;
+ op->on_done_send = &call->on_done_send;
}
return op->send_ops != NULL;
}
@@ -965,7 +1007,7 @@ static void finish_read_ops(grpc_call *call) {
switch (call->read_state) {
case READ_STATE_STREAM_CLOSED:
- if (empty) {
+ if (empty && !call->have_alarm) {
finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, 1);
}
/* fallthrough */
@@ -1020,9 +1062,18 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
GRPC_CALL_ERROR_INVALID_METADATA);
}
}
+ if (op == GRPC_IOREQ_SEND_STATUS) {
+ set_status_code(call, STATUS_FROM_SERVER_STATUS,
+ reqs[i].data.send_status.code);
+ if (reqs[i].data.send_status.details) {
+ set_status_details(call, STATUS_FROM_SERVER_STATUS,
+ grpc_mdstr_ref(reqs[i].data.send_status.details));
+ }
+ }
have_ops |= 1u << op;
call->request_data[op] = data;
+ call->request_flags[op] = reqs[i].flags;
call->request_set[op] = set;
}
@@ -1052,10 +1103,9 @@ grpc_call_error grpc_call_start_ioreq_and_call_back(
void grpc_call_destroy(grpc_call *c) {
int cancel;
lock(c);
- if (c->have_alarm) {
- grpc_alarm_cancel(&c->alarm);
- c->have_alarm = 0;
- }
+ GPR_ASSERT(!c->destroy_called);
+ c->destroy_called = 1;
+ c->cancel_alarm |= c->have_alarm;
cancel = c->read_state != READ_STATE_STREAM_CLOSED;
unlock(c);
if (cancel) grpc_call_cancel(c);
@@ -1096,14 +1146,31 @@ static void finished_loose_op(void *call, int success_ignored) {
GRPC_CALL_INTERNAL_UNREF(call, "loose-op", 0);
}
+typedef struct {
+ grpc_call *call;
+ grpc_iomgr_closure closure;
+} finished_loose_op_allocated_args;
+
+static void finished_loose_op_allocated(void *alloc, int success) {
+ finished_loose_op_allocated_args *args = alloc;
+ finished_loose_op(args->call, success);
+ gpr_free(args);
+}
+
static void execute_op(grpc_call *call, grpc_transport_op *op) {
grpc_call_element *elem;
GPR_ASSERT(op->on_consumed == NULL);
if (op->cancel_with_status != GRPC_STATUS_OK || op->bind_pollset) {
GRPC_CALL_INTERNAL_REF(call, "loose-op");
- op->on_consumed = finished_loose_op;
- op->on_consumed_user_data = call;
+ if (op->bind_pollset) {
+ op->on_consumed = &call->on_done_bind;
+ } else {
+ finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args));
+ args->call = call;
+ grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated, args);
+ op->on_consumed = &args->closure;
+ }
}
elem = CALL_ELEM_FROM_CALL(call, 0);
@@ -1117,12 +1184,14 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
static void call_alarm(void *arg, int success) {
grpc_call *call = arg;
+ lock(call);
+ call->have_alarm = 0;
if (success) {
- lock(call);
cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
"Deadline Exceeded");
- unlock(call);
}
+ finish_read_ops(call);
+ unlock(call);
GRPC_CALL_INTERNAL_UNREF(call, "alarm", 1);
}
@@ -1160,6 +1229,33 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
return status;
}
+/* just as for status above, we need to offset: metadata userdata can't hold a
+ * zero (null), which in this case is used to signal no compression */
+#define COMPRESS_OFFSET 1
+static void destroy_compression(void *ignored) {}
+
+static gpr_uint32 decode_compression(grpc_mdelem *md) {
+ grpc_compression_level clevel;
+ void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
+ if (user_data) {
+ clevel = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
+ } else {
+ gpr_uint32 parsed_clevel_bytes;
+ if (gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
+ GPR_SLICE_LENGTH(md->value->slice),
+ &parsed_clevel_bytes)) {
+ /* the following cast is safe, as a gpr_uint32 should be able to hold all
+ * possible values of the grpc_compression_level enum */
+ clevel = (grpc_compression_level) parsed_clevel_bytes;
+ } else {
+ clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */
+ }
+ grpc_mdelem_set_user_data(md, destroy_compression,
+ (void *)(gpr_intptr)(clevel + COMPRESS_OFFSET));
+ }
+ return clevel;
+}
+
static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
grpc_linked_mdelem *l;
grpc_metadata_array *dest;
@@ -1175,6 +1271,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
} else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
+ } else if (key == grpc_channel_get_compresssion_level_string(call->channel)) {
+ set_decode_compression_level(call, decode_compression(md));
} else {
dest = &call->buffered_metadata[is_trailing];
if (dest->count == dest->capacity) {
@@ -1239,6 +1337,14 @@ static void finish_batch_with_close(grpc_call *call, int success, void *tag) {
grpc_cq_end_op(call->cq, tag, call, 1);
}
+static int are_write_flags_valid(gpr_uint32 flags) {
+ /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
+ const gpr_uint32 allowed_write_positions =
+ (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
+ const gpr_uint32 invalid_positions = ~allowed_write_positions;
+ return !(flags & invalid_positions);
+}
+
grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t nops, void *tag) {
grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT];
@@ -1261,30 +1367,43 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
op = &ops[in];
switch (op->op) {
case GRPC_OP_SEND_INITIAL_METADATA:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_INITIAL_METADATA;
req->data.send_metadata.count = op->data.send_initial_metadata.count;
req->data.send_metadata.metadata =
op->data.send_initial_metadata.metadata;
+ req->flags = op->flags;
break;
case GRPC_OP_SEND_MESSAGE:
+ if (!are_write_flags_valid(op->flags)) {
+ return GRPC_CALL_ERROR_INVALID_FLAGS;
+ }
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_MESSAGE;
req->data.send_message = op->data.send_message;
+ req->flags = ops->flags;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
if (!call->is_client) {
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_CLOSE;
+ req->flags = op->flags;
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
if (call->is_client) {
return GRPC_CALL_ERROR_NOT_ON_CLIENT;
}
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_TRAILING_METADATA;
+ req->flags = op->flags;
req->data.send_metadata.count =
op->data.send_status_from_server.trailing_metadata_count;
req->data.send_metadata.metadata =
@@ -1293,29 +1412,42 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req->op = GRPC_IOREQ_SEND_STATUS;
req->data.send_status.code = op->data.send_status_from_server.status;
req->data.send_status.details =
- op->data.send_status_from_server.status_details;
+ op->data.send_status_from_server.status_details != NULL
+ ? grpc_mdstr_from_string(
+ call->metadata_context,
+ op->data.send_status_from_server.status_details)
+ : NULL;
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_CLOSE;
break;
case GRPC_OP_RECV_INITIAL_METADATA:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
if (!call->is_client) {
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
req->data.recv_metadata = op->data.recv_initial_metadata;
+ req->flags = op->flags;
break;
case GRPC_OP_RECV_MESSAGE:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_MESSAGE;
req->data.recv_message = op->data.recv_message;
+ req->flags = op->flags;
break;
case GRPC_OP_RECV_STATUS_ON_CLIENT:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
if (!call->is_client) {
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_STATUS;
+ req->flags = op->flags;
req->data.recv_status.set_value = set_status_value_directly;
req->data.recv_status.user_data = op->data.recv_status_on_client.status;
req = &reqs[out++];
@@ -1333,8 +1465,11 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
finish_func = finish_batch_with_close;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_STATUS;
+ req->flags = op->flags;
req->data.recv_status.set_value = set_cancelled_value;
req->data.recv_status.user_data =
op->data.recv_close_on_server.cancelled;