aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/surface/call.c41
-rw-r--r--src/core/surface/call.h1
-rw-r--r--src/core/surface/server.c3
-rw-r--r--src/core/transport/stream_op.h9
-rw-r--r--src/cpp/common/call.cc8
-rw-r--r--src/csharp/ext/grpc_csharp_ext.c24
-rw-r--r--src/node/ext/call.cc1
-rw-r--r--src/php/ext/grpc/call.c1
-rw-r--r--src/python/src/grpc/_adapter/_c/utility.c1
-rw-r--r--src/ruby/ext/grpc/rb_call.c1
10 files changed, 88 insertions, 2 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index cead5e08dc..5cdd7cd0f6 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -185,6 +185,7 @@ struct grpc_call {
and a strong upper bound of a count of masters to be calculated. */
gpr_uint8 request_set[GRPC_IOREQ_OP_COUNT];
grpc_ioreq_data request_data[GRPC_IOREQ_OP_COUNT];
+ gpr_uint32 request_flags[GRPC_IOREQ_OP_COUNT];
reqinfo_master masters[GRPC_IOREQ_OP_COUNT];
/* Dynamic array of ioreq's that have completed: the count of
@@ -228,6 +229,7 @@ struct grpc_call {
gpr_slice_buffer incoming_message;
gpr_uint32 incoming_message_length;
+ gpr_uint32 incoming_message_flags;
grpc_iomgr_closure destroy_closure;
};
@@ -670,6 +672,7 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) {
} else if (msg.length > 0) {
call->reading_message = 1;
call->incoming_message_length = msg.length;
+ call->incoming_message_flags = msg.flags;
return 1;
} else {
finish_message(call);
@@ -818,6 +821,7 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
grpc_ioreq_data data;
+ gpr_uint32 flags;
grpc_metadata_batch mdb;
size_t i;
GPR_ASSERT(op->send_ops == NULL);
@@ -844,8 +848,9 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
case WRITE_STATE_STARTED:
if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
+ flags = call->request_flags[GRPC_IOREQ_SEND_MESSAGE];
grpc_sopb_add_begin_message(
- &call->send_ops, grpc_byte_buffer_length(data.send_message), 0);
+ &call->send_ops, grpc_byte_buffer_length(data.send_message), flags);
copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops);
op->send_ops = &call->send_ops;
call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE;
@@ -979,6 +984,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
have_ops |= 1u << op;
call->request_data[op] = data;
+ call->request_flags[op] = reqs[i].flags;
call->request_set[op] = set;
}
@@ -1189,6 +1195,14 @@ static void finish_batch_with_close(grpc_call *call, int success, void *tag) {
grpc_cq_end_op(call->cq, tag, call, 1);
}
+static int are_write_flags_valid(gpr_uint32 flags) {
+ /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
+ const gpr_uint32 allowed_write_positions =
+ (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
+ const gpr_uint32 invalid_positions = ~allowed_write_positions;
+ return !(flags & invalid_positions);
+}
+
grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t nops, void *tag) {
grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT];
@@ -1211,30 +1225,43 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
op = &ops[in];
switch (op->op) {
case GRPC_OP_SEND_INITIAL_METADATA:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_INITIAL_METADATA;
req->data.send_metadata.count = op->data.send_initial_metadata.count;
req->data.send_metadata.metadata =
op->data.send_initial_metadata.metadata;
+ req->flags = op->flags;
break;
case GRPC_OP_SEND_MESSAGE:
+ if (!are_write_flags_valid(op->flags)){
+ return GRPC_CALL_ERROR_INVALID_FLAGS;
+ }
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_MESSAGE;
req->data.send_message = op->data.send_message;
+ req->flags = ops->flags;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
if (!call->is_client) {
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_CLOSE;
+ req->flags = op->flags;
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
if (call->is_client) {
return GRPC_CALL_ERROR_NOT_ON_CLIENT;
}
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_TRAILING_METADATA;
+ req->flags = op->flags;
req->data.send_metadata.count =
op->data.send_status_from_server.trailing_metadata_count;
req->data.send_metadata.metadata =
@@ -1248,24 +1275,33 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req->op = GRPC_IOREQ_SEND_CLOSE;
break;
case GRPC_OP_RECV_INITIAL_METADATA:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
if (!call->is_client) {
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
req->data.recv_metadata = op->data.recv_initial_metadata;
+ req->flags = op->flags;
break;
case GRPC_OP_RECV_MESSAGE:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_MESSAGE;
req->data.recv_message = op->data.recv_message;
+ req->flags = op->flags;
break;
case GRPC_OP_RECV_STATUS_ON_CLIENT:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
if (!call->is_client) {
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_STATUS;
+ req->flags = op->flags;
req->data.recv_status.set_value = set_status_value_directly;
req->data.recv_status.user_data = op->data.recv_status_on_client.status;
req = &reqs[out++];
@@ -1283,8 +1319,11 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
finish_func = finish_batch_with_close;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
+ /* Flag validation: currently allow no flags */
+ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_STATUS;
+ req->flags = op->flags;
req->data.recv_status.set_value = set_cancelled_value;
req->data.recv_status.user_data =
op->data.recv_close_on_server.cancelled;
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 9116538948..7a14161253 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -79,6 +79,7 @@ typedef union {
typedef struct {
grpc_ioreq_op op;
grpc_ioreq_data data;
+ gpr_uint32 flags; /**< A copy of the write flags from grpc_op */
} grpc_ioreq;
typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success,
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 733f0e8a11..73995202a4 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -1111,6 +1111,7 @@ static void begin_call(grpc_server *server, call_data *calld,
rc->data.batch.details->deadline = calld->deadline;
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = rc->data.batch.initial_metadata;
+ r->flags = 0;
r++;
publish = publish_registered_or_batch;
break;
@@ -1118,10 +1119,12 @@ static void begin_call(grpc_server *server, call_data *calld,
*rc->data.registered.deadline = calld->deadline;
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = rc->data.registered.initial_metadata;
+ r->flags = 0;
r++;
if (rc->data.registered.optional_payload) {
r->op = GRPC_IOREQ_RECV_MESSAGE;
r->data.recv_message = rc->data.registered.optional_payload;
+ r->flags = 0;
r++;
}
publish = publish_registered_or_batch;
diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h
index 5215cc87b1..e080701e2d 100644
--- a/src/core/transport/stream_op.h
+++ b/src/core/transport/stream_op.h
@@ -58,11 +58,18 @@ typedef enum grpc_stream_op_code {
GRPC_OP_SLICE
} grpc_stream_op_code;
+/** Internal bit flag for grpc_begin_message's \a flags signaling the use of
+ * compression for the message */
+#define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u)
+/** Mask of all valid internal flags. */
+#define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS)
+
/* Arguments for GRPC_OP_BEGIN_MESSAGE */
typedef struct grpc_begin_message {
/* How many bytes of data will this message contain */
gpr_uint32 length;
- /* Write flags for the message: see grpc.h GRPC_WRITE_xxx */
+ /* Write flags for the message: see grpc.h GRPC_WRITE_* for the public bits,
+ * GRPC_WRITE_INTERNAL_* for the internal ones. */
gpr_uint32 flags;
} grpc_begin_message;
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
index 1068111e3f..3212f770f4 100644
--- a/src/cpp/common/call.cc
+++ b/src/cpp/common/call.cc
@@ -224,11 +224,13 @@ void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) {
ops[*nops].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[*nops].data.send_initial_metadata.count = initial_metadata_count_;
ops[*nops].data.send_initial_metadata.metadata = initial_metadata_;
+ ops[*nops].flags = 0;
(*nops)++;
}
if (recv_initial_metadata_) {
ops[*nops].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[*nops].data.recv_initial_metadata = &recv_initial_metadata_arr_;
+ ops[*nops].flags = 0;
(*nops)++;
}
if (send_message_ || send_message_buffer_) {
@@ -245,15 +247,18 @@ void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) {
}
ops[*nops].op = GRPC_OP_SEND_MESSAGE;
ops[*nops].data.send_message = send_buf_;
+ ops[*nops].flags = 0;
(*nops)++;
}
if (recv_message_ || recv_message_buffer_) {
ops[*nops].op = GRPC_OP_RECV_MESSAGE;
ops[*nops].data.recv_message = &recv_buf_;
+ ops[*nops].flags = 0;
(*nops)++;
}
if (client_send_close_) {
ops[*nops].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ ops[*nops].flags = 0;
(*nops)++;
}
if (recv_status_) {
@@ -264,6 +269,7 @@ void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) {
ops[*nops].data.recv_status_on_client.status_details = &status_details_;
ops[*nops].data.recv_status_on_client.status_details_capacity =
&status_details_capacity_;
+ ops[*nops].flags = 0;
(*nops)++;
}
if (send_status_available_) {
@@ -275,11 +281,13 @@ void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) {
ops[*nops].data.send_status_from_server.status = send_status_code_;
ops[*nops].data.send_status_from_server.status_details =
send_status_details_.empty() ? nullptr : send_status_details_.c_str();
+ ops[*nops].flags = 0;
(*nops)++;
}
if (recv_closed_) {
ops[*nops].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
ops[*nops].data.recv_close_on_server.cancelled = &cancelled_buf_;
+ ops[*nops].flags = 0;
(*nops)++;
}
}
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index 8337218255..f01c6c9183 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -417,18 +417,23 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
+ ops[0].flags = 0;
ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[1].data.send_message = ctx->send_message;
+ ops[1].flags = 0;
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ ops[2].flags = 0;
ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
+ ops[3].flags = 0;
ops[4].op = GRPC_OP_RECV_MESSAGE;
ops[4].data.recv_message = &(ctx->recv_message);
+ ops[4].flags = 0;
ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[5].data.recv_status_on_client.trailing_metadata =
@@ -440,6 +445,7 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
&(ctx->recv_status_on_client.status_details);
ops[5].data.recv_status_on_client.status_details_capacity =
&(ctx->recv_status_on_client.status_details_capacity);
+ ops[5].flags = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
@@ -456,12 +462,15 @@ grpcsharp_call_start_client_streaming(grpc_call *call,
ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
+ ops[0].flags = 0;
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
+ ops[1].flags = 0;
ops[2].op = GRPC_OP_RECV_MESSAGE;
ops[2].data.recv_message = &(ctx->recv_message);
+ ops[2].flags = 0;
ops[3].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[3].data.recv_status_on_client.trailing_metadata =
@@ -473,6 +482,7 @@ grpcsharp_call_start_client_streaming(grpc_call *call,
&(ctx->recv_status_on_client.status_details);
ops[3].data.recv_status_on_client.status_details_capacity =
&(ctx->recv_status_on_client.status_details_capacity);
+ ops[3].flags = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
@@ -488,15 +498,19 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
+ ops[0].flags = 0;
ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[1].data.send_message = ctx->send_message;
+ ops[1].flags = 0;
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ ops[2].flags = 0;
ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
+ ops[3].flags = 0;
ops[4].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[4].data.recv_status_on_client.trailing_metadata =
@@ -508,6 +522,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
&(ctx->recv_status_on_client.status_details);
ops[4].data.recv_status_on_client.status_details_capacity =
&(ctx->recv_status_on_client.status_details_capacity);
+ ops[4].flags = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
@@ -524,9 +539,11 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
+ ops[0].flags = 0;
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
+ ops[1].flags = 0;
ops[2].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[2].data.recv_status_on_client.trailing_metadata =
@@ -538,6 +555,7 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
&(ctx->recv_status_on_client.status_details);
ops[2].data.recv_status_on_client.status_details_capacity =
&(ctx->recv_status_on_client.status_details_capacity);
+ ops[2].flags = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
@@ -550,6 +568,7 @@ grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx,
ops[0].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[0].data.send_message = ctx->send_message;
+ ops[0].flags = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
@@ -560,6 +579,7 @@ grpcsharp_call_send_close_from_client(grpc_call *call,
/* TODO: don't use magic number */
grpc_op ops[1];
ops[0].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ ops[0].flags = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
@@ -577,6 +597,7 @@ grpcsharp_call_send_status_from_server(grpc_call *call,
gpr_strdup(status_details);
ops[0].data.send_status_from_server.trailing_metadata = NULL;
ops[0].data.send_status_from_server.trailing_metadata_count = 0;
+ ops[0].flags = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
@@ -587,6 +608,7 @@ grpcsharp_call_recv_message(grpc_call *call, grpcsharp_batch_context *ctx) {
grpc_op ops[1];
ops[0].op = GRPC_OP_RECV_MESSAGE;
ops[0].data.recv_message = &(ctx->recv_message);
+ ops[0].flags = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
@@ -597,10 +619,12 @@ grpcsharp_call_start_serverside(grpc_call *call, grpcsharp_batch_context *ctx) {
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[0].data.send_initial_metadata.count = 0;
ops[0].data.send_initial_metadata.metadata = NULL;
+ ops[0].flags = 0;
ops[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
ops[1].data.recv_close_on_server.cancelled =
(&ctx->recv_close_on_server_cancelled);
+ ops[1].flags = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc
index 8cc3e38cd9..15c9b2d97d 100644
--- a/src/node/ext/call.cc
+++ b/src/node/ext/call.cc
@@ -550,6 +550,7 @@ NAN_METHOD(Call::StartBatch) {
}
uint32_t type = keys->Get(i)->Uint32Value();
ops[i].op = static_cast<grpc_op_type>(type);
+ ops[i].flags = 0;
switch (type) {
case GRPC_OP_SEND_INITIAL_METADATA:
op.reset(new SendMetadataOp());
diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c
index 9f651ff56f..10a4946ea6 100644
--- a/src/php/ext/grpc/call.c
+++ b/src/php/ext/grpc/call.c
@@ -397,6 +397,7 @@ PHP_METHOD(Call, startBatch) {
goto cleanup;
}
ops[op_num].op = (grpc_op_type)index;
+ ops[op_num].flags = 0;
op_num++;
}
error = grpc_call_start_batch(call->wrapped, ops, op_num, call->wrapped);
diff --git a/src/python/src/grpc/_adapter/_c/utility.c b/src/python/src/grpc/_adapter/_c/utility.c
index e3139f2887..64b3428f2b 100644
--- a/src/python/src/grpc/_adapter/_c/utility.c
+++ b/src/python/src/grpc/_adapter/_c/utility.c
@@ -168,6 +168,7 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) {
return 0;
}
c_op.op = type;
+ c_op.flags = 0;
switch (type) {
case GRPC_OP_SEND_INITIAL_METADATA:
if (!pygrpc_cast_pylist_to_send_metadata(
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 29f870f929..33bfd006da 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -507,6 +507,7 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
NUM2INT(this_op));
};
st->ops[st->op_num].op = (grpc_op_type)NUM2INT(this_op);
+ st->ops[st->op_num].flags = 0;
st->op_num++;
}
}