aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2017-01-23 18:26:24 -0800
committerGravatar Muxi Yan <mxyan@google.com>2017-01-23 18:26:24 -0800
commitb44e089d296b5cbc790df122196cb7f2a728e80d (patch)
tree41e6bdbcffae9c7da0ba356683113f38c2c219f5 /src/core
parente42f0b21600b9eb0f65c198ea99f0883bbde9693 (diff)
parent889b0a45352928e2a815c34d5f849be9641284b0 (diff)
Merge branch 'master' into packet-coalescing-core
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/census/tracing.c10
-rw-r--r--src/core/ext/client_channel/connector.h3
-rw-r--r--src/core/ext/client_channel/resolver_registry.h4
-rw-r--r--src/core/ext/client_channel/subchannel.c57
-rw-r--r--src/core/ext/client_channel/subchannel.h13
-rw-r--r--src/core/ext/client_channel/subchannel_index.c12
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c12
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c12
-rw-r--r--src/core/ext/transport/chttp2/client/chttp2_connector.c7
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c33
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h6
-rw-r--r--src/core/ext/transport/cronet/client/secure/cronet_channel_create.c2
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_api_dummy.c29
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c168
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c2
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c4
-rw-r--r--src/core/lib/iomgr/resource_quota.c5
-rw-r--r--src/core/lib/iomgr/resource_quota.h6
18 files changed, 231 insertions, 154 deletions
diff --git a/src/core/ext/census/tracing.c b/src/core/ext/census/tracing.c
index 3b5d6dab2b..9371fffc8d 100644
--- a/src/core/ext/census/tracing.c
+++ b/src/core/ext/census/tracing.c
@@ -31,15 +31,21 @@
*
*/
+//#include "src/core/ext/census/tracing.h"
+
#include <grpc/census.h>
+#include <stdlib.h>
/* TODO(aveitch): These are all placeholder implementations. */
int census_trace_mask(const census_context *context) {
+ abort();
return CENSUS_TRACE_MASK_NONE;
}
-void census_set_trace_mask(int trace_mask) {}
+void census_set_trace_mask(int trace_mask) { abort(); }
void census_trace_print(census_context *context, uint32_t type,
- const char *buffer, size_t n) {}
+ const char *buffer, size_t n) {
+ abort();
+}
diff --git a/src/core/ext/client_channel/connector.h b/src/core/ext/client_channel/connector.h
index 3de061620e..395f89b3b2 100644
--- a/src/core/ext/client_channel/connector.h
+++ b/src/core/ext/client_channel/connector.h
@@ -48,9 +48,6 @@ struct grpc_connector {
typedef struct {
/** set of pollsets interested in this connection */
grpc_pollset_set *interested_parties;
- /** address to connect to */
- const grpc_resolved_address *addr;
- size_t addr_len;
/** initial connect string to send */
grpc_slice initial_connect_string;
/** deadline for connection */
diff --git a/src/core/ext/client_channel/resolver_registry.h b/src/core/ext/client_channel/resolver_registry.h
index 4fb16131db..a4606463eb 100644
--- a/src/core/ext/client_channel/resolver_registry.h
+++ b/src/core/ext/client_channel/resolver_registry.h
@@ -60,7 +60,9 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory);
return it.
If a resolver factory was not found, return NULL.
\a args is a set of channel arguments to be included in the result
- (typically the set of arguments passed in from the client API). */
+ (typically the set of arguments passed in from the client API).
+ \a pollset_set is used to drive IO in the name resolution process, it
+ should not be NULL. */
grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target,
const grpc_channel_args *args,
grpc_pollset_set *pollset_set);
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index 1bac82b451..8bd284507d 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -38,12 +38,16 @@
#include <grpc/support/alloc.h>
#include <grpc/support/avl.h>
+#include <grpc/support/string_util.h>
#include "src/core/ext/client_channel/client_channel.h"
#include "src/core/ext/client_channel/initial_connect_string.h"
+#include "src/core/ext/client_channel/parse_address.h"
#include "src/core/ext/client_channel/subchannel_index.h"
+#include "src/core/ext/client_channel/uri_parser.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
@@ -95,8 +99,6 @@ struct grpc_subchannel {
size_t num_filters;
/** channel arguments */
grpc_channel_args *args;
- /** address to connect to */
- grpc_resolved_address *addr;
grpc_subchannel_key *key;
@@ -211,7 +213,6 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
grpc_subchannel *c = arg;
gpr_free((void *)c->filters);
grpc_channel_args_destroy(exec_ctx, c->args);
- gpr_free(c->addr);
grpc_slice_unref_internal(exec_ctx, c->initial_connect_string);
grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker);
grpc_connector_unref(exec_ctx, c->connector);
@@ -327,12 +328,17 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
} else {
c->filters = NULL;
}
- c->addr = gpr_malloc(sizeof(grpc_resolved_address));
- if (args->addr->len)
- memcpy(c->addr, args->addr, sizeof(grpc_resolved_address));
c->pollset_set = grpc_pollset_set_create();
- grpc_set_initial_connect_string(&c->addr, &c->initial_connect_string);
- c->args = grpc_channel_args_copy(args->args);
+ grpc_resolved_address *addr = gpr_malloc(sizeof(*addr));
+ grpc_get_subchannel_address_arg(args->args, addr);
+ grpc_set_initial_connect_string(&addr, &c->initial_connect_string);
+ static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
+ grpc_arg new_arg = grpc_create_subchannel_address_arg(addr);
+ gpr_free(addr);
+ c->args = grpc_channel_args_copy_and_add_and_remove(
+ args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1);
+ gpr_free(new_arg.value.string);
+
c->root_external_state_watcher.next = c->root_external_state_watcher.prev =
&c->root_external_state_watcher;
grpc_closure_init(&c->connected, subchannel_connected, c,
@@ -385,7 +391,6 @@ static void continue_connect_locked(grpc_exec_ctx *exec_ctx,
grpc_connect_in_args args;
args.interested_parties = c->pollset_set;
- args.addr = c->addr;
args.deadline = c->next_attempt;
args.channel_args = c->args;
args.initial_connect_string = c->initial_connect_string;
@@ -771,3 +776,37 @@ grpc_call_stack *grpc_subchannel_call_get_call_stack(
grpc_subchannel_call *subchannel_call) {
return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call);
}
+
+static void grpc_uri_to_sockaddr(char *uri_str, grpc_resolved_address *addr) {
+ grpc_uri *uri = grpc_uri_parse(uri_str, 0 /* suppress_errors */);
+ GPR_ASSERT(uri != NULL);
+ if (strcmp(uri->scheme, "ipv4") == 0) {
+ GPR_ASSERT(parse_ipv4(uri, addr));
+ } else if (strcmp(uri->scheme, "ipv6") == 0) {
+ GPR_ASSERT(parse_ipv6(uri, addr));
+ } else {
+ GPR_ASSERT(parse_unix(uri, addr));
+ }
+ grpc_uri_destroy(uri);
+}
+
+void grpc_get_subchannel_address_arg(const grpc_channel_args *args,
+ grpc_resolved_address *addr) {
+ const grpc_arg *addr_arg =
+ grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS);
+ GPR_ASSERT(addr_arg != NULL); // Should have been set by LB policy.
+ GPR_ASSERT(addr_arg->type == GRPC_ARG_STRING);
+ memset(addr, 0, sizeof(*addr));
+ if (*addr_arg->value.string != '\0') {
+ grpc_uri_to_sockaddr(addr_arg->value.string, addr);
+ }
+}
+
+grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address *addr) {
+ grpc_arg new_arg;
+ new_arg.key = GRPC_ARG_SUBCHANNEL_ADDRESS;
+ new_arg.type = GRPC_ARG_STRING;
+ new_arg.value.string =
+ addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup("");
+ return new_arg;
+}
diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h
index 24aa9f73dc..684675eb37 100644
--- a/src/core/ext/client_channel/subchannel.h
+++ b/src/core/ext/client_channel/subchannel.h
@@ -40,6 +40,9 @@
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata.h"
+// Channel arg containing a grpc_resolved_address to connect to.
+#define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
+
/** A (sub-)channel that knows how to connect to exactly one target
address. Provides a target for load balancing. */
typedef struct grpc_subchannel grpc_subchannel;
@@ -164,8 +167,6 @@ struct grpc_subchannel_args {
size_t filter_count;
/** Channel arguments to be supplied to the newly created channel */
const grpc_channel_args *args;
- /** Address to connect to */
- grpc_resolved_address *addr;
};
/** create a subchannel given a connector */
@@ -173,4 +174,12 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
grpc_connector *connector,
const grpc_subchannel_args *args);
+/// Sets \a addr from \a args.
+void grpc_get_subchannel_address_arg(const grpc_channel_args *args,
+ grpc_resolved_address *addr);
+
+/// Returns a new channel arg encoding the subchannel address as a string.
+/// Caller is responsible for freeing the string.
+grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address *addr);
+
#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_H */
diff --git a/src/core/ext/client_channel/subchannel_index.c b/src/core/ext/client_channel/subchannel_index.c
index 1ebe03ef11..11889300a2 100644
--- a/src/core/ext/client_channel/subchannel_index.c
+++ b/src/core/ext/client_channel/subchannel_index.c
@@ -86,11 +86,6 @@ static grpc_subchannel_key *create_key(
} else {
k->args.filters = NULL;
}
- k->args.addr = gpr_malloc(sizeof(grpc_resolved_address));
- k->args.addr->len = args->addr->len;
- if (k->args.addr->len > 0) {
- memcpy(k->args.addr, args->addr, sizeof(grpc_resolved_address));
- }
k->args.args = copy_channel_args(args->args);
return k;
}
@@ -108,14 +103,8 @@ static int subchannel_key_compare(grpc_subchannel_key *a,
grpc_subchannel_key *b) {
int c = GPR_ICMP(a->connector, b->connector);
if (c != 0) return c;
- c = GPR_ICMP(a->args.addr->len, b->args.addr->len);
- if (c != 0) return c;
c = GPR_ICMP(a->args.filter_count, b->args.filter_count);
if (c != 0) return c;
- if (a->args.addr->len) {
- c = memcmp(a->args.addr->addr, b->args.addr->addr, a->args.addr->len);
- if (c != 0) return c;
- }
if (a->args.filter_count > 0) {
c = memcmp(a->args.filters, b->args.filters,
a->args.filter_count * sizeof(*a->args.filters));
@@ -129,7 +118,6 @@ void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx,
grpc_connector_unref(exec_ctx, k->connector);
gpr_free((grpc_channel_args *)k->args.filters);
grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)k->args.args);
- gpr_free(k->args.addr);
gpr_free(k);
}
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 821becff69..9f2aa461be 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -36,7 +36,9 @@
#include <grpc/support/alloc.h>
#include "src/core/ext/client_channel/lb_policy_registry.h"
+#include "src/core/ext/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
typedef struct pending_pick {
@@ -466,11 +468,15 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
}
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- sc_args.addr = &addresses->addresses[i].address;
- sc_args.args = args->args;
-
+ grpc_arg addr_arg =
+ grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
+ grpc_channel_args *new_args =
+ grpc_channel_args_copy_and_add(args->args, &addr_arg, 1);
+ gpr_free(addr_arg.value.string);
+ sc_args.args = new_args;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
+ grpc_channel_args_destroy(exec_ctx, new_args);
if (subchannel != NULL) {
p->subchannels[subchannel_idx++] = subchannel;
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index cb679489c3..d17d8fa057 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -64,8 +64,10 @@
#include <grpc/support/alloc.h>
#include "src/core/ext/client_channel/lb_policy_registry.h"
+#include "src/core/ext/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h"
@@ -729,11 +731,15 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
if (addresses->addresses[i].is_balancer) continue;
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- sc_args.addr = &addresses->addresses[i].address;
- sc_args.args = args->args;
-
+ grpc_arg addr_arg =
+ grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
+ grpc_channel_args *new_args =
+ grpc_channel_args_copy_and_add(args->args, &addr_arg, 1);
+ gpr_free(addr_arg.value.string);
+ sc_args.args = new_args;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
+ grpc_channel_args_destroy(exec_ctx, new_args);
if (subchannel != NULL) {
subchannel_data *sd = gpr_malloc(sizeof(*sd));
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c
index 2c5dfaea60..013c96dc70 100644
--- a/src/core/ext/transport/chttp2/client/chttp2_connector.c
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c
@@ -43,6 +43,7 @@
#include "src/core/ext/client_channel/connector.h"
#include "src/core/ext/client_channel/http_connect_handshaker.h"
+#include "src/core/ext/client_channel/subchannel.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
@@ -220,6 +221,8 @@ static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx,
grpc_connect_out_args *result,
grpc_closure *notify) {
chttp2_connector *c = (chttp2_connector *)con;
+ grpc_resolved_address addr;
+ grpc_get_subchannel_address_arg(args->channel_args, &addr);
gpr_mu_lock(&c->mu);
GPR_ASSERT(c->notify == NULL);
c->notify = notify;
@@ -231,8 +234,8 @@ static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(!c->connecting);
c->connecting = true;
grpc_tcp_client_connect(exec_ctx, &c->connected, &c->endpoint,
- args->interested_parties, args->channel_args,
- args->addr, args->deadline);
+ args->interested_parties, args->channel_args, &addr,
+ args->deadline);
gpr_mu_unlock(&c->mu);
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 8fe12a2690..68a6a2155d 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -62,7 +62,7 @@
#define DEFAULT_WINDOW 65535
#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
#define MAX_WINDOW 0x7fffffffu
-
+#define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
#define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024)
#define MAX_CLIENT_STREAM_ID 0x7fffffffu
@@ -271,6 +271,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
window -- this should by rights be 0 */
t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
t->sent_local_settings = 0;
+ t->write_buffer_size = DEFAULT_WINDOW;
if (is_client) {
grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string(
@@ -321,6 +322,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_hpack_compressor_set_max_usable_size(&t->hpack_compressor,
(uint32_t)value);
}
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
+ t->write_buffer_size = (uint32_t)grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ (grpc_integer_options){0, 0, MAX_WRITE_BUFFER_SIZE});
} else {
static const struct {
const char *channel_arg_name;
@@ -899,15 +905,22 @@ static bool contains_non_ok_status(grpc_metadata_batch *batch) {
return false;
}
+static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s) {
+ if (s->id != 0 && (!s->write_buffering ||
+ s->flow_controlled_buffer.length > t->write_buffer_size)) {
+ grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message");
+ }
+}
+
static void add_fetched_slice_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
s->fetched_send_message_length +=
(uint32_t)GRPC_SLICE_LENGTH(s->fetching_slice);
grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice);
- if (s->id != 0) {
- grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message");
- }
+ maybe_become_writable_due_to_send_msg(exec_ctx, t, s);
}
static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
@@ -1100,14 +1113,13 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
(int64_t)len;
s->complete_fetch_covered_by_poller = op->covered_by_poller;
if (flags & GRPC_WRITE_BUFFER_HINT) {
- /* allow up to 64kb to be buffered */
- /* TODO(ctiller): make this configurable */
- s->next_message_end_offset -= 65536;
+ s->next_message_end_offset -= t->write_buffer_size;
+ s->write_buffering = true;
+ } else {
+ s->write_buffering = false;
}
continue_fetching_send_locked(exec_ctx, t, s);
- if (s->id != 0) {
- grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message");
- }
+ maybe_become_writable_due_to_send_msg(exec_ctx, t, s);
}
}
@@ -1116,6 +1128,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
s->send_trailing_metadata = op->send_trailing_metadata;
+ s->write_buffering = false;
const size_t metadata_size =
grpc_metadata_batch_size(op->send_trailing_metadata);
const size_t metadata_peer_limit =
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index a52acbacdb..ea7beb4c2b 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -249,6 +249,9 @@ struct grpc_chttp2_transport {
int64_t announce_incoming_window;
/** how much window would we like to have for incoming_window */
uint32_t connection_window_target;
+ /** how much data are we willing to buffer when the WRITE_BUFFER_HINT is set?
+ */
+ uint32_t write_buffer_size;
/** have we seen a goaway */
uint8_t seen_goaway;
@@ -403,6 +406,9 @@ struct grpc_chttp2_stream {
/** Has this stream seen an error.
If true, then pending incoming frames can be thrown away. */
bool seen_error;
+ /** Are we buffering writes on this stream? If yes, we won't become writable
+ until there's enough queued up in the flow_controlled_buffer */
+ bool write_buffering;
/** the error that resulted in this stream being read-closed */
grpc_error *read_closed_error;
diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c
index df1acddcc0..477cf07f45 100644
--- a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c
+++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c
@@ -60,7 +60,7 @@ GRPCAPI grpc_channel *grpc_cronet_secure_channel_create(
ct->host = gpr_malloc(strlen(target) + 1);
strcpy(ct->host, target);
gpr_log(GPR_DEBUG,
- "grpc_create_cronet_transport: cronet_engine = %p, target=%s", engine,
+ "grpc_create_cronet_transport: stream_engine = %p, target=%s", engine,
ct->host);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
diff --git a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c
index 38755604b9..74327a4214 100644
--- a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c
+++ b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c
@@ -38,46 +38,45 @@ library, so we can build it in all environments */
#include <grpc/support/log.h>
-#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h"
+#include "third_party/Cronet/bidirectional_stream_c.h"
#ifdef GRPC_COMPILE_WITH_CRONET
/* link with the real CRONET library in the build system */
#else
/* Dummy implementation of cronet API just to test for build-ability */
-cronet_bidirectional_stream* cronet_bidirectional_stream_create(
- cronet_engine* engine, void* annotation,
- cronet_bidirectional_stream_callback* callback) {
+bidirectional_stream* bidirectional_stream_create(
+ stream_engine* engine, void* annotation,
+ bidirectional_stream_callback* callback) {
GPR_ASSERT(0);
return NULL;
}
-int cronet_bidirectional_stream_destroy(cronet_bidirectional_stream* stream) {
+int bidirectional_stream_destroy(bidirectional_stream* stream) {
GPR_ASSERT(0);
return 0;
}
-int cronet_bidirectional_stream_start(
- cronet_bidirectional_stream* stream, const char* url, int priority,
- const char* method, const cronet_bidirectional_stream_header_array* headers,
- bool end_of_stream) {
+int bidirectional_stream_start(bidirectional_stream* stream, const char* url,
+ int priority, const char* method,
+ const bidirectional_stream_header_array* headers,
+ bool end_of_stream) {
GPR_ASSERT(0);
return 0;
}
-int cronet_bidirectional_stream_read(cronet_bidirectional_stream* stream,
- char* buffer, int capacity) {
+int bidirectional_stream_read(bidirectional_stream* stream, char* buffer,
+ int capacity) {
GPR_ASSERT(0);
return 0;
}
-int cronet_bidirectional_stream_write(cronet_bidirectional_stream* stream,
- const char* buffer, int count,
- bool end_of_stream) {
+int bidirectional_stream_write(bidirectional_stream* stream, const char* buffer,
+ int count, bool end_of_stream) {
GPR_ASSERT(0);
return 0;
}
-void cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream) {
+void bidirectional_stream_cancel(bidirectional_stream* stream) {
GPR_ASSERT(0);
}
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 96ff6ef772..447f3f31ec 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -49,7 +49,7 @@
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/transport_impl.h"
-#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h"
+#include "third_party/Cronet/bidirectional_stream_c.h"
#define GRPC_HEADER_SIZE_IN_BYTES 5
@@ -86,19 +86,18 @@ enum e_op_id {
/* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */
-static void on_stream_ready(cronet_bidirectional_stream *);
+static void on_stream_ready(bidirectional_stream *);
static void on_response_headers_received(
- cronet_bidirectional_stream *,
- const cronet_bidirectional_stream_header_array *, const char *);
-static void on_write_completed(cronet_bidirectional_stream *, const char *);
-static void on_read_completed(cronet_bidirectional_stream *, char *, int);
+ bidirectional_stream *, const bidirectional_stream_header_array *,
+ const char *);
+static void on_write_completed(bidirectional_stream *, const char *);
+static void on_read_completed(bidirectional_stream *, char *, int);
static void on_response_trailers_received(
- cronet_bidirectional_stream *,
- const cronet_bidirectional_stream_header_array *);
-static void on_succeeded(cronet_bidirectional_stream *);
-static void on_failed(cronet_bidirectional_stream *, int);
-static void on_canceled(cronet_bidirectional_stream *);
-static cronet_bidirectional_stream_callback cronet_callbacks = {
+ bidirectional_stream *, const bidirectional_stream_header_array *);
+static void on_succeeded(bidirectional_stream *);
+static void on_failed(bidirectional_stream *, int);
+static void on_canceled(bidirectional_stream *);
+static bidirectional_stream_callback cronet_callbacks = {
on_stream_ready,
on_response_headers_received,
on_read_completed,
@@ -111,7 +110,7 @@ static cronet_bidirectional_stream_callback cronet_callbacks = {
/* Cronet transport object */
struct grpc_cronet_transport {
grpc_transport base; /* must be first element in this structure */
- cronet_engine *engine;
+ stream_engine *engine;
char *host;
};
typedef struct grpc_cronet_transport grpc_cronet_transport;
@@ -181,8 +180,8 @@ struct stream_obj {
grpc_transport_stream_op *curr_op;
grpc_cronet_transport curr_ct;
grpc_stream *curr_gs;
- cronet_bidirectional_stream *cbs;
- cronet_bidirectional_stream_header_array header_array;
+ bidirectional_stream *cbs;
+ bidirectional_stream_header_array header_array;
/* Stream level state. Some state will be tracked both at stream and stream_op
* level */
@@ -352,11 +351,11 @@ static void execute_from_storage(stream_obj *s) {
/*
Cronet callback
*/
-static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
+static void on_failed(bidirectional_stream *stream, int net_error) {
CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error);
stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
- cronet_bidirectional_stream_destroy(s->cbs);
+ bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_FAILED] = true;
s->cbs = NULL;
if (s->header_array.headers) {
@@ -375,11 +374,11 @@ static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
/*
Cronet callback
*/
-static void on_canceled(cronet_bidirectional_stream *stream) {
+static void on_canceled(bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
- cronet_bidirectional_stream_destroy(s->cbs);
+ bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_CANCELED] = true;
s->cbs = NULL;
if (s->header_array.headers) {
@@ -398,11 +397,11 @@ static void on_canceled(cronet_bidirectional_stream *stream) {
/*
Cronet callback
*/
-static void on_succeeded(cronet_bidirectional_stream *stream) {
+static void on_succeeded(bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
- cronet_bidirectional_stream_destroy(s->cbs);
+ bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_SUCCEEDED] = true;
s->cbs = NULL;
free_read_buffer(s);
@@ -413,7 +412,7 @@ static void on_succeeded(cronet_bidirectional_stream *stream) {
/*
Cronet callback
*/
-static void on_stream_ready(cronet_bidirectional_stream *stream) {
+static void on_stream_ready(bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
@@ -429,7 +428,7 @@ static void on_stream_ready(cronet_bidirectional_stream *stream) {
#ifdef GRPC_CRONET_WITH_PACKET_COALESCING
if (s->state.flush_cronet_when_ready) {
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
- cronet_bidirectional_stream_flush(stream);
+ bidirectional_stream_flush(stream);
}
#endif
gpr_mu_unlock(&s->mu);
@@ -440,8 +439,8 @@ static void on_stream_ready(cronet_bidirectional_stream *stream) {
Cronet callback
*/
static void on_response_headers_received(
- cronet_bidirectional_stream *stream,
- const cronet_bidirectional_stream_header_array *headers,
+ bidirectional_stream *stream,
+ const bidirectional_stream_header_array *headers,
const char *negotiated_protocol) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
@@ -467,9 +466,9 @@ static void on_response_headers_received(
s->state.rs.read_buffer = s->state.rs.grpc_header_bytes;
s->state.rs.received_bytes = 0;
s->state.rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
- cronet_bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
- s->state.rs.remaining_bytes);
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
+ bidirectional_stream_read(s->cbs, s->state.rs.read_buffer,
+ s->state.rs.remaining_bytes);
}
gpr_mu_unlock(&s->mu);
grpc_exec_ctx_finish(&exec_ctx);
@@ -479,8 +478,7 @@ static void on_response_headers_received(
/*
Cronet callback
*/
-static void on_write_completed(cronet_bidirectional_stream *stream,
- const char *data) {
+static void on_write_completed(bidirectional_stream *stream, const char *data) {
stream_obj *s = (stream_obj *)stream->annotation;
CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
gpr_mu_lock(&s->mu);
@@ -496,7 +494,7 @@ static void on_write_completed(cronet_bidirectional_stream *stream,
/*
Cronet callback
*/
-static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
+static void on_read_completed(bidirectional_stream *stream, char *data,
int count) {
stream_obj *s = (stream_obj *)stream->annotation;
CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
@@ -504,16 +502,16 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
gpr_mu_lock(&s->mu);
s->state.state_callback_received[OP_RECV_MESSAGE] = true;
if (count > 0 && s->state.flush_read) {
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
- cronet_bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, 4096);
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
+ bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, 4096);
gpr_mu_unlock(&s->mu);
} else if (count > 0) {
s->state.rs.received_bytes += count;
s->state.rs.remaining_bytes -= count;
if (s->state.rs.remaining_bytes > 0) {
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
s->state.state_op_done[OP_READ_REQ_MADE] = true;
- cronet_bidirectional_stream_read(
+ bidirectional_stream_read(
s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
s->state.rs.remaining_bytes);
gpr_mu_unlock(&s->mu);
@@ -536,8 +534,8 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
Cronet callback
*/
static void on_response_trailers_received(
- cronet_bidirectional_stream *stream,
- const cronet_bidirectional_stream_header_array *trailers) {
+ bidirectional_stream *stream,
+ const bidirectional_stream_header_array *trailers) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
trailers);
@@ -567,12 +565,12 @@ static void on_response_trailers_received(
if (!s->state.state_op_done[OP_SEND_TRAILING_METADATA] &&
!(s->state.state_op_done[OP_CANCEL_ERROR] ||
s->state.state_callback_received[OP_FAILED])) {
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", s->cbs);
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
s->state.state_callback_received[OP_SEND_MESSAGE] = false;
- cronet_bidirectional_stream_write(s->cbs, "", 0, true);
+ bidirectional_stream_write(s->cbs, "", 0, true);
#ifdef GRPC_CRONET_WITH_PACKET_COALESCING
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
- cronet_bidirectional_stream_flush(s->cbs);
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
+ bidirectional_stream_flush(s->cbs);
#endif
s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
@@ -614,7 +612,7 @@ static void create_grpc_frame(grpc_slice_buffer *write_slice_buffer,
*/
static void convert_metadata_to_cronet_headers(
grpc_linked_mdelem *head, const char *host, char **pp_url,
- cronet_bidirectional_stream_header **pp_headers, size_t *p_num_headers,
+ bidirectional_stream_header **pp_headers, size_t *p_num_headers,
const char **method) {
grpc_linked_mdelem *curr = head;
/* Walk the linked list and get number of header fields */
@@ -625,9 +623,9 @@ static void convert_metadata_to_cronet_headers(
}
/* Allocate enough memory. It is freed in the on_stream_ready callback
*/
- cronet_bidirectional_stream_header *headers =
- (cronet_bidirectional_stream_header *)gpr_malloc(
- sizeof(cronet_bidirectional_stream_header) * num_headers_available);
+ bidirectional_stream_header *headers =
+ (bidirectional_stream_header *)gpr_malloc(
+ sizeof(bidirectional_stream_header) * num_headers_available);
*pp_headers = headers;
/* Walk the linked list again, this time copying the header fields.
@@ -857,12 +855,12 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
* on_failed */
GPR_ASSERT(s->cbs == NULL);
GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
- s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs,
- &cronet_callbacks);
- CRONET_LOG(GPR_DEBUG, "%p = cronet_bidirectional_stream_create()", s->cbs);
+ s->cbs = bidirectional_stream_create(s->curr_ct.engine, s->curr_gs,
+ &cronet_callbacks);
+ CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs);
#ifdef GRPC_CRONET_WITH_PACKET_COALESCING
- cronet_bidirectional_stream_disable_auto_flush(s->cbs, true);
- cronet_bidirectional_stream_delay_request_headers_until_flush(s->cbs, true);
+ bidirectional_stream_disable_auto_flush(s->cbs, true);
+ bidirectional_stream_delay_request_headers_until_flush(s->cbs, true);
#endif
char *url = NULL;
const char *method = "POST";
@@ -871,10 +869,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_op->send_initial_metadata->list.head, s->curr_ct.host, &url,
&s->header_array.headers, &s->header_array.count, &method);
s->header_array.capacity = s->header_array.count;
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_start(%p, %s)", s->cbs,
- url);
- cronet_bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array,
- false);
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url);
+ bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false);
stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
#ifdef GRPC_CRONET_WITH_PACKET_COALESCING
if (!stream_op->send_message && !stream_op->send_trailing_metadata) {
@@ -912,16 +908,16 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
size_t write_buffer_size;
create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
&write_buffer_size);
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, %p)",
- s->cbs, stream_state->ws.write_buffer);
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
+ stream_state->ws.write_buffer);
stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
- cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
- (int)write_buffer_size, false);
+ bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
+ (int)write_buffer_size, false);
#ifdef GRPC_CRONET_WITH_PACKET_COALESCING
if (!stream_op->send_trailing_metadata) {
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)",
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)",
s->cbs);
- cronet_bidirectional_stream_flush(s->cbs);
+ bidirectional_stream_flush(s->cbs);
result = ACTION_TAKEN_WITH_CALLBACK;
} else {
stream_state->pending_write_for_trailer = true;
@@ -944,13 +940,13 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
result = NO_ACTION_POSSIBLE;
CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
} else {
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)",
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)",
s->cbs);
stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
- cronet_bidirectional_stream_write(s->cbs, "", 0, true);
+ bidirectional_stream_write(s->cbs, "", 0, true);
#ifdef GRPC_CRONET_WITH_PACKET_COALESCING
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs);
- cronet_bidirectional_stream_flush(s->cbs);
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
+ bidirectional_stream_flush(s->cbs);
#endif
result = ACTION_TAKEN_WITH_CALLBACK;
}
@@ -981,14 +977,13 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
- GRPC_ERROR_CANCELLED);
+ GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->state_callback_received[OP_FAILED]) {
CRONET_LOG(GPR_DEBUG, "Stream failed.");
- grpc_closure_sched(
- exec_ctx, stream_op->recv_message_ready,
- make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
+ grpc_closure_sched(exec_ctx, stream_op->recv_message_ready,
+ GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->rs.read_stream_closed == true) {
@@ -1014,11 +1009,11 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(stream_state->rs.read_buffer);
stream_state->rs.remaining_bytes = stream_state->rs.length_field;
stream_state->rs.received_bytes = 0;
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
stream_state->state_op_done[OP_READ_REQ_MADE] =
true; /* Indicates that at least one read request has been made */
- cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
- stream_state->rs.remaining_bytes);
+ bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
+ stream_state->rs.remaining_bytes);
result = ACTION_TAKEN_WITH_CALLBACK;
} else {
stream_state->rs.remaining_bytes = 0;
@@ -1037,10 +1032,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
stream_state->rs.received_bytes = 0;
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
stream_state->state_op_done[OP_READ_REQ_MADE] =
true; /* Indicates that at least one read request has been made */
- cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
+ bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
stream_state->rs.remaining_bytes);
result = ACTION_TAKEN_NO_CALLBACK;
}
@@ -1049,11 +1044,11 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
stream_state->rs.received_bytes = 0;
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
stream_state->state_op_done[OP_READ_REQ_MADE] =
true; /* Indicates that at least one read request has been made */
- cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
- stream_state->rs.remaining_bytes);
+ bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
+ stream_state->rs.remaining_bytes);
result = ACTION_TAKEN_WITH_CALLBACK;
} else {
result = NO_ACTION_POSSIBLE;
@@ -1083,9 +1078,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
stream_state->rs.received_bytes = 0;
stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
stream_state->rs.length_field_received = false;
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
- cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
- stream_state->rs.remaining_bytes);
+ CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
+ bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
+ stream_state->rs.remaining_bytes);
result = ACTION_TAKEN_NO_CALLBACK;
}
} else if (stream_op->recv_trailing_metadata &&
@@ -1104,9 +1099,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
op_can_be_run(stream_op, stream_state, &oas->state,
OP_CANCEL_ERROR)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas);
- CRONET_LOG(GPR_DEBUG, "W: cronet_bidirectional_stream_cancel(%p)", s->cbs);
+ CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
if (s->cbs) {
- cronet_bidirectional_stream_cancel(s->cbs);
+ bidirectional_stream_cancel(s->cbs);
result = ACTION_TAKEN_WITH_CALLBACK;
} else {
result = ACTION_TAKEN_NO_CALLBACK;
@@ -1207,18 +1202,17 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
header_has_authority(op->send_initial_metadata->list.head)) {
/* Cronet does not support :authority header field. We cancel the call when
this field is present in metadata */
- cronet_bidirectional_stream_header_array header_array;
- cronet_bidirectional_stream_header *header;
- cronet_bidirectional_stream cbs;
+ bidirectional_stream_header_array header_array;
+ bidirectional_stream_header *header;
+ bidirectional_stream cbs;
CRONET_LOG(GPR_DEBUG,
":authority header is provided but not supported;"
" cancel operations");
/* Notify application that operation is cancelled by forging trailers */
header_array.count = 1;
header_array.capacity = 1;
- header_array.headers =
- gpr_malloc(sizeof(cronet_bidirectional_stream_header));
- header = (cronet_bidirectional_stream_header *)header_array.headers;
+ header_array.headers = gpr_malloc(sizeof(bidirectional_stream_header));
+ header = (bidirectional_stream_header *)header_array.headers;
header->key = "grpc-status";
header->value = "1"; /* Return status GRPC_STATUS_CANCELLED */
cbs.annotation = (void *)s;
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index d6664aead2..715d057c51 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -796,7 +796,7 @@ static polling_island *polling_island_merge(polling_island *p,
gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
- workqueue_move_items_to_parent(q);
+ workqueue_move_items_to_parent(p);
}
/* else if p == q, nothing needs to be done */
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 5bc5621443..9477ac3688 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -413,9 +413,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
const char *reason) {
fd->on_done_closure = on_done;
fd->released = release_fd != NULL;
- if (!fd->released) {
- shutdown(fd->fd, SHUT_RDWR);
- } else {
+ if (fd->released) {
*release_fd = fd->fd;
}
gpr_mu_lock(&fd->mu);
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index 42a044df77..31590cd53b 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -691,6 +691,11 @@ grpc_resource_user *grpc_resource_user_create(
return resource_user;
}
+grpc_resource_quota *grpc_resource_user_quota(
+ grpc_resource_user *resource_user) {
+ return resource_user->resource_quota;
+}
+
static void ru_ref_by(grpc_resource_user *resource_user, gpr_atm amount) {
GPR_ASSERT(amount > 0);
GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&resource_user->refs, amount) != 0);
diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h
index ef286c2fce..d1127ce9ea 100644
--- a/src/core/lib/iomgr/resource_quota.h
+++ b/src/core/lib/iomgr/resource_quota.h
@@ -88,6 +88,12 @@ typedef struct grpc_resource_user grpc_resource_user;
grpc_resource_user *grpc_resource_user_create(
grpc_resource_quota *resource_quota, const char *name);
+
+/* Returns a borrowed reference to the underlying resource quota for this
+ resource user. */
+grpc_resource_quota *grpc_resource_user_quota(
+ grpc_resource_user *resource_user);
+
void grpc_resource_user_ref(grpc_resource_user *resource_user);
void grpc_resource_user_unref(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user);