aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/channel
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-11-16 14:45:10 -0800
committerGravatar Craig Tiller <ctiller@google.com>2016-11-16 14:45:10 -0800
commitd57a1487b17a2fb7251cc575dd7161ae6c1bea13 (patch)
tree7635e13309c057d7449d3e1528429dca0070be69 /src/core/lib/channel
parent3e2048dd9cb2a5458105cbbbf4d4b4b67549bd5c (diff)
parent740665a6f65b3d827e0755de8bb1bcd57745b9f1 (diff)
Merge github.com:grpc/grpc into newlines
Diffstat (limited to 'src/core/lib/channel')
-rw-r--r--src/core/lib/channel/channel_args.c6
-rw-r--r--src/core/lib/channel/channel_args.h8
-rw-r--r--src/core/lib/channel/channel_stack.c16
-rw-r--r--src/core/lib/channel/channel_stack.h15
-rw-r--r--src/core/lib/channel/compress_filter.c35
-rw-r--r--src/core/lib/channel/connected_channel.c8
-rw-r--r--src/core/lib/channel/deadline_filter.c6
-rw-r--r--src/core/lib/channel/handshaker.c10
-rw-r--r--src/core/lib/channel/handshaker.h6
-rw-r--r--src/core/lib/channel/http_client_filter.c33
-rw-r--r--src/core/lib/channel/http_server_filter.c14
-rw-r--r--src/core/lib/channel/message_size_filter.c3
12 files changed, 105 insertions, 55 deletions
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c
index cfc072c0b5..401a2ad4fe 100644
--- a/src/core/lib/channel/channel_args.c
+++ b/src/core/lib/channel/channel_args.c
@@ -298,6 +298,12 @@ uint32_t grpc_channel_args_compression_algorithm_get_states(
}
}
+grpc_channel_args *grpc_channel_args_set_socket_mutator(
+ grpc_channel_args *a, grpc_socket_mutator *mutator) {
+ grpc_arg tmp = grpc_socket_mutator_to_arg(mutator);
+ return grpc_channel_args_copy_and_add(a, &tmp, 1);
+}
+
int grpc_channel_args_compare(const grpc_channel_args *a,
const grpc_channel_args *b) {
int c = GPR_ICMP(a->num_args, b->num_args);
diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h
index 1e05303471..88fc0e37a3 100644
--- a/src/core/lib/channel/channel_args.h
+++ b/src/core/lib/channel/channel_args.h
@@ -36,6 +36,7 @@
#include <grpc/compression.h>
#include <grpc/grpc.h>
+#include "src/core/lib/iomgr/socket_mutator.h"
// Channel args are intentionally immutable, to avoid the need for locking.
@@ -100,6 +101,13 @@ uint32_t grpc_channel_args_compression_algorithm_get_states(
int grpc_channel_args_compare(const grpc_channel_args *a,
const grpc_channel_args *b);
+/** Returns a channel arg instance with socket mutator added. The socket mutator
+ * will perform its mutate_fd method on all file descriptors used by the
+ * channel.
+ * If \a a is non-MULL, its args are copied. */
+grpc_channel_args *grpc_channel_args_set_socket_mutator(
+ grpc_channel_args *a, grpc_socket_mutator *mutator);
+
/** Returns the value of argument \a name from \a args, or NULL if not found. */
const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args,
const char *name);
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 2c5367901d..999ad5f507 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -162,7 +162,8 @@ grpc_error *grpc_call_stack_init(
grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
grpc_call_context_element *context, const void *transport_server_data,
- grpc_mdstr *path, gpr_timespec deadline, grpc_call_stack *call_stack) {
+ grpc_mdstr *path, gpr_timespec start_time, gpr_timespec deadline,
+ grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
grpc_call_element_args args;
size_t count = channel_stack->count;
@@ -179,7 +180,7 @@ grpc_error *grpc_call_stack_init(
/* init per-filter data */
grpc_error *first_error = GRPC_ERROR_NONE;
- args.start_time = gpr_now(GPR_CLOCK_MONOTONIC);
+ args.start_time = start_time;
for (i = 0; i < count; i++) {
args.call_stack = call_stack;
args.server_transport_data = transport_server_data;
@@ -255,6 +256,13 @@ char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx,
return next_elem->filter->get_peer(exec_ctx, next_elem);
}
+void grpc_channel_next_get_info(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ const grpc_channel_info *channel_info) {
+ grpc_channel_element *next_elem = elem + 1;
+ next_elem->filter->get_channel_info(exec_ctx, next_elem, channel_info);
+}
+
void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
grpc_transport_op *op) {
grpc_channel_element *next_elem = elem + 1;
@@ -288,7 +296,7 @@ void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx,
void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_status_code status,
- gpr_slice *optional_message) {
+ grpc_slice *optional_message) {
grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
memset(op, 0, sizeof(*op));
op->on_complete = grpc_closure_create(destroy_op, op);
@@ -300,7 +308,7 @@ void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_status_code status,
- gpr_slice *optional_message) {
+ grpc_slice *optional_message) {
grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
memset(op, 0, sizeof(*op));
op->on_complete = grpc_closure_create(destroy_op, op);
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index 27f3be7b29..004643d45f 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -156,6 +156,10 @@ typedef struct {
/* Implement grpc_call_get_peer() */
char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
+ /* Implement grpc_channel_get_info() */
+ void (*get_channel_info)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ const grpc_channel_info *channel_info);
+
/* The name of this filter */
const char *name;
} grpc_channel_filter;
@@ -227,7 +231,8 @@ grpc_error *grpc_call_stack_init(
grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
grpc_call_context_element *context, const void *transport_server_data,
- grpc_mdstr *path, gpr_timespec deadline, grpc_call_stack *call_stack);
+ grpc_mdstr *path, gpr_timespec start_time, gpr_timespec deadline,
+ grpc_call_stack *call_stack);
/* Set a pollset or a pollset_set for a call stack: must occur before the first
* op is started */
void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
@@ -273,6 +278,10 @@ void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
grpc_transport_op *op);
/* Pass through a request to get_peer to the next child element */
char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
+/* Pass through a request to get_channel_info() to the next child element */
+void grpc_channel_next_get_info(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ const grpc_channel_info *channel_info);
/* Given the top element of a channel stack, get the channel stack itself */
grpc_channel_stack *grpc_channel_stack_from_top_element(
@@ -289,12 +298,12 @@ void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx,
void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *cur_elem,
grpc_status_code status,
- gpr_slice *optional_message);
+ grpc_slice *optional_message);
void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *cur_elem,
grpc_status_code status,
- gpr_slice *optional_message);
+ grpc_slice *optional_message);
extern int grpc_trace_channel;
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 0981d59f63..2874d63fc7 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -35,9 +35,9 @@
#include <string.h>
#include <grpc/compression.h>
+#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpc/support/slice_buffer.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/compress_filter.h"
@@ -50,7 +50,7 @@
int grpc_compression_trace = 0;
typedef struct call_data {
- gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */
+ grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */
grpc_linked_mdelem compression_algorithm_storage;
grpc_linked_mdelem accept_encoding_storage;
uint32_t remaining_slice_bytes;
@@ -63,7 +63,7 @@ typedef struct call_data {
grpc_transport_stream_op *send_op;
uint32_t send_length;
uint32_t send_flags;
- gpr_slice incoming_slice;
+ grpc_slice incoming_slice;
grpc_slice_buffer_stream replacement_stream;
grpc_closure *post_send;
grpc_closure send_done;
@@ -111,9 +111,13 @@ static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-static int skip_compression(grpc_call_element *elem) {
+static int skip_compression(grpc_call_element *elem, uint32_t flags) {
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
+
+ if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) {
+ return 1;
+ }
if (calld->has_compression_algorithm) {
if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
return 1;
@@ -157,7 +161,7 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
grpc_call_element *elem = elemp;
call_data *calld = elem->call_data;
- gpr_slice_buffer_reset_and_unref(&calld->slices);
+ grpc_slice_buffer_reset_and_unref(&calld->slices);
calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error);
}
@@ -165,8 +169,8 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
call_data *calld = elem->call_data;
int did_compress;
- gpr_slice_buffer tmp;
- gpr_slice_buffer_init(&tmp);
+ grpc_slice_buffer tmp;
+ grpc_slice_buffer_init(&tmp);
did_compress =
grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp);
if (did_compress) {
@@ -181,7 +185,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
" bytes (%.2f%% savings)",
algo_name, before_size, after_size, 100 * savings_ratio);
}
- gpr_slice_buffer_swap(&calld->slices, &tmp);
+ grpc_slice_buffer_swap(&calld->slices, &tmp);
calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
} else {
if (grpc_compression_trace) {
@@ -195,7 +199,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
}
}
- gpr_slice_buffer_destroy(&tmp);
+ grpc_slice_buffer_destroy(&tmp);
grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
calld->send_flags);
@@ -209,7 +213,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
grpc_call_element *elem = elemp;
call_data *calld = elem->call_data;
- gpr_slice_buffer_add(&calld->slices, calld->incoming_slice);
+ grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) {
finish_send_message(exec_ctx, elem);
} else {
@@ -223,7 +227,7 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
while (grpc_byte_stream_next(exec_ctx, calld->send_op->send_message,
&calld->incoming_slice, ~(size_t)0,
&calld->got_slice)) {
- gpr_slice_buffer_add(&calld->slices, calld->incoming_slice);
+ grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) {
finish_send_message(exec_ctx, elem);
break;
@@ -241,8 +245,8 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
if (op->send_initial_metadata) {
process_send_initial_metadata(elem, op->send_initial_metadata);
}
- if (op->send_message != NULL && !skip_compression(elem) &&
- 0 == (op->send_message->flags & GRPC_WRITE_NO_COMPRESS)) {
+ if (op->send_message != NULL &&
+ !skip_compression(elem, op->send_message->flags)) {
calld->send_op = op;
calld->send_length = op->send_message->length;
calld->send_flags = op->send_message->flags;
@@ -263,7 +267,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
call_data *calld = elem->call_data;
/* initialize members */
- gpr_slice_buffer_init(&calld->slices);
+ grpc_slice_buffer_init(&calld->slices);
calld->has_compression_algorithm = 0;
grpc_closure_init(&calld->got_slice, got_slice, elem);
grpc_closure_init(&calld->send_done, send_done, elem);
@@ -277,7 +281,7 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
void *ignored) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
- gpr_slice_buffer_destroy(&calld->slices);
+ grpc_slice_buffer_destroy(&calld->slices);
}
/* Constructor for channel_data */
@@ -328,4 +332,5 @@ const grpc_channel_filter grpc_compress_filter = {
init_channel_elem,
destroy_channel_elem,
grpc_call_next_get_peer,
+ grpc_channel_next_get_info,
"compress"};
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c
index 918379c845..038e819f72 100644
--- a/src/core/lib/channel/connected_channel.c
+++ b/src/core/lib/channel/connected_channel.c
@@ -38,9 +38,9 @@
#include <string.h>
#include <grpc/byte_buffer.h>
+#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpc/support/slice_buffer.h>
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/transport.h"
@@ -134,6 +134,11 @@ static char *con_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
return grpc_transport_get_peer(exec_ctx, chand->transport);
}
+/* No-op. */
+static void con_get_channel_info(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ const grpc_channel_info *channel_info) {}
+
static const grpc_channel_filter connected_channel_filter = {
con_start_transport_stream_op,
con_start_transport_op,
@@ -145,6 +150,7 @@ static const grpc_channel_filter connected_channel_filter = {
init_channel_elem,
destroy_channel_elem,
con_get_peer,
+ con_get_channel_info,
"connected",
};
diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c
index d2ea5250f6..0e703d8d27 100644
--- a/src/core/lib/channel/deadline_filter.c
+++ b/src/core/lib/channel/deadline_filter.c
@@ -55,10 +55,10 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg,
deadline_state->timer_pending = false;
gpr_mu_unlock(&deadline_state->timer_mu);
if (error != GRPC_ERROR_CANCELLED) {
- gpr_slice msg = gpr_slice_from_static_string("Deadline Exceeded");
+ grpc_slice msg = grpc_slice_from_static_string("Deadline Exceeded");
grpc_call_element_send_cancel_with_message(
exec_ctx, elem, GRPC_STATUS_DEADLINE_EXCEEDED, &msg);
- gpr_slice_unref(msg);
+ grpc_slice_unref(msg);
}
GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer");
}
@@ -316,6 +316,7 @@ const grpc_channel_filter grpc_client_deadline_filter = {
init_channel_elem,
destroy_channel_elem,
grpc_call_next_get_peer,
+ grpc_channel_next_get_info,
"deadline",
};
@@ -330,5 +331,6 @@ const grpc_channel_filter grpc_server_deadline_filter = {
init_channel_elem,
destroy_channel_elem,
grpc_call_next_get_peer,
+ grpc_channel_next_get_info,
"deadline",
};
diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c
index 0d759887bc..a45a39981c 100644
--- a/src/core/lib/channel/handshaker.c
+++ b/src/core/lib/channel/handshaker.c
@@ -62,7 +62,7 @@ void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker,
grpc_endpoint* endpoint,
grpc_channel_args* args,
- gpr_slice_buffer* read_buffer,
+ grpc_slice_buffer* read_buffer,
gpr_timespec deadline,
grpc_tcp_server_acceptor* acceptor,
grpc_handshaker_done_cb cb, void* user_data) {
@@ -146,8 +146,8 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
static void call_next_handshaker(grpc_exec_ctx* exec_ctx,
grpc_endpoint* endpoint,
grpc_channel_args* args,
- gpr_slice_buffer* read_buffer, void* user_data,
- grpc_error* error) {
+ grpc_slice_buffer* read_buffer,
+ void* user_data, grpc_error* error) {
grpc_handshake_manager* mgr = user_data;
GPR_ASSERT(mgr->state != NULL);
GPR_ASSERT(mgr->state->index < mgr->count);
@@ -183,8 +183,8 @@ void grpc_handshake_manager_do_handshake(
gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor,
grpc_handshaker_done_cb cb, void* user_data) {
grpc_channel_args* args_copy = grpc_channel_args_copy(args);
- gpr_slice_buffer* read_buffer = gpr_malloc(sizeof(*read_buffer));
- gpr_slice_buffer_init(read_buffer);
+ grpc_slice_buffer* read_buffer = gpr_malloc(sizeof(*read_buffer));
+ grpc_slice_buffer_init(read_buffer);
if (mgr->count == 0) {
// No handshakers registered, so we just immediately call the done
// callback with the passed-in endpoint.
diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h
index d574b46242..f8a36c6473 100644
--- a/src/core/lib/channel/handshaker.h
+++ b/src/core/lib/channel/handshaker.h
@@ -59,7 +59,7 @@ typedef struct grpc_handshaker grpc_handshaker;
typedef void (*grpc_handshaker_done_cb)(grpc_exec_ctx* exec_ctx,
grpc_endpoint* endpoint,
grpc_channel_args* args,
- gpr_slice_buffer* read_buffer,
+ grpc_slice_buffer* read_buffer,
void* user_data, grpc_error* error);
struct grpc_handshaker_vtable {
@@ -77,7 +77,7 @@ struct grpc_handshaker_vtable {
/// \a acceptor will be NULL for client-side handshakers.
void (*do_handshake)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker,
grpc_endpoint* endpoint, grpc_channel_args* args,
- gpr_slice_buffer* read_buffer, gpr_timespec deadline,
+ grpc_slice_buffer* read_buffer, gpr_timespec deadline,
grpc_tcp_server_acceptor* acceptor,
grpc_handshaker_done_cb cb, void* user_data);
};
@@ -103,7 +103,7 @@ void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker,
grpc_endpoint* endpoint,
grpc_channel_args* args,
- gpr_slice_buffer* read_buffer,
+ grpc_slice_buffer* read_buffer,
gpr_timespec deadline,
grpc_tcp_server_acceptor* acceptor,
grpc_handshaker_done_cb cb, void* user_data);
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index 0875cc18c4..dbe0d25211 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -36,7 +36,7 @@
#include <grpc/support/string_util.h>
#include <string.h>
#include "src/core/lib/profiling/timers.h"
-#include "src/core/lib/support/percent_encoding.h"
+#include "src/core/lib/slice/percent_encoding.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/transport_impl.h"
@@ -64,9 +64,9 @@ typedef struct call_data {
grpc_transport_stream_op send_op;
uint32_t send_length;
uint32_t send_flags;
- gpr_slice incoming_slice;
+ grpc_slice incoming_slice;
grpc_slice_buffer_stream replacement_stream;
- gpr_slice_buffer slices;
+ grpc_slice_buffer slices;
/* flag that indicates that all slices of send_messages aren't availble */
bool send_message_blocked;
@@ -105,16 +105,16 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) {
char *message_string;
gpr_asprintf(&message_string, "Received http2 header with status: %s",
grpc_mdstr_as_c_string(md->value));
- gpr_slice message = gpr_slice_from_copied_string(message_string);
+ grpc_slice message = grpc_slice_from_copied_string(message_string);
gpr_free(message_string);
grpc_call_element_send_close_with_message(a->exec_ctx, a->elem,
GRPC_STATUS_CANCELLED, &message);
return NULL;
} else if (md->key == GRPC_MDSTR_GRPC_MESSAGE) {
- gpr_slice pct_decoded_msg =
- gpr_permissive_percent_decode_slice(md->value->slice);
- if (gpr_slice_is_equivalent(pct_decoded_msg, md->value->slice)) {
- gpr_slice_unref(pct_decoded_msg);
+ grpc_slice pct_decoded_msg =
+ grpc_permissive_percent_decode_slice(md->value->slice);
+ if (grpc_slice_is_equivalent(pct_decoded_msg, md->value->slice)) {
+ grpc_slice_unref(pct_decoded_msg);
return md;
} else {
return grpc_mdelem_from_metadata_strings(
@@ -183,7 +183,7 @@ static void hc_on_complete(grpc_exec_ctx *exec_ctx, void *user_data,
static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
grpc_call_element *elem = elemp;
call_data *calld = elem->call_data;
- gpr_slice_buffer_reset_and_unref(&calld->slices);
+ grpc_slice_buffer_reset_and_unref(&calld->slices);
calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error);
}
@@ -204,10 +204,10 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message,
&calld->incoming_slice, ~(size_t)0,
&calld->got_slice)) {
- memcpy(wrptr, GPR_SLICE_START_PTR(calld->incoming_slice),
- GPR_SLICE_LENGTH(calld->incoming_slice));
- wrptr += GPR_SLICE_LENGTH(calld->incoming_slice);
- gpr_slice_buffer_add(&calld->slices, calld->incoming_slice);
+ memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice),
+ GRPC_SLICE_LENGTH(calld->incoming_slice));
+ wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice);
+ grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) {
calld->send_message_blocked = false;
break;
@@ -219,7 +219,7 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
grpc_call_element *elem = elemp;
call_data *calld = elem->call_data;
calld->send_message_blocked = false;
- gpr_slice_buffer_add(&calld->slices, calld->incoming_slice);
+ grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) {
/* Pass down the original send_message op that was blocked.*/
grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
@@ -347,7 +347,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
calld->on_done_recv_trailing_metadata = NULL;
calld->on_complete = NULL;
calld->payload_bytes = NULL;
- gpr_slice_buffer_init(&calld->slices);
+ grpc_slice_buffer_init(&calld->slices);
grpc_closure_init(&calld->hc_on_recv_initial_metadata,
hc_on_recv_initial_metadata, elem);
grpc_closure_init(&calld->hc_on_recv_trailing_metadata,
@@ -363,7 +363,7 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
void *ignored) {
call_data *calld = elem->call_data;
- gpr_slice_buffer_destroy(&calld->slices);
+ grpc_slice_buffer_destroy(&calld->slices);
}
static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) {
@@ -487,4 +487,5 @@ const grpc_channel_filter grpc_http_client_filter = {
init_channel_elem,
destroy_channel_elem,
grpc_call_next_get_peer,
+ grpc_channel_next_get_info,
"http-client"};
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
index eb335b6b5c..80f3fc3cc3 100644
--- a/src/core/lib/channel/http_server_filter.c
+++ b/src/core/lib/channel/http_server_filter.c
@@ -69,7 +69,7 @@ typedef struct call_data {
grpc_closure *recv_message_ready;
grpc_closure *on_complete;
grpc_byte_stream **pp_recv_message;
- gpr_slice_buffer read_slice_buffer;
+ grpc_slice_buffer read_slice_buffer;
grpc_slice_buffer_stream read_stream;
/** Receive closures are chained: we inject this closure as the on_done_recv
@@ -180,9 +180,8 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
/* Retrieve the payload from the value of the 'grpc-internal-payload-bin'
header field */
calld->seen_payload_bin = 1;
- gpr_slice_buffer_init(&calld->read_slice_buffer);
- gpr_slice_buffer_add(&calld->read_slice_buffer,
- gpr_slice_ref(md->value->slice));
+ grpc_slice_buffer_add(&calld->read_slice_buffer,
+ grpc_slice_ref(md->value->slice));
grpc_slice_buffer_stream_init(&calld->read_stream,
&calld->read_slice_buffer, 0);
return NULL;
@@ -338,13 +337,17 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem);
grpc_closure_init(&calld->hs_on_complete, hs_on_complete, elem);
grpc_closure_init(&calld->hs_recv_message_ready, hs_recv_message_ready, elem);
+ grpc_slice_buffer_init(&calld->read_slice_buffer);
return GRPC_ERROR_NONE;
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
- void *ignored) {}
+ void *ignored) {
+ call_data *calld = elem->call_data;
+ grpc_slice_buffer_destroy(&calld->read_slice_buffer);
+}
/* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
@@ -368,4 +371,5 @@ const grpc_channel_filter grpc_http_server_filter = {
init_channel_elem,
destroy_channel_elem,
grpc_call_next_get_peer,
+ grpc_channel_next_get_info,
"http-server"};
diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c
index 7dc5ae0df1..1331fe1c65 100644
--- a/src/core/lib/channel/message_size_filter.c
+++ b/src/core/lib/channel/message_size_filter.c
@@ -141,7 +141,7 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
char* message_string;
gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)",
op->send_message->length, calld->max_send_size);
- gpr_slice message = gpr_slice_from_copied_string(message_string);
+ grpc_slice message = grpc_slice_from_copied_string(message_string);
gpr_free(message_string);
grpc_call_element_send_close_with_message(
exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message);
@@ -248,4 +248,5 @@ const grpc_channel_filter grpc_message_size_filter = {
init_channel_elem,
destroy_channel_elem,
grpc_call_next_get_peer,
+ grpc_channel_next_get_info,
"message_size"};