diff options
author | Craig Tiller <ctiller@google.com> | 2016-11-16 14:45:10 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-11-16 14:45:10 -0800 |
commit | d57a1487b17a2fb7251cc575dd7161ae6c1bea13 (patch) | |
tree | 7635e13309c057d7449d3e1528429dca0070be69 /src/core/lib/channel | |
parent | 3e2048dd9cb2a5458105cbbbf4d4b4b67549bd5c (diff) | |
parent | 740665a6f65b3d827e0755de8bb1bcd57745b9f1 (diff) |
Merge github.com:grpc/grpc into newlines
Diffstat (limited to 'src/core/lib/channel')
-rw-r--r-- | src/core/lib/channel/channel_args.c | 6 | ||||
-rw-r--r-- | src/core/lib/channel/channel_args.h | 8 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack.c | 16 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack.h | 15 | ||||
-rw-r--r-- | src/core/lib/channel/compress_filter.c | 35 | ||||
-rw-r--r-- | src/core/lib/channel/connected_channel.c | 8 | ||||
-rw-r--r-- | src/core/lib/channel/deadline_filter.c | 6 | ||||
-rw-r--r-- | src/core/lib/channel/handshaker.c | 10 | ||||
-rw-r--r-- | src/core/lib/channel/handshaker.h | 6 | ||||
-rw-r--r-- | src/core/lib/channel/http_client_filter.c | 33 | ||||
-rw-r--r-- | src/core/lib/channel/http_server_filter.c | 14 | ||||
-rw-r--r-- | src/core/lib/channel/message_size_filter.c | 3 |
12 files changed, 105 insertions, 55 deletions
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c index cfc072c0b5..401a2ad4fe 100644 --- a/src/core/lib/channel/channel_args.c +++ b/src/core/lib/channel/channel_args.c @@ -298,6 +298,12 @@ uint32_t grpc_channel_args_compression_algorithm_get_states( } } +grpc_channel_args *grpc_channel_args_set_socket_mutator( + grpc_channel_args *a, grpc_socket_mutator *mutator) { + grpc_arg tmp = grpc_socket_mutator_to_arg(mutator); + return grpc_channel_args_copy_and_add(a, &tmp, 1); +} + int grpc_channel_args_compare(const grpc_channel_args *a, const grpc_channel_args *b) { int c = GPR_ICMP(a->num_args, b->num_args); diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 1e05303471..88fc0e37a3 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -36,6 +36,7 @@ #include <grpc/compression.h> #include <grpc/grpc.h> +#include "src/core/lib/iomgr/socket_mutator.h" // Channel args are intentionally immutable, to avoid the need for locking. @@ -100,6 +101,13 @@ uint32_t grpc_channel_args_compression_algorithm_get_states( int grpc_channel_args_compare(const grpc_channel_args *a, const grpc_channel_args *b); +/** Returns a channel arg instance with socket mutator added. The socket mutator + * will perform its mutate_fd method on all file descriptors used by the + * channel. + * If \a a is non-MULL, its args are copied. */ +grpc_channel_args *grpc_channel_args_set_socket_mutator( + grpc_channel_args *a, grpc_socket_mutator *mutator); + /** Returns the value of argument \a name from \a args, or NULL if not found. */ const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args, const char *name); diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index 2c5367901d..999ad5f507 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -162,7 +162,8 @@ grpc_error *grpc_call_stack_init( grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, grpc_call_context_element *context, const void *transport_server_data, - grpc_mdstr *path, gpr_timespec deadline, grpc_call_stack *call_stack) { + grpc_mdstr *path, gpr_timespec start_time, gpr_timespec deadline, + grpc_call_stack *call_stack) { grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack); grpc_call_element_args args; size_t count = channel_stack->count; @@ -179,7 +180,7 @@ grpc_error *grpc_call_stack_init( /* init per-filter data */ grpc_error *first_error = GRPC_ERROR_NONE; - args.start_time = gpr_now(GPR_CLOCK_MONOTONIC); + args.start_time = start_time; for (i = 0; i < count; i++) { args.call_stack = call_stack; args.server_transport_data = transport_server_data; @@ -255,6 +256,13 @@ char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, return next_elem->filter->get_peer(exec_ctx, next_elem); } +void grpc_channel_next_get_info(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + const grpc_channel_info *channel_info) { + grpc_channel_element *next_elem = elem + 1; + next_elem->filter->get_channel_info(exec_ctx, next_elem, channel_info); +} + void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_transport_op *op) { grpc_channel_element *next_elem = elem + 1; @@ -288,7 +296,7 @@ void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx, void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_status_code status, - gpr_slice *optional_message) { + grpc_slice *optional_message) { grpc_transport_stream_op *op = gpr_malloc(sizeof(*op)); memset(op, 0, sizeof(*op)); op->on_complete = grpc_closure_create(destroy_op, op); @@ -300,7 +308,7 @@ void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx, void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_status_code status, - gpr_slice *optional_message) { + grpc_slice *optional_message) { grpc_transport_stream_op *op = gpr_malloc(sizeof(*op)); memset(op, 0, sizeof(*op)); op->on_complete = grpc_closure_create(destroy_op, op); diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 27f3be7b29..004643d45f 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -156,6 +156,10 @@ typedef struct { /* Implement grpc_call_get_peer() */ char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); + /* Implement grpc_channel_get_info() */ + void (*get_channel_info)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + const grpc_channel_info *channel_info); + /* The name of this filter */ const char *name; } grpc_channel_filter; @@ -227,7 +231,8 @@ grpc_error *grpc_call_stack_init( grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, grpc_call_context_element *context, const void *transport_server_data, - grpc_mdstr *path, gpr_timespec deadline, grpc_call_stack *call_stack); + grpc_mdstr *path, gpr_timespec start_time, gpr_timespec deadline, + grpc_call_stack *call_stack); /* Set a pollset or a pollset_set for a call stack: must occur before the first * op is started */ void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, @@ -273,6 +278,10 @@ void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_transport_op *op); /* Pass through a request to get_peer to the next child element */ char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); +/* Pass through a request to get_channel_info() to the next child element */ +void grpc_channel_next_get_info(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + const grpc_channel_info *channel_info); /* Given the top element of a channel stack, get the channel stack itself */ grpc_channel_stack *grpc_channel_stack_from_top_element( @@ -289,12 +298,12 @@ void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx, void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx, grpc_call_element *cur_elem, grpc_status_code status, - gpr_slice *optional_message); + grpc_slice *optional_message); void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx, grpc_call_element *cur_elem, grpc_status_code status, - gpr_slice *optional_message); + grpc_slice *optional_message); extern int grpc_trace_channel; diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index 0981d59f63..2874d63fc7 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -35,9 +35,9 @@ #include <string.h> #include <grpc/compression.h> +#include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/slice_buffer.h> #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/compress_filter.h" @@ -50,7 +50,7 @@ int grpc_compression_trace = 0; typedef struct call_data { - gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */ + grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */ grpc_linked_mdelem compression_algorithm_storage; grpc_linked_mdelem accept_encoding_storage; uint32_t remaining_slice_bytes; @@ -63,7 +63,7 @@ typedef struct call_data { grpc_transport_stream_op *send_op; uint32_t send_length; uint32_t send_flags; - gpr_slice incoming_slice; + grpc_slice incoming_slice; grpc_slice_buffer_stream replacement_stream; grpc_closure *post_send; grpc_closure send_done; @@ -111,9 +111,13 @@ static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) { return md; } -static int skip_compression(grpc_call_element *elem) { +static int skip_compression(grpc_call_element *elem, uint32_t flags) { call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; + + if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) { + return 1; + } if (calld->has_compression_algorithm) { if (calld->compression_algorithm == GRPC_COMPRESS_NONE) { return 1; @@ -157,7 +161,7 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; - gpr_slice_buffer_reset_and_unref(&calld->slices); + grpc_slice_buffer_reset_and_unref(&calld->slices); calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error); } @@ -165,8 +169,8 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { call_data *calld = elem->call_data; int did_compress; - gpr_slice_buffer tmp; - gpr_slice_buffer_init(&tmp); + grpc_slice_buffer tmp; + grpc_slice_buffer_init(&tmp); did_compress = grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp); if (did_compress) { @@ -181,7 +185,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, " bytes (%.2f%% savings)", algo_name, before_size, after_size, 100 * savings_ratio); } - gpr_slice_buffer_swap(&calld->slices, &tmp); + grpc_slice_buffer_swap(&calld->slices, &tmp); calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; } else { if (grpc_compression_trace) { @@ -195,7 +199,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, } } - gpr_slice_buffer_destroy(&tmp); + grpc_slice_buffer_destroy(&tmp); grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, calld->send_flags); @@ -209,7 +213,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; - gpr_slice_buffer_add(&calld->slices, calld->incoming_slice); + grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { finish_send_message(exec_ctx, elem); } else { @@ -223,7 +227,7 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, while (grpc_byte_stream_next(exec_ctx, calld->send_op->send_message, &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) { - gpr_slice_buffer_add(&calld->slices, calld->incoming_slice); + grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { finish_send_message(exec_ctx, elem); break; @@ -241,8 +245,8 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx, if (op->send_initial_metadata) { process_send_initial_metadata(elem, op->send_initial_metadata); } - if (op->send_message != NULL && !skip_compression(elem) && - 0 == (op->send_message->flags & GRPC_WRITE_NO_COMPRESS)) { + if (op->send_message != NULL && + !skip_compression(elem, op->send_message->flags)) { calld->send_op = op; calld->send_length = op->send_message->length; calld->send_flags = op->send_message->flags; @@ -263,7 +267,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, call_data *calld = elem->call_data; /* initialize members */ - gpr_slice_buffer_init(&calld->slices); + grpc_slice_buffer_init(&calld->slices); calld->has_compression_algorithm = 0; grpc_closure_init(&calld->got_slice, got_slice, elem); grpc_closure_init(&calld->send_done, send_done, elem); @@ -277,7 +281,7 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, void *ignored) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; - gpr_slice_buffer_destroy(&calld->slices); + grpc_slice_buffer_destroy(&calld->slices); } /* Constructor for channel_data */ @@ -328,4 +332,5 @@ const grpc_channel_filter grpc_compress_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "compress"}; diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c index 918379c845..038e819f72 100644 --- a/src/core/lib/channel/connected_channel.c +++ b/src/core/lib/channel/connected_channel.c @@ -38,9 +38,9 @@ #include <string.h> #include <grpc/byte_buffer.h> +#include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/slice_buffer.h> #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/transport.h" @@ -134,6 +134,11 @@ static char *con_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { return grpc_transport_get_peer(exec_ctx, chand->transport); } +/* No-op. */ +static void con_get_channel_info(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + const grpc_channel_info *channel_info) {} + static const grpc_channel_filter connected_channel_filter = { con_start_transport_stream_op, con_start_transport_op, @@ -145,6 +150,7 @@ static const grpc_channel_filter connected_channel_filter = { init_channel_elem, destroy_channel_elem, con_get_peer, + con_get_channel_info, "connected", }; diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index d2ea5250f6..0e703d8d27 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -55,10 +55,10 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, deadline_state->timer_pending = false; gpr_mu_unlock(&deadline_state->timer_mu); if (error != GRPC_ERROR_CANCELLED) { - gpr_slice msg = gpr_slice_from_static_string("Deadline Exceeded"); + grpc_slice msg = grpc_slice_from_static_string("Deadline Exceeded"); grpc_call_element_send_cancel_with_message( exec_ctx, elem, GRPC_STATUS_DEADLINE_EXCEEDED, &msg); - gpr_slice_unref(msg); + grpc_slice_unref(msg); } GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer"); } @@ -316,6 +316,7 @@ const grpc_channel_filter grpc_client_deadline_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "deadline", }; @@ -330,5 +331,6 @@ const grpc_channel_filter grpc_server_deadline_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "deadline", }; diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c index 0d759887bc..a45a39981c 100644 --- a/src/core/lib/channel/handshaker.c +++ b/src/core/lib/channel/handshaker.c @@ -62,7 +62,7 @@ void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, grpc_endpoint* endpoint, grpc_channel_args* args, - gpr_slice_buffer* read_buffer, + grpc_slice_buffer* read_buffer, gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb, void* user_data) { @@ -146,8 +146,8 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, static void call_next_handshaker(grpc_exec_ctx* exec_ctx, grpc_endpoint* endpoint, grpc_channel_args* args, - gpr_slice_buffer* read_buffer, void* user_data, - grpc_error* error) { + grpc_slice_buffer* read_buffer, + void* user_data, grpc_error* error) { grpc_handshake_manager* mgr = user_data; GPR_ASSERT(mgr->state != NULL); GPR_ASSERT(mgr->state->index < mgr->count); @@ -183,8 +183,8 @@ void grpc_handshake_manager_do_handshake( gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb, void* user_data) { grpc_channel_args* args_copy = grpc_channel_args_copy(args); - gpr_slice_buffer* read_buffer = gpr_malloc(sizeof(*read_buffer)); - gpr_slice_buffer_init(read_buffer); + grpc_slice_buffer* read_buffer = gpr_malloc(sizeof(*read_buffer)); + grpc_slice_buffer_init(read_buffer); if (mgr->count == 0) { // No handshakers registered, so we just immediately call the done // callback with the passed-in endpoint. diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index d574b46242..f8a36c6473 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -59,7 +59,7 @@ typedef struct grpc_handshaker grpc_handshaker; typedef void (*grpc_handshaker_done_cb)(grpc_exec_ctx* exec_ctx, grpc_endpoint* endpoint, grpc_channel_args* args, - gpr_slice_buffer* read_buffer, + grpc_slice_buffer* read_buffer, void* user_data, grpc_error* error); struct grpc_handshaker_vtable { @@ -77,7 +77,7 @@ struct grpc_handshaker_vtable { /// \a acceptor will be NULL for client-side handshakers. void (*do_handshake)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, grpc_endpoint* endpoint, grpc_channel_args* args, - gpr_slice_buffer* read_buffer, gpr_timespec deadline, + grpc_slice_buffer* read_buffer, gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb, void* user_data); }; @@ -103,7 +103,7 @@ void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker, grpc_endpoint* endpoint, grpc_channel_args* args, - gpr_slice_buffer* read_buffer, + grpc_slice_buffer* read_buffer, gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb, void* user_data); diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index 0875cc18c4..dbe0d25211 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -36,7 +36,7 @@ #include <grpc/support/string_util.h> #include <string.h> #include "src/core/lib/profiling/timers.h" -#include "src/core/lib/support/percent_encoding.h" +#include "src/core/lib/slice/percent_encoding.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/transport_impl.h" @@ -64,9 +64,9 @@ typedef struct call_data { grpc_transport_stream_op send_op; uint32_t send_length; uint32_t send_flags; - gpr_slice incoming_slice; + grpc_slice incoming_slice; grpc_slice_buffer_stream replacement_stream; - gpr_slice_buffer slices; + grpc_slice_buffer slices; /* flag that indicates that all slices of send_messages aren't availble */ bool send_message_blocked; @@ -105,16 +105,16 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) { char *message_string; gpr_asprintf(&message_string, "Received http2 header with status: %s", grpc_mdstr_as_c_string(md->value)); - gpr_slice message = gpr_slice_from_copied_string(message_string); + grpc_slice message = grpc_slice_from_copied_string(message_string); gpr_free(message_string); grpc_call_element_send_close_with_message(a->exec_ctx, a->elem, GRPC_STATUS_CANCELLED, &message); return NULL; } else if (md->key == GRPC_MDSTR_GRPC_MESSAGE) { - gpr_slice pct_decoded_msg = - gpr_permissive_percent_decode_slice(md->value->slice); - if (gpr_slice_is_equivalent(pct_decoded_msg, md->value->slice)) { - gpr_slice_unref(pct_decoded_msg); + grpc_slice pct_decoded_msg = + grpc_permissive_percent_decode_slice(md->value->slice); + if (grpc_slice_is_equivalent(pct_decoded_msg, md->value->slice)) { + grpc_slice_unref(pct_decoded_msg); return md; } else { return grpc_mdelem_from_metadata_strings( @@ -183,7 +183,7 @@ static void hc_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; - gpr_slice_buffer_reset_and_unref(&calld->slices); + grpc_slice_buffer_reset_and_unref(&calld->slices); calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error); } @@ -204,10 +204,10 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message, &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) { - memcpy(wrptr, GPR_SLICE_START_PTR(calld->incoming_slice), - GPR_SLICE_LENGTH(calld->incoming_slice)); - wrptr += GPR_SLICE_LENGTH(calld->incoming_slice); - gpr_slice_buffer_add(&calld->slices, calld->incoming_slice); + memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice), + GRPC_SLICE_LENGTH(calld->incoming_slice)); + wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice); + grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { calld->send_message_blocked = false; break; @@ -219,7 +219,7 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; calld->send_message_blocked = false; - gpr_slice_buffer_add(&calld->slices, calld->incoming_slice); + grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { /* Pass down the original send_message op that was blocked.*/ grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, @@ -347,7 +347,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, calld->on_done_recv_trailing_metadata = NULL; calld->on_complete = NULL; calld->payload_bytes = NULL; - gpr_slice_buffer_init(&calld->slices); + grpc_slice_buffer_init(&calld->slices); grpc_closure_init(&calld->hc_on_recv_initial_metadata, hc_on_recv_initial_metadata, elem); grpc_closure_init(&calld->hc_on_recv_trailing_metadata, @@ -363,7 +363,7 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, void *ignored) { call_data *calld = elem->call_data; - gpr_slice_buffer_destroy(&calld->slices); + grpc_slice_buffer_destroy(&calld->slices); } static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) { @@ -487,4 +487,5 @@ const grpc_channel_filter grpc_http_client_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "http-client"}; diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c index eb335b6b5c..80f3fc3cc3 100644 --- a/src/core/lib/channel/http_server_filter.c +++ b/src/core/lib/channel/http_server_filter.c @@ -69,7 +69,7 @@ typedef struct call_data { grpc_closure *recv_message_ready; grpc_closure *on_complete; grpc_byte_stream **pp_recv_message; - gpr_slice_buffer read_slice_buffer; + grpc_slice_buffer read_slice_buffer; grpc_slice_buffer_stream read_stream; /** Receive closures are chained: we inject this closure as the on_done_recv @@ -180,9 +180,8 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { /* Retrieve the payload from the value of the 'grpc-internal-payload-bin' header field */ calld->seen_payload_bin = 1; - gpr_slice_buffer_init(&calld->read_slice_buffer); - gpr_slice_buffer_add(&calld->read_slice_buffer, - gpr_slice_ref(md->value->slice)); + grpc_slice_buffer_add(&calld->read_slice_buffer, + grpc_slice_ref(md->value->slice)); grpc_slice_buffer_stream_init(&calld->read_stream, &calld->read_slice_buffer, 0); return NULL; @@ -338,13 +337,17 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem); grpc_closure_init(&calld->hs_on_complete, hs_on_complete, elem); grpc_closure_init(&calld->hs_recv_message_ready, hs_recv_message_ready, elem); + grpc_slice_buffer_init(&calld->read_slice_buffer); return GRPC_ERROR_NONE; } /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) {} + void *ignored) { + call_data *calld = elem->call_data; + grpc_slice_buffer_destroy(&calld->read_slice_buffer); +} /* Constructor for channel_data */ static void init_channel_elem(grpc_exec_ctx *exec_ctx, @@ -368,4 +371,5 @@ const grpc_channel_filter grpc_http_server_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "http-server"}; diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index 7dc5ae0df1..1331fe1c65 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -141,7 +141,7 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, char* message_string; gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)", op->send_message->length, calld->max_send_size); - gpr_slice message = gpr_slice_from_copied_string(message_string); + grpc_slice message = grpc_slice_from_copied_string(message_string); gpr_free(message_string); grpc_call_element_send_close_with_message( exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message); @@ -248,4 +248,5 @@ const grpc_channel_filter grpc_message_size_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "message_size"}; |