aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/http/message_compress
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2017-07-26 14:29:52 -0700
committerGravatar Mark D. Roth <roth@google.com>2017-07-26 14:29:52 -0700
commit5794061764976c4771f69a3193cd38b3cdba193e (patch)
tree49aa5451f12f373ae744ab54be78ecdfa08991ff /src/core/ext/filters/http/message_compress
parent3064423b71ac076d4df6638fd54c2caf54641546 (diff)
Improvements to grpc_byte_stream API and handling.
- Add shutdown() method (to be used in forthcoming call combiner code). - Use a vtable instead of storing method pointers in each instance. - Check all callers of pull() to make sure that they are properly handling errors. - Clarify ownership rules and attempt to adhere to them. - Added a new grpc_caching_byte_stream implementation, which is used in http_client_filter to avoid having to read the whole send_message byte stream before passing control down the stack. (This class may also be used in the retry code I'm working on separately.) - As part of this, did a major rewrite of http_client_filter, which made the code more readable and fixed a number of potential bugs. Note that some of this code is hard to test right now, due to the fact that the send_message byte stream is always a slice_buffer stream, for which next() is always synchronous and no destruction is needed. However, some future work (specifically, my call combiner work and Craig's incremental send work) will start leveraging this.
Diffstat (limited to 'src/core/ext/filters/http/message_compress')
-rw-r--r--src/core/ext/filters/http/message_compress/message_compress_filter.c202
1 files changed, 118 insertions, 84 deletions
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c
index 71a8bc5bec..20a3488115 100644
--- a/src/core/ext/filters/http/message_compress/message_compress_filter.c
+++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c
@@ -61,14 +61,11 @@ typedef struct call_data {
pointer | CANCELLED_BIT - request was cancelled with error pointed to */
gpr_atm send_initial_metadata_state;
- grpc_transport_stream_op_batch *send_op;
- uint32_t send_length;
- uint32_t send_flags;
- grpc_slice incoming_slice;
+ grpc_transport_stream_op_batch *send_message_batch;
grpc_slice_buffer_stream replacement_stream;
- grpc_closure *post_send;
- grpc_closure send_done;
- grpc_closure got_slice;
+ grpc_closure *original_send_message_on_complete;
+ grpc_closure send_message_on_complete;
+ grpc_closure on_send_message_next_done;
} call_data;
typedef struct channel_data {
@@ -164,24 +161,25 @@ static grpc_error *process_send_initial_metadata(
return error;
}
-static void continue_send_message(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem);
-
-static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
- grpc_call_element *elem = elemp;
- call_data *calld = elem->call_data;
+static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_call_element *elem = (grpc_call_element *)arg;
+ call_data *calld = (call_data *)elem->call_data;
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices);
- calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error);
+ GRPC_CLOSURE_RUN(exec_ctx, calld->original_send_message_on_complete,
+ GRPC_ERROR_REF(error));
}
static void finish_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- int did_compress;
+ call_data *calld = (call_data *)elem->call_data;
+ // Compress the data if appropriate.
grpc_slice_buffer tmp;
grpc_slice_buffer_init(&tmp);
- did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm,
- &calld->slices, &tmp);
+ uint32_t send_flags =
+ calld->send_message_batch->payload->send_message.send_message->flags;
+ const bool did_compress = grpc_msg_compress(
+ exec_ctx, calld->compression_algorithm, &calld->slices, &tmp);
if (did_compress) {
if (GRPC_TRACER_ON(grpc_compression_trace)) {
char *algo_name;
@@ -195,7 +193,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
algo_name, before_size, after_size, 100 * savings_ratio);
}
grpc_slice_buffer_swap(&calld->slices, &tmp);
- calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+ send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
} else {
if (GRPC_TRACER_ON(grpc_compression_trace)) {
char *algo_name;
@@ -207,83 +205,118 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
algo_name, calld->slices.length);
}
}
-
grpc_slice_buffer_destroy_internal(exec_ctx, &tmp);
-
+ // Swap out the original byte stream with our new one and send the
+ // batch down.
+ grpc_byte_stream_destroy(
+ exec_ctx, calld->send_message_batch->payload->send_message.send_message);
grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
- calld->send_flags);
- calld->send_op->payload->send_message.send_message =
+ send_flags);
+ calld->send_message_batch->payload->send_message.send_message =
&calld->replacement_stream.base;
- calld->post_send = calld->send_op->on_complete;
- calld->send_op->on_complete = &calld->send_done;
-
- grpc_call_next_op(exec_ctx, elem, calld->send_op);
+ calld->original_send_message_on_complete =
+ calld->send_message_batch->on_complete;
+ calld->send_message_batch->on_complete = &calld->send_message_on_complete;
+ grpc_call_next_op(exec_ctx, elem, calld->send_message_batch);
}
-static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
- grpc_call_element *elem = elemp;
- call_data *calld = elem->call_data;
- if (GRPC_ERROR_NONE !=
- grpc_byte_stream_pull(exec_ctx,
- calld->send_op->payload->send_message.send_message,
- &calld->incoming_slice)) {
- /* Should never reach here */
- abort();
- }
- grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
- if (calld->send_length == calld->slices.length) {
- finish_send_message(exec_ctx, elem);
- } else {
- continue_send_message(exec_ctx, elem);
+// Pulls a slice from the send_message byte stream and adds it to calld->slices.
+static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx,
+ call_data *calld) {
+ grpc_slice incoming_slice;
+ grpc_error *error = grpc_byte_stream_pull(
+ exec_ctx, calld->send_message_batch->payload->send_message.send_message,
+ &incoming_slice);
+ if (error == GRPC_ERROR_NONE) {
+ grpc_slice_buffer_add(&calld->slices, incoming_slice);
}
+ return error;
}
-static void continue_send_message(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
- call_data *calld = elem->call_data;
+// Reads as many slices as possible from the send_message byte stream.
+// If all data has been read, invokes finish_send_message(). Otherwise,
+// an async call to grpc_byte_stream_next() has been started, which will
+// eventually result in calling on_send_message_next_done().
+static grpc_error *continue_reading_send_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ call_data *calld = (call_data *)elem->call_data;
while (grpc_byte_stream_next(
- exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0,
- &calld->got_slice)) {
- grpc_byte_stream_pull(exec_ctx,
- calld->send_op->payload->send_message.send_message,
- &calld->incoming_slice);
- grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
- if (calld->send_length == calld->slices.length) {
+ exec_ctx, calld->send_message_batch->payload->send_message.send_message,
+ ~(size_t)0, &calld->on_send_message_next_done)) {
+ grpc_error *error = pull_slice_from_send_message(exec_ctx, calld);
+ if (error != GRPC_ERROR_NONE) return error;
+ if (calld->slices.length ==
+ calld->send_message_batch->payload->send_message.send_message->length) {
finish_send_message(exec_ctx, elem);
break;
}
}
+ return GRPC_ERROR_NONE;
}
-static void handle_send_message_batch(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op_batch *op,
- bool has_compression_algorithm) {
- call_data *calld = elem->call_data;
- if (!skip_compression(elem, op->payload->send_message.send_message->flags,
+// Async callback for grpc_byte_stream_next().
+static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_call_element *elem = (grpc_call_element *)arg;
+ call_data *calld = (call_data *)elem->call_data;
+ if (error != GRPC_ERROR_NONE) goto fail;
+ error = pull_slice_from_send_message(exec_ctx, calld);
+ if (error != GRPC_ERROR_NONE) goto fail;
+ if (calld->slices.length ==
+ calld->send_message_batch->payload->send_message.send_message->length) {
+ finish_send_message(exec_ctx, elem);
+ } else {
+ // This will either finish reading all of the data and invoke
+ // finish_send_message(), or else it will make an async call to
+ // grpc_byte_stream_next(), which will eventually result in calling
+ // this function again.
+ error = continue_reading_send_message(exec_ctx, elem);
+ if (error != GRPC_ERROR_NONE) goto fail;
+ }
+ return;
+fail:
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, calld->send_message_batch, error);
+}
+
+static void start_send_message_batch(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op_batch *batch,
+ bool has_compression_algorithm) {
+ call_data *calld = (call_data *)elem->call_data;
+ if (!skip_compression(elem, batch->payload->send_message.send_message->flags,
has_compression_algorithm)) {
- calld->send_op = op;
- calld->send_length = op->payload->send_message.send_message->length;
- calld->send_flags = op->payload->send_message.send_message->flags;
- continue_send_message(exec_ctx, elem);
+ calld->send_message_batch = batch;
+ // This will either finish reading all of the data and invoke
+ // finish_send_message(), or else it will make an async call to
+ // grpc_byte_stream_next(), which will eventually result in calling
+ // on_send_message_next_done().
+ grpc_error *error = continue_reading_send_message(exec_ctx, elem);
+ if (error != GRPC_ERROR_NONE) {
+ grpc_transport_stream_op_batch_finish_with_failure(
+ exec_ctx, calld->send_message_batch, error);
+ }
} else {
/* pass control down the stack */
- grpc_call_next_op(exec_ctx, elem, op);
+ grpc_call_next_op(exec_ctx, elem, batch);
}
}
static void compress_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
+ grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0);
- if (op->cancel_stream) {
- GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error);
+ if (batch->cancel_stream) {
+ // TODO(roth): As part of the upcoming call combiner work, change
+ // this to call grpc_byte_stream_shutdown() on the incoming byte
+ // stream, to cancel any in-flight calls to grpc_byte_stream_next().
+ GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
gpr_atm cur = gpr_atm_full_xchg(
&calld->send_initial_metadata_state,
- CANCELLED_BIT | (gpr_atm)op->payload->cancel_stream.cancel_error);
+ CANCELLED_BIT | (gpr_atm)batch->payload->cancel_stream.cancel_error);
switch (cur) {
case HAS_COMPRESSION_ALGORITHM:
case NO_COMPRESSION_ALGORITHM:
@@ -293,7 +326,7 @@ static void compress_start_transport_stream_op_batch(
if ((cur & CANCELLED_BIT) == 0) {
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, (grpc_transport_stream_op_batch *)cur,
- GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
+ GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error));
} else {
GRPC_ERROR_UNREF((grpc_error *)(cur & ~CANCELLED_BIT));
}
@@ -301,14 +334,15 @@ static void compress_start_transport_stream_op_batch(
}
}
- if (op->send_initial_metadata) {
+ if (batch->send_initial_metadata) {
bool has_compression_algorithm;
grpc_error *error = process_send_initial_metadata(
exec_ctx, elem,
- op->payload->send_initial_metadata.send_initial_metadata,
+ batch->payload->send_initial_metadata.send_initial_metadata,
&has_compression_algorithm);
if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch,
+ error);
return;
}
gpr_atm cur;
@@ -324,32 +358,32 @@ static void compress_start_transport_stream_op_batch(
goto retry_send_im;
}
if (cur != INITIAL_METADATA_UNSEEN) {
- handle_send_message_batch(exec_ctx, elem,
- (grpc_transport_stream_op_batch *)cur,
- has_compression_algorithm);
+ start_send_message_batch(exec_ctx, elem,
+ (grpc_transport_stream_op_batch *)cur,
+ has_compression_algorithm);
}
}
}
- if (op->send_message) {
+ if (batch->send_message) {
gpr_atm cur;
retry_send:
cur = gpr_atm_acq_load(&calld->send_initial_metadata_state);
switch (cur) {
case INITIAL_METADATA_UNSEEN:
if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur,
- (gpr_atm)op)) {
+ (gpr_atm)batch)) {
goto retry_send;
}
break;
case HAS_COMPRESSION_ALGORITHM:
case NO_COMPRESSION_ALGORITHM:
- handle_send_message_batch(exec_ctx, elem, op,
- cur == HAS_COMPRESSION_ALGORITHM);
+ start_send_message_batch(exec_ctx, elem, batch,
+ cur == HAS_COMPRESSION_ALGORITHM);
break;
default:
if (cur & CANCELLED_BIT) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op,
+ exec_ctx, batch,
GRPC_ERROR_REF((grpc_error *)(cur & ~CANCELLED_BIT)));
} else {
/* >1 send_message concurrently */
@@ -358,7 +392,7 @@ static void compress_start_transport_stream_op_batch(
}
} else {
/* pass control down the stack */
- grpc_call_next_op(exec_ctx, elem, op);
+ grpc_call_next_op(exec_ctx, elem, batch);
}
GPR_TIMER_END("compress_start_transport_stream_op_batch", 0);
@@ -373,10 +407,10 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
/* initialize members */
grpc_slice_buffer_init(&calld->slices);
- GRPC_CLOSURE_INIT(&calld->got_slice, got_slice, elem,
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&calld->send_done, send_done, elem,
- grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&calld->on_send_message_next_done,
+ on_send_message_next_done, elem, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
+ elem, grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
}