aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport/chttp2
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport/chttp2')
-rw-r--r--src/core/ext/transport/chttp2/README.md1
-rw-r--r--src/core/ext/transport/chttp2/client/chttp2_connector.c253
-rw-r--r--src/core/ext/transport/chttp2/client/chttp2_connector.h41
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c182
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c2
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c281
-rw-r--r--src/core/ext/transport/chttp2/server/chttp2_server.c342
-rw-r--r--src/core/ext/transport/chttp2/server/chttp2_server.h47
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2.c171
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c2
-rw-r--r--src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c317
-rw-r--r--src/core/ext/transport/chttp2/transport/bin_decoder.c15
-rw-r--r--src/core/ext/transport/chttp2/transport/bin_decoder.h5
-rw-r--r--src/core/ext/transport/chttp2/transport/bin_encoder.h2
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c330
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_rst_stream.c2
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_encoder.c57
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_encoder.h6
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_parser.c73
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_parser.h6
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_table.c35
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_table.h13
-rw-r--r--src/core/ext/transport/chttp2/transport/incoming_metadata.c4
-rw-r--r--src/core/ext/transport/chttp2/transport/incoming_metadata.h2
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h19
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c10
-rw-r--r--src/core/ext/transport/chttp2/transport/stream_lists.c5
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c11
28 files changed, 1105 insertions, 1129 deletions
diff --git a/src/core/ext/transport/chttp2/README.md b/src/core/ext/transport/chttp2/README.md
new file mode 100644
index 0000000000..8880a47460
--- /dev/null
+++ b/src/core/ext/transport/chttp2/README.md
@@ -0,0 +1 @@
+CHTTP2 - gRPC's implementation of a HTTP2 based transport
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c
new file mode 100644
index 0000000000..013c96dc70
--- /dev/null
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c
@@ -0,0 +1,253 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
+
+#include <grpc/grpc.h>
+
+#include <string.h>
+
+#include <grpc/slice_buffer.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/ext/client_channel/connector.h"
+#include "src/core/ext/client_channel/http_connect_handshaker.h"
+#include "src/core/ext/client_channel/subchannel.h"
+#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/handshaker.h"
+#include "src/core/lib/channel/handshaker_registry.h"
+#include "src/core/lib/iomgr/tcp_client.h"
+#include "src/core/lib/slice/slice_internal.h"
+
+typedef struct {
+ grpc_connector base;
+
+ gpr_mu mu;
+ gpr_refcount refs;
+
+ bool shutdown;
+ bool connecting;
+
+ grpc_closure *notify;
+ grpc_connect_in_args args;
+ grpc_connect_out_args *result;
+ grpc_closure initial_string_sent;
+ grpc_slice_buffer initial_string_buffer;
+
+ grpc_endpoint *endpoint; // Non-NULL until handshaking starts.
+
+ grpc_closure connected;
+
+ grpc_handshake_manager *handshake_mgr;
+} chttp2_connector;
+
+static void chttp2_connector_ref(grpc_connector *con) {
+ chttp2_connector *c = (chttp2_connector *)con;
+ gpr_ref(&c->refs);
+}
+
+static void chttp2_connector_unref(grpc_exec_ctx *exec_ctx,
+ grpc_connector *con) {
+ chttp2_connector *c = (chttp2_connector *)con;
+ if (gpr_unref(&c->refs)) {
+ /* c->initial_string_buffer does not need to be destroyed */
+ gpr_mu_destroy(&c->mu);
+ // If handshaking is not yet in progress, destroy the endpoint.
+ // Otherwise, the handshaker will do this for us.
+ if (c->endpoint != NULL) grpc_endpoint_destroy(exec_ctx, c->endpoint);
+ gpr_free(c);
+ }
+}
+
+static void chttp2_connector_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_connector *con) {
+ chttp2_connector *c = (chttp2_connector *)con;
+ gpr_mu_lock(&c->mu);
+ c->shutdown = true;
+ if (c->handshake_mgr != NULL) {
+ grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr);
+ }
+ // If handshaking is not yet in progress, shutdown the endpoint.
+ // Otherwise, the handshaker will do this for us.
+ if (!c->connecting && c->endpoint != NULL) {
+ grpc_endpoint_shutdown(exec_ctx, c->endpoint);
+ }
+ gpr_mu_unlock(&c->mu);
+}
+
+static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_handshaker_args *args = arg;
+ chttp2_connector *c = args->user_data;
+ gpr_mu_lock(&c->mu);
+ if (error != GRPC_ERROR_NONE || c->shutdown) {
+ if (error == GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE("connector shutdown");
+ // We were shut down after handshaking completed successfully, so
+ // destroy the endpoint here.
+ // TODO(ctiller): It is currently necessary to shutdown endpoints
+ // before destroying them, even if we know that there are no
+ // pending read/write callbacks. This should be fixed, at which
+ // point this can be removed.
+ grpc_endpoint_shutdown(exec_ctx, args->endpoint);
+ grpc_endpoint_destroy(exec_ctx, args->endpoint);
+ grpc_channel_args_destroy(exec_ctx, args->args);
+ grpc_slice_buffer_destroy_internal(exec_ctx, args->read_buffer);
+ gpr_free(args->read_buffer);
+ } else {
+ error = GRPC_ERROR_REF(error);
+ }
+ memset(c->result, 0, sizeof(*c->result));
+ } else {
+ c->result->transport =
+ grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 1);
+ GPR_ASSERT(c->result->transport);
+ grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport,
+ args->read_buffer);
+ c->result->channel_args = args->args;
+ }
+ grpc_closure *notify = c->notify;
+ c->notify = NULL;
+ grpc_closure_sched(exec_ctx, notify, error);
+ grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr);
+ c->handshake_mgr = NULL;
+ gpr_mu_unlock(&c->mu);
+ chttp2_connector_unref(exec_ctx, (grpc_connector *)c);
+}
+
+static void start_handshake_locked(grpc_exec_ctx *exec_ctx,
+ chttp2_connector *c) {
+ c->handshake_mgr = grpc_handshake_manager_create();
+ grpc_handshakers_add(exec_ctx, HANDSHAKER_CLIENT, c->args.channel_args,
+ c->handshake_mgr);
+ grpc_handshake_manager_do_handshake(
+ exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args,
+ c->args.deadline, NULL /* acceptor */, on_handshake_done, c);
+ c->endpoint = NULL; // Endpoint handed off to handshake manager.
+}
+
+static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ chttp2_connector *c = arg;
+ gpr_mu_lock(&c->mu);
+ if (error != GRPC_ERROR_NONE || c->shutdown) {
+ if (error == GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE("connector shutdown");
+ } else {
+ error = GRPC_ERROR_REF(error);
+ }
+ memset(c->result, 0, sizeof(*c->result));
+ grpc_closure *notify = c->notify;
+ c->notify = NULL;
+ grpc_closure_sched(exec_ctx, notify, error);
+ gpr_mu_unlock(&c->mu);
+ chttp2_connector_unref(exec_ctx, arg);
+ } else {
+ start_handshake_locked(exec_ctx, c);
+ gpr_mu_unlock(&c->mu);
+ }
+}
+
+static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ chttp2_connector *c = arg;
+ gpr_mu_lock(&c->mu);
+ GPR_ASSERT(c->connecting);
+ c->connecting = false;
+ if (error != GRPC_ERROR_NONE || c->shutdown) {
+ if (error == GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE("connector shutdown");
+ } else {
+ error = GRPC_ERROR_REF(error);
+ }
+ memset(c->result, 0, sizeof(*c->result));
+ grpc_closure *notify = c->notify;
+ c->notify = NULL;
+ grpc_closure_sched(exec_ctx, notify, error);
+ if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint);
+ gpr_mu_unlock(&c->mu);
+ chttp2_connector_unref(exec_ctx, arg);
+ } else {
+ GPR_ASSERT(c->endpoint != NULL);
+ if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) {
+ grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent,
+ c, grpc_schedule_on_exec_ctx);
+ grpc_slice_buffer_init(&c->initial_string_buffer);
+ grpc_slice_buffer_add(&c->initial_string_buffer,
+ c->args.initial_connect_string);
+ grpc_endpoint_write(exec_ctx, c->endpoint, &c->initial_string_buffer,
+ &c->initial_string_sent);
+ } else {
+ start_handshake_locked(exec_ctx, c);
+ }
+ gpr_mu_unlock(&c->mu);
+ }
+}
+
+static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx,
+ grpc_connector *con,
+ const grpc_connect_in_args *args,
+ grpc_connect_out_args *result,
+ grpc_closure *notify) {
+ chttp2_connector *c = (chttp2_connector *)con;
+ grpc_resolved_address addr;
+ grpc_get_subchannel_address_arg(args->channel_args, &addr);
+ gpr_mu_lock(&c->mu);
+ GPR_ASSERT(c->notify == NULL);
+ c->notify = notify;
+ c->args = *args;
+ c->result = result;
+ GPR_ASSERT(c->endpoint == NULL);
+ chttp2_connector_ref(con); // Ref taken for callback.
+ grpc_closure_init(&c->connected, connected, c, grpc_schedule_on_exec_ctx);
+ GPR_ASSERT(!c->connecting);
+ c->connecting = true;
+ grpc_tcp_client_connect(exec_ctx, &c->connected, &c->endpoint,
+ args->interested_parties, args->channel_args, &addr,
+ args->deadline);
+ gpr_mu_unlock(&c->mu);
+}
+
+static const grpc_connector_vtable chttp2_connector_vtable = {
+ chttp2_connector_ref, chttp2_connector_unref, chttp2_connector_shutdown,
+ chttp2_connector_connect};
+
+grpc_connector *grpc_chttp2_connector_create() {
+ chttp2_connector *c = gpr_malloc(sizeof(*c));
+ memset(c, 0, sizeof(*c));
+ c->base.vtable = &chttp2_connector_vtable;
+ gpr_mu_init(&c->mu);
+ gpr_ref_init(&c->refs, 1);
+ return &c->base;
+}
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.h b/src/core/ext/transport/chttp2/client/chttp2_connector.h
new file mode 100644
index 0000000000..f5d1025432
--- /dev/null
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.h
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H
+#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H
+
+#include "src/core/ext/client_channel/connector.h"
+
+grpc_connector* grpc_chttp2_connector_create();
+
+#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H */
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index 8e03fd82c1..c9f4021216 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -33,138 +33,17 @@
#include <grpc/grpc.h>
-#include <stdlib.h>
#include <string.h>
-#include <grpc/slice.h>
-#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
#include "src/core/ext/client_channel/client_channel.h"
-#include "src/core/ext/client_channel/http_connect_handshaker.h"
-#include "src/core/ext/client_channel/resolver_registry.h"
-#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+#include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/channel/compress_filter.h"
-#include "src/core/lib/channel/handshaker.h"
-#include "src/core/lib/channel/http_client_filter.h"
-#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
-//
-// connector
-//
-
-typedef struct {
- grpc_connector base;
- gpr_refcount refs;
-
- grpc_closure *notify;
- grpc_connect_in_args args;
- grpc_connect_out_args *result;
- grpc_closure initial_string_sent;
- grpc_slice_buffer initial_string_buffer;
-
- grpc_endpoint *tcp;
-
- grpc_closure connected;
-
- grpc_handshake_manager *handshake_mgr;
-} connector;
-
-static void connector_ref(grpc_connector *con) {
- connector *c = (connector *)con;
- gpr_ref(&c->refs);
-}
-
-static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
- connector *c = (connector *)con;
- if (gpr_unref(&c->refs)) {
- /* c->initial_string_buffer does not need to be destroyed */
- grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr);
- gpr_free(c);
- }
-}
-
-static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- connector_unref(exec_ctx, arg);
-}
-
-static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
- grpc_channel_args *args,
- grpc_slice_buffer *read_buffer, void *user_data,
- grpc_error *error) {
- connector *c = user_data;
- if (error != GRPC_ERROR_NONE) {
- grpc_channel_args_destroy(args);
- gpr_free(read_buffer);
- } else {
- c->result->transport =
- grpc_create_chttp2_transport(exec_ctx, args, endpoint, 1);
- GPR_ASSERT(c->result->transport);
- grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport,
- read_buffer);
- c->result->channel_args = args;
- }
- grpc_closure *notify = c->notify;
- c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
-}
-
-static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- connector *c = arg;
- grpc_endpoint *tcp = c->tcp;
- if (tcp != NULL) {
- if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) {
- grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent,
- c);
- grpc_slice_buffer_init(&c->initial_string_buffer);
- grpc_slice_buffer_add(&c->initial_string_buffer,
- c->args.initial_connect_string);
- connector_ref(arg);
- grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer,
- &c->initial_string_sent);
- } else {
- grpc_handshake_manager_do_handshake(
- exec_ctx, c->handshake_mgr, tcp, c->args.channel_args,
- c->args.deadline, NULL /* acceptor */, on_handshake_done, c);
- }
- } else {
- memset(c->result, 0, sizeof(*c->result));
- grpc_closure *notify = c->notify;
- c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL);
- }
-}
-
-static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) {}
-
-static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con,
- const grpc_connect_in_args *args,
- grpc_connect_out_args *result,
- grpc_closure *notify) {
- connector *c = (connector *)con;
- GPR_ASSERT(c->notify == NULL);
- GPR_ASSERT(notify->cb);
- c->notify = notify;
- c->args = *args;
- c->result = result;
- c->tcp = NULL;
- grpc_closure_init(&c->connected, connected, c);
- grpc_tcp_client_connect(exec_ctx, &c->connected, &c->tcp,
- args->interested_parties, args->channel_args,
- args->addr, args->deadline);
-}
-
-static const grpc_connector_vtable connector_vtable = {
- connector_ref, connector_unref, connector_shutdown, connector_connect};
-
-//
-// client_channel_factory
-//
-
static void client_channel_factory_ref(
grpc_client_channel_factory *cc_factory) {}
@@ -174,20 +53,9 @@ static void client_channel_factory_unref(
static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const grpc_subchannel_args *args) {
- connector *c = gpr_malloc(sizeof(*c));
- memset(c, 0, sizeof(*c));
- c->base.vtable = &connector_vtable;
- gpr_ref_init(&c->refs, 1);
- c->handshake_mgr = grpc_handshake_manager_create();
- char *proxy_name = grpc_get_http_proxy_server();
- if (proxy_name != NULL) {
- grpc_handshake_manager_add(
- c->handshake_mgr,
- grpc_http_connect_handshaker_create(proxy_name, args->server_name));
- gpr_free(proxy_name);
- }
- grpc_subchannel *s = grpc_subchannel_create(exec_ctx, &c->base, args);
- grpc_connector_unref(exec_ctx, &c->base);
+ grpc_connector *connector = grpc_chttp2_connector_create();
+ grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args);
+ grpc_connector_unref(exec_ctx, connector);
return s;
}
@@ -195,19 +63,15 @@ static grpc_channel *client_channel_factory_create_channel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const char *target, grpc_client_channel_type type,
const grpc_channel_args *args) {
- grpc_channel *channel =
- grpc_channel_create(exec_ctx, target, args, GRPC_CLIENT_CHANNEL, NULL);
- grpc_resolver *resolver = grpc_resolver_create(target, args);
- if (!resolver) {
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel,
- "client_channel_factory_create_channel");
- return NULL;
- }
-
- grpc_client_channel_finish_initialization(
- exec_ctx, grpc_channel_get_channel_stack(channel), resolver, cc_factory);
- GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create_channel");
-
+ // Add channel arg containing the server URI.
+ grpc_arg arg;
+ arg.type = GRPC_ARG_STRING;
+ arg.key = GRPC_ARG_SERVER_URI;
+ arg.value.string = (char *)target;
+ grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
+ grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args,
+ GRPC_CLIENT_CHANNEL, NULL);
+ grpc_channel_args_destroy(exec_ctx, new_args);
return channel;
}
@@ -230,16 +94,18 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
GRPC_API_TRACE(
"grpc_insecure_channel_create(target=%p, args=%p, reserved=%p)", 3,
(target, args, reserved));
- GPR_ASSERT(!reserved);
-
- grpc_client_channel_factory *factory =
- (grpc_client_channel_factory *)&client_channel_factory;
+ GPR_ASSERT(reserved == NULL);
+ // Add channel arg containing the client channel factory.
+ grpc_arg arg =
+ grpc_client_channel_factory_create_channel_arg(&client_channel_factory);
+ grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
+ // Create channel.
grpc_channel *channel = client_channel_factory_create_channel(
- &exec_ctx, factory, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, args);
-
- grpc_client_channel_factory_unref(&exec_ctx, factory);
+ &exec_ctx, &client_channel_factory, target,
+ GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args);
+ // Clean up.
+ grpc_channel_args_destroy(&exec_ctx, new_args);
grpc_exec_ctx_finish(&exec_ctx);
-
return channel != NULL ? channel : grpc_lame_client_channel_create(
target, GRPC_STATUS_INTERNAL,
"Failed to create client channel");
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c
index 1e5b1c22e3..f1069e2c57 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c
@@ -74,7 +74,7 @@ grpc_channel *grpc_insecure_channel_create_from_fd(
GPR_ASSERT(transport);
grpc_channel *channel = grpc_channel_create(
&exec_ctx, target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
- grpc_channel_args_destroy(final_args);
+ grpc_channel_args_destroy(&exec_ctx, final_args);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL);
grpc_exec_ctx_finish(&exec_ctx);
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index 04c88a2d36..f979d9bad5 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -33,239 +33,31 @@
#include <grpc/grpc.h>
-#include <stdlib.h>
#include <string.h>
-#include <grpc/slice.h>
-#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
#include "src/core/ext/client_channel/client_channel.h"
-#include "src/core/ext/client_channel/http_connect_handshaker.h"
-#include "src/core/ext/client_channel/resolver_registry.h"
-#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+#include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/channel/handshaker.h"
-#include "src/core/lib/iomgr/tcp_client.h"
-#include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/security/credentials/credentials.h"
-#include "src/core/lib/security/transport/auth_filters.h"
+#include "src/core/lib/security/transport/security_connector.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
-#include "src/core/lib/tsi/transport_security_interface.h"
-
-//
-// connector
-//
-
-typedef struct {
- grpc_connector base;
- gpr_refcount refs;
-
- grpc_channel_security_connector *security_connector;
-
- grpc_closure *notify;
- grpc_connect_in_args args;
- grpc_connect_out_args *result;
- grpc_closure initial_string_sent;
- grpc_slice_buffer initial_string_buffer;
-
- gpr_mu mu;
- grpc_endpoint *connecting_endpoint;
- grpc_endpoint *newly_connecting_endpoint;
-
- grpc_closure connected_closure;
-
- grpc_handshake_manager *handshake_mgr;
-
- // TODO(roth): Remove once we eliminate on_secure_handshake_done().
- grpc_channel_args *tmp_args;
-} connector;
-
-static void connector_ref(grpc_connector *con) {
- connector *c = (connector *)con;
- gpr_ref(&c->refs);
-}
-
-static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
- connector *c = (connector *)con;
- if (gpr_unref(&c->refs)) {
- /* c->initial_string_buffer does not need to be destroyed */
- grpc_channel_args_destroy(c->tmp_args);
- grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr);
- gpr_free(c);
- }
-}
-
-static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_security_status status,
- grpc_endpoint *secure_endpoint,
- grpc_auth_context *auth_context) {
- connector *c = arg;
- gpr_mu_lock(&c->mu);
- grpc_error *error = GRPC_ERROR_NONE;
- if (c->connecting_endpoint == NULL) {
- memset(c->result, 0, sizeof(*c->result));
- gpr_mu_unlock(&c->mu);
- } else if (status != GRPC_SECURITY_OK) {
- error = grpc_error_set_int(GRPC_ERROR_CREATE("Secure handshake failed"),
- GRPC_ERROR_INT_SECURITY_STATUS, status);
- memset(c->result, 0, sizeof(*c->result));
- c->connecting_endpoint = NULL;
- gpr_mu_unlock(&c->mu);
- } else {
- grpc_arg auth_context_arg;
- c->connecting_endpoint = NULL;
- gpr_mu_unlock(&c->mu);
- c->result->transport = grpc_create_chttp2_transport(
- exec_ctx, c->args.channel_args, secure_endpoint, 1);
- grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL);
- auth_context_arg = grpc_auth_context_to_arg(auth_context);
- c->result->channel_args =
- grpc_channel_args_copy_and_add(c->tmp_args, &auth_context_arg, 1);
- }
- grpc_closure *notify = c->notify;
- c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
-}
-
-static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
- grpc_channel_args *args,
- grpc_slice_buffer *read_buffer, void *user_data,
- grpc_error *error) {
- connector *c = user_data;
- c->tmp_args = args;
- if (error != GRPC_ERROR_NONE) {
- gpr_free(read_buffer);
- grpc_closure *notify = c->notify;
- c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
- } else {
- // TODO(roth, jboeuf): Convert security connector handshaking to use new
- // handshake API, and then move the code from on_secure_handshake_done()
- // into this function.
- grpc_channel_security_connector_do_handshake(
- exec_ctx, c->security_connector, endpoint, read_buffer,
- c->args.deadline, on_secure_handshake_done, c);
- }
-}
-
-static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- connector *c = arg;
- grpc_handshake_manager_do_handshake(
- exec_ctx, c->handshake_mgr, c->connecting_endpoint, c->args.channel_args,
- c->args.deadline, NULL /* acceptor */, on_handshake_done, c);
-}
-
-static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- connector *c = arg;
- grpc_endpoint *tcp = c->newly_connecting_endpoint;
- if (tcp != NULL) {
- gpr_mu_lock(&c->mu);
- GPR_ASSERT(c->connecting_endpoint == NULL);
- c->connecting_endpoint = tcp;
- gpr_mu_unlock(&c->mu);
- if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) {
- grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent,
- c);
- grpc_slice_buffer_init(&c->initial_string_buffer);
- grpc_slice_buffer_add(&c->initial_string_buffer,
- c->args.initial_connect_string);
- grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer,
- &c->initial_string_sent);
- } else {
- grpc_handshake_manager_do_handshake(
- exec_ctx, c->handshake_mgr, tcp, c->args.channel_args,
- c->args.deadline, NULL /* acceptor */, on_handshake_done, c);
- }
- } else {
- memset(c->result, 0, sizeof(*c->result));
- grpc_closure *notify = c->notify;
- c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL);
- }
-}
-
-static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
- connector *c = (connector *)con;
- grpc_endpoint *ep;
- gpr_mu_lock(&c->mu);
- ep = c->connecting_endpoint;
- c->connecting_endpoint = NULL;
- gpr_mu_unlock(&c->mu);
- if (ep) {
- grpc_endpoint_shutdown(exec_ctx, ep);
- }
-}
-
-static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con,
- const grpc_connect_in_args *args,
- grpc_connect_out_args *result,
- grpc_closure *notify) {
- connector *c = (connector *)con;
- GPR_ASSERT(c->notify == NULL);
- c->notify = notify;
- c->args = *args;
- c->result = result;
- gpr_mu_lock(&c->mu);
- GPR_ASSERT(c->connecting_endpoint == NULL);
- gpr_mu_unlock(&c->mu);
- grpc_closure_init(&c->connected_closure, connected, c);
- grpc_tcp_client_connect(
- exec_ctx, &c->connected_closure, &c->newly_connecting_endpoint,
- args->interested_parties, args->channel_args, args->addr, args->deadline);
-}
-
-static const grpc_connector_vtable connector_vtable = {
- connector_ref, connector_unref, connector_shutdown, connector_connect};
-
-//
-// client_channel_factory
-//
-
-typedef struct {
- grpc_client_channel_factory base;
- gpr_refcount refs;
- grpc_channel_security_connector *security_connector;
-} client_channel_factory;
static void client_channel_factory_ref(
- grpc_client_channel_factory *cc_factory) {
- client_channel_factory *f = (client_channel_factory *)cc_factory;
- gpr_ref(&f->refs);
-}
+ grpc_client_channel_factory *cc_factory) {}
static void client_channel_factory_unref(
- grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {
- client_channel_factory *f = (client_channel_factory *)cc_factory;
- if (gpr_unref(&f->refs)) {
- GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
- "client_channel_factory");
- gpr_free(f);
- }
-}
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {}
static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const grpc_subchannel_args *args) {
- client_channel_factory *f = (client_channel_factory *)cc_factory;
- connector *c = gpr_malloc(sizeof(*c));
- memset(c, 0, sizeof(*c));
- c->base.vtable = &connector_vtable;
- c->security_connector = f->security_connector;
- c->handshake_mgr = grpc_handshake_manager_create();
- char *proxy_name = grpc_get_http_proxy_server();
- if (proxy_name != NULL) {
- grpc_handshake_manager_add(
- c->handshake_mgr,
- grpc_http_connect_handshaker_create(proxy_name, args->server_name));
- gpr_free(proxy_name);
- }
- gpr_mu_init(&c->mu);
- gpr_ref_init(&c->refs, 1);
- grpc_subchannel *s = grpc_subchannel_create(exec_ctx, &c->base, args);
- grpc_connector_unref(exec_ctx, &c->base);
+ grpc_connector *connector = grpc_chttp2_connector_create();
+ grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args);
+ grpc_connector_unref(exec_ctx, connector);
return s;
}
@@ -273,19 +65,15 @@ static grpc_channel *client_channel_factory_create_channel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const char *target, grpc_client_channel_type type,
const grpc_channel_args *args) {
- client_channel_factory *f = (client_channel_factory *)cc_factory;
- grpc_channel *channel =
- grpc_channel_create(exec_ctx, target, args, GRPC_CLIENT_CHANNEL, NULL);
- grpc_resolver *resolver = grpc_resolver_create(target, args);
- if (resolver != NULL) {
- grpc_client_channel_finish_initialization(
- exec_ctx, grpc_channel_get_channel_stack(channel), resolver, &f->base);
- GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create");
- } else {
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel,
- "client_channel_factory_create_channel");
- channel = NULL;
- }
+ // Add channel arg containing the server URI.
+ grpc_arg arg;
+ arg.type = GRPC_ARG_STRING;
+ arg.key = GRPC_ARG_SERVER_URI;
+ arg.value.string = (char *)target;
+ grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
+ grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args,
+ GRPC_CLIENT_CHANNEL, NULL);
+ grpc_channel_args_destroy(exec_ctx, new_args);
return channel;
}
@@ -294,6 +82,9 @@ static const grpc_client_channel_factory_vtable client_channel_factory_vtable =
client_channel_factory_create_subchannel,
client_channel_factory_create_channel};
+static grpc_client_channel_factory client_channel_factory = {
+ &client_channel_factory_vtable};
+
/* Create a secure client channel:
Asynchronously: - resolve target
- connect to it (trying alternatives as presented)
@@ -320,36 +111,32 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
grpc_channel_security_connector *security_connector;
grpc_channel_args *new_args_from_connector;
if (grpc_channel_credentials_create_security_connector(
- creds, target, args, &security_connector, &new_args_from_connector) !=
- GRPC_SECURITY_OK) {
+ &exec_ctx, creds, target, args, &security_connector,
+ &new_args_from_connector) != GRPC_SECURITY_OK) {
grpc_exec_ctx_finish(&exec_ctx);
return grpc_lame_client_channel_create(
target, GRPC_STATUS_INTERNAL, "Failed to create security connector.");
}
- grpc_arg connector_arg =
- grpc_security_connector_to_arg(&security_connector->base);
+ // Add channel args containing the client channel factory and security
+ // connector.
+ grpc_arg args_to_add[2];
+ args_to_add[0] =
+ grpc_client_channel_factory_create_channel_arg(&client_channel_factory);
+ args_to_add[1] = grpc_security_connector_to_arg(&security_connector->base);
grpc_channel_args *new_args = grpc_channel_args_copy_and_add(
new_args_from_connector != NULL ? new_args_from_connector : args,
- &connector_arg, 1);
+ args_to_add, GPR_ARRAY_SIZE(args_to_add));
if (new_args_from_connector != NULL) {
- grpc_channel_args_destroy(new_args_from_connector);
+ grpc_channel_args_destroy(&exec_ctx, new_args_from_connector);
}
- // Create client channel factory.
- client_channel_factory *f = gpr_malloc(sizeof(*f));
- memset(f, 0, sizeof(*f));
- f->base.vtable = &client_channel_factory_vtable;
- gpr_ref_init(&f->refs, 1);
- GRPC_SECURITY_CONNECTOR_REF(&security_connector->base,
- "grpc_secure_channel_create");
- f->security_connector = security_connector;
// Create channel.
grpc_channel *channel = client_channel_factory_create_channel(
- &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args);
+ &exec_ctx, &client_channel_factory, target,
+ GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args);
// Clean up.
- GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
+ GRPC_SECURITY_CONNECTOR_UNREF(&exec_ctx, &security_connector->base,
"secure_client_channel_factory_create_channel");
- grpc_channel_args_destroy(new_args);
- grpc_client_channel_factory_unref(&exec_ctx, &f->base);
+ grpc_channel_args_destroy(&exec_ctx, new_args);
grpc_exec_ctx_finish(&exec_ctx);
return channel; /* may be NULL */
}
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c
new file mode 100644
index 0000000000..574d1a7710
--- /dev/null
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.c
@@ -0,0 +1,342 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/transport/chttp2/server/chttp2_server.h"
+
+#include <grpc/grpc.h>
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/handshaker.h"
+#include "src/core/lib/channel/handshaker_registry.h"
+#include "src/core/lib/channel/http_server_filter.h"
+#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/tcp_server.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/surface/api_trace.h"
+#include "src/core/lib/surface/server.h"
+
+typedef struct pending_handshake_manager_node {
+ grpc_handshake_manager *handshake_mgr;
+ struct pending_handshake_manager_node *next;
+} pending_handshake_manager_node;
+
+typedef struct {
+ grpc_server *server;
+ grpc_tcp_server *tcp_server;
+ grpc_channel_args *args;
+ gpr_mu mu;
+ bool shutdown;
+ grpc_closure tcp_server_shutdown_complete;
+ grpc_closure *server_destroy_listener_done;
+ pending_handshake_manager_node *pending_handshake_mgrs;
+} server_state;
+
+typedef struct {
+ server_state *server_state;
+ grpc_pollset *accepting_pollset;
+ grpc_tcp_server_acceptor *acceptor;
+ grpc_handshake_manager *handshake_mgr;
+} server_connection_state;
+
+static void pending_handshake_manager_add_locked(
+ server_state *state, grpc_handshake_manager *handshake_mgr) {
+ pending_handshake_manager_node *node = gpr_malloc(sizeof(*node));
+ node->handshake_mgr = handshake_mgr;
+ node->next = state->pending_handshake_mgrs;
+ state->pending_handshake_mgrs = node;
+}
+
+static void pending_handshake_manager_remove_locked(
+ server_state *state, grpc_handshake_manager *handshake_mgr) {
+ pending_handshake_manager_node **prev_node = &state->pending_handshake_mgrs;
+ for (pending_handshake_manager_node *node = state->pending_handshake_mgrs;
+ node != NULL; node = node->next) {
+ if (node->handshake_mgr == handshake_mgr) {
+ *prev_node = node->next;
+ gpr_free(node);
+ break;
+ }
+ prev_node = &node->next;
+ }
+}
+
+static void pending_handshake_manager_shutdown_locked(grpc_exec_ctx *exec_ctx,
+ server_state *state) {
+ pending_handshake_manager_node *prev_node = NULL;
+ for (pending_handshake_manager_node *node = state->pending_handshake_mgrs;
+ node != NULL; node = node->next) {
+ grpc_handshake_manager_shutdown(exec_ctx, node->handshake_mgr);
+ gpr_free(prev_node);
+ prev_node = node;
+ }
+ gpr_free(prev_node);
+ state->pending_handshake_mgrs = NULL;
+}
+
+static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_handshaker_args *args = arg;
+ server_connection_state *connection_state = args->user_data;
+ gpr_mu_lock(&connection_state->server_state->mu);
+ if (error != GRPC_ERROR_NONE || connection_state->server_state->shutdown) {
+ const char *error_str = grpc_error_string(error);
+ gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str);
+ grpc_error_free_string(error_str);
+ if (error == GRPC_ERROR_NONE && args->endpoint != NULL) {
+ // We were shut down after handshaking completed successfully, so
+ // destroy the endpoint here.
+ // TODO(ctiller): It is currently necessary to shutdown endpoints
+ // before destroying them, even if we know that there are no
+ // pending read/write callbacks. This should be fixed, at which
+ // point this can be removed.
+ grpc_endpoint_shutdown(exec_ctx, args->endpoint);
+ grpc_endpoint_destroy(exec_ctx, args->endpoint);
+ grpc_channel_args_destroy(exec_ctx, args->args);
+ grpc_slice_buffer_destroy_internal(exec_ctx, args->read_buffer);
+ gpr_free(args->read_buffer);
+ }
+ } else {
+ // If the handshaking succeeded but there is no endpoint, then the
+ // handshaker may have handed off the connection to some external
+ // code, so we can just clean up here without creating a transport.
+ if (args->endpoint != NULL) {
+ grpc_transport *transport =
+ grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0);
+ grpc_server_setup_transport(
+ exec_ctx, connection_state->server_state->server, transport,
+ connection_state->accepting_pollset, args->args);
+ grpc_chttp2_transport_start_reading(exec_ctx, transport,
+ args->read_buffer);
+ grpc_channel_args_destroy(exec_ctx, args->args);
+ }
+ }
+ pending_handshake_manager_remove_locked(connection_state->server_state,
+ connection_state->handshake_mgr);
+ gpr_mu_unlock(&connection_state->server_state->mu);
+ grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr);
+ grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server);
+ gpr_free(connection_state->acceptor);
+ gpr_free(connection_state);
+}
+
+static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
+ grpc_pollset *accepting_pollset,
+ grpc_tcp_server_acceptor *acceptor) {
+ server_state *state = arg;
+ gpr_mu_lock(&state->mu);
+ if (state->shutdown) {
+ gpr_mu_unlock(&state->mu);
+ grpc_endpoint_destroy(exec_ctx, tcp);
+ gpr_free(acceptor);
+ return;
+ }
+ grpc_handshake_manager *handshake_mgr = grpc_handshake_manager_create();
+ pending_handshake_manager_add_locked(state, handshake_mgr);
+ gpr_mu_unlock(&state->mu);
+ grpc_tcp_server_ref(state->tcp_server);
+ server_connection_state *connection_state =
+ gpr_malloc(sizeof(*connection_state));
+ connection_state->server_state = state;
+ connection_state->accepting_pollset = accepting_pollset;
+ connection_state->acceptor = acceptor;
+ connection_state->handshake_mgr = handshake_mgr;
+ grpc_handshakers_add(exec_ctx, HANDSHAKER_SERVER, state->args,
+ connection_state->handshake_mgr);
+ // TODO(roth): We should really get this timeout value from channel
+ // args instead of hard-coding it.
+ const gpr_timespec deadline = gpr_time_add(
+ gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN));
+ grpc_handshake_manager_do_handshake(exec_ctx, connection_state->handshake_mgr,
+ tcp, state->args, deadline, acceptor,
+ on_handshake_done, connection_state);
+}
+
+/* Server callback: start listening on our ports */
+static void server_start_listener(grpc_exec_ctx *exec_ctx, grpc_server *server,
+ void *arg, grpc_pollset **pollsets,
+ size_t pollset_count) {
+ server_state *state = arg;
+ gpr_mu_lock(&state->mu);
+ state->shutdown = false;
+ gpr_mu_unlock(&state->mu);
+ grpc_tcp_server_start(exec_ctx, state->tcp_server, pollsets, pollset_count,
+ on_accept, state);
+}
+
+static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ server_state *state = arg;
+ /* ensure all threads have unlocked */
+ gpr_mu_lock(&state->mu);
+ grpc_closure *destroy_done = state->server_destroy_listener_done;
+ GPR_ASSERT(state->shutdown);
+ pending_handshake_manager_shutdown_locked(exec_ctx, state);
+ gpr_mu_unlock(&state->mu);
+ // Flush queued work before destroying handshaker factory, since that
+ // may do a synchronous unref.
+ grpc_exec_ctx_flush(exec_ctx);
+ if (destroy_done != NULL) {
+ destroy_done->cb(exec_ctx, destroy_done->cb_arg, GRPC_ERROR_REF(error));
+ grpc_exec_ctx_flush(exec_ctx);
+ }
+ grpc_channel_args_destroy(exec_ctx, state->args);
+ gpr_mu_destroy(&state->mu);
+ gpr_free(state);
+}
+
+/* Server callback: destroy the tcp listener (so we don't generate further
+ callbacks) */
+static void server_destroy_listener(grpc_exec_ctx *exec_ctx,
+ grpc_server *server, void *arg,
+ grpc_closure *destroy_done) {
+ server_state *state = arg;
+ gpr_mu_lock(&state->mu);
+ state->shutdown = true;
+ state->server_destroy_listener_done = destroy_done;
+ grpc_tcp_server *tcp_server = state->tcp_server;
+ gpr_mu_unlock(&state->mu);
+ grpc_tcp_server_shutdown_listeners(exec_ctx, tcp_server);
+ grpc_tcp_server_unref(exec_ctx, tcp_server);
+}
+
+grpc_error *grpc_chttp2_server_add_port(grpc_exec_ctx *exec_ctx,
+ grpc_server *server, const char *addr,
+ grpc_channel_args *args,
+ int *port_num) {
+ grpc_resolved_addresses *resolved = NULL;
+ grpc_tcp_server *tcp_server = NULL;
+ size_t i;
+ size_t count = 0;
+ int port_temp;
+ grpc_error *err = GRPC_ERROR_NONE;
+ server_state *state = NULL;
+ grpc_error **errors = NULL;
+
+ *port_num = -1;
+
+ /* resolve address */
+ err = grpc_blocking_resolve_address(addr, "https", &resolved);
+ if (err != GRPC_ERROR_NONE) {
+ goto error;
+ }
+ state = gpr_malloc(sizeof(*state));
+ memset(state, 0, sizeof(*state));
+ grpc_closure_init(&state->tcp_server_shutdown_complete,
+ tcp_server_shutdown_complete, state,
+ grpc_schedule_on_exec_ctx);
+ err = grpc_tcp_server_create(exec_ctx, &state->tcp_server_shutdown_complete,
+ args, &tcp_server);
+ if (err != GRPC_ERROR_NONE) {
+ goto error;
+ }
+
+ state->server = server;
+ state->tcp_server = tcp_server;
+ state->args = args;
+ state->shutdown = true;
+ gpr_mu_init(&state->mu);
+
+ const size_t naddrs = resolved->naddrs;
+ errors = gpr_malloc(sizeof(*errors) * naddrs);
+ for (i = 0; i < naddrs; i++) {
+ errors[i] =
+ grpc_tcp_server_add_port(tcp_server, &resolved->addrs[i], &port_temp);
+ if (errors[i] == GRPC_ERROR_NONE) {
+ if (*port_num == -1) {
+ *port_num = port_temp;
+ } else {
+ GPR_ASSERT(*port_num == port_temp);
+ }
+ count++;
+ }
+ }
+ if (count == 0) {
+ char *msg;
+ gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved",
+ naddrs);
+ err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs);
+ gpr_free(msg);
+ goto error;
+ } else if (count != naddrs) {
+ char *msg;
+ gpr_asprintf(&msg, "Only %" PRIuPTR
+ " addresses added out of total %" PRIuPTR " resolved",
+ count, naddrs);
+ err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs);
+ gpr_free(msg);
+
+ const char *warning_message = grpc_error_string(err);
+ gpr_log(GPR_INFO, "WARNING: %s", warning_message);
+ grpc_error_free_string(warning_message);
+ /* we managed to bind some addresses: continue */
+ }
+ grpc_resolved_addresses_destroy(resolved);
+
+ /* Register with the server only upon success */
+ grpc_server_add_listener(exec_ctx, server, state, server_start_listener,
+ server_destroy_listener);
+ goto done;
+
+/* Error path: cleanup and return */
+error:
+ GPR_ASSERT(err != GRPC_ERROR_NONE);
+ if (resolved) {
+ grpc_resolved_addresses_destroy(resolved);
+ }
+ if (tcp_server) {
+ grpc_tcp_server_unref(exec_ctx, tcp_server);
+ } else {
+ grpc_channel_args_destroy(exec_ctx, args);
+ gpr_free(state);
+ }
+ *port_num = 0;
+
+done:
+ if (errors != NULL) {
+ for (i = 0; i < naddrs; i++) {
+ GRPC_ERROR_UNREF(errors[i]);
+ }
+ gpr_free(errors);
+ }
+ return err;
+}
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.h b/src/core/ext/transport/chttp2/server/chttp2_server.h
new file mode 100644
index 0000000000..2581ebaae9
--- /dev/null
+++ b/src/core/ext/transport/chttp2/server/chttp2_server.h
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H
+#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H
+
+#include <grpc/impl/codegen/grpc_types.h>
+
+#include "src/core/lib/iomgr/exec_ctx.h"
+
+/// Adds a port to \a server. Sets \a port_num to the port number.
+/// Takes ownership of \a args.
+grpc_error *grpc_chttp2_server_add_port(grpc_exec_ctx *exec_ctx,
+ grpc_server *server, const char *addr,
+ grpc_channel_args *args, int *port_num);
+
+#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H */
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
index c18d618f96..bf5026bea6 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
@@ -33,180 +33,27 @@
#include <grpc/grpc.h>
-#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-#include <grpc/support/useful.h>
-#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+#include "src/core/ext/transport/chttp2/server/chttp2_server.h"
#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/channel/handshaker.h"
-#include "src/core/lib/channel/http_server_filter.h"
-#include "src/core/lib/iomgr/resolve_address.h"
-#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/server.h"
-typedef struct server_connect_state {
- grpc_server *server;
- grpc_pollset *accepting_pollset;
- grpc_tcp_server_acceptor *acceptor;
- grpc_handshake_manager *handshake_mgr;
-} server_connect_state;
-
-static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
- grpc_channel_args *args,
- grpc_slice_buffer *read_buffer, void *user_data,
- grpc_error *error) {
- server_connect_state *state = user_data;
- if (error != GRPC_ERROR_NONE) {
- const char *error_str = grpc_error_string(error);
- gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str);
- grpc_error_free_string(error_str);
- GRPC_ERROR_UNREF(error);
- grpc_handshake_manager_shutdown(exec_ctx, state->handshake_mgr);
- gpr_free(read_buffer);
- } else {
- // Beware that the call to grpc_create_chttp2_transport() has to happen
- // before grpc_tcp_server_destroy(). This is fine here, but similar code
- // asynchronously doing a handshake instead of calling
- // grpc_tcp_server_start() (as in server_secure_chttp2.c) needs to add
- // synchronization to avoid this case.
- grpc_transport *transport =
- grpc_create_chttp2_transport(exec_ctx, args, endpoint, 0);
- grpc_server_setup_transport(exec_ctx, state->server, transport,
- state->accepting_pollset,
- grpc_server_get_channel_args(state->server));
- grpc_chttp2_transport_start_reading(exec_ctx, transport, read_buffer);
- }
- // Clean up.
- grpc_channel_args_destroy(args);
- grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr);
- gpr_free(state);
-}
-
-static void on_accept(grpc_exec_ctx *exec_ctx, void *server, grpc_endpoint *tcp,
- grpc_pollset *accepting_pollset,
- grpc_tcp_server_acceptor *acceptor) {
- server_connect_state *state = gpr_malloc(sizeof(server_connect_state));
- state->server = server;
- state->accepting_pollset = accepting_pollset;
- state->acceptor = acceptor;
- state->handshake_mgr = grpc_handshake_manager_create();
- // TODO(roth): We should really get this timeout value from channel
- // args instead of hard-coding it.
- const gpr_timespec deadline = gpr_time_add(
- gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN));
- grpc_handshake_manager_do_handshake(
- exec_ctx, state->handshake_mgr, tcp, grpc_server_get_channel_args(server),
- deadline, acceptor, on_handshake_done, state);
-}
-
-/* Server callback: start listening on our ports */
-static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp,
- grpc_pollset **pollsets, size_t pollset_count) {
- grpc_tcp_server *tcp = tcpp;
- grpc_tcp_server_start(exec_ctx, tcp, pollsets, pollset_count, on_accept,
- server);
-}
-
-/* Server callback: destroy the tcp listener (so we don't generate further
- callbacks) */
-static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp,
- grpc_closure *destroy_done) {
- grpc_tcp_server *tcp = tcpp;
- grpc_tcp_server_shutdown_listeners(exec_ctx, tcp);
- grpc_tcp_server_unref(exec_ctx, tcp);
- grpc_exec_ctx_sched(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL);
-}
-
int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
- grpc_resolved_addresses *resolved = NULL;
- grpc_tcp_server *tcp = NULL;
- size_t i;
- size_t count = 0;
- int port_num = -1;
- int port_temp;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_error *err = GRPC_ERROR_NONE;
-
+ int port_num = 0;
GRPC_API_TRACE("grpc_server_add_insecure_http2_port(server=%p, addr=%s)", 2,
(server, addr));
-
- grpc_error **errors = NULL;
- err = grpc_blocking_resolve_address(addr, "https", &resolved);
- if (err != GRPC_ERROR_NONE) {
- goto error;
- }
-
- err = grpc_tcp_server_create(&exec_ctx, NULL,
- grpc_server_get_channel_args(server), &tcp);
+ grpc_error *err = grpc_chttp2_server_add_port(
+ &exec_ctx, server, addr,
+ grpc_channel_args_copy(grpc_server_get_channel_args(server)), &port_num);
if (err != GRPC_ERROR_NONE) {
- goto error;
- }
-
- const size_t naddrs = resolved->naddrs;
- errors = gpr_malloc(sizeof(*errors) * naddrs);
- for (i = 0; i < naddrs; i++) {
- errors[i] = grpc_tcp_server_add_port(tcp, &resolved->addrs[i], &port_temp);
- if (errors[i] == GRPC_ERROR_NONE) {
- if (port_num == -1) {
- port_num = port_temp;
- } else {
- GPR_ASSERT(port_num == port_temp);
- }
- count++;
- }
+ const char *msg = grpc_error_string(err);
+ gpr_log(GPR_ERROR, "%s", msg);
+ grpc_error_free_string(msg);
+ GRPC_ERROR_UNREF(err);
}
- if (count == 0) {
- char *msg;
- gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved",
- naddrs);
- err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs);
- gpr_free(msg);
- goto error;
- } else if (count != naddrs) {
- char *msg;
- gpr_asprintf(&msg, "Only %" PRIuPTR
- " addresses added out of total %" PRIuPTR " resolved",
- count, naddrs);
- err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs);
- gpr_free(msg);
-
- const char *warning_message = grpc_error_string(err);
- gpr_log(GPR_INFO, "WARNING: %s", warning_message);
- grpc_error_free_string(warning_message);
- /* we managed to bind some addresses: continue */
- }
- grpc_resolved_addresses_destroy(resolved);
-
- /* Register with the server only upon success */
- grpc_server_add_listener(&exec_ctx, server, tcp, start, destroy);
- goto done;
-
-/* Error path: cleanup and return */
-error:
- GPR_ASSERT(err != GRPC_ERROR_NONE);
- if (resolved) {
- grpc_resolved_addresses_destroy(resolved);
- }
- if (tcp) {
- grpc_tcp_server_unref(&exec_ctx, tcp);
- }
- port_num = 0;
-
- const char *msg = grpc_error_string(err);
- gpr_log(GPR_ERROR, "%s", msg);
- grpc_error_free_string(msg);
- GRPC_ERROR_UNREF(err);
-
-done:
grpc_exec_ctx_finish(&exec_ctx);
- if (errors != NULL) {
- for (i = 0; i < naddrs; i++) {
- GRPC_ERROR_UNREF(errors[i]);
- }
- }
- gpr_free(errors);
return port_num;
}
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
index aa2ecf5743..f46e849932 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
@@ -62,7 +62,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server *server,
grpc_endpoint *server_endpoint =
grpc_tcp_create(grpc_fd_create(fd, name), resource_quota,
GRPC_TCP_DEFAULT_READ_SLICE_SIZE, name);
- grpc_resource_quota_internal_unref(&exec_ctx, resource_quota);
+ grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
gpr_free(name);
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
index 942638ad7f..395c79a71d 100644
--- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
@@ -38,218 +38,35 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-#include <grpc/support/sync.h>
-#include <grpc/support/useful.h>
+
+#include "src/core/ext/transport/chttp2/server/chttp2_server.h"
+
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
-#include "src/core/lib/channel/http_server_filter.h"
-#include "src/core/lib/iomgr/endpoint.h"
-#include "src/core/lib/iomgr/resolve_address.h"
-#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/security/credentials/credentials.h"
-#include "src/core/lib/security/transport/auth_filters.h"
-#include "src/core/lib/security/transport/security_connector.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/server.h"
-typedef struct server_secure_state {
- grpc_server *server;
- grpc_tcp_server *tcp;
- grpc_server_security_connector *sc;
- grpc_server_credentials *creds;
- bool is_shutdown;
- gpr_mu mu;
- grpc_closure tcp_server_shutdown_complete;
- grpc_closure *server_destroy_listener_done;
-} server_secure_state;
-
-typedef struct server_secure_connect {
- server_secure_state *server_state;
- grpc_pollset *accepting_pollset;
- grpc_tcp_server_acceptor *acceptor;
- grpc_handshake_manager *handshake_mgr;
- // TODO(roth): Remove the following two fields when we eliminate
- // grpc_server_security_connector_do_handshake().
- gpr_timespec deadline;
- grpc_channel_args *args;
-} server_secure_connect;
-
-static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep,
- grpc_security_status status,
- grpc_endpoint *secure_endpoint,
- grpc_auth_context *auth_context) {
- server_secure_connect *connection_state = statep;
- if (status == GRPC_SECURITY_OK) {
- if (secure_endpoint) {
- gpr_mu_lock(&connection_state->server_state->mu);
- if (!connection_state->server_state->is_shutdown) {
- grpc_transport *transport = grpc_create_chttp2_transport(
- exec_ctx, grpc_server_get_channel_args(
- connection_state->server_state->server),
- secure_endpoint, 0);
- grpc_arg args_to_add[2];
- args_to_add[0] = grpc_server_credentials_to_arg(
- connection_state->server_state->creds);
- args_to_add[1] = grpc_auth_context_to_arg(auth_context);
- grpc_channel_args *args_copy = grpc_channel_args_copy_and_add(
- connection_state->args, args_to_add, GPR_ARRAY_SIZE(args_to_add));
- grpc_server_setup_transport(
- exec_ctx, connection_state->server_state->server, transport,
- connection_state->accepting_pollset, args_copy);
- grpc_channel_args_destroy(args_copy);
- grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL);
- } else {
- /* We need to consume this here, because the server may already have
- * gone away. */
- grpc_endpoint_destroy(exec_ctx, secure_endpoint);
- }
- gpr_mu_unlock(&connection_state->server_state->mu);
- }
- } else {
- gpr_log(GPR_ERROR, "Secure transport failed with error %d", status);
- }
- grpc_channel_args_destroy(connection_state->args);
- grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp);
- gpr_free(connection_state);
-}
-
-static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
- grpc_channel_args *args,
- grpc_slice_buffer *read_buffer, void *user_data,
- grpc_error *error) {
- server_secure_connect *connection_state = user_data;
- if (error != GRPC_ERROR_NONE) {
- const char *error_str = grpc_error_string(error);
- gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str);
- grpc_error_free_string(error_str);
- GRPC_ERROR_UNREF(error);
- grpc_channel_args_destroy(args);
- gpr_free(read_buffer);
- grpc_handshake_manager_shutdown(exec_ctx, connection_state->handshake_mgr);
- grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr);
- grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp);
- gpr_free(connection_state);
- return;
- }
- grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr);
- connection_state->handshake_mgr = NULL;
- // TODO(roth, jboeuf): Convert security connector handshaking to use new
- // handshake API, and then move the code from on_secure_handshake_done()
- // into this function.
- connection_state->args = args;
- grpc_server_security_connector_do_handshake(
- exec_ctx, connection_state->server_state->sc, connection_state->acceptor,
- endpoint, read_buffer, connection_state->deadline,
- on_secure_handshake_done, connection_state);
-}
-
-static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp,
- grpc_pollset *accepting_pollset,
- grpc_tcp_server_acceptor *acceptor) {
- server_secure_state *server_state = statep;
- server_secure_connect *connection_state = NULL;
- gpr_mu_lock(&server_state->mu);
- if (server_state->is_shutdown) {
- gpr_mu_unlock(&server_state->mu);
- grpc_endpoint_destroy(exec_ctx, tcp);
- return;
- }
- gpr_mu_unlock(&server_state->mu);
- grpc_tcp_server_ref(server_state->tcp);
- connection_state = gpr_malloc(sizeof(*connection_state));
- connection_state->server_state = server_state;
- connection_state->accepting_pollset = accepting_pollset;
- connection_state->acceptor = acceptor;
- connection_state->handshake_mgr = grpc_handshake_manager_create();
- // TODO(roth): We should really get this timeout value from channel
- // args instead of hard-coding it.
- connection_state->deadline = gpr_time_add(
- gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN));
- grpc_handshake_manager_do_handshake(
- exec_ctx, connection_state->handshake_mgr, tcp,
- grpc_server_get_channel_args(connection_state->server_state->server),
- connection_state->deadline, acceptor, on_handshake_done,
- connection_state);
-}
-
-/* Server callback: start listening on our ports */
-static void server_start_listener(grpc_exec_ctx *exec_ctx, grpc_server *server,
- void *statep, grpc_pollset **pollsets,
- size_t pollset_count) {
- server_secure_state *server_state = statep;
- gpr_mu_lock(&server_state->mu);
- server_state->is_shutdown = false;
- gpr_mu_unlock(&server_state->mu);
- grpc_tcp_server_start(exec_ctx, server_state->tcp, pollsets, pollset_count,
- on_accept, server_state);
-}
-
-static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *statep,
- grpc_error *error) {
- server_secure_state *server_state = statep;
- /* ensure all threads have unlocked */
- gpr_mu_lock(&server_state->mu);
- grpc_closure *destroy_done = server_state->server_destroy_listener_done;
- GPR_ASSERT(server_state->is_shutdown);
- gpr_mu_unlock(&server_state->mu);
- /* clean up */
- grpc_server_security_connector_shutdown(exec_ctx, server_state->sc);
-
- /* Flush queued work before a synchronous unref. */
- grpc_exec_ctx_flush(exec_ctx);
- GRPC_SECURITY_CONNECTOR_UNREF(&server_state->sc->base, "server");
- grpc_server_credentials_unref(server_state->creds);
-
- if (destroy_done != NULL) {
- destroy_done->cb(exec_ctx, destroy_done->cb_arg, GRPC_ERROR_REF(error));
- grpc_exec_ctx_flush(exec_ctx);
- }
- gpr_free(server_state);
-}
-
-static void server_destroy_listener(grpc_exec_ctx *exec_ctx,
- grpc_server *server, void *statep,
- grpc_closure *callback) {
- server_secure_state *server_state = statep;
- grpc_tcp_server *tcp;
- gpr_mu_lock(&server_state->mu);
- server_state->is_shutdown = true;
- server_state->server_destroy_listener_done = callback;
- tcp = server_state->tcp;
- gpr_mu_unlock(&server_state->mu);
- grpc_tcp_server_shutdown_listeners(exec_ctx, tcp);
- grpc_tcp_server_unref(exec_ctx, server_state->tcp);
-}
-
int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
grpc_server_credentials *creds) {
- grpc_resolved_addresses *resolved = NULL;
- grpc_tcp_server *tcp = NULL;
- server_secure_state *server_state = NULL;
- size_t i;
- size_t count = 0;
- int port_num = -1;
- int port_temp;
- grpc_security_status status = GRPC_SECURITY_ERROR;
- grpc_server_security_connector *sc = NULL;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_error *err = GRPC_ERROR_NONE;
- grpc_error **errors = NULL;
-
+ grpc_server_security_connector *sc = NULL;
+ int port_num = 0;
GRPC_API_TRACE(
"grpc_server_add_secure_http2_port("
"server=%p, addr=%s, creds=%p)",
3, (server, addr, creds));
-
- /* create security context */
+ // Create security context.
if (creds == NULL) {
err = GRPC_ERROR_CREATE(
"No credentials specified for secure server port (creds==NULL)");
- goto error;
+ goto done;
}
- status = grpc_server_credentials_create_security_connector(creds, &sc);
+ grpc_security_status status =
+ grpc_server_credentials_create_security_connector(&exec_ctx, creds, &sc);
if (status != GRPC_SECURITY_OK) {
char *msg;
gpr_asprintf(&msg,
@@ -258,107 +75,27 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
err = grpc_error_set_int(GRPC_ERROR_CREATE(msg),
GRPC_ERROR_INT_SECURITY_STATUS, status);
gpr_free(msg);
- goto error;
+ goto done;
}
- sc->channel_args = grpc_server_get_channel_args(server);
-
- /* resolve address */
- err = grpc_blocking_resolve_address(addr, "https", &resolved);
- if (err != GRPC_ERROR_NONE) {
- goto error;
+ // Create channel args.
+ grpc_arg args_to_add[2];
+ args_to_add[0] = grpc_server_credentials_to_arg(creds);
+ args_to_add[1] = grpc_security_connector_to_arg(&sc->base);
+ grpc_channel_args *args =
+ grpc_channel_args_copy_and_add(grpc_server_get_channel_args(server),
+ args_to_add, GPR_ARRAY_SIZE(args_to_add));
+ // Add server port.
+ err = grpc_chttp2_server_add_port(&exec_ctx, server, addr, args, &port_num);
+done:
+ if (sc != NULL) {
+ GRPC_SECURITY_CONNECTOR_UNREF(&exec_ctx, &sc->base, "server");
}
- server_state = gpr_malloc(sizeof(*server_state));
- memset(server_state, 0, sizeof(*server_state));
- grpc_closure_init(&server_state->tcp_server_shutdown_complete,
- tcp_server_shutdown_complete, server_state);
- err = grpc_tcp_server_create(&exec_ctx,
- &server_state->tcp_server_shutdown_complete,
- grpc_server_get_channel_args(server), &tcp);
+ grpc_exec_ctx_finish(&exec_ctx);
if (err != GRPC_ERROR_NONE) {
- goto error;
+ const char *msg = grpc_error_string(err);
+ gpr_log(GPR_ERROR, "%s", msg);
+ grpc_error_free_string(msg);
+ GRPC_ERROR_UNREF(err);
}
-
- server_state->server = server;
- server_state->tcp = tcp;
- server_state->sc = sc;
- server_state->creds = grpc_server_credentials_ref(creds);
- server_state->is_shutdown = true;
- gpr_mu_init(&server_state->mu);
-
- errors = gpr_malloc(sizeof(*errors) * resolved->naddrs);
- for (i = 0; i < resolved->naddrs; i++) {
- errors[i] = grpc_tcp_server_add_port(tcp, &resolved->addrs[i], &port_temp);
- if (errors[i] == GRPC_ERROR_NONE) {
- if (port_num == -1) {
- port_num = port_temp;
- } else {
- GPR_ASSERT(port_num == port_temp);
- }
- count++;
- }
- }
- if (count == 0) {
- char *msg;
- gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved",
- resolved->naddrs);
- err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, resolved->naddrs);
- gpr_free(msg);
- goto error;
- } else if (count != resolved->naddrs) {
- char *msg;
- gpr_asprintf(&msg, "Only %" PRIuPTR
- " addresses added out of total %" PRIuPTR " resolved",
- count, resolved->naddrs);
- err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, resolved->naddrs);
- gpr_free(msg);
-
- const char *warning_message = grpc_error_string(err);
- gpr_log(GPR_INFO, "WARNING: %s", warning_message);
- grpc_error_free_string(warning_message);
- /* we managed to bind some addresses: continue */
- } else {
- for (i = 0; i < resolved->naddrs; i++) {
- GRPC_ERROR_UNREF(errors[i]);
- }
- }
- gpr_free(errors);
- errors = NULL;
- grpc_resolved_addresses_destroy(resolved);
-
- /* Register with the server only upon success */
- grpc_server_add_listener(&exec_ctx, server, server_state,
- server_start_listener, server_destroy_listener);
-
- grpc_exec_ctx_finish(&exec_ctx);
return port_num;
-
-/* Error path: cleanup and return */
-error:
- GPR_ASSERT(err != GRPC_ERROR_NONE);
- if (errors != NULL) {
- for (i = 0; i < resolved->naddrs; i++) {
- GRPC_ERROR_UNREF(errors[i]);
- }
- gpr_free(errors);
- }
- if (resolved) {
- grpc_resolved_addresses_destroy(resolved);
- }
- if (tcp) {
- grpc_tcp_server_unref(&exec_ctx, tcp);
- } else {
- if (sc) {
- grpc_exec_ctx_flush(&exec_ctx);
- GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "server");
- }
- if (server_state) {
- gpr_free(server_state);
- }
- }
- grpc_exec_ctx_finish(&exec_ctx);
- const char *msg = grpc_error_string(err);
- GRPC_ERROR_UNREF(err);
- gpr_log(GPR_ERROR, "%s", msg);
- grpc_error_free_string(msg);
- return 0;
}
diff --git a/src/core/ext/transport/chttp2/transport/bin_decoder.c b/src/core/ext/transport/chttp2/transport/bin_decoder.c
index 3eef80b557..8db36e4a7f 100644
--- a/src/core/ext/transport/chttp2/transport/bin_decoder.c
+++ b/src/core/ext/transport/chttp2/transport/bin_decoder.c
@@ -34,6 +34,7 @@
#include "src/core/ext/transport/chttp2/transport/bin_decoder.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
@@ -143,7 +144,8 @@ bool grpc_base64_decode_partial(struct grpc_base64_decode_context *ctx) {
return true;
}
-grpc_slice grpc_chttp2_base64_decode(grpc_slice input) {
+grpc_slice grpc_chttp2_base64_decode(grpc_exec_ctx *exec_ctx,
+ grpc_slice input) {
size_t input_length = GRPC_SLICE_LENGTH(input);
size_t output_length = input_length / 4 * 3;
struct grpc_base64_decode_context ctx;
@@ -179,7 +181,7 @@ grpc_slice grpc_chttp2_base64_decode(grpc_slice input) {
char *s = grpc_dump_slice(input, GPR_DUMP_ASCII);
gpr_log(GPR_ERROR, "Base64 decoding failed, input string:\n%s\n", s);
gpr_free(s);
- grpc_slice_unref(output);
+ grpc_slice_unref_internal(exec_ctx, output);
return gpr_empty_slice();
}
GPR_ASSERT(ctx.output_cur == GRPC_SLICE_END_PTR(output));
@@ -187,7 +189,8 @@ grpc_slice grpc_chttp2_base64_decode(grpc_slice input) {
return output;
}
-grpc_slice grpc_chttp2_base64_decode_with_length(grpc_slice input,
+grpc_slice grpc_chttp2_base64_decode_with_length(grpc_exec_ctx *exec_ctx,
+ grpc_slice input,
size_t output_length) {
size_t input_length = GRPC_SLICE_LENGTH(input);
grpc_slice output = grpc_slice_malloc(output_length);
@@ -200,7 +203,7 @@ grpc_slice grpc_chttp2_base64_decode_with_length(grpc_slice input,
"grpc_chttp2_base64_decode_with_length has a length of %d, which "
"has a tail of 1 byte.\n",
(int)input_length);
- grpc_slice_unref(output);
+ grpc_slice_unref_internal(exec_ctx, output);
return gpr_empty_slice();
}
@@ -210,7 +213,7 @@ grpc_slice grpc_chttp2_base64_decode_with_length(grpc_slice input,
"than the max possible output length %d.\n",
(int)output_length,
(int)(input_length / 4 * 3 + tail_xtra[input_length % 4]));
- grpc_slice_unref(output);
+ grpc_slice_unref_internal(exec_ctx, output);
return gpr_empty_slice();
}
@@ -224,7 +227,7 @@ grpc_slice grpc_chttp2_base64_decode_with_length(grpc_slice input,
char *s = grpc_dump_slice(input, GPR_DUMP_ASCII);
gpr_log(GPR_ERROR, "Base64 decoding failed, input string:\n%s\n", s);
gpr_free(s);
- grpc_slice_unref(output);
+ grpc_slice_unref_internal(exec_ctx, output);
return gpr_empty_slice();
}
GPR_ASSERT(ctx.output_cur == GRPC_SLICE_END_PTR(output));
diff --git a/src/core/ext/transport/chttp2/transport/bin_decoder.h b/src/core/ext/transport/chttp2/transport/bin_decoder.h
index 83a90be519..48ffb2ae3b 100644
--- a/src/core/ext/transport/chttp2/transport/bin_decoder.h
+++ b/src/core/ext/transport/chttp2/transport/bin_decoder.h
@@ -55,12 +55,13 @@ bool grpc_base64_decode_partial(struct grpc_base64_decode_context *ctx);
/* base64 decode a slice with pad chars. Returns a new slice, does not take
ownership of the input. Returns an empty slice if decoding is failed. */
-grpc_slice grpc_chttp2_base64_decode(grpc_slice input);
+grpc_slice grpc_chttp2_base64_decode(grpc_exec_ctx *exec_ctx, grpc_slice input);
/* base64 decode a slice without pad chars, data length is needed. Returns a new
slice, does not take ownership of the input. Returns an empty slice if
decoding is failed. */
-grpc_slice grpc_chttp2_base64_decode_with_length(grpc_slice input,
+grpc_slice grpc_chttp2_base64_decode_with_length(grpc_exec_ctx *exec_ctx,
+ grpc_slice input,
size_t output_length);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_DECODER_H */
diff --git a/src/core/ext/transport/chttp2/transport/bin_encoder.h b/src/core/ext/transport/chttp2/transport/bin_encoder.h
index 9e143b46e2..477559d0e2 100644
--- a/src/core/ext/transport/chttp2/transport/bin_encoder.h
+++ b/src/core/ext/transport/chttp2/transport/bin_encoder.h
@@ -47,7 +47,7 @@ grpc_slice grpc_chttp2_huffman_compress(grpc_slice input);
/* equivalent to:
grpc_slice x = grpc_chttp2_base64_encode(input);
grpc_slice y = grpc_chttp2_huffman_compress(x);
- grpc_slice_unref(x);
+ grpc_slice_unref_internal(exec_ctx, x);
return y; */
grpc_slice grpc_chttp2_base64_encode_and_huffman_compress_impl(
grpc_slice input);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 4e3c7ff681..68a6a2155d 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -47,10 +47,12 @@
#include "src/core/ext/transport/chttp2/transport/http2_errors.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/status_conversion.h"
+#include "src/core/ext/transport/chttp2/transport/varint.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
@@ -60,7 +62,7 @@
#define DEFAULT_WINDOW 65535
#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
#define MAX_WINDOW 0x7fffffffu
-
+#define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
#define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024)
#define MAX_CLIENT_STREAM_ID 0x7fffffffu
@@ -73,20 +75,14 @@ static const grpc_transport_vtable vtable;
static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
static void write_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error);
-static void write_action_end(grpc_exec_ctx *exec_ctx, void *t,
- grpc_error *error);
static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
-static void read_action_begin(grpc_exec_ctx *exec_ctx, void *t,
- grpc_error *error);
static void read_action_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs,
grpc_error *error);
-static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs,
- grpc_error *error);
/** Set a transport level setting, and push it to our peer */
static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_setting_id id, uint32_t value);
@@ -111,16 +107,9 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream,
grpc_error *error_ignored);
-static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t, grpc_chttp2_stream *s,
- grpc_error *error);
-static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *t,
- grpc_error *error);
static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
-static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *t,
- grpc_error *error);
static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
@@ -144,13 +133,13 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_endpoint_destroy(exec_ctx, t->ep);
- grpc_slice_buffer_destroy(&t->qbuf);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &t->qbuf);
- grpc_slice_buffer_destroy(&t->outbuf);
- grpc_chttp2_hpack_compressor_destroy(&t->hpack_compressor);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &t->outbuf);
+ grpc_chttp2_hpack_compressor_destroy(exec_ctx, &t->hpack_compressor);
- grpc_slice_buffer_destroy(&t->read_buffer);
- grpc_chttp2_hpack_parser_destroy(&t->hpack_parser);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &t->read_buffer);
+ grpc_chttp2_hpack_parser_destroy(exec_ctx, &t->hpack_parser);
grpc_chttp2_goaway_parser_destroy(&t->goaway_parser);
for (i = 0; i < STREAM_LIST_COUNT; i++) {
@@ -169,8 +158,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
and maybe they hold resources that need to be freed */
while (t->pings.next != &t->pings) {
grpc_chttp2_outstanding_ping *ping = t->pings.next;
- grpc_exec_ctx_sched(exec_ctx, ping->on_recv,
- GRPC_ERROR_CREATE("Transport closed"), NULL);
+ grpc_closure_sched(exec_ctx, ping->on_recv,
+ GRPC_ERROR_CREATE("Transport closed"));
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
gpr_free(ping);
@@ -249,21 +238,18 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_slice_buffer_init(&t->outbuf);
grpc_chttp2_hpack_compressor_init(&t->hpack_compressor);
- grpc_closure_init(&t->write_action_begin_locked, write_action_begin_locked,
- t);
- grpc_closure_init(&t->write_action, write_action, t);
- grpc_closure_init(&t->write_action_end, write_action_end, t);
- grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t);
- grpc_closure_init(&t->read_action_begin, read_action_begin, t);
- grpc_closure_init(&t->read_action_locked, read_action_locked, t);
- grpc_closure_init(&t->benign_reclaimer, benign_reclaimer, t);
- grpc_closure_init(&t->destructive_reclaimer, destructive_reclaimer, t);
- grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t);
+ grpc_closure_init(&t->write_action, write_action, t,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&t->read_action_locked, read_action_locked, t,
+ grpc_combiner_scheduler(t->combiner, false));
+ grpc_closure_init(&t->benign_reclaimer_locked, benign_reclaimer_locked, t,
+ grpc_combiner_scheduler(t->combiner, false));
grpc_closure_init(&t->destructive_reclaimer_locked,
- destructive_reclaimer_locked, t);
+ destructive_reclaimer_locked, t,
+ grpc_combiner_scheduler(t->combiner, false));
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
- grpc_chttp2_hpack_parser_init(&t->hpack_parser);
+ grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser);
grpc_slice_buffer_init(&t->read_buffer);
@@ -285,6 +271,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
window -- this should by rights be 0 */
t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
t->sent_local_settings = 0;
+ t->write_buffer_size = DEFAULT_WINDOW;
if (is_client) {
grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string(
@@ -335,6 +322,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_hpack_compressor_set_max_usable_size(&t->hpack_compressor,
(uint32_t)value);
}
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
+ t->write_buffer_size = (uint32_t)grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ (grpc_integer_options){0, 0, MAX_WRITE_BUFFER_SIZE});
} else {
static const struct {
const char *channel_arg_name;
@@ -398,9 +390,10 @@ static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp,
static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
- grpc_combiner_execute(exec_ctx, t->combiner,
- grpc_closure_create(destroy_transport_locked, t),
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(exec_ctx, grpc_closure_create(
+ destroy_transport_locked, t,
+ grpc_combiner_scheduler(t->combiner, false)),
+ GRPC_ERROR_NONE);
}
static void close_transport_locked(grpc_exec_ctx *exec_ctx,
@@ -474,8 +467,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_data_parser_init(&s->data_parser);
grpc_slice_buffer_init(&s->flow_controlled_buffer);
s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
- grpc_closure_init(&s->complete_fetch, complete_fetch, s);
- grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s);
+ grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s,
+ grpc_schedule_on_exec_ctx);
GRPC_CHTTP2_REF_TRANSPORT(t, "stream");
@@ -530,9 +523,11 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GPR_ASSERT(s->recv_message_ready == NULL);
GPR_ASSERT(s->recv_trailing_metadata_finished == NULL);
grpc_chttp2_data_parser_destroy(exec_ctx, &s->data_parser);
- grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[0]);
- grpc_chttp2_incoming_metadata_buffer_destroy(&s->metadata_buffer[1]);
- grpc_slice_buffer_destroy(&s->flow_controlled_buffer);
+ grpc_chttp2_incoming_metadata_buffer_destroy(exec_ctx,
+ &s->metadata_buffer[0]);
+ grpc_chttp2_incoming_metadata_buffer_destroy(exec_ctx,
+ &s->metadata_buffer[1]);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &s->flow_controlled_buffer);
GRPC_ERROR_UNREF(s->read_closed_error);
GRPC_ERROR_UNREF(s->write_closed_error);
@@ -550,9 +545,10 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
s->destroy_stream_arg = and_free_memory;
- grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s);
- grpc_combiner_execute(exec_ctx, t->combiner, &s->destroy_stream,
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(
+ exec_ctx, grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s,
+ grpc_combiner_scheduler(t->combiner, false)),
+ GRPC_ERROR_NONE);
GPR_TIMER_END("destroy_stream", 0);
}
@@ -602,11 +598,13 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
write_state_name(t->write_state),
write_state_name(st), reason));
t->write_state = st;
- if (st == GRPC_CHTTP2_WRITE_STATE_IDLE &&
- t->close_transport_on_writes_finished != NULL) {
- grpc_error *err = t->close_transport_on_writes_finished;
- t->close_transport_on_writes_finished = NULL;
- close_transport_locked(exec_ctx, t, err);
+ if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
+ grpc_closure_list_sched(exec_ctx, &t->run_after_write);
+ if (t->close_transport_on_writes_finished != NULL) {
+ grpc_error *err = t->close_transport_on_writes_finished;
+ t->close_transport_on_writes_finished = NULL;
+ close_transport_locked(exec_ctx, t, err);
+ }
}
}
@@ -619,9 +617,12 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
case GRPC_CHTTP2_WRITE_STATE_IDLE:
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, reason);
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
- grpc_combiner_execute_finally(exec_ctx, t->combiner,
- &t->write_action_begin_locked,
- GRPC_ERROR_NONE, covered_by_poller);
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_init(
+ &t->write_action_begin_locked, write_action_begin_locked, t,
+ grpc_combiner_finally_scheduler(t->combiner, covered_by_poller)),
+ GRPC_ERROR_NONE);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING:
set_write_state(
@@ -663,7 +664,7 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
if (!t->closed && grpc_chttp2_begin_write(exec_ctx, t)) {
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
"begin writing");
- grpc_exec_ctx_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &t->write_action, GRPC_ERROR_NONE);
} else {
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE,
"begin writing nothing");
@@ -675,19 +676,13 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) {
grpc_chttp2_transport *t = gt;
GPR_TIMER_BEGIN("write_action", 0);
- grpc_endpoint_write(exec_ctx, t->ep, &t->outbuf, &t->write_action_end);
+ grpc_endpoint_write(
+ exec_ctx, t->ep, &t->outbuf,
+ grpc_closure_init(&t->write_action_end_locked, write_action_end_locked, t,
+ grpc_combiner_scheduler(t->combiner, false)));
GPR_TIMER_END("write_action", 0);
}
-static void write_action_end(grpc_exec_ctx *exec_ctx, void *gt,
- grpc_error *error) {
- grpc_chttp2_transport *t = gt;
- GPR_TIMER_BEGIN("write_action_end", 0);
- grpc_combiner_execute(exec_ctx, t->combiner, &t->write_action_end_locked,
- GRPC_ERROR_REF(error), false);
- GPR_TIMER_END("write_action_end", 0);
-}
-
static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *error) {
GPR_TIMER_BEGIN("terminate_writing_with_lock", 0);
@@ -704,8 +699,6 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
}
}
- grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error));
-
switch (t->write_state) {
case GRPC_CHTTP2_WRITE_STATE_IDLE:
GPR_UNREACHABLE_CODE(break);
@@ -719,21 +712,29 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
"continue writing [!covered]");
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
- grpc_combiner_execute_finally(exec_ctx, t->combiner,
- &t->write_action_begin_locked,
- GRPC_ERROR_NONE, false);
+ grpc_closure_run(
+ exec_ctx,
+ grpc_closure_init(
+ &t->write_action_begin_locked, write_action_begin_locked, t,
+ grpc_combiner_finally_scheduler(t->combiner, false)),
+ GRPC_ERROR_NONE);
break;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER:
GPR_TIMER_MARK("state=writing_stale_with_poller", 0);
set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING,
"continue writing [covered]");
GRPC_CHTTP2_REF_TRANSPORT(t, "writing");
- grpc_combiner_execute_finally(exec_ctx, t->combiner,
- &t->write_action_begin_locked,
- GRPC_ERROR_NONE, true);
+ grpc_closure_run(
+ exec_ctx,
+ grpc_closure_init(&t->write_action_begin_locked,
+ write_action_begin_locked, t,
+ grpc_combiner_finally_scheduler(t->combiner, true)),
+ GRPC_ERROR_NONE);
break;
}
+ grpc_chttp2_end_write(exec_ctx, t, GRPC_ERROR_REF(error));
+
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing");
GPR_TIMER_END("terminate_writing_with_lock", 0);
}
@@ -761,7 +762,7 @@ void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx,
char *msg = grpc_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII);
GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg));
- grpc_slice_unref(goaway_text);
+ grpc_slice_unref_internal(exec_ctx, goaway_text);
t->seen_goaway = 1;
/* lie: use transient failure from the transport to indicate goaway has been
* received */
@@ -823,7 +824,14 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
}
}
+/* Flag that this closure barrier wants stats to be updated before finishing */
#define CLOSURE_BARRIER_STATS_BIT (1 << 0)
+/* Flag that this closure barrier may be covering a write in a pollset, and so
+ we should not complete this closure until we can prove that the write got
+ scheduled */
+#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 1)
+/* First bit of the reference count, stored in the high order bits (with the low
+ bits being used for flags defined above) */
#define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
static grpc_closure *add_closure_barrier(grpc_closure *closure) {
@@ -850,6 +858,16 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
return;
}
closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
+ if (grpc_http_trace) {
+ const char *errstr = grpc_error_string(error);
+ gpr_log(GPR_DEBUG,
+ "complete_closure_step: %p refs=%d flags=0x%04x desc=%s err=%s",
+ closure,
+ (int)(closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT),
+ (int)(closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT),
+ desc, errstr);
+ grpc_error_free_string(errstr);
+ }
if (error != GRPC_ERROR_NONE) {
if (closure->error_data.error == GRPC_ERROR_NONE) {
closure->error_data.error =
@@ -866,7 +884,13 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
grpc_transport_move_stats(&s->stats, s->collecting_stats);
s->collecting_stats = NULL;
}
- grpc_closure_run(exec_ctx, closure, closure->error_data.error);
+ if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
+ !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
+ grpc_closure_run(exec_ctx, closure, closure->error_data.error);
+ } else {
+ grpc_closure_list_append(&t->run_after_write, closure,
+ closure->error_data.error);
+ }
}
}
@@ -881,15 +905,22 @@ static bool contains_non_ok_status(grpc_metadata_batch *batch) {
return false;
}
+static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s) {
+ if (s->id != 0 && (!s->write_buffering ||
+ s->flow_controlled_buffer.length > t->write_buffer_size)) {
+ grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message");
+ }
+}
+
static void add_fetched_slice_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
s->fetched_send_message_length +=
(uint32_t)GRPC_SLICE_LENGTH(s->fetching_slice);
grpc_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice);
- if (s->id != 0) {
- grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message");
- }
+ maybe_become_writable_due_to_send_msg(exec_ctx, t, s);
}
static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
@@ -943,15 +974,6 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs,
}
}
-static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs,
- grpc_error *error) {
- grpc_chttp2_stream *s = gs;
- grpc_chttp2_transport *t = s->t;
- grpc_combiner_execute(exec_ctx, t->combiner, &s->complete_fetch_locked,
- GRPC_ERROR_REF(error),
- s->complete_fetch_covered_by_poller);
-}
-
static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
static void log_metadata(const grpc_metadata_batch *md_batch, uint32_t id,
@@ -987,7 +1009,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_closure *on_complete = op->on_complete;
if (on_complete == NULL) {
- on_complete = grpc_closure_create(do_nothing, NULL);
+ on_complete =
+ grpc_closure_create(do_nothing, NULL, grpc_schedule_on_exec_ctx);
}
/* use final_data as a barrier until enqueue time; the inital counter is
@@ -1011,6 +1034,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
if (op->send_initial_metadata != NULL) {
GPR_ASSERT(s->send_initial_metadata_finished == NULL);
+ on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->send_initial_metadata_finished = add_closure_barrier(on_complete);
s->send_initial_metadata = op->send_initial_metadata;
const size_t metadata_size =
@@ -1064,6 +1088,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
if (op->send_message != NULL) {
+ on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
if (s->write_closed) {
grpc_chttp2_complete_closure_step(
@@ -1088,21 +1113,22 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
(int64_t)len;
s->complete_fetch_covered_by_poller = op->covered_by_poller;
if (flags & GRPC_WRITE_BUFFER_HINT) {
- /* allow up to 64kb to be buffered */
- /* TODO(ctiller): make this configurable */
- s->next_message_end_offset -= 65536;
+ s->next_message_end_offset -= t->write_buffer_size;
+ s->write_buffering = true;
+ } else {
+ s->write_buffering = false;
}
continue_fetching_send_locked(exec_ctx, t, s);
- if (s->id != 0) {
- grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message");
- }
+ maybe_become_writable_due_to_send_msg(exec_ctx, t, s);
}
}
if (op->send_trailing_metadata != NULL) {
GPR_ASSERT(s->send_trailing_metadata_finished == NULL);
+ on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
s->send_trailing_metadata = op->send_trailing_metadata;
+ s->write_buffering = false;
const size_t metadata_size =
grpc_metadata_batch_size(op->send_trailing_metadata);
const size_t metadata_peer_limit =
@@ -1187,13 +1213,15 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
gpr_free(str);
}
- grpc_closure_init(&op->transport_private.closure, perform_stream_op_locked,
- op);
op->transport_private.args[0] = gt;
op->transport_private.args[1] = gs;
GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
- grpc_combiner_execute(exec_ctx, t->combiner, &op->transport_private.closure,
- GRPC_ERROR_NONE, op->covered_by_poller);
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_init(
+ &op->transport_private.closure, perform_stream_op_locked, op,
+ grpc_combiner_scheduler(t->combiner, op->covered_by_poller)),
+ GRPC_ERROR_NONE);
GPR_TIMER_END("perform_stream_op", 0);
}
@@ -1222,7 +1250,7 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_outstanding_ping *ping;
for (ping = t->pings.next; ping != &t->pings; ping = ping->next) {
if (0 == memcmp(opaque_8bytes, ping->id, 8)) {
- grpc_exec_ctx_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE);
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
gpr_free(ping);
@@ -1260,7 +1288,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
if (op->send_goaway) {
send_goaway(exec_ctx, t,
grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
- grpc_slice_ref(*op->goaway_message));
+ grpc_slice_ref_internal(*op->goaway_message));
}
if (op->set_accept_stream) {
@@ -1296,11 +1324,12 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
char *msg = grpc_transport_op_string(op);
gpr_free(msg);
op->transport_private.args[0] = gt;
- grpc_closure_init(&op->transport_private.closure, perform_transport_op_locked,
- op);
GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op");
- grpc_combiner_execute(exec_ctx, t->combiner, &op->transport_private.closure,
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(
+ exec_ctx, grpc_closure_init(&op->transport_private.closure,
+ perform_transport_op_locked, op,
+ grpc_combiner_scheduler(t->combiner, false)),
+ GRPC_ERROR_NONE);
}
/*******************************************************************************
@@ -1490,21 +1519,22 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
char status_string[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(status, status_string);
grpc_chttp2_incoming_metadata_buffer_add(
- &s->metadata_buffer[1],
- grpc_mdelem_from_metadata_strings(
- GRPC_MDSTR_GRPC_STATUS, grpc_mdstr_from_string(status_string)));
+ &s->metadata_buffer[1], grpc_mdelem_from_metadata_strings(
+ exec_ctx, GRPC_MDSTR_GRPC_STATUS,
+ grpc_mdstr_from_string(status_string)));
if (slice) {
grpc_chttp2_incoming_metadata_buffer_add(
&s->metadata_buffer[1],
grpc_mdelem_from_metadata_strings(
- GRPC_MDSTR_GRPC_MESSAGE,
- grpc_mdstr_from_slice(grpc_slice_ref(*slice))));
+ exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
+ grpc_mdstr_from_slice(exec_ctx,
+ grpc_slice_ref_internal(*slice))));
}
s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
if (slice) {
- grpc_slice_unref(*slice);
+ grpc_slice_unref_internal(exec_ctx, *slice);
}
}
@@ -1534,9 +1564,9 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s,
return error;
}
-static void fail_pending_writes(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t, grpc_chttp2_stream *s,
- grpc_error *error) {
+void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, grpc_error *error) {
error =
removal_error(error, s, "Pending writes failed due to stream closure");
s->send_initial_metadata = NULL;
@@ -1590,13 +1620,16 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
if (close_writes && !s->write_closed) {
s->write_closed_error = GRPC_ERROR_REF(error);
s->write_closed = true;
- fail_pending_writes(exec_ctx, t, s, GRPC_ERROR_REF(error));
+ grpc_chttp2_fail_pending_writes(exec_ctx, t, s, GRPC_ERROR_REF(error));
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
if (s->read_closed && s->write_closed) {
if (s->id != 0) {
remove_stream(exec_ctx, t, s->id,
removal_error(GRPC_ERROR_REF(error), s, "Stream removed"));
+ } else {
+ /* Purge streams waiting on concurrency still waiting for id assignment */
+ grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2");
}
@@ -1657,8 +1690,9 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if (optional_message != NULL) {
size_t msg_len = strlen(optional_message);
- GPR_ASSERT(msg_len < 127);
- message_pfx = grpc_slice_malloc(15);
+ GPR_ASSERT(msg_len <= UINT32_MAX);
+ uint32_t msg_len_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)msg_len, 0);
+ message_pfx = grpc_slice_malloc(14 + msg_len_len);
p = GRPC_SLICE_START_PTR(message_pfx);
*p++ = 0x40;
*p++ = 12; /* len(grpc-message) */
@@ -1674,7 +1708,9 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
*p++ = 'a';
*p++ = 'g';
*p++ = 'e';
- *p++ = (uint8_t)msg_len;
+ GRPC_CHTTP2_WRITE_VARINT((uint32_t)msg_len, 0, 0, p,
+ (uint32_t)msg_len_len);
+ p += msg_len_len;
GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
len += (uint32_t)GRPC_SLICE_LENGTH(message_pfx);
len += (uint32_t)msg_len;
@@ -1773,19 +1809,6 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
* INPUT PROCESSING - PARSING
*/
-static void read_action_begin(grpc_exec_ctx *exec_ctx, void *tp,
- grpc_error *error) {
- /* Control flow:
- reading_action_locked ->
- (parse_unlocked -> post_parse_locked)? ->
- post_reading_action_locked */
- GPR_TIMER_BEGIN("reading_action", 0);
- grpc_chttp2_transport *t = tp;
- grpc_combiner_execute(exec_ctx, t->combiner, &t->read_action_locked,
- GRPC_ERROR_REF(error), false);
- GPR_TIMER_END("reading_action", 0);
-}
-
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_http_parser parser;
@@ -1882,10 +1905,11 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
keep_reading = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "keep_reading");
}
- grpc_slice_buffer_reset_and_unref(&t->read_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &t->read_buffer);
if (keep_reading) {
- grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->read_action_begin);
+ grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer,
+ &t->read_action_locked);
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
} else {
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action");
@@ -1936,7 +1960,7 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs) {
if (gpr_unref(&bs->refs)) {
GRPC_ERROR_UNREF(bs->error);
- grpc_slice_buffer_destroy(&bs->slices);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &bs->slices);
gpr_mu_destroy(&bs->slice_mu);
gpr_free(bs);
}
@@ -2022,10 +2046,12 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
bs->next_action.slice = slice;
bs->next_action.max_size_hint = max_size_hint;
bs->next_action.on_complete = on_complete;
- grpc_closure_init(&bs->next_action.closure, incoming_byte_stream_next_locked,
- bs);
- grpc_combiner_execute(exec_ctx, bs->transport->combiner,
- &bs->next_action.closure, GRPC_ERROR_NONE, false);
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_init(
+ &bs->next_action.closure, incoming_byte_stream_next_locked, bs,
+ grpc_combiner_scheduler(bs->transport->combiner, false)),
+ GRPC_ERROR_NONE);
GPR_TIMER_END("incoming_byte_stream_next", 0);
return 0;
}
@@ -2047,10 +2073,12 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
GPR_TIMER_BEGIN("incoming_byte_stream_destroy", 0);
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
- grpc_closure_init(&bs->destroy_action, incoming_byte_stream_destroy_locked,
- bs);
- grpc_combiner_execute(exec_ctx, bs->transport->combiner, &bs->destroy_action,
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(
+ exec_ctx,
+ grpc_closure_init(
+ &bs->destroy_action, incoming_byte_stream_destroy_locked, bs,
+ grpc_combiner_scheduler(bs->transport->combiner, false)),
+ GRPC_ERROR_NONE);
GPR_TIMER_END("incoming_byte_stream_destroy", 0);
}
@@ -2058,7 +2086,7 @@ static void incoming_byte_stream_publish_error(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error) {
GPR_ASSERT(error != GRPC_ERROR_NONE);
- grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL);
+ grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error));
bs->on_next = NULL;
GRPC_ERROR_UNREF(bs->error);
bs->error = error;
@@ -2075,7 +2103,7 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice);
if (bs->on_next != NULL) {
*bs->next = slice;
- grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE);
bs->on_next = NULL;
} else {
grpc_slice_buffer_add(&bs->slices, slice);
@@ -2143,7 +2171,7 @@ static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer");
grpc_resource_user_post_reclaimer(exec_ctx,
grpc_endpoint_get_resource_user(t->ep),
- false, &t->benign_reclaimer);
+ false, &t->benign_reclaimer_locked);
}
}
@@ -2154,24 +2182,10 @@ static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
grpc_resource_user_post_reclaimer(exec_ctx,
grpc_endpoint_get_resource_user(t->ep),
- true, &t->destructive_reclaimer);
+ true, &t->destructive_reclaimer_locked);
}
}
-static void benign_reclaimer(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- grpc_chttp2_transport *t = arg;
- grpc_combiner_execute(exec_ctx, t->combiner, &t->benign_reclaimer_locked,
- GRPC_ERROR_REF(error), false);
-}
-
-static void destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- grpc_chttp2_transport *t = arg;
- grpc_combiner_execute(exec_ctx, t->combiner, &t->destructive_reclaimer_locked,
- GRPC_ERROR_REF(error), false);
-}
-
static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = arg;
@@ -2352,5 +2366,5 @@ void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
gpr_free(read_buffer);
}
- read_action_begin(exec_ctx, t, GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx, &t->read_action_locked, GRPC_ERROR_NONE);
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c
index b4c5ed769b..20043f5fbf 100644
--- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c
+++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c
@@ -109,7 +109,7 @@ grpc_error *grpc_chttp2_rst_stream_parser_parse(grpc_exec_ctx *exec_ctx,
(((uint32_t)p->reason_bytes[2]) << 8) |
(((uint32_t)p->reason_bytes[3]));
grpc_error *error = GRPC_ERROR_NONE;
- if (reason != GRPC_CHTTP2_NO_ERROR) {
+ if (reason != GRPC_CHTTP2_NO_ERROR || s->header_frames_received < 2) {
error = grpc_error_set_int(GRPC_ERROR_CREATE("RST_STREAM"),
GRPC_ERROR_INT_HTTP2_ERROR, (intptr_t)reason);
grpc_status_code status_code = grpc_chttp2_http2_error_to_grpc_status(
diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.c b/src/core/ext/transport/chttp2/transport/hpack_encoder.c
index eb68fe3138..49a8326f62 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_encoder.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.c
@@ -48,6 +48,7 @@
#include "src/core/ext/transport/chttp2/transport/bin_encoder.h"
#include "src/core/ext/transport/chttp2/transport/hpack_table.h"
#include "src/core/ext/transport/chttp2/transport/varint.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/transport/metadata.h"
#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/timeout_encoding.h"
@@ -183,7 +184,8 @@ static void evict_entry(grpc_chttp2_hpack_compressor *c) {
}
/* add an element to the decoder table */
-static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) {
+static void add_elem(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c,
+ grpc_mdelem *elem) {
uint32_t key_hash = elem->key->hash;
uint32_t elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash);
uint32_t new_index = c->tail_remote_index + c->table_elems + 1;
@@ -227,12 +229,12 @@ static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) {
} else if (c->indices_elems[HASH_FRAGMENT_2(elem_hash)] <
c->indices_elems[HASH_FRAGMENT_3(elem_hash)]) {
/* not there: replace oldest */
- GRPC_MDELEM_UNREF(c->entries_elems[HASH_FRAGMENT_2(elem_hash)]);
+ GRPC_MDELEM_UNREF(exec_ctx, c->entries_elems[HASH_FRAGMENT_2(elem_hash)]);
c->entries_elems[HASH_FRAGMENT_2(elem_hash)] = GRPC_MDELEM_REF(elem);
c->indices_elems[HASH_FRAGMENT_2(elem_hash)] = new_index;
} else {
/* not there: replace oldest */
- GRPC_MDELEM_UNREF(c->entries_elems[HASH_FRAGMENT_3(elem_hash)]);
+ GRPC_MDELEM_UNREF(exec_ctx, c->entries_elems[HASH_FRAGMENT_3(elem_hash)]);
c->entries_elems[HASH_FRAGMENT_3(elem_hash)] = GRPC_MDELEM_REF(elem);
c->indices_elems[HASH_FRAGMENT_3(elem_hash)] = new_index;
}
@@ -251,11 +253,11 @@ static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) {
c->indices_keys[HASH_FRAGMENT_3(key_hash)] = new_index;
} else if (c->indices_keys[HASH_FRAGMENT_2(key_hash)] <
c->indices_keys[HASH_FRAGMENT_3(key_hash)]) {
- GRPC_MDSTR_UNREF(c->entries_keys[HASH_FRAGMENT_2(key_hash)]);
+ GRPC_MDSTR_UNREF(exec_ctx, c->entries_keys[HASH_FRAGMENT_2(key_hash)]);
c->entries_keys[HASH_FRAGMENT_2(key_hash)] = GRPC_MDSTR_REF(elem->key);
c->indices_keys[HASH_FRAGMENT_2(key_hash)] = new_index;
} else {
- GRPC_MDSTR_UNREF(c->entries_keys[HASH_FRAGMENT_3(key_hash)]);
+ GRPC_MDSTR_UNREF(exec_ctx, c->entries_keys[HASH_FRAGMENT_3(key_hash)]);
c->entries_keys[HASH_FRAGMENT_3(key_hash)] = GRPC_MDSTR_REF(elem->key);
c->indices_keys[HASH_FRAGMENT_3(key_hash)] = new_index;
}
@@ -294,7 +296,7 @@ static void emit_lithdr_incidx(grpc_chttp2_hpack_compressor *c,
add_tiny_header_data(st, len_pfx), len_pfx);
GRPC_CHTTP2_WRITE_VARINT((uint32_t)len_val, 1, huffman_prefix,
add_tiny_header_data(st, len_val_len), len_val_len);
- add_header_data(st, grpc_slice_ref(value_slice));
+ add_header_data(st, grpc_slice_ref_internal(value_slice));
}
static void emit_lithdr_noidx(grpc_chttp2_hpack_compressor *c,
@@ -311,7 +313,7 @@ static void emit_lithdr_noidx(grpc_chttp2_hpack_compressor *c,
add_tiny_header_data(st, len_pfx), len_pfx);
GRPC_CHTTP2_WRITE_VARINT((uint32_t)len_val, 1, huffman_prefix,
add_tiny_header_data(st, len_val_len), len_val_len);
- add_header_data(st, grpc_slice_ref(value_slice));
+ add_header_data(st, grpc_slice_ref_internal(value_slice));
}
static void emit_lithdr_incidx_v(grpc_chttp2_hpack_compressor *c,
@@ -327,10 +329,10 @@ static void emit_lithdr_incidx_v(grpc_chttp2_hpack_compressor *c,
*add_tiny_header_data(st, 1) = 0x40;
GRPC_CHTTP2_WRITE_VARINT(len_key, 1, 0x00,
add_tiny_header_data(st, len_key_len), len_key_len);
- add_header_data(st, grpc_slice_ref(elem->key->slice));
+ add_header_data(st, grpc_slice_ref_internal(elem->key->slice));
GRPC_CHTTP2_WRITE_VARINT(len_val, 1, huffman_prefix,
add_tiny_header_data(st, len_val_len), len_val_len);
- add_header_data(st, grpc_slice_ref(value_slice));
+ add_header_data(st, grpc_slice_ref_internal(value_slice));
}
static void emit_lithdr_noidx_v(grpc_chttp2_hpack_compressor *c,
@@ -346,10 +348,10 @@ static void emit_lithdr_noidx_v(grpc_chttp2_hpack_compressor *c,
*add_tiny_header_data(st, 1) = 0x00;
GRPC_CHTTP2_WRITE_VARINT(len_key, 1, 0x00,
add_tiny_header_data(st, len_key_len), len_key_len);
- add_header_data(st, grpc_slice_ref(elem->key->slice));
+ add_header_data(st, grpc_slice_ref_internal(elem->key->slice));
GRPC_CHTTP2_WRITE_VARINT(len_val, 1, huffman_prefix,
add_tiny_header_data(st, len_val_len), len_val_len);
- add_header_data(st, grpc_slice_ref(value_slice));
+ add_header_data(st, grpc_slice_ref_internal(value_slice));
}
static void emit_advertise_table_size_change(grpc_chttp2_hpack_compressor *c,
@@ -366,8 +368,8 @@ static uint32_t dynidx(grpc_chttp2_hpack_compressor *c, uint32_t elem_index) {
}
/* encode an mdelem */
-static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
- framer_state *st) {
+static void hpack_enc(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c,
+ grpc_mdelem *elem, framer_state *st) {
uint32_t key_hash = elem->key->hash;
uint32_t elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash);
size_t decoder_space_usage;
@@ -417,7 +419,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
/* HIT: key (first cuckoo hash) */
if (should_add_elem) {
emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st);
- add_elem(c, elem);
+ add_elem(exec_ctx, c, elem);
return;
} else {
emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st);
@@ -432,7 +434,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
/* HIT: key (first cuckoo hash) */
if (should_add_elem) {
emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st);
- add_elem(c, elem);
+ add_elem(exec_ctx, c, elem);
return;
} else {
emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st);
@@ -445,7 +447,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
if (should_add_elem) {
emit_lithdr_incidx_v(c, elem, st);
- add_elem(c, elem);
+ add_elem(exec_ctx, c, elem);
return;
} else {
emit_lithdr_noidx_v(c, elem, st);
@@ -457,16 +459,17 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
#define STRLEN_LIT(x) (sizeof(x) - 1)
#define TIMEOUT_KEY "grpc-timeout"
-static void deadline_enc(grpc_chttp2_hpack_compressor *c, gpr_timespec deadline,
+static void deadline_enc(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hpack_compressor *c, gpr_timespec deadline,
framer_state *st) {
char timeout_str[GRPC_HTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE];
grpc_mdelem *mdelem;
grpc_http2_encode_timeout(
gpr_time_sub(deadline, gpr_now(deadline.clock_type)), timeout_str);
mdelem = grpc_mdelem_from_metadata_strings(
- GRPC_MDSTR_GRPC_TIMEOUT, grpc_mdstr_from_string(timeout_str));
- hpack_enc(c, mdelem, st);
- GRPC_MDELEM_UNREF(mdelem);
+ exec_ctx, GRPC_MDSTR_GRPC_TIMEOUT, grpc_mdstr_from_string(timeout_str));
+ hpack_enc(exec_ctx, c, mdelem, st);
+ GRPC_MDELEM_UNREF(exec_ctx, mdelem);
}
static uint32_t elems_for_bytes(uint32_t bytes) { return (bytes + 31) / 32; }
@@ -483,11 +486,12 @@ void grpc_chttp2_hpack_compressor_init(grpc_chttp2_hpack_compressor *c) {
sizeof(*c->table_elem_size) * c->cap_table_elems);
}
-void grpc_chttp2_hpack_compressor_destroy(grpc_chttp2_hpack_compressor *c) {
+void grpc_chttp2_hpack_compressor_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hpack_compressor *c) {
int i;
for (i = 0; i < GRPC_CHTTP2_HPACKC_NUM_VALUES; i++) {
- if (c->entries_keys[i]) GRPC_MDSTR_UNREF(c->entries_keys[i]);
- if (c->entries_elems[i]) GRPC_MDELEM_UNREF(c->entries_elems[i]);
+ if (c->entries_keys[i]) GRPC_MDSTR_UNREF(exec_ctx, c->entries_keys[i]);
+ if (c->entries_elems[i]) GRPC_MDELEM_UNREF(exec_ctx, c->entries_elems[i]);
}
gpr_free(c->table_elem_size);
}
@@ -542,7 +546,8 @@ void grpc_chttp2_hpack_compressor_set_max_table_size(
}
}
-void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c,
+void grpc_chttp2_encode_header(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hpack_compressor *c,
uint32_t stream_id,
grpc_metadata_batch *metadata, int is_eof,
size_t max_frame_size,
@@ -571,11 +576,11 @@ void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c,
}
grpc_metadata_batch_assert_ok(metadata);
for (l = metadata->list.head; l; l = l->next) {
- hpack_enc(c, l->md, &st);
+ hpack_enc(exec_ctx, c, l->md, &st);
}
deadline = metadata->deadline;
if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) != 0) {
- deadline_enc(c, deadline, &st);
+ deadline_enc(exec_ctx, c, deadline, &st);
}
finish_frame(&st, 1, is_eof);
diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.h b/src/core/ext/transport/chttp2/transport/hpack_encoder.h
index bcbd675ca2..3a35496ec8 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_encoder.h
+++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.h
@@ -83,13 +83,15 @@ typedef struct {
} grpc_chttp2_hpack_compressor;
void grpc_chttp2_hpack_compressor_init(grpc_chttp2_hpack_compressor *c);
-void grpc_chttp2_hpack_compressor_destroy(grpc_chttp2_hpack_compressor *c);
+void grpc_chttp2_hpack_compressor_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hpack_compressor *c);
void grpc_chttp2_hpack_compressor_set_max_table_size(
grpc_chttp2_hpack_compressor *c, uint32_t max_table_size);
void grpc_chttp2_hpack_compressor_set_max_usable_size(
grpc_chttp2_hpack_compressor *c, uint32_t max_table_size);
-void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c, uint32_t id,
+void grpc_chttp2_encode_header(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hpack_compressor *c, uint32_t id,
grpc_metadata_batch *metadata, int is_eof,
size_t max_frame_size,
grpc_transport_one_way_stats *stats,
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c
index 6a9200b8db..8b91cc760b 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c
@@ -670,11 +670,11 @@ static const uint8_t inverse_base64[256] = {
static grpc_error *on_hdr(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser *p,
grpc_mdelem *md, int add_to_table) {
if (add_to_table) {
- grpc_error *err = grpc_chttp2_hptbl_add(&p->table, md);
+ grpc_error *err = grpc_chttp2_hptbl_add(exec_ctx, &p->table, md);
if (err != GRPC_ERROR_NONE) return err;
}
if (p->on_header == NULL) {
- GRPC_MDELEM_UNREF(md);
+ GRPC_MDELEM_UNREF(exec_ctx, md);
return GRPC_ERROR_CREATE("on_header callback not set");
}
p->on_header(exec_ctx, p->on_header_user_data, md);
@@ -815,10 +815,10 @@ static grpc_error *finish_lithdr_incidx(grpc_exec_ctx *exec_ctx,
const uint8_t *end) {
grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
GPR_ASSERT(md != NULL); /* handled in string parsing */
- grpc_error *err = on_hdr(
- exec_ctx, p, grpc_mdelem_from_metadata_strings(GRPC_MDSTR_REF(md->key),
- take_string(p, &p->value)),
- 1);
+ grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings(
+ exec_ctx, GRPC_MDSTR_REF(md->key),
+ take_string(p, &p->value)),
+ 1);
if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err);
return parse_begin(exec_ctx, p, cur, end);
}
@@ -828,10 +828,10 @@ static grpc_error *finish_lithdr_incidx_v(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_parser *p,
const uint8_t *cur,
const uint8_t *end) {
- grpc_error *err = on_hdr(
- exec_ctx, p, grpc_mdelem_from_metadata_strings(take_string(p, &p->key),
- take_string(p, &p->value)),
- 1);
+ grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings(
+ exec_ctx, take_string(p, &p->key),
+ take_string(p, &p->value)),
+ 1);
if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err);
return parse_begin(exec_ctx, p, cur, end);
}
@@ -883,10 +883,10 @@ static grpc_error *finish_lithdr_notidx(grpc_exec_ctx *exec_ctx,
const uint8_t *end) {
grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
GPR_ASSERT(md != NULL); /* handled in string parsing */
- grpc_error *err = on_hdr(
- exec_ctx, p, grpc_mdelem_from_metadata_strings(GRPC_MDSTR_REF(md->key),
- take_string(p, &p->value)),
- 0);
+ grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings(
+ exec_ctx, GRPC_MDSTR_REF(md->key),
+ take_string(p, &p->value)),
+ 0);
if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err);
return parse_begin(exec_ctx, p, cur, end);
}
@@ -896,10 +896,10 @@ static grpc_error *finish_lithdr_notidx_v(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_parser *p,
const uint8_t *cur,
const uint8_t *end) {
- grpc_error *err = on_hdr(
- exec_ctx, p, grpc_mdelem_from_metadata_strings(take_string(p, &p->key),
- take_string(p, &p->value)),
- 0);
+ grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings(
+ exec_ctx, take_string(p, &p->key),
+ take_string(p, &p->value)),
+ 0);
if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err);
return parse_begin(exec_ctx, p, cur, end);
}
@@ -951,10 +951,10 @@ static grpc_error *finish_lithdr_nvridx(grpc_exec_ctx *exec_ctx,
const uint8_t *end) {
grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
GPR_ASSERT(md != NULL); /* handled in string parsing */
- grpc_error *err = on_hdr(
- exec_ctx, p, grpc_mdelem_from_metadata_strings(GRPC_MDSTR_REF(md->key),
- take_string(p, &p->value)),
- 0);
+ grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings(
+ exec_ctx, GRPC_MDSTR_REF(md->key),
+ take_string(p, &p->value)),
+ 0);
if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err);
return parse_begin(exec_ctx, p, cur, end);
}
@@ -964,10 +964,10 @@ static grpc_error *finish_lithdr_nvridx_v(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_parser *p,
const uint8_t *cur,
const uint8_t *end) {
- grpc_error *err = on_hdr(
- exec_ctx, p, grpc_mdelem_from_metadata_strings(take_string(p, &p->key),
- take_string(p, &p->value)),
- 0);
+ grpc_error *err = on_hdr(exec_ctx, p, grpc_mdelem_from_metadata_strings(
+ exec_ctx, take_string(p, &p->key),
+ take_string(p, &p->value)),
+ 0);
if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err);
return parse_begin(exec_ctx, p, cur, end);
}
@@ -1020,7 +1020,7 @@ static grpc_error *finish_max_tbl_size(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_INFO, "MAX TABLE SIZE: %d", p->index);
}
grpc_error *err =
- grpc_chttp2_hptbl_set_current_table_size(&p->table, p->index);
+ grpc_chttp2_hptbl_set_current_table_size(exec_ctx, &p->table, p->index);
if (err != GRPC_ERROR_NONE) return parse_error(exec_ctx, p, cur, end, err);
return parse_begin(exec_ctx, p, cur, end);
}
@@ -1534,7 +1534,8 @@ static grpc_error *parse_value_string_with_literal_key(
/* PUBLIC INTERFACE */
-void grpc_chttp2_hpack_parser_init(grpc_chttp2_hpack_parser *p) {
+void grpc_chttp2_hpack_parser_init(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hpack_parser *p) {
p->on_header = NULL;
p->on_header_user_data = NULL;
p->state = parse_begin;
@@ -1546,7 +1547,7 @@ void grpc_chttp2_hpack_parser_init(grpc_chttp2_hpack_parser *p) {
p->value.length = 0;
p->dynamic_table_update_allowed = 2;
p->last_error = GRPC_ERROR_NONE;
- grpc_chttp2_hptbl_init(&p->table);
+ grpc_chttp2_hptbl_init(exec_ctx, &p->table);
}
void grpc_chttp2_hpack_parser_set_has_priority(grpc_chttp2_hpack_parser *p) {
@@ -1554,8 +1555,9 @@ void grpc_chttp2_hpack_parser_set_has_priority(grpc_chttp2_hpack_parser *p) {
p->state = parse_stream_dep0;
}
-void grpc_chttp2_hpack_parser_destroy(grpc_chttp2_hpack_parser *p) {
- grpc_chttp2_hptbl_destroy(&p->table);
+void grpc_chttp2_hpack_parser_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hpack_parser *p) {
+ grpc_chttp2_hptbl_destroy(exec_ctx, &p->table);
GRPC_ERROR_UNREF(p->last_error);
gpr_free(p->key.str);
gpr_free(p->value.str);
@@ -1634,10 +1636,11 @@ grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx,
however -- it might be that we receive a RST_STREAM following this
and can avoid the extra write */
GRPC_CHTTP2_STREAM_REF(s, "final_rst");
- grpc_combiner_execute_finally(
- exec_ctx, t->combiner,
- grpc_closure_create(force_client_rst_stream, s), GRPC_ERROR_NONE,
- false);
+ grpc_closure_sched(
+ exec_ctx, grpc_closure_create(force_client_rst_stream, s,
+ grpc_combiner_finally_scheduler(
+ t->combiner, false)),
+ GRPC_ERROR_NONE);
}
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false,
GRPC_ERROR_NONE);
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.h b/src/core/ext/transport/chttp2/transport/hpack_parser.h
index a39bf466cd..52ccf1e7a7 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.h
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.h
@@ -99,8 +99,10 @@ struct grpc_chttp2_hpack_parser {
grpc_chttp2_hptbl table;
};
-void grpc_chttp2_hpack_parser_init(grpc_chttp2_hpack_parser *p);
-void grpc_chttp2_hpack_parser_destroy(grpc_chttp2_hpack_parser *p);
+void grpc_chttp2_hpack_parser_init(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hpack_parser *p);
+void grpc_chttp2_hpack_parser_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hpack_parser *p);
void grpc_chttp2_hpack_parser_set_has_priority(grpc_chttp2_hpack_parser *p);
diff --git a/src/core/ext/transport/chttp2/transport/hpack_table.c b/src/core/ext/transport/chttp2/transport/hpack_table.c
index 2dc793d304..26d4036d49 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_table.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_table.c
@@ -179,7 +179,7 @@ static uint32_t entries_for_bytes(uint32_t bytes) {
GRPC_CHTTP2_HPACK_ENTRY_OVERHEAD;
}
-void grpc_chttp2_hptbl_init(grpc_chttp2_hptbl *tbl) {
+void grpc_chttp2_hptbl_init(grpc_exec_ctx *exec_ctx, grpc_chttp2_hptbl *tbl) {
size_t i;
memset(tbl, 0, sizeof(*tbl));
@@ -190,18 +190,20 @@ void grpc_chttp2_hptbl_init(grpc_chttp2_hptbl *tbl) {
tbl->ents = gpr_malloc(sizeof(*tbl->ents) * tbl->cap_entries);
memset(tbl->ents, 0, sizeof(*tbl->ents) * tbl->cap_entries);
for (i = 1; i <= GRPC_CHTTP2_LAST_STATIC_ENTRY; i++) {
- tbl->static_ents[i - 1] =
- grpc_mdelem_from_strings(static_table[i].key, static_table[i].value);
+ tbl->static_ents[i - 1] = grpc_mdelem_from_strings(
+ exec_ctx, static_table[i].key, static_table[i].value);
}
}
-void grpc_chttp2_hptbl_destroy(grpc_chttp2_hptbl *tbl) {
+void grpc_chttp2_hptbl_destroy(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hptbl *tbl) {
size_t i;
for (i = 0; i < GRPC_CHTTP2_LAST_STATIC_ENTRY; i++) {
- GRPC_MDELEM_UNREF(tbl->static_ents[i]);
+ GRPC_MDELEM_UNREF(exec_ctx, tbl->static_ents[i]);
}
for (i = 0; i < tbl->num_ents; i++) {
- GRPC_MDELEM_UNREF(tbl->ents[(tbl->first_ent + i) % tbl->cap_entries]);
+ GRPC_MDELEM_UNREF(exec_ctx,
+ tbl->ents[(tbl->first_ent + i) % tbl->cap_entries]);
}
gpr_free(tbl->ents);
}
@@ -224,7 +226,7 @@ grpc_mdelem *grpc_chttp2_hptbl_lookup(const grpc_chttp2_hptbl *tbl,
}
/* Evict one element from the table */
-static void evict1(grpc_chttp2_hptbl *tbl) {
+static void evict1(grpc_exec_ctx *exec_ctx, grpc_chttp2_hptbl *tbl) {
grpc_mdelem *first_ent = tbl->ents[tbl->first_ent];
size_t elem_bytes = GRPC_SLICE_LENGTH(first_ent->key->slice) +
GRPC_SLICE_LENGTH(first_ent->value->slice) +
@@ -233,7 +235,7 @@ static void evict1(grpc_chttp2_hptbl *tbl) {
tbl->mem_used -= (uint32_t)elem_bytes;
tbl->first_ent = ((tbl->first_ent + 1) % tbl->cap_entries);
tbl->num_ents--;
- GRPC_MDELEM_UNREF(first_ent);
+ GRPC_MDELEM_UNREF(exec_ctx, first_ent);
}
static void rebuild_ents(grpc_chttp2_hptbl *tbl, uint32_t new_cap) {
@@ -249,7 +251,8 @@ static void rebuild_ents(grpc_chttp2_hptbl *tbl, uint32_t new_cap) {
tbl->first_ent = 0;
}
-void grpc_chttp2_hptbl_set_max_bytes(grpc_chttp2_hptbl *tbl,
+void grpc_chttp2_hptbl_set_max_bytes(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hptbl *tbl,
uint32_t max_bytes) {
if (tbl->max_bytes == max_bytes) {
return;
@@ -258,12 +261,13 @@ void grpc_chttp2_hptbl_set_max_bytes(grpc_chttp2_hptbl *tbl,
gpr_log(GPR_DEBUG, "Update hpack parser max size to %d", max_bytes);
}
while (tbl->mem_used > max_bytes) {
- evict1(tbl);
+ evict1(exec_ctx, tbl);
}
tbl->max_bytes = max_bytes;
}
-grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_chttp2_hptbl *tbl,
+grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hptbl *tbl,
uint32_t bytes) {
if (tbl->current_table_bytes == bytes) {
return GRPC_ERROR_NONE;
@@ -281,7 +285,7 @@ grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_chttp2_hptbl *tbl,
gpr_log(GPR_DEBUG, "Update hpack parser table size to %d", bytes);
}
while (tbl->mem_used > bytes) {
- evict1(tbl);
+ evict1(exec_ctx, tbl);
}
tbl->current_table_bytes = bytes;
tbl->max_entries = entries_for_bytes(bytes);
@@ -296,7 +300,8 @@ grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_chttp2_hptbl *tbl,
return GRPC_ERROR_NONE;
}
-grpc_error *grpc_chttp2_hptbl_add(grpc_chttp2_hptbl *tbl, grpc_mdelem *md) {
+grpc_error *grpc_chttp2_hptbl_add(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hptbl *tbl, grpc_mdelem *md) {
/* determine how many bytes of buffer this entry represents */
size_t elem_bytes = GRPC_SLICE_LENGTH(md->key->slice) +
GRPC_SLICE_LENGTH(md->value->slice) +
@@ -326,14 +331,14 @@ grpc_error *grpc_chttp2_hptbl_add(grpc_chttp2_hptbl *tbl, grpc_mdelem *md) {
* empty table.
*/
while (tbl->num_ents) {
- evict1(tbl);
+ evict1(exec_ctx, tbl);
}
return GRPC_ERROR_NONE;
}
/* evict entries to ensure no overflow */
while (elem_bytes > (size_t)tbl->current_table_bytes - tbl->mem_used) {
- evict1(tbl);
+ evict1(exec_ctx, tbl);
}
/* copy the finalized entry in */
diff --git a/src/core/ext/transport/chttp2/transport/hpack_table.h b/src/core/ext/transport/chttp2/transport/hpack_table.h
index 2ca130e64b..144574ef06 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_table.h
+++ b/src/core/ext/transport/chttp2/transport/hpack_table.h
@@ -84,18 +84,21 @@ typedef struct {
} grpc_chttp2_hptbl;
/* initialize a hpack table */
-void grpc_chttp2_hptbl_init(grpc_chttp2_hptbl *tbl);
-void grpc_chttp2_hptbl_destroy(grpc_chttp2_hptbl *tbl);
-void grpc_chttp2_hptbl_set_max_bytes(grpc_chttp2_hptbl *tbl,
+void grpc_chttp2_hptbl_init(grpc_exec_ctx *exec_ctx, grpc_chttp2_hptbl *tbl);
+void grpc_chttp2_hptbl_destroy(grpc_exec_ctx *exec_ctx, grpc_chttp2_hptbl *tbl);
+void grpc_chttp2_hptbl_set_max_bytes(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hptbl *tbl,
uint32_t max_bytes);
-grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_chttp2_hptbl *tbl,
+grpc_error *grpc_chttp2_hptbl_set_current_table_size(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hptbl *tbl,
uint32_t bytes);
/* lookup a table entry based on its hpack index */
grpc_mdelem *grpc_chttp2_hptbl_lookup(const grpc_chttp2_hptbl *tbl,
uint32_t index);
/* add a table entry to the index */
-grpc_error *grpc_chttp2_hptbl_add(grpc_chttp2_hptbl *tbl,
+grpc_error *grpc_chttp2_hptbl_add(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_hptbl *tbl,
grpc_mdelem *md) GRPC_MUST_USE_RESULT;
/* Find a key/value pair in the table... returns the index in the table of the
most similar entry, or 0 if the value was not found */
diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c
index 3e463a7995..5d1094999c 100644
--- a/src/core/ext/transport/chttp2/transport/incoming_metadata.c
+++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c
@@ -46,11 +46,11 @@ void grpc_chttp2_incoming_metadata_buffer_init(
}
void grpc_chttp2_incoming_metadata_buffer_destroy(
- grpc_chttp2_incoming_metadata_buffer *buffer) {
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer) {
size_t i;
if (!buffer->published) {
for (i = 0; i < buffer->count; i++) {
- GRPC_MDELEM_UNREF(buffer->elems[i].md);
+ GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md);
}
}
gpr_free(buffer->elems);
diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.h b/src/core/ext/transport/chttp2/transport/incoming_metadata.h
index df4343b93e..7a0c4da15f 100644
--- a/src/core/ext/transport/chttp2/transport/incoming_metadata.h
+++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.h
@@ -49,7 +49,7 @@ typedef struct {
void grpc_chttp2_incoming_metadata_buffer_init(
grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_destroy(
- grpc_chttp2_incoming_metadata_buffer *buffer);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_publish(
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch);
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index b74233d992..ea7beb4c2b 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -212,10 +212,8 @@ struct grpc_chttp2_transport {
grpc_closure write_action_begin_locked;
grpc_closure write_action;
- grpc_closure write_action_end;
grpc_closure write_action_end_locked;
- grpc_closure read_action_begin;
grpc_closure read_action_locked;
/** incoming read bytes */
@@ -251,6 +249,9 @@ struct grpc_chttp2_transport {
int64_t announce_incoming_window;
/** how much window would we like to have for incoming_window */
uint32_t connection_window_target;
+ /** how much data are we willing to buffer when the WRITE_BUFFER_HINT is set?
+ */
+ uint32_t write_buffer_size;
/** have we seen a goaway */
uint8_t seen_goaway;
@@ -327,16 +328,17 @@ struct grpc_chttp2_transport {
*/
grpc_error *close_transport_on_writes_finished;
+ /* a list of closures to run after writes are finished */
+ grpc_closure_list run_after_write;
+
/* buffer pool state */
/** have we scheduled a benign cleanup? */
bool benign_reclaimer_registered;
/** have we scheduled a destructive cleanup? */
bool destructive_reclaimer_registered;
/** benign cleanup closure */
- grpc_closure benign_reclaimer;
grpc_closure benign_reclaimer_locked;
/** destructive cleanup closure */
- grpc_closure destructive_reclaimer;
grpc_closure destructive_reclaimer_locked;
};
@@ -404,6 +406,9 @@ struct grpc_chttp2_stream {
/** Has this stream seen an error.
If true, then pending incoming frames can be thrown away. */
bool seen_error;
+ /** Are we buffering writes on this stream? If yes, we won't become writable
+ until there's enough queued up in the flow_controlled_buffer */
+ bool write_buffering;
/** the error that resulted in this stream being read-closed */
grpc_error *read_closed_error;
@@ -493,6 +498,8 @@ void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
int grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t,
grpc_chttp2_stream **s);
+void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s);
void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
@@ -689,4 +696,8 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
+void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s, grpc_error *error);
+
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 5efb49751c..4fb5dc7bd2 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -336,7 +336,7 @@ static grpc_error *skip_parser(grpc_exec_ctx *exec_ctx, void *parser,
}
static void skip_header(grpc_exec_ctx *exec_ctx, void *tp, grpc_mdelem *md) {
- GRPC_MDELEM_UNREF(md);
+ GRPC_MDELEM_UNREF(exec_ctx, md);
}
static grpc_error *init_skip_frame_parser(grpc_exec_ctx *exec_ctx,
@@ -477,7 +477,7 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp,
grpc_chttp2_incoming_metadata_buffer_set_deadline(
&s->metadata_buffer[0],
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), *cached_timeout));
- GRPC_MDELEM_UNREF(md);
+ GRPC_MDELEM_UNREF(exec_ctx, md);
} else {
const size_t new_size = s->metadata_buffer[0].size + GRPC_MDELEM_LENGTH(md);
const size_t metadata_size_limit =
@@ -495,7 +495,7 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp,
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
s->seen_error = true;
- GRPC_MDELEM_UNREF(md);
+ GRPC_MDELEM_UNREF(exec_ctx, md);
} else {
grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[0], md);
}
@@ -538,7 +538,7 @@ static void on_trailing_header(grpc_exec_ctx *exec_ctx, void *tp,
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
s->seen_error = true;
- GRPC_MDELEM_UNREF(md);
+ GRPC_MDELEM_UNREF(exec_ctx, md);
} else {
grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[1], md);
}
@@ -712,7 +712,7 @@ static grpc_error *init_settings_frame_parser(grpc_exec_ctx *exec_ctx,
memcpy(t->settings[GRPC_ACKED_SETTINGS], t->settings[GRPC_SENT_SETTINGS],
GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t));
grpc_chttp2_hptbl_set_max_bytes(
- &t->hpack_parser.table,
+ exec_ctx, &t->hpack_parser.table,
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
t->sent_local_settings = 0;
diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c
index 6d25b3ae57..a60264cc51 100644
--- a/src/core/ext/transport/chttp2/transport/stream_lists.c
+++ b/src/core/ext/transport/chttp2/transport/stream_lists.c
@@ -158,6 +158,11 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t,
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
}
+void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s) {
+ stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
+}
+
void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 139e7387c4..ef2010af7b 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -39,6 +39,7 @@
#include "src/core/ext/transport/chttp2/transport/http2_errors.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_internal.h"
static void add_to_write_list(grpc_chttp2_write_cb **list,
grpc_chttp2_write_cb *cb) {
@@ -119,7 +120,7 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
/* send initial metadata if it's available */
if (!sent_initial_metadata && s->send_initial_metadata) {
grpc_chttp2_encode_header(
- &t->hpack_compressor, s->id, s->send_initial_metadata, 0,
+ exec_ctx, &t->hpack_compressor, s->id, s->send_initial_metadata, 0,
t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
&s->stats.outgoing, &t->outbuf);
s->send_initial_metadata = NULL;
@@ -186,9 +187,9 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
&s->stats.outgoing, &t->outbuf);
} else {
grpc_chttp2_encode_header(
- &t->hpack_compressor, s->id, s->send_trailing_metadata, true,
- t->settings[GRPC_ACKED_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
+ exec_ctx, &t->hpack_compressor, s->id, s->send_trailing_metadata,
+ true, t->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
&s->stats.outgoing, &t->outbuf);
}
s->send_trailing_metadata = NULL;
@@ -254,7 +255,7 @@ void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:end");
}
- grpc_slice_buffer_reset_and_unref(&t->outbuf);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &t->outbuf);
GRPC_ERROR_UNREF(error);
GPR_TIMER_END("grpc_chttp2_end_write", 0);
}