aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2017-02-14 09:32:21 -0800
committerGravatar Muxi Yan <mxyan@google.com>2017-02-14 09:32:21 -0800
commitccbfb461d0a6b6b1785e17b4a5b9f92f789af8e3 (patch)
treed5d5dd28a5eedaff378ff31c7d9b7aa14d94ddba /src/core/ext
parente1cf1b691d3e82742214c8772a71e79e3209c2ce (diff)
parent09173c66f446a465aa8bae76926f2807f76fc795 (diff)
Merge remote-tracking branch 'upstream/master' into packet-coalescing-core
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/census/gen/trace_context.pb.c45
-rw-r--r--src/core/ext/census/gen/trace_context.pb.h48
-rw-r--r--src/core/ext/census/trace_context.c2
-rw-r--r--src/core/ext/census/trace_context.h3
-rw-r--r--src/core/ext/census/trace_label.h61
-rw-r--r--src/core/ext/census/trace_propagation.h63
-rw-r--r--src/core/ext/census/trace_status.h45
-rw-r--r--src/core/ext/census/trace_string.h50
-rw-r--r--src/core/ext/census/tracing.c42
-rw-r--r--src/core/ext/census/tracing.h124
-rw-r--r--src/core/ext/client_channel/client_channel.c42
-rw-r--r--src/core/ext/client_channel/client_channel.h4
-rw-r--r--src/core/ext/client_channel/client_channel_plugin.c2
-rw-r--r--src/core/ext/client_channel/http_proxy.c57
-rw-r--r--src/core/ext/client_channel/http_proxy.h4
-rw-r--r--src/core/ext/client_channel/proxy_mapper.c25
-rw-r--r--src/core/ext/client_channel/proxy_mapper.h36
-rw-r--r--src/core/ext/client_channel/proxy_mapper_registry.c52
-rw-r--r--src/core/ext/client_channel/proxy_mapper_registry.h16
-rw-r--r--src/core/ext/client_channel/subchannel.c20
-rw-r--r--src/core/ext/client_channel/subchannel.h3
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c171
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb_channel.c77
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb_channel.h56
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb_channel_secure.c107
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c7
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c8
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c184
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c400
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_ping.c22
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_ping.h4
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_rst_stream.c9
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_settings.c7
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_window_update.c11
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h125
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c30
-rw-r--r--src/core/ext/transport/chttp2/transport/stream_lists.c43
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c120
38 files changed, 1633 insertions, 492 deletions
diff --git a/src/core/ext/census/gen/trace_context.pb.c b/src/core/ext/census/gen/trace_context.pb.c
index c8aea324ce..f4126d4d80 100644
--- a/src/core/ext/census/gen/trace_context.pb.c
+++ b/src/core/ext/census/gen/trace_context.pb.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2016, Google Inc.
+ * Copyright 2017, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -31,51 +31,24 @@
*
*/
/* Automatically generated nanopb constant definitions */
-/* Generated by nanopb-0.3.5-dev */
+/* Generated by nanopb-0.3.7-dev at Fri Jan 20 16:14:22 2017. */
#include "src/core/ext/census/gen/trace_context.pb.h"
+/* @@protoc_insertion_point(includes) */
#if PB_PROTO_HEADER_VERSION != 30
#error Regenerate this file with the current version of nanopb generator.
#endif
-const pb_field_t google_trace_TraceId_fields[3] = {
- PB_FIELD( 1, FIXED64 , OPTIONAL, STATIC , FIRST, google_trace_TraceId, hi, hi, 0),
- PB_FIELD( 2, FIXED64 , OPTIONAL, STATIC , OTHER, google_trace_TraceId, lo, hi, 0),
+const pb_field_t google_trace_TraceContext_fields[5] = {
+ PB_FIELD( 1, FIXED64 , OPTIONAL, STATIC , FIRST, google_trace_TraceContext, trace_id_hi, trace_id_hi, 0),
+ PB_FIELD( 2, FIXED64 , OPTIONAL, STATIC , OTHER, google_trace_TraceContext, trace_id_lo, trace_id_hi, 0),
+ PB_FIELD( 3, FIXED64 , OPTIONAL, STATIC , OTHER, google_trace_TraceContext, span_id, trace_id_lo, 0),
+ PB_FIELD( 4, FIXED32 , OPTIONAL, STATIC , OTHER, google_trace_TraceContext, span_options, span_id, 0),
PB_LAST_FIELD
};
-const pb_field_t google_trace_TraceContext_fields[4] = {
- PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, google_trace_TraceContext, trace_id, trace_id, &google_trace_TraceId_fields),
- PB_FIELD( 2, FIXED64 , OPTIONAL, STATIC , OTHER, google_trace_TraceContext, span_id, trace_id, 0),
- PB_FIELD( 3, BOOL , OPTIONAL, STATIC , OTHER, google_trace_TraceContext, is_sampled, span_id, 0),
- PB_LAST_FIELD
-};
-
-
-/* Check that field information fits in pb_field_t */
-#if !defined(PB_FIELD_32BIT)
-/* If you get an error here, it means that you need to define PB_FIELD_32BIT
- * compile-time option. You can do that in pb.h or on compiler command line.
- *
- * The reason you need to do this is that some of your messages contain tag
- * numbers or field sizes that are larger than what can fit in 8 or 16 bit
- * field descriptors.
- */
-PB_STATIC_ASSERT((pb_membersize(google_trace_TraceContext, trace_id) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_google_trace_TraceId_google_trace_TraceContext)
-#endif
-
-#if !defined(PB_FIELD_16BIT) && !defined(PB_FIELD_32BIT)
-/* If you get an error here, it means that you need to define PB_FIELD_16BIT
- * compile-time option. You can do that in pb.h or on compiler command line.
- *
- * The reason you need to do this is that some of your messages contain tag
- * numbers or field sizes that are larger than what can fit in the default
- * 8 bit descriptors.
- */
-PB_STATIC_ASSERT((pb_membersize(google_trace_TraceContext, trace_id) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_google_trace_TraceId_google_trace_TraceContext)
-#endif
-
+/* @@protoc_insertion_point(eof) */
diff --git a/src/core/ext/census/gen/trace_context.pb.h b/src/core/ext/census/gen/trace_context.pb.h
index cfb2f04ccd..ea127bf70a 100644
--- a/src/core/ext/census/gen/trace_context.pb.h
+++ b/src/core/ext/census/gen/trace_context.pb.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2016, Google Inc.
+ * Copyright 2017, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -31,11 +31,13 @@
*
*/
/* Automatically generated nanopb header */
-/* Generated by nanopb-0.3.5-dev */
+/* Generated by nanopb-0.3.7-dev at Fri Jan 20 16:14:22 2017. */
#ifndef GRPC_CORE_EXT_CENSUS_GEN_TRACE_CONTEXT_PB_H
#define GRPC_CORE_EXT_CENSUS_GEN_TRACE_CONTEXT_PB_H
#include "third_party/nanopb/pb.h"
+
+/* @@protoc_insertion_point(includes) */
#if PB_PROTO_HEADER_VERSION != 30
#error Regenerate this file with the current version of nanopb generator.
#endif
@@ -45,44 +47,35 @@ extern "C" {
#endif
/* Struct definitions */
-typedef struct _google_trace_TraceId {
- bool has_hi;
- uint64_t hi;
- bool has_lo;
- uint64_t lo;
-} google_trace_TraceId;
-
typedef struct _google_trace_TraceContext {
- bool has_trace_id;
- google_trace_TraceId trace_id;
+ bool has_trace_id_hi;
+ uint64_t trace_id_hi;
+ bool has_trace_id_lo;
+ uint64_t trace_id_lo;
bool has_span_id;
uint64_t span_id;
- bool has_is_sampled;
- bool is_sampled;
+ bool has_span_options;
+ uint32_t span_options;
+/* @@protoc_insertion_point(struct:google_trace_TraceContext) */
} google_trace_TraceContext;
/* Default values for struct fields */
/* Initializer values for message structs */
-#define google_trace_TraceId_init_default {false, 0, false, 0}
-#define google_trace_TraceContext_init_default {false, google_trace_TraceId_init_default, false, 0, false, 0}
-#define google_trace_TraceId_init_zero {false, 0, false, 0}
-#define google_trace_TraceContext_init_zero {false, google_trace_TraceId_init_zero, false, 0, false, 0}
+#define google_trace_TraceContext_init_default {false, 0, false, 0, false, 0, false, 0}
+#define google_trace_TraceContext_init_zero {false, 0, false, 0, false, 0, false, 0}
/* Field tags (for use in manual encoding/decoding) */
-#define google_trace_TraceId_hi_tag 1
-#define google_trace_TraceId_lo_tag 2
-#define google_trace_TraceContext_trace_id_tag 1
-#define google_trace_TraceContext_span_id_tag 2
-#define google_trace_TraceContext_is_sampled_tag 3
+#define google_trace_TraceContext_trace_id_hi_tag 1
+#define google_trace_TraceContext_trace_id_lo_tag 2
+#define google_trace_TraceContext_span_id_tag 3
+#define google_trace_TraceContext_span_options_tag 4
/* Struct field encoding specification for nanopb */
-extern const pb_field_t google_trace_TraceId_fields[3];
-extern const pb_field_t google_trace_TraceContext_fields[4];
+extern const pb_field_t google_trace_TraceContext_fields[5];
/* Maximum encoded size of messages (where known) */
-#define google_trace_TraceId_size 18
-#define google_trace_TraceContext_size 31
+#define google_trace_TraceContext_size 32
/* Message IDs (where set with "msgid" option) */
#ifdef PB_MSGID
@@ -95,5 +88,6 @@ extern const pb_field_t google_trace_TraceContext_fields[4];
#ifdef __cplusplus
} /* extern "C" */
#endif
+/* @@protoc_insertion_point(eof) */
-#endif /* GRPC_CORE_EXT_CENSUS_GEN_TRACE_CONTEXT_PB_H */
+#endif
diff --git a/src/core/ext/census/trace_context.c b/src/core/ext/census/trace_context.c
index fbb20d3448..47d0de19da 100644
--- a/src/core/ext/census/trace_context.c
+++ b/src/core/ext/census/trace_context.c
@@ -73,7 +73,7 @@ bool decode_trace_context(google_trace_TraceContext *ctxt, uint8_t *buffer,
}
// check fields
- if (!ctxt->has_trace_id) {
+ if (!ctxt->has_trace_id_hi || !ctxt->has_trace_id_lo) {
gpr_log(GPR_DEBUG, "Invalid TraceContext: missing trace_id");
return false;
}
diff --git a/src/core/ext/census/trace_context.h b/src/core/ext/census/trace_context.h
index 1cb5e26ea7..f391a1b7c1 100644
--- a/src/core/ext/census/trace_context.h
+++ b/src/core/ext/census/trace_context.h
@@ -38,6 +38,9 @@
#include "src/core/ext/census/gen/trace_context.pb.h"
+/* Span option flags. */
+#define SPAN_OPTIONS_IS_SAMPLED 0x01
+
/* Maximum number of bytes required to encode a TraceContext (31)
1 byte for trace_id field
1 byte for trace_id length
diff --git a/src/core/ext/census/trace_label.h b/src/core/ext/census/trace_label.h
new file mode 100644
index 0000000000..0e4a8d885f
--- /dev/null
+++ b/src/core/ext/census/trace_label.h
@@ -0,0 +1,61 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CENSUS_TRACE_LABEL_H
+#define GRPC_CORE_EXT_CENSUS_TRACE_LABEL_H
+
+#include "src/core/ext/census/trace_string.h"
+
+/* Trace label (key/value pair) stores a label name and the label value. The
+ value can be one of trace_string/int64_t/bool. */
+typedef struct trace_label {
+ trace_string key;
+ enum label_type {
+ /* Unknown value for debugging/error purposes */
+ LABEL_UNKNOWN = 0,
+ /* A string value */
+ LABEL_STRING = 1,
+ /* An integer value. */
+ LABEL_INT = 2,
+ /* A boolean value. */
+ LABEL_BOOL = 3,
+ } value_type;
+
+ union value {
+ trace_string label_str;
+ int64_t label_int;
+ bool label_bool;
+ } value;
+} trace_label;
+
+#endif
diff --git a/src/core/ext/census/trace_propagation.h b/src/core/ext/census/trace_propagation.h
new file mode 100644
index 0000000000..75c4ebaa39
--- /dev/null
+++ b/src/core/ext/census/trace_propagation.h
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CENSUS_TRACE_PROPAGATION_H
+#define GRPC_CORE_EXT_CENSUS_TRACE_PROPAGATION_H
+
+#include "src/core/ext/census/tracing.h"
+
+/* Encoding and decoding functions for receiving and sending trace contexts
+ over the wire. Only RPC libraries should be calling these
+ functions. These functions return the number of bytes encoded/decoded
+ (0 if a failure has occurred). buf_size indicates the size of the
+ input/output buffer. trace_span_context is a struct that includes the
+ trace ID, span ID, and a set of option flags (is_sampled, etc.). */
+
+/* Converts a span context to a binary byte buffer. */
+size_t trace_span_context_to_binary(const trace_span_context *ctxt,
+ uint8_t *buf, size_t buf_size);
+
+/* Reads a binary byte buffer and populates a span context structure. */
+size_t binary_to_trace_span_context(const uint8_t *buf, size_t buf_size,
+ trace_span_context *ctxt);
+
+/* Converts a span context to an http metadata compatible string. */
+size_t trace_span_context_to_http_format(const trace_span_context *ctxt,
+ char *buf, size_t buf_size);
+
+/* Reads an http metadata compatible string and populates a span context
+ structure. */
+size_t http_format_to_trace_span_context(const char *buf, size_t buf_size,
+ trace_span_context *ctxt);
+
+#endif
diff --git a/src/core/ext/census/trace_status.h b/src/core/ext/census/trace_status.h
new file mode 100644
index 0000000000..adc0ebd0c0
--- /dev/null
+++ b/src/core/ext/census/trace_status.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CENSUS_TRACE_STATUS_H
+#define GRPC_CORE_EXT_CENSUS_TRACE_STATUS_H
+
+#include "src/core/ext/census/trace_string.h"
+
+/* Stores a status code and status message for a trace. */
+typedef struct trace_status {
+ int64_t errorCode;
+ trace_string errorMessage;
+} trace_status;
+
+#endif
diff --git a/src/core/ext/census/trace_string.h b/src/core/ext/census/trace_string.h
new file mode 100644
index 0000000000..8e77ee9f7e
--- /dev/null
+++ b/src/core/ext/census/trace_string.h
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CENSUS_TRACE_STRING_H
+#define GRPC_CORE_EXT_CENSUS_TRACE_STRING_H
+
+#include <grpc/slice.h>
+
+/* String struct for tracing messages. Since this is a C API, we do not have
+ access to a string class. This is intended for use by higher level
+ languages which wrap around the C API, as most of them have a string class.
+ This will also be more efficient when copying, as we have an explicitly
+ specified length. Also, grpc_slice has reference counting which allows for
+ interning. */
+typedef struct trace_string {
+ char *string;
+ size_t length;
+} trace_string;
+
+#endif
diff --git a/src/core/ext/census/tracing.c b/src/core/ext/census/tracing.c
index 9371fffc8d..8b74628f4f 100644
--- a/src/core/ext/census/tracing.c
+++ b/src/core/ext/census/tracing.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -31,21 +31,41 @@
*
*/
-//#include "src/core/ext/census/tracing.h"
+#include "src/core/ext/census/tracing.h"
#include <grpc/census.h>
-#include <stdlib.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <openssl/rand.h>
+#include "src/core/ext/census/mlog.h"
-/* TODO(aveitch): These are all placeholder implementations. */
+void trace_start_span(const trace_span_context *span_ctxt,
+ const trace_string name, const start_span_options *opts,
+ trace_span_context *new_span_ctxt,
+ bool has_remote_parent) {
+ // Noop implementation.
+}
+
+void trace_add_span_annotation(const trace_string description,
+ const trace_label *labels, const size_t n_labels,
+ trace_span_context *span_ctxt) {
+ // Noop implementation.
+}
-int census_trace_mask(const census_context *context) {
- abort();
- return CENSUS_TRACE_MASK_NONE;
+void trace_add_span_network_event_annotation(const trace_string description,
+ const trace_label *labels,
+ const size_t n_labels,
+ const gpr_timespec timestamp,
+ bool sent, uint64_t id,
+ trace_span_context *span_ctxt) {
+ // Noop implementation.
}
-void census_set_trace_mask(int trace_mask) { abort(); }
+void trace_add_span_labels(const trace_label *labels, const size_t n_labels,
+ trace_span_context *span_ctxt) {
+ // Noop implementation.
+}
-void census_trace_print(census_context *context, uint32_t type,
- const char *buffer, size_t n) {
- abort();
+void trace_end_span(const trace_status *status, trace_span_context *span_ctxt) {
+ // Noop implementation.
}
diff --git a/src/core/ext/census/tracing.h b/src/core/ext/census/tracing.h
new file mode 100644
index 0000000000..c2b947ae40
--- /dev/null
+++ b/src/core/ext/census/tracing.h
@@ -0,0 +1,124 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_CENSUS_TRACING_H
+#define GRPC_CORE_EXT_CENSUS_TRACING_H
+
+#include <grpc/support/time.h>
+#include <stdbool.h>
+#include "src/core/ext/census/trace_context.h"
+#include "src/core/ext/census/trace_label.h"
+#include "src/core/ext/census/trace_status.h"
+
+/* This is the low level tracing API that other languages will interface with.
+ This is not intended to be accessed by the end-user, therefore it has been
+ designed with performance in mind rather than ease of use. */
+
+/* The tracing level. */
+enum TraceLevel {
+ /* Annotations on this context will be silently discarded. */
+ NO_TRACING = 0,
+ /* Annotations will not be saved to a persistent store. They will be
+ available via local APIs only. This setting is not propagated to child
+ spans. */
+ TRANSIENT_TRACING = 1,
+ /* Annotations are recorded for the entire distributed trace and they are
+ saved to a persistent store. This setting is propagated to child spans. */
+ PERSISTENT_TRACING = 2,
+};
+
+typedef struct trace_span_context {
+ /* Trace span context stores Span ID, Trace ID, and option flags. */
+ /* Trace ID is 128 bits split into 2 64-bit chunks (hi and lo). */
+ uint64_t trace_id_hi;
+ uint64_t trace_id_lo;
+ /* Span ID is 64 bits. */
+ uint64_t span_id;
+ /* Span-options is 32-bit value which contains flag options. */
+ uint32_t span_options;
+} trace_span_context;
+
+typedef struct start_span_options {
+ /* If set, this will override the Span.local_start_time for the Span. */
+ gpr_timespec local_start_timestamp;
+
+ /* Linked spans can be used to identify spans that are linked to this span in
+ a different trace. This can be used (for example) in batching operations,
+ where a single batch handler processes multiple requests from different
+ traces. If set, points to a list of Spans are linked to the created Span.*/
+ trace_span_context *linked_spans;
+ /* The number of linked spans. */
+ size_t n_linked_spans;
+} start_span_options;
+
+/* Create a new child Span (or root if parent is NULL), with parent being the
+ designated Span. The child span will have the provided name and starting
+ span options (optional). The bool has_remote_parent marks whether the
+ context refers to a remote parent span or not. */
+void trace_start_span(const trace_span_context *span_ctxt,
+ const trace_string name, const start_span_options *opts,
+ trace_span_context *new_span_ctxt,
+ bool has_remote_parent);
+
+/* Add a new Annotation to the Span. Annotations consist of a description
+ (trace_string) and a set of n labels (trace_label). This can be populated
+ with arbitrary user data. */
+void trace_add_span_annotation(const trace_string description,
+ const trace_label *labels, const size_t n_labels,
+ trace_span_context *span_ctxt);
+
+/* Add a new NetworkEvent annotation to a Span. This function is only intended
+ to be used by RPC systems (either client or server), not by higher level
+ applications. The timestamp type will be system-defined, the sent argument
+ designates whether this is a network send event (client request, server
+ reply)or receive (server request, client reply). The id argument corresponds
+ to Span.Annotation.NetworkEvent.id from the data model, and serves to uniquely
+ identify each network message. */
+void trace_add_span_network_event(const trace_string description,
+ const trace_label *labels,
+ const size_t n_labels,
+ const gpr_timespec timestamp, bool sent,
+ uint64_t id, trace_span_context *span_ctxt);
+
+/* Add a set of labels to the Span. These will correspond to the field
+Span.labels in the data model. */
+void trace_add_span_labels(const trace_label *labels, const size_t n_labels,
+ trace_span_context *span_ctxt);
+
+/* Mark the end of Span Execution with the given status. Only the timing of the
+first EndSpan call for a given Span will be recorded, and implementations are
+free to ignore all further calls using the Span. EndSpanOptions can
+optionally be NULL. */
+void trace_end_span(const trace_status *status, trace_span_context *span_ctxt);
+
+#endif
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index 07eb68a3eb..06038bb5ba 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -44,8 +44,8 @@
#include <grpc/support/useful.h>
#include "src/core/ext/client_channel/http_connect_handshaker.h"
-#include "src/core/ext/client_channel/http_proxy.h"
#include "src/core/ext/client_channel/lb_policy_registry.h"
+#include "src/core/ext/client_channel/proxy_mapper_registry.h"
#include "src/core/ext/client_channel/resolver_registry.h"
#include "src/core/ext/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
@@ -153,10 +153,6 @@ static void *method_parameters_create_from_json(const grpc_json *json) {
*/
typedef struct client_channel_channel_data {
- /** server name */
- char *server_name;
- /** HTTP CONNECT proxy to use, if any */
- char *proxy_name;
/** resolver for this channel */
grpc_resolver *resolver;
/** have we started resolving this channel */
@@ -317,17 +313,6 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
// Use pick_first if nothing was specified and we didn't select grpclb
// above.
if (lb_policy_name == NULL) lb_policy_name = "pick_first";
- // If using a proxy, add channel arg for server in HTTP CONNECT request.
- if (chand->proxy_name != NULL) {
- grpc_arg new_arg;
- new_arg.key = GRPC_ARG_HTTP_CONNECT_SERVER;
- new_arg.type = GRPC_ARG_STRING;
- new_arg.value.string = chand->server_name;
- grpc_channel_args *tmp_args = chand->resolver_result;
- chand->resolver_result =
- grpc_channel_args_copy_and_add(chand->resolver_result, &new_arg, 1);
- grpc_channel_args_destroy(exec_ctx, tmp_args);
- }
// Instantiate LB policy.
grpc_lb_policy_args lb_policy_args;
lb_policy_args.args = chand->resolver_result;
@@ -542,16 +527,21 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(arg->type == GRPC_ARG_POINTER);
grpc_client_channel_factory_ref(arg->value.pointer.p);
chand->client_channel_factory = arg->value.pointer.p;
- // Instantiate resolver.
+ // Get server name to resolve, using proxy mapper if needed.
arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
GPR_ASSERT(arg != NULL);
GPR_ASSERT(arg->type == GRPC_ARG_STRING);
- chand->server_name = gpr_strdup(arg->value.string);
- chand->proxy_name = grpc_get_http_proxy_server();
- char *name_to_resolve =
- chand->proxy_name == NULL ? chand->server_name : chand->proxy_name;
+ char *proxy_name = NULL;
+ grpc_channel_args *new_args = NULL;
+ grpc_proxy_mappers_map_name(exec_ctx, arg->value.string, args->channel_args,
+ &proxy_name, &new_args);
+ // Instantiate resolver.
chand->resolver = grpc_resolver_create(
- exec_ctx, name_to_resolve, args->channel_args, chand->interested_parties);
+ exec_ctx, proxy_name != NULL ? proxy_name : arg->value.string,
+ new_args != NULL ? new_args : args->channel_args,
+ chand->interested_parties);
+ if (proxy_name != NULL) gpr_free(proxy_name);
+ if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args);
if (chand->resolver == NULL) {
return GRPC_ERROR_CREATE("resolver creation failed");
}
@@ -562,8 +552,6 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
- gpr_free(chand->server_name);
- gpr_free(chand->proxy_name);
if (chand->resolver != NULL) {
grpc_resolver_shutdown(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
@@ -644,6 +632,12 @@ typedef struct client_channel_call_data {
grpc_linked_mdelem lb_token_mdelem;
} call_data;
+grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
+ grpc_call_element *call_elem) {
+ grpc_subchannel_call *scc = GET_CALL((call_data *)call_elem->call_data);
+ return scc == CANCELLED_CALL ? NULL : scc;
+}
+
static void add_waiting_locked(call_data *calld, grpc_transport_stream_op *op) {
GPR_TIMER_BEGIN("add_waiting_locked", 0);
if (calld->waiting_ops_count == calld->waiting_ops_capacity) {
diff --git a/src/core/ext/client_channel/client_channel.h b/src/core/ext/client_channel/client_channel.h
index f02587d0c1..5e6e64e58b 100644
--- a/src/core/ext/client_channel/client_channel.h
+++ b/src/core/ext/client_channel/client_channel.h
@@ -57,4 +57,8 @@ void grpc_client_channel_watch_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
grpc_connectivity_state *state, grpc_closure *on_complete);
+/* Debug helper: pull the subchannel call from a call stack element */
+grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
+ grpc_call_element *elem);
+
#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_H */
diff --git a/src/core/ext/client_channel/client_channel_plugin.c b/src/core/ext/client_channel/client_channel_plugin.c
index 7f75233727..6f9df3e386 100644
--- a/src/core/ext/client_channel/client_channel_plugin.c
+++ b/src/core/ext/client_channel/client_channel_plugin.c
@@ -39,6 +39,7 @@
#include "src/core/ext/client_channel/client_channel.h"
#include "src/core/ext/client_channel/http_connect_handshaker.h"
+#include "src/core/ext/client_channel/http_proxy.h"
#include "src/core/ext/client_channel/lb_policy_registry.h"
#include "src/core/ext/client_channel/proxy_mapper_registry.h"
#include "src/core/ext/client_channel/resolver_registry.h"
@@ -82,6 +83,7 @@ void grpc_client_channel_init(void) {
grpc_lb_policy_registry_init();
grpc_resolver_registry_init();
grpc_proxy_mapper_registry_init();
+ grpc_register_http_proxy_mapper();
grpc_subchannel_index_init();
grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MIN,
set_default_host_if_unset, NULL);
diff --git a/src/core/ext/client_channel/http_proxy.c b/src/core/ext/client_channel/http_proxy.c
index 9a6c818c4e..bbe4ff550c 100644
--- a/src/core/ext/client_channel/http_proxy.c
+++ b/src/core/ext/client_channel/http_proxy.c
@@ -40,10 +40,13 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include "src/core/ext/client_channel/http_connect_handshaker.h"
+#include "src/core/ext/client_channel/proxy_mapper_registry.h"
#include "src/core/ext/client_channel/uri_parser.h"
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/support/env.h"
-char* grpc_get_http_proxy_server() {
+static char* grpc_get_http_proxy_server() {
char* uri_str = gpr_getenv("http_proxy");
if (uri_str == NULL) return NULL;
grpc_uri* uri = grpc_uri_parse(uri_str, false /* suppress_errors */);
@@ -66,3 +69,55 @@ done:
grpc_uri_destroy(uri);
return proxy_name;
}
+
+static bool proxy_mapper_map_name(grpc_exec_ctx* exec_ctx,
+ grpc_proxy_mapper* mapper,
+ const char* server_uri,
+ const grpc_channel_args* args,
+ char** name_to_resolve,
+ grpc_channel_args** new_args) {
+ *name_to_resolve = grpc_get_http_proxy_server();
+ if (*name_to_resolve == NULL) return false;
+ grpc_uri* uri = grpc_uri_parse(server_uri, false /* suppress_errors */);
+ if (uri == NULL || uri->path[0] == '\0') {
+ gpr_log(GPR_ERROR,
+ "'http_proxy' environment variable set, but cannot "
+ "parse server URI '%s' -- not using proxy",
+ server_uri);
+ if (uri != NULL) grpc_uri_destroy(uri);
+ return false;
+ }
+ if (strcmp(uri->scheme, "unix") == 0) {
+ gpr_log(GPR_INFO, "not using proxy for Unix domain socket '%s'",
+ server_uri);
+ grpc_uri_destroy(uri);
+ return false;
+ }
+ grpc_arg new_arg;
+ new_arg.key = GRPC_ARG_HTTP_CONNECT_SERVER;
+ new_arg.type = GRPC_ARG_STRING;
+ new_arg.value.string = uri->path[0] == '/' ? uri->path + 1 : uri->path;
+ *new_args = grpc_channel_args_copy_and_add(args, &new_arg, 1);
+ grpc_uri_destroy(uri);
+ return true;
+}
+
+static bool proxy_mapper_map_address(grpc_exec_ctx* exec_ctx,
+ grpc_proxy_mapper* mapper,
+ const grpc_resolved_address* address,
+ const grpc_channel_args* args,
+ grpc_resolved_address** new_address,
+ grpc_channel_args** new_args) {
+ return false;
+}
+
+static void proxy_mapper_destroy(grpc_proxy_mapper* mapper) {}
+
+static const grpc_proxy_mapper_vtable proxy_mapper_vtable = {
+ proxy_mapper_map_name, proxy_mapper_map_address, proxy_mapper_destroy};
+
+static grpc_proxy_mapper proxy_mapper = {&proxy_mapper_vtable};
+
+void grpc_register_http_proxy_mapper() {
+ grpc_proxy_mapper_register(true /* at_start */, &proxy_mapper);
+}
diff --git a/src/core/ext/client_channel/http_proxy.h b/src/core/ext/client_channel/http_proxy.h
index 0d77ae253b..c8882b1ef1 100644
--- a/src/core/ext/client_channel/http_proxy.h
+++ b/src/core/ext/client_channel/http_proxy.h
@@ -34,8 +34,6 @@
#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_HTTP_PROXY_H
#define GRPC_CORE_EXT_CLIENT_CHANNEL_HTTP_PROXY_H
-/// Returns the name of the proxy to use, or NULL if no proxy is configured.
-/// Caller takes ownership of result.
-char* grpc_get_http_proxy_server();
+void grpc_register_http_proxy_mapper();
#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_HTTP_PROXY_H */
diff --git a/src/core/ext/client_channel/proxy_mapper.c b/src/core/ext/client_channel/proxy_mapper.c
index 6b6f328d3c..f92afe847b 100644
--- a/src/core/ext/client_channel/proxy_mapper.c
+++ b/src/core/ext/client_channel/proxy_mapper.c
@@ -38,13 +38,24 @@ void grpc_proxy_mapper_init(const grpc_proxy_mapper_vtable* vtable,
mapper->vtable = vtable;
}
-bool grpc_proxy_mapper_map(grpc_exec_ctx* exec_ctx, grpc_proxy_mapper* mapper,
- const grpc_resolved_address* address,
- const grpc_channel_args* args,
- grpc_resolved_address** new_address,
- grpc_channel_args** new_args) {
- return mapper->vtable->map(exec_ctx, mapper, address, args, new_address,
- new_args);
+bool grpc_proxy_mapper_map_name(grpc_exec_ctx* exec_ctx,
+ grpc_proxy_mapper* mapper,
+ const char* server_uri,
+ const grpc_channel_args* args,
+ char** name_to_resolve,
+ grpc_channel_args** new_args) {
+ return mapper->vtable->map_name(exec_ctx, mapper, server_uri, args,
+ name_to_resolve, new_args);
+}
+
+bool grpc_proxy_mapper_map_address(grpc_exec_ctx* exec_ctx,
+ grpc_proxy_mapper* mapper,
+ const grpc_resolved_address* address,
+ const grpc_channel_args* args,
+ grpc_resolved_address** new_address,
+ grpc_channel_args** new_args) {
+ return mapper->vtable->map_address(exec_ctx, mapper, address, args,
+ new_address, new_args);
}
void grpc_proxy_mapper_destroy(grpc_proxy_mapper* mapper) {
diff --git a/src/core/ext/client_channel/proxy_mapper.h b/src/core/ext/client_channel/proxy_mapper.h
index fa930379e7..6e4607fe4d 100644
--- a/src/core/ext/client_channel/proxy_mapper.h
+++ b/src/core/ext/client_channel/proxy_mapper.h
@@ -43,15 +43,22 @@
typedef struct grpc_proxy_mapper grpc_proxy_mapper;
typedef struct {
+ /// Determines the proxy name to resolve for \a server_uri.
+ /// If no proxy is needed, returns false.
+ /// Otherwise, sets \a name_to_resolve, optionally sets \a new_args,
+ /// and returns true.
+ bool (*map_name)(grpc_exec_ctx* exec_ctx, grpc_proxy_mapper* mapper,
+ const char* server_uri, const grpc_channel_args* args,
+ char** name_to_resolve, grpc_channel_args** new_args);
/// Determines the proxy address to use to contact \a address.
/// If no proxy is needed, returns false.
/// Otherwise, sets \a new_address, optionally sets \a new_args, and
/// returns true.
- bool (*map)(grpc_exec_ctx* exec_ctx, grpc_proxy_mapper* mapper,
- const grpc_resolved_address* address,
- const grpc_channel_args* args,
- grpc_resolved_address** new_address,
- grpc_channel_args** new_args);
+ bool (*map_address)(grpc_exec_ctx* exec_ctx, grpc_proxy_mapper* mapper,
+ const grpc_resolved_address* address,
+ const grpc_channel_args* args,
+ grpc_resolved_address** new_address,
+ grpc_channel_args** new_args);
/// Destroys \a mapper.
void (*destroy)(grpc_proxy_mapper* mapper);
} grpc_proxy_mapper_vtable;
@@ -63,11 +70,20 @@ struct grpc_proxy_mapper {
void grpc_proxy_mapper_init(const grpc_proxy_mapper_vtable* vtable,
grpc_proxy_mapper* mapper);
-bool grpc_proxy_mapper_map(grpc_exec_ctx* exec_ctx, grpc_proxy_mapper* mapper,
- const grpc_resolved_address* address,
- const grpc_channel_args* args,
- grpc_resolved_address** new_address,
- grpc_channel_args** new_args);
+bool grpc_proxy_mapper_map_name(grpc_exec_ctx* exec_ctx,
+ grpc_proxy_mapper* mapper,
+ const char* server_uri,
+ const grpc_channel_args* args,
+ char** name_to_resolve,
+ grpc_channel_args** new_args);
+
+bool grpc_proxy_mapper_map_address(grpc_exec_ctx* exec_ctx,
+ grpc_proxy_mapper* mapper,
+ const grpc_resolved_address* address,
+ const grpc_channel_args* args,
+ grpc_resolved_address** new_address,
+ grpc_channel_args** new_args);
+
void grpc_proxy_mapper_destroy(grpc_proxy_mapper* mapper);
#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_PROXY_MAPPER_H */
diff --git a/src/core/ext/client_channel/proxy_mapper_registry.c b/src/core/ext/client_channel/proxy_mapper_registry.c
index 0a156c8b1c..2c44b9d490 100644
--- a/src/core/ext/client_channel/proxy_mapper_registry.c
+++ b/src/core/ext/client_channel/proxy_mapper_registry.c
@@ -61,15 +61,28 @@ static void grpc_proxy_mapper_list_register(grpc_proxy_mapper_list* list,
++list->num_mappers;
}
-static bool grpc_proxy_mapper_list_map(grpc_exec_ctx* exec_ctx,
- grpc_proxy_mapper_list* list,
- const grpc_resolved_address* address,
- const grpc_channel_args* args,
- grpc_resolved_address** new_address,
- grpc_channel_args** new_args) {
+static bool grpc_proxy_mapper_list_map_name(grpc_exec_ctx* exec_ctx,
+ grpc_proxy_mapper_list* list,
+ const char* server_uri,
+ const grpc_channel_args* args,
+ char** name_to_resolve,
+ grpc_channel_args** new_args) {
for (size_t i = 0; i < list->num_mappers; ++i) {
- if (grpc_proxy_mapper_map(exec_ctx, list->list[i], address, args,
- new_address, new_args)) {
+ if (grpc_proxy_mapper_map_name(exec_ctx, list->list[i], server_uri, args,
+ name_to_resolve, new_args)) {
+ return true;
+ }
+ }
+ return false;
+}
+
+static bool grpc_proxy_mapper_list_map_address(
+ grpc_exec_ctx* exec_ctx, grpc_proxy_mapper_list* list,
+ const grpc_resolved_address* address, const grpc_channel_args* args,
+ grpc_resolved_address** new_address, grpc_channel_args** new_args) {
+ for (size_t i = 0; i < list->num_mappers; ++i) {
+ if (grpc_proxy_mapper_map_address(exec_ctx, list->list[i], address, args,
+ new_address, new_args)) {
return true;
}
}
@@ -101,11 +114,20 @@ void grpc_proxy_mapper_register(bool at_start, grpc_proxy_mapper* mapper) {
grpc_proxy_mapper_list_register(&g_proxy_mapper_list, at_start, mapper);
}
-bool grpc_proxy_mappers_map(grpc_exec_ctx* exec_ctx,
- const grpc_resolved_address* address,
- const grpc_channel_args* args,
- grpc_resolved_address** new_address,
- grpc_channel_args** new_args) {
- return grpc_proxy_mapper_list_map(exec_ctx, &g_proxy_mapper_list, address,
- args, new_address, new_args);
+bool grpc_proxy_mappers_map_name(grpc_exec_ctx* exec_ctx,
+ const char* server_uri,
+ const grpc_channel_args* args,
+ char** name_to_resolve,
+ grpc_channel_args** new_args) {
+ return grpc_proxy_mapper_list_map_name(exec_ctx, &g_proxy_mapper_list,
+ server_uri, args, name_to_resolve,
+ new_args);
+}
+bool grpc_proxy_mappers_map_address(grpc_exec_ctx* exec_ctx,
+ const grpc_resolved_address* address,
+ const grpc_channel_args* args,
+ grpc_resolved_address** new_address,
+ grpc_channel_args** new_args) {
+ return grpc_proxy_mapper_list_map_address(
+ exec_ctx, &g_proxy_mapper_list, address, args, new_address, new_args);
}
diff --git a/src/core/ext/client_channel/proxy_mapper_registry.h b/src/core/ext/client_channel/proxy_mapper_registry.h
index b76af8d456..742b57a2d4 100644
--- a/src/core/ext/client_channel/proxy_mapper_registry.h
+++ b/src/core/ext/client_channel/proxy_mapper_registry.h
@@ -44,10 +44,16 @@ void grpc_proxy_mapper_registry_shutdown();
/// the list. Otherwise, it will be added to the end.
void grpc_proxy_mapper_register(bool at_start, grpc_proxy_mapper* mapper);
-bool grpc_proxy_mappers_map(grpc_exec_ctx* exec_ctx,
- const grpc_resolved_address* address,
- const grpc_channel_args* args,
- grpc_resolved_address** new_address,
- grpc_channel_args** new_args);
+bool grpc_proxy_mappers_map_name(grpc_exec_ctx* exec_ctx,
+ const char* server_uri,
+ const grpc_channel_args* args,
+ char** name_to_resolve,
+ grpc_channel_args** new_args);
+
+bool grpc_proxy_mappers_map_address(grpc_exec_ctx* exec_ctx,
+ const grpc_resolved_address* address,
+ const grpc_channel_args* args,
+ grpc_resolved_address** new_address,
+ grpc_channel_args** new_args);
#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_PROXY_MAPPER_REGISTRY_H */
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index f1e4e079e2..aa036e883b 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -336,8 +336,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
grpc_set_initial_connect_string(&addr, &c->initial_connect_string);
grpc_resolved_address *new_address = NULL;
grpc_channel_args *new_args = NULL;
- if (grpc_proxy_mappers_map(exec_ctx, addr, args->args, &new_address,
- &new_args)) {
+ if (grpc_proxy_mappers_map_address(exec_ctx, addr, args->args, &new_address,
+ &new_args)) {
GPR_ASSERT(new_address != NULL);
gpr_free(addr);
addr = new_address;
@@ -788,7 +788,8 @@ grpc_call_stack *grpc_subchannel_call_get_call_stack(
return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call);
}
-static void grpc_uri_to_sockaddr(char *uri_str, grpc_resolved_address *addr) {
+static void grpc_uri_to_sockaddr(const char *uri_str,
+ grpc_resolved_address *addr) {
grpc_uri *uri = grpc_uri_parse(uri_str, 0 /* suppress_errors */);
GPR_ASSERT(uri != NULL);
if (strcmp(uri->scheme, "ipv4") == 0) {
@@ -803,14 +804,19 @@ static void grpc_uri_to_sockaddr(char *uri_str, grpc_resolved_address *addr) {
void grpc_get_subchannel_address_arg(const grpc_channel_args *args,
grpc_resolved_address *addr) {
+ const char *addr_uri_str = grpc_get_subchannel_address_uri_arg(args);
+ memset(addr, 0, sizeof(*addr));
+ if (*addr_uri_str != '\0') {
+ grpc_uri_to_sockaddr(addr_uri_str, addr);
+ }
+}
+
+const char *grpc_get_subchannel_address_uri_arg(const grpc_channel_args *args) {
const grpc_arg *addr_arg =
grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS);
GPR_ASSERT(addr_arg != NULL); // Should have been set by LB policy.
GPR_ASSERT(addr_arg->type == GRPC_ARG_STRING);
- memset(addr, 0, sizeof(*addr));
- if (*addr_arg->value.string != '\0') {
- grpc_uri_to_sockaddr(addr_arg->value.string, addr);
- }
+ return addr_arg->value.string;
}
grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address *addr) {
diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h
index 9bd35a7704..26ce954487 100644
--- a/src/core/ext/client_channel/subchannel.h
+++ b/src/core/ext/client_channel/subchannel.h
@@ -178,6 +178,9 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
void grpc_get_subchannel_address_arg(const grpc_channel_args *args,
grpc_resolved_address *addr);
+/// Returns the URI string for the address to connect to.
+const char *grpc_get_subchannel_address_uri_arg(const grpc_channel_args *args);
+
/// Returns a new channel arg encoding the subchannel address as a string.
/// Caller is responsible for freeing the string.
grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address *addr);
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index 308facb7e7..ab62e5ed6a 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -112,11 +112,13 @@
#include "src/core/ext/client_channel/lb_policy_registry.h"
#include "src/core/ext/client_channel/parse_address.h"
#include "src/core/ext/lb_policy/grpclb/grpclb.h"
+#include "src/core/ext/lb_policy/grpclb/grpclb_channel.h"
#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/slice/slice_hash_table.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/backoff.h"
@@ -751,6 +753,96 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_UNREF(error);
}
+static void destroy_balancer_name(grpc_exec_ctx *exec_ctx,
+ void *balancer_name) {
+ gpr_free(balancer_name);
+}
+
+static void *copy_balancer_name(void *balancer_name) {
+ return gpr_strdup(balancer_name);
+}
+
+static grpc_slice_hash_table_entry targets_info_entry_create(
+ const char *address, const char *balancer_name) {
+ static const grpc_slice_hash_table_vtable vtable = {destroy_balancer_name,
+ copy_balancer_name};
+ grpc_slice_hash_table_entry entry;
+ entry.key = grpc_slice_from_copied_string(address);
+ entry.value = (void *)balancer_name;
+ entry.vtable = &vtable;
+ return entry;
+}
+
+/* Returns the target URI for the LB service whose addresses are in \a
+ * addresses. Using this URI, a bidirectional streaming channel will be created
+ * for the reception of load balancing updates.
+ *
+ * The output argument \a targets_info will be updated to contain a mapping of
+ * "LB server address" to "balancer name", as reported by the naming system.
+ * This mapping will be propagated via the channel arguments of the
+ * aforementioned LB streaming channel, to be used by the security connector for
+ * secure naming checks. The user is responsible for freeing \a targets_info. */
+static char *get_lb_uri_target_addresses(grpc_exec_ctx *exec_ctx,
+ const grpc_lb_addresses *addresses,
+ grpc_slice_hash_table **targets_info) {
+ size_t num_grpclb_addrs = 0;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
+ }
+ /* All input addresses come from a resolver that claims they are LB services.
+ * It's the resolver's responsibility to make sure this policy is only
+ * instantiated and used in that case. Otherwise, something has gone wrong. */
+ GPR_ASSERT(num_grpclb_addrs > 0);
+
+ grpc_slice_hash_table_entry *targets_info_entries =
+ gpr_malloc(sizeof(*targets_info_entries) * num_grpclb_addrs);
+
+ /* construct a target ipvX://ip1:port1,ip2:port2,... from the addresses in \a
+ * addresses */
+ /* TODO(dgq): support mixed ip version */
+ char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs);
+ size_t addr_index = 0;
+
+ for (size_t i = 0; i < addresses->num_addresses; i++) {
+ if (addresses->addresses[i].user_data != NULL) {
+ gpr_log(GPR_ERROR,
+ "This LB policy doesn't support user data. It will be ignored");
+ }
+ if (addresses->addresses[i].is_balancer) {
+ char *addr_str;
+ GPR_ASSERT(grpc_sockaddr_to_string(
+ &addr_str, &addresses->addresses[i].address, true) > 0);
+ targets_info_entries[addr_index] = targets_info_entry_create(
+ addr_str, addresses->addresses[i].balancer_name);
+ addr_strs[addr_index++] = addr_str;
+ }
+ }
+ GPR_ASSERT(addr_index == num_grpclb_addrs);
+
+ size_t uri_path_len;
+ char *uri_path = gpr_strjoin_sep((const char **)addr_strs, num_grpclb_addrs,
+ ",", &uri_path_len);
+ for (size_t i = 0; i < num_grpclb_addrs; i++) gpr_free(addr_strs[i]);
+ gpr_free(addr_strs);
+
+ char *target_uri_str = NULL;
+ /* TODO(dgq): Don't assume all addresses will share the scheme of the first
+ * one */
+ gpr_asprintf(&target_uri_str, "%s:%s",
+ grpc_sockaddr_get_uri_scheme(&addresses->addresses[0].address),
+ uri_path);
+ gpr_free(uri_path);
+
+ *targets_info =
+ grpc_slice_hash_table_create(num_grpclb_addrs, targets_info_entries);
+ for (size_t i = 0; i < num_grpclb_addrs; i++) {
+ grpc_slice_unref_internal(exec_ctx, targets_info_entries[i].key);
+ }
+ gpr_free(targets_info_entries);
+
+ return target_uri_str;
+}
+
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
@@ -788,85 +880,30 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
}
grpc_uri_destroy(uri);
- /* All input addresses in addresses come from a resolver that claims
- * they are LB services. It's the resolver's responsibility to make sure
- * this policy is only instantiated and used in that case.
- *
- * Create a client channel over them to communicate with a LB service */
glb_policy->cc_factory = args->client_channel_factory;
glb_policy->args = grpc_channel_args_copy(args->args);
GPR_ASSERT(glb_policy->cc_factory != NULL);
- /* construct a target from the addresses in args, given in the form
- * ipvX://ip1:port1,ip2:port2,...
- * TODO(dgq): support mixed ip version */
- char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs);
- size_t addr_index = 0;
- for (size_t i = 0; i < addresses->num_addresses; i++) {
- if (addresses->addresses[i].user_data != NULL) {
- gpr_log(GPR_ERROR,
- "This LB policy doesn't support user data. It will be ignored");
- }
- if (addresses->addresses[i].is_balancer) {
- if (addr_index == 0) {
- addr_strs[addr_index++] =
- grpc_sockaddr_to_uri(&addresses->addresses[i].address);
- } else {
- GPR_ASSERT(grpc_sockaddr_to_string(&addr_strs[addr_index++],
- &addresses->addresses[i].address,
- true) > 0);
- }
- }
- }
- size_t uri_path_len;
- char *target_uri_str = gpr_strjoin_sep((const char **)addr_strs,
- num_grpclb_addrs, ",", &uri_path_len);
-
- /* Create a channel to talk to the LBs.
- *
- * We strip out the channel arg for the LB policy name, since we want
- * to use the default (pick_first) in this case.
- *
- * We also strip out the channel arg for the resolved addresses, since
- * that will be generated by the name resolver used in the LB channel.
- * Note that the LB channel will use the sockaddr resolver, so this
- * won't actually generate a query to DNS (or some other name service).
- * However, the addresses returned by the sockaddr resolver will have
- * is_balancer=false, whereas our own addresses have is_balancer=true.
- * We need the LB channel to return addresses with is_balancer=false
- * so that it does not wind up recursively using the grpclb LB policy,
- * as per the special case logic in client_channel.c.
- *
- * Finally, we also strip out the channel arg for the server URI,
- * since that will be different for the LB channel than for the parent
- * channel. (The client channel factory will re-add this arg with
- * the right value.)
- */
- static const char *keys_to_remove[] = {
- GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI};
- grpc_channel_args *new_args = grpc_channel_args_copy_and_remove(
- args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove));
- glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
- exec_ctx, glb_policy->cc_factory, target_uri_str,
- GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, new_args);
- grpc_channel_args_destroy(exec_ctx, new_args);
-
- gpr_free(target_uri_str);
- for (size_t i = 0; i < num_grpclb_addrs; i++) {
- gpr_free(addr_strs[i]);
- }
- gpr_free(addr_strs);
-
+ grpc_slice_hash_table *targets_info = NULL;
+ /* Create a client channel over them to communicate with a LB service */
+ char *lb_service_target_addresses =
+ get_lb_uri_target_addresses(exec_ctx, addresses, &targets_info);
+ grpc_channel_args *lb_channel_args =
+ get_lb_channel_args(exec_ctx, targets_info, args->args);
+ glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel(
+ exec_ctx, lb_service_target_addresses, args->client_channel_factory,
+ lb_channel_args);
+ grpc_slice_hash_table_unref(exec_ctx, targets_info);
+ grpc_channel_args_destroy(exec_ctx, lb_channel_args);
+ gpr_free(lb_service_target_addresses);
if (glb_policy->lb_channel == NULL) {
gpr_free(glb_policy);
return NULL;
}
-
grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable);
gpr_mu_init(&glb_policy->mu);
grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
"grpclb");
-
return &glb_policy->base;
}
diff --git a/src/core/ext/lb_policy/grpclb/grpclb_channel.c b/src/core/ext/lb_policy/grpclb/grpclb_channel.c
new file mode 100644
index 0000000000..1b8bbab1b6
--- /dev/null
+++ b/src/core/ext/lb_policy/grpclb/grpclb_channel.c
@@ -0,0 +1,77 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/ext/client_channel/client_channel.h"
+#include "src/core/ext/lb_policy/grpclb/grpclb_channel.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/support/string.h"
+
+grpc_channel *grpc_lb_policy_grpclb_create_lb_channel(
+ grpc_exec_ctx *exec_ctx, const char *lb_service_target_addresses,
+ grpc_client_channel_factory *client_channel_factory,
+ grpc_channel_args *args) {
+ grpc_channel *lb_channel = grpc_client_channel_factory_create_channel(
+ exec_ctx, client_channel_factory, lb_service_target_addresses,
+ GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, args);
+ return lb_channel;
+}
+
+grpc_channel_args *get_lb_channel_args(grpc_exec_ctx *exec_ctx,
+ grpc_slice_hash_table *targets_info,
+ const grpc_channel_args *args) {
+ /* We strip out the channel arg for the LB policy name, since we want
+ * to use the default (pick_first) in this case.
+ *
+ * We also strip out the channel arg for the resolved addresses, since
+ * that will be generated by the name resolver used in the LB channel.
+ * Note that the LB channel will use the sockaddr resolver, so this
+ * won't actually generate a query to DNS (or some other name service).
+ * However, the addresses returned by the sockaddr resolver will have
+ * is_balancer=false, whereas our own addresses have is_balancer=true.
+ * We need the LB channel to return addresses with is_balancer=false
+ * so that it does not wind up recursively using the grpclb LB policy,
+ * as per the special case logic in client_channel.c.
+ *
+ * Lastly, we also strip out the channel arg for the server URI,
+ * since that will be different for the LB channel than for the parent
+ * channel (the client channel factory will re-add this arg with
+ * the right value). */
+ static const char *keys_to_remove[] = {
+ GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI};
+ return grpc_channel_args_copy_and_remove(args, keys_to_remove,
+ GPR_ARRAY_SIZE(keys_to_remove));
+}
diff --git a/src/core/ext/lb_policy/grpclb/grpclb_channel.h b/src/core/ext/lb_policy/grpclb/grpclb_channel.h
new file mode 100644
index 0000000000..f66082d78e
--- /dev/null
+++ b/src/core/ext/lb_policy/grpclb/grpclb_channel.h
@@ -0,0 +1,56 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H
+#define GRPC_CORE_EXT_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H
+
+#include "src/core/ext/client_channel/lb_policy_factory.h"
+#include "src/core/lib/slice/slice_hash_table.h"
+
+/** Create the channel used for communicating with an LB service.
+ * Note that an LB *service* may be comprised of several LB *servers*.
+ *
+ * \a lb_service_target_addresses is the target URI containing the addresses
+ * from resolving the LB service's name (eg, ipv4:10.0.0.1:1234,10.2.3.4:9876).
+ * \a client_channel_factory will be used for the creation of the LB channel,
+ * alongside the channel args passed in \a args. */
+grpc_channel *grpc_lb_policy_grpclb_create_lb_channel(
+ grpc_exec_ctx *exec_ctx, const char *lb_service_target_addresses,
+ grpc_client_channel_factory *client_channel_factory,
+ grpc_channel_args *args);
+
+grpc_channel_args *get_lb_channel_args(grpc_exec_ctx *exec_ctx,
+ grpc_slice_hash_table *targets_info,
+ const grpc_channel_args *args);
+
+#endif /* GRPC_CORE_EXT_LB_POLICY_GRPCLB_GRPCLB_CHANNEL_H */
diff --git a/src/core/ext/lb_policy/grpclb/grpclb_channel_secure.c b/src/core/ext/lb_policy/grpclb/grpclb_channel_secure.c
new file mode 100644
index 0000000000..2fee5f1b8e
--- /dev/null
+++ b/src/core/ext/lb_policy/grpclb/grpclb_channel_secure.c
@@ -0,0 +1,107 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/ext/client_channel/client_channel.h"
+#include "src/core/ext/lb_policy/grpclb/grpclb_channel.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/security/credentials/credentials.h"
+#include "src/core/lib/security/transport/lb_targets_info.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/support/string.h"
+
+grpc_channel *grpc_lb_policy_grpclb_create_lb_channel(
+ grpc_exec_ctx *exec_ctx, const char *lb_service_target_addresses,
+ grpc_client_channel_factory *client_channel_factory,
+ grpc_channel_args *args) {
+ grpc_channel_args *new_args = args;
+ grpc_channel_credentials *channel_credentials =
+ grpc_channel_credentials_find_in_args(args);
+ if (channel_credentials != NULL) {
+ /* Substitute the channel credentials with a version without call
+ * credentials: the load balancer is not necessarily trusted to handle
+ * bearer token credentials */
+ static const char *keys_to_remove[] = {GRPC_ARG_CHANNEL_CREDENTIALS};
+ grpc_channel_credentials *creds_sans_call_creds =
+ grpc_channel_credentials_duplicate_without_call_credentials(
+ channel_credentials);
+ GPR_ASSERT(creds_sans_call_creds != NULL);
+ grpc_arg args_to_add[] = {
+ grpc_channel_credentials_to_arg(creds_sans_call_creds)};
+ /* Create the new set of channel args */
+ new_args = grpc_channel_args_copy_and_add_and_remove(
+ args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add,
+ GPR_ARRAY_SIZE(args_to_add));
+ grpc_channel_credentials_unref(exec_ctx, creds_sans_call_creds);
+ }
+ grpc_channel *lb_channel = grpc_client_channel_factory_create_channel(
+ exec_ctx, client_channel_factory, lb_service_target_addresses,
+ GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, new_args);
+ if (channel_credentials != NULL) {
+ grpc_channel_args_destroy(exec_ctx, new_args);
+ }
+ return lb_channel;
+}
+
+grpc_channel_args *get_lb_channel_args(grpc_exec_ctx *exec_ctx,
+ grpc_slice_hash_table *targets_info,
+ const grpc_channel_args *args) {
+ const grpc_arg targets_info_arg =
+ grpc_lb_targets_info_create_channel_arg(targets_info);
+ /* We strip out the channel arg for the LB policy name, since we want
+ * to use the default (pick_first) in this case.
+ *
+ * We also strip out the channel arg for the resolved addresses, since
+ * that will be generated by the name resolver used in the LB channel.
+ * Note that the LB channel will use the sockaddr resolver, so this
+ * won't actually generate a query to DNS (or some other name service).
+ * However, the addresses returned by the sockaddr resolver will have
+ * is_balancer=false, whereas our own addresses have is_balancer=true.
+ * We need the LB channel to return addresses with is_balancer=false
+ * so that it does not wind up recursively using the grpclb LB policy,
+ * as per the special case logic in client_channel.c.
+ *
+ * Lastly, we also strip out the channel arg for the server URI,
+ * since that will be different for the LB channel than for the parent
+ * channel (the client channel factory will re-add this arg with
+ * the right value). */
+ static const char *keys_to_remove[] = {
+ GRPC_ARG_LB_POLICY_NAME, GRPC_ARG_LB_ADDRESSES, GRPC_ARG_SERVER_URI};
+ /* Add the targets info table to be used for secure naming */
+ return grpc_channel_args_copy_and_add_and_remove(
+ args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &targets_info_arg,
+ 1);
+}
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index d17d8fa057..3e060d189a 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -739,6 +739,13 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
sc_args.args = new_args;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
+ if (grpc_lb_round_robin_trace) {
+ char *address_uri =
+ grpc_sockaddr_to_uri(&addresses->addresses[i].address);
+ gpr_log(GPR_DEBUG, "Created subchannel %p for address uri %s",
+ (void *)subchannel, address_uri);
+ gpr_free(address_uri);
+ }
grpc_channel_args_destroy(exec_ctx, new_args);
if (subchannel != NULL) {
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index c9f4021216..490a0c560e 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -39,6 +39,7 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/client_channel/client_channel.h"
+#include "src/core/ext/client_channel/resolver_registry.h"
#include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/surface/api_trace.h"
@@ -63,12 +64,17 @@ static grpc_channel *client_channel_factory_create_channel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const char *target, grpc_client_channel_type type,
const grpc_channel_args *args) {
+ if (target == NULL) {
+ gpr_log(GPR_ERROR, "cannot create channel with NULL target name");
+ return NULL;
+ }
// Add channel arg containing the server URI.
grpc_arg arg;
arg.type = GRPC_ARG_STRING;
arg.key = GRPC_ARG_SERVER_URI;
- arg.value.string = (char *)target;
+ arg.value.string = grpc_resolver_factory_add_default_prefix_if_needed(target);
grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
+ gpr_free(arg.value.string);
grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args,
GRPC_CLIENT_CHANNEL, NULL);
grpc_channel_args_destroy(exec_ctx, new_args);
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index f979d9bad5..d8c18eb122 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -39,10 +39,16 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/client_channel/client_channel.h"
+#include "src/core/ext/client_channel/resolver_registry.h"
+#include "src/core/ext/client_channel/uri_parser.h"
#include "src/core/ext/transport/chttp2/client/chttp2_connector.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/security/credentials/credentials.h"
+#include "src/core/lib/security/transport/lb_targets_info.h"
#include "src/core/lib/security/transport/security_connector.h"
+#include "src/core/lib/slice/slice_hash_table.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
@@ -52,12 +58,114 @@ static void client_channel_factory_ref(
static void client_channel_factory_unref(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {}
+static grpc_subchannel_args *get_secure_naming_subchannel_args(
+ grpc_exec_ctx *exec_ctx, const grpc_subchannel_args *args) {
+ grpc_channel_credentials *channel_credentials =
+ grpc_channel_credentials_find_in_args(args->args);
+ if (channel_credentials == NULL) {
+ gpr_log(GPR_ERROR,
+ "Can't create subchannel: channel credentials missing for secure "
+ "channel.");
+ return NULL;
+ }
+ // Make sure security connector does not already exist in args.
+ if (grpc_security_connector_find_in_args(args->args) != NULL) {
+ gpr_log(GPR_ERROR,
+ "Can't create subchannel: security connector already present in "
+ "channel args.");
+ return NULL;
+ }
+ // To which address are we connecting? By default, use the server URI.
+ const grpc_arg *server_uri_arg =
+ grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
+ GPR_ASSERT(server_uri_arg != NULL);
+ GPR_ASSERT(server_uri_arg->type == GRPC_ARG_STRING);
+ const char *server_uri_str = server_uri_arg->value.string;
+ GPR_ASSERT(server_uri_str != NULL);
+ grpc_uri *server_uri =
+ grpc_uri_parse(server_uri_str, true /* supress errors */);
+ GPR_ASSERT(server_uri != NULL);
+ const char *server_uri_path;
+ server_uri_path =
+ server_uri->path[0] == '/' ? server_uri->path + 1 : server_uri->path;
+ const grpc_slice_hash_table *targets_info =
+ grpc_lb_targets_info_find_in_args(args->args);
+ char *target_name_to_check = NULL;
+ if (targets_info != NULL) { // LB channel
+ // Find the balancer name for the target.
+ const char *target_uri_str =
+ grpc_get_subchannel_address_uri_arg(args->args);
+ grpc_uri *target_uri =
+ grpc_uri_parse(target_uri_str, false /* suppress errors */);
+ GPR_ASSERT(target_uri != NULL);
+ if (target_uri->path[0] != '\0') { // "path" may be empty
+ const grpc_slice key = grpc_slice_from_static_string(
+ target_uri->path[0] == '/' ? target_uri->path + 1 : target_uri->path);
+ const char *value = grpc_slice_hash_table_get(targets_info, key);
+ if (value != NULL) target_name_to_check = gpr_strdup(value);
+ grpc_slice_unref_internal(exec_ctx, key);
+ }
+ if (target_name_to_check == NULL) {
+ // If the target name to check hasn't already been set, fall back to using
+ // SERVER_URI
+ target_name_to_check = gpr_strdup(server_uri_path);
+ }
+ grpc_uri_destroy(target_uri);
+ } else { // regular channel: the secure name is the original server URI.
+ target_name_to_check = gpr_strdup(server_uri_path);
+ }
+ grpc_uri_destroy(server_uri);
+ GPR_ASSERT(target_name_to_check != NULL);
+ grpc_channel_security_connector *subchannel_security_connector = NULL;
+ // Create the security connector using the credentials and target name.
+ grpc_channel_args *new_args_from_connector = NULL;
+ const grpc_security_status security_status =
+ grpc_channel_credentials_create_security_connector(
+ exec_ctx, channel_credentials, target_name_to_check, args->args,
+ &subchannel_security_connector, &new_args_from_connector);
+ if (security_status != GRPC_SECURITY_OK) {
+ gpr_log(GPR_ERROR,
+ "Failed to create secure subchannel for secure name '%s'",
+ target_name_to_check);
+ gpr_free(target_name_to_check);
+ return NULL;
+ }
+ gpr_free(target_name_to_check);
+ grpc_arg new_security_connector_arg =
+ grpc_security_connector_to_arg(&subchannel_security_connector->base);
+
+ grpc_channel_args *new_args = grpc_channel_args_copy_and_add(
+ new_args_from_connector != NULL ? new_args_from_connector : args->args,
+ &new_security_connector_arg, 1);
+ GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, &subchannel_security_connector->base,
+ "lb_channel_create");
+ if (new_args_from_connector != NULL) {
+ grpc_channel_args_destroy(exec_ctx, new_args_from_connector);
+ }
+ grpc_subchannel_args *final_sc_args = gpr_malloc(sizeof(*final_sc_args));
+ memcpy(final_sc_args, args, sizeof(*args));
+ final_sc_args->args = new_args;
+ return final_sc_args;
+}
+
static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const grpc_subchannel_args *args) {
+ grpc_subchannel_args *subchannel_args =
+ get_secure_naming_subchannel_args(exec_ctx, args);
+ if (subchannel_args == NULL) {
+ gpr_log(
+ GPR_ERROR,
+ "Failed to create subchannel arguments during subchannel creation.");
+ return NULL;
+ }
grpc_connector *connector = grpc_chttp2_connector_create();
- grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args);
+ grpc_subchannel *s =
+ grpc_subchannel_create(exec_ctx, connector, subchannel_args);
grpc_connector_unref(exec_ctx, connector);
+ grpc_channel_args_destroy(exec_ctx,
+ (grpc_channel_args *)subchannel_args->args);
+ gpr_free(subchannel_args);
return s;
}
@@ -65,12 +173,17 @@ static grpc_channel *client_channel_factory_create_channel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const char *target, grpc_client_channel_type type,
const grpc_channel_args *args) {
+ if (target == NULL) {
+ gpr_log(GPR_ERROR, "cannot create channel with NULL target name");
+ return NULL;
+ }
// Add channel arg containing the server URI.
grpc_arg arg;
arg.type = GRPC_ARG_STRING;
arg.key = GRPC_ARG_SERVER_URI;
- arg.value.string = (char *)target;
+ arg.value.string = grpc_resolver_factory_add_default_prefix_if_needed(target);
grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
+ gpr_free(arg.value.string);
grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args,
GRPC_CLIENT_CHANNEL, NULL);
grpc_channel_args_destroy(exec_ctx, new_args);
@@ -85,10 +198,10 @@ static const grpc_client_channel_factory_vtable client_channel_factory_vtable =
static grpc_client_channel_factory client_channel_factory = {
&client_channel_factory_vtable};
-/* Create a secure client channel:
- Asynchronously: - resolve target
- - connect to it (trying alternatives as presented)
- - perform handshakes */
+// Create a secure client channel:
+// Asynchronously: - resolve target
+// - connect to it (trying alternatives as presented)
+// - perform handshakes
grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
const char *target,
const grpc_channel_args *args,
@@ -97,46 +210,27 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
GRPC_API_TRACE(
"grpc_secure_channel_create(creds=%p, target=%s, args=%p, "
"reserved=%p)",
- 4, (creds, target, args, reserved));
+ 4, ((void *)creds, target, (void *)args, (void *)reserved));
GPR_ASSERT(reserved == NULL);
- // Make sure security connector does not already exist in args.
- if (grpc_find_security_connector_in_args(args) != NULL) {
- gpr_log(GPR_ERROR, "Cannot set security context in channel args.");
+ grpc_channel *channel = NULL;
+ if (creds != NULL) {
+ // Add channel args containing the client channel factory and channel
+ // credentials.
+ grpc_arg args_to_add[] = {
+ grpc_client_channel_factory_create_channel_arg(&client_channel_factory),
+ grpc_channel_credentials_to_arg(creds)};
+ grpc_channel_args *new_args = grpc_channel_args_copy_and_add(
+ args, args_to_add, GPR_ARRAY_SIZE(args_to_add));
+ // Create channel.
+ channel = client_channel_factory_create_channel(
+ &exec_ctx, &client_channel_factory, target,
+ GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args);
+ // Clean up.
+ grpc_channel_args_destroy(&exec_ctx, new_args);
grpc_exec_ctx_finish(&exec_ctx);
- return grpc_lame_client_channel_create(
- target, GRPC_STATUS_INTERNAL,
- "Security connector exists in channel args.");
- }
- // Create security connector and construct new channel args.
- grpc_channel_security_connector *security_connector;
- grpc_channel_args *new_args_from_connector;
- if (grpc_channel_credentials_create_security_connector(
- &exec_ctx, creds, target, args, &security_connector,
- &new_args_from_connector) != GRPC_SECURITY_OK) {
- grpc_exec_ctx_finish(&exec_ctx);
- return grpc_lame_client_channel_create(
- target, GRPC_STATUS_INTERNAL, "Failed to create security connector.");
- }
- // Add channel args containing the client channel factory and security
- // connector.
- grpc_arg args_to_add[2];
- args_to_add[0] =
- grpc_client_channel_factory_create_channel_arg(&client_channel_factory);
- args_to_add[1] = grpc_security_connector_to_arg(&security_connector->base);
- grpc_channel_args *new_args = grpc_channel_args_copy_and_add(
- new_args_from_connector != NULL ? new_args_from_connector : args,
- args_to_add, GPR_ARRAY_SIZE(args_to_add));
- if (new_args_from_connector != NULL) {
- grpc_channel_args_destroy(&exec_ctx, new_args_from_connector);
}
- // Create channel.
- grpc_channel *channel = client_channel_factory_create_channel(
- &exec_ctx, &client_channel_factory, target,
- GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args);
- // Clean up.
- GRPC_SECURITY_CONNECTOR_UNREF(&exec_ctx, &security_connector->base,
- "secure_client_channel_factory_create_channel");
- grpc_channel_args_destroy(&exec_ctx, new_args);
- grpc_exec_ctx_finish(&exec_ctx);
- return channel; /* may be NULL */
+ return channel != NULL ? channel
+ : grpc_lame_client_channel_create(
+ target, GRPC_STATUS_INTERNAL,
+ "Failed to create secure client channel");
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 15f486d676..3ee5e976f8 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -124,6 +124,21 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error);
+static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error);
+static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error);
+
+static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_error *error);
+static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_chttp2_ping_type ping_type,
+ grpc_closure *on_initiate,
+ grpc_closure *on_complete);
+
+#define DEFAULT_MIN_TIME_BETWEEN_PINGS_MS 0
+#define DEFAULT_MAX_PINGS_BETWEEN_DATA 3
+
/*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/
@@ -153,18 +168,9 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_map_destroy(&t->stream_map);
grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker);
- grpc_combiner_destroy(exec_ctx, t->combiner);
+ GRPC_COMBINER_UNREF(exec_ctx, t->combiner, "chttp2_transport");
- /* callback remaining pings: they're not allowed to call into the transpot,
- and maybe they hold resources that need to be freed */
- while (t->pings.next != &t->pings) {
- grpc_chttp2_outstanding_ping *ping = t->pings.next;
- grpc_closure_sched(exec_ctx, ping->on_recv,
- GRPC_ERROR_CREATE("Transport closed"));
- ping->next->prev = ping->prev;
- ping->prev->next = ping->next;
- gpr_free(ping);
- }
+ cancel_pings(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed"));
while (t->write_cb_pool) {
grpc_chttp2_write_cb *next = t->write_cb_pool->next;
@@ -172,6 +178,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
t->write_cb_pool = next;
}
+ gpr_free(t->ping_acks);
gpr_free(t->peer_string);
gpr_free(t);
}
@@ -224,10 +231,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->is_client = is_client;
t->outgoing_window = DEFAULT_WINDOW;
t->incoming_window = DEFAULT_WINDOW;
- t->stream_lookahead = DEFAULT_WINDOW;
- t->connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
- t->ping_counter = 1;
- t->pings.next = t->pings.prev = &t->pings;
t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->is_first_frame = true;
grpc_connectivity_state_init(
@@ -248,6 +251,22 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure_init(&t->destructive_reclaimer_locked,
destructive_reclaimer_locked, t,
grpc_combiner_scheduler(t->combiner, false));
+ grpc_closure_init(&t->start_bdp_ping_locked, start_bdp_ping_locked, t,
+ grpc_combiner_scheduler(t->combiner, false));
+ grpc_closure_init(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t,
+ grpc_combiner_scheduler(t->combiner, false));
+
+ grpc_bdp_estimator_init(&t->bdp_estimator, t->peer_string);
+ t->last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC);
+ grpc_pid_controller_init(
+ &t->pid_controller,
+ (grpc_pid_controller_args){.gain_p = 4,
+ .gain_i = 8,
+ .gain_d = 0,
+ .initial_control_value = log2(DEFAULT_WINDOW),
+ .min_control_value = -1,
+ .max_control_value = 22,
+ .integral_range = 10});
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser);
@@ -273,6 +292,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
t->sent_local_settings = 0;
t->write_buffer_size = DEFAULT_WINDOW;
+ t->enable_bdp_probe = true;
if (is_client) {
grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string(
@@ -290,6 +310,12 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
DEFAULT_MAX_HEADER_LIST_SIZE);
+ t->ping_policy = (grpc_chttp2_repeated_ping_policy){
+ .max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA,
+ .min_time_between_pings =
+ gpr_time_from_millis(DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, GPR_TIMESPAN),
+ };
+
if (channel_args) {
for (i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key,
@@ -307,14 +333,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
} else if (0 == strcmp(channel_args->args[i].key,
- GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES)) {
- const grpc_integer_options options = {-1, 5, INT_MAX};
- const int value =
- grpc_channel_arg_get_integer(&channel_args->args[i], options);
- if (value >= 0) {
- t->stream_lookahead = (uint32_t)value;
- }
- } else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) {
const grpc_integer_options options = {-1, 0, INT_MAX};
const int value =
@@ -324,34 +342,53 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
(uint32_t)value);
}
} else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
+ t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ (grpc_integer_options){DEFAULT_MAX_PINGS_BETWEEN_DATA, 0, INT_MAX});
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_MIN_TIME_BETWEEN_PINGS_MS)) {
+ t->ping_policy.min_time_between_pings = gpr_time_from_millis(
+ grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ (grpc_integer_options){DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, 0,
+ INT_MAX}),
+ GPR_TIMESPAN);
+ } else if (0 == strcmp(channel_args->args[i].key,
GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
t->write_buffer_size = (uint32_t)grpc_channel_arg_get_integer(
&channel_args->args[i],
(grpc_integer_options){0, 0, MAX_WRITE_BUFFER_SIZE});
+ } else if (0 ==
+ strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
+ t->enable_bdp_probe = grpc_channel_arg_get_integer(
+ &channel_args->args[i], (grpc_integer_options){1, 0, 1});
} else {
static const struct {
const char *channel_arg_name;
grpc_chttp2_setting_id setting_id;
grpc_integer_options integer_options;
bool availability[2] /* server, client */;
- } settings_map[] = {
- {GRPC_ARG_MAX_CONCURRENT_STREAMS,
- GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
- {-1, 0, INT_MAX},
- {true, false}},
- {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
- GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
- {-1, 0, INT_MAX},
- {true, true}},
- {GRPC_ARG_MAX_METADATA_SIZE,
- GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
- {-1, 0, INT_MAX},
- {true, true}},
- {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
- GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
- {-1, 16384, 16777215},
- {true, true}},
- };
+ } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS,
+ GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
+ {-1, 0, INT32_MAX},
+ {true, false}},
+ {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
+ GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
+ {-1, 0, INT32_MAX},
+ {true, true}},
+ {GRPC_ARG_MAX_METADATA_SIZE,
+ GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
+ {-1, 0, INT32_MAX},
+ {true, true}},
+ {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
+ GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
+ {-1, 16384, 16777215},
+ {true, true}},
+ {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES,
+ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
+ {-1, 5, INT32_MAX},
+ {true, true}}};
for (j = 0; j < (int)GPR_ARRAY_SIZE(settings_map); j++) {
if (0 == strcmp(channel_args->args[i].key,
settings_map[j].channel_arg_name)) {
@@ -374,6 +411,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
+ t->ping_state.pings_before_data_required =
+ t->ping_policy.max_pings_without_data;
+
grpc_chttp2_initiate_write(exec_ctx, t, false, "init");
post_benign_reclaimer(exec_ctx, t);
}
@@ -425,6 +465,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:close");
}
end_all_the_calls(exec_ctx, t, GRPC_ERROR_REF(error));
+ cancel_pings(exec_ctx, t, GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
}
@@ -475,11 +516,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
if (server_data) {
s->id = (uint32_t)(uintptr_t)server_data;
- s->outgoing_window = t->settings[GRPC_PEER_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->incoming_window = s->max_recv_bytes =
- t->settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
*t->accepting_stream = s;
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
post_destructive_reclaimer(exec_ctx, t);
@@ -508,6 +544,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
}
grpc_chttp2_list_remove_stalled_by_transport(t, s);
+ grpc_chttp2_list_remove_stalled_by_stream(t, s);
for (int i = 0; i < STREAM_LIST_COUNT; i++) {
if (s->included[i]) {
@@ -647,13 +684,21 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
GPR_TIMER_END("grpc_chttp2_initiate_write", 0);
}
-void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, bool covered_by_poller,
- const char *reason) {
+void grpc_chttp2_become_writable(
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
+ grpc_chttp2_stream_write_type stream_write_type, const char *reason) {
if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) {
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
- grpc_chttp2_initiate_write(exec_ctx, t, covered_by_poller, reason);
+ }
+ switch (stream_write_type) {
+ case GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK:
+ break;
+ case GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED:
+ grpc_chttp2_initiate_write(exec_ctx, t, true, reason);
+ break;
+ case GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED:
+ grpc_chttp2_initiate_write(exec_ctx, t, false, reason);
+ break;
}
}
@@ -781,7 +826,6 @@ void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx,
static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;
- uint32_t stream_incoming_window;
/* start streams where we have free grpc_chttp2_stream ids and free
* concurrency */
while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
@@ -804,15 +848,11 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
"no_more_stream_ids");
}
- s->outgoing_window = t->settings[GRPC_PEER_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->incoming_window = stream_incoming_window =
- t->settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->max_recv_bytes = GPR_MAX(stream_incoming_window, s->max_recv_bytes);
grpc_chttp2_stream_map_add(&t->stream_map, s->id, s);
post_destructive_reclaimer(exec_ctx, t);
- grpc_chttp2_become_writable(exec_ctx, t, s, true, "new_stream");
+ grpc_chttp2_become_writable(exec_ctx, t, s,
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
+ "new_stream");
}
/* cancel out streams that will never be started */
while (t->next_stream_id >= MAX_CLIENT_STREAM_ID &&
@@ -907,7 +947,9 @@ static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream *s) {
if (s->id != 0 && (!s->write_buffering ||
s->flow_controlled_buffer.length > t->write_buffer_size)) {
- grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message");
+ grpc_chttp2_become_writable(exec_ctx, t, s,
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
+ "op.send_message");
}
}
@@ -1069,7 +1111,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
} else {
GPR_ASSERT(s->id != 0);
- grpc_chttp2_become_writable(exec_ctx, t, s, true,
+ grpc_chttp2_become_writable(exec_ctx, t, s,
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
"op.send_initial_metadata");
}
} else {
@@ -1160,7 +1203,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
} else if (s->id != 0) {
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
- grpc_chttp2_become_writable(exec_ctx, t, s, true,
+ grpc_chttp2_become_writable(exec_ctx, t, s,
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
"op.send_trailing_metadata");
}
}
@@ -1179,8 +1223,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
s->recv_message = op->recv_message;
if (s->id != 0 &&
(s->incoming_frames.head == NULL || s->incoming_frames.head->is_tail)) {
- incoming_byte_stream_update_flow_control(exec_ctx, t, s,
- t->stream_lookahead, 0);
+ incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0);
}
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
}
@@ -1224,43 +1267,46 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
GPR_TIMER_END("perform_stream_op", 0);
}
+static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_error *error) {
+ /* callback remaining pings: they're not allowed to call into the transpot,
+ and maybe they hold resources that need to be freed */
+ for (size_t i = 0; i < GRPC_CHTTP2_PING_TYPE_COUNT; i++) {
+ grpc_chttp2_ping_queue *pq = &t->ping_queues[i];
+ for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) {
+ grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error));
+ grpc_closure_list_sched(exec_ctx, &pq->lists[j]);
+ }
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- grpc_closure *on_recv) {
- grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p));
- p->next = &t->pings;
- p->prev = p->next->prev;
- p->prev->next = p->next->prev = p;
- p->id[0] = (uint8_t)((t->ping_counter >> 56) & 0xff);
- p->id[1] = (uint8_t)((t->ping_counter >> 48) & 0xff);
- p->id[2] = (uint8_t)((t->ping_counter >> 40) & 0xff);
- p->id[3] = (uint8_t)((t->ping_counter >> 32) & 0xff);
- p->id[4] = (uint8_t)((t->ping_counter >> 24) & 0xff);
- p->id[5] = (uint8_t)((t->ping_counter >> 16) & 0xff);
- p->id[6] = (uint8_t)((t->ping_counter >> 8) & 0xff);
- p->id[7] = (uint8_t)(t->ping_counter & 0xff);
- t->ping_counter++;
- p->on_recv = on_recv;
- grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_ping_create(0, p->id));
- grpc_chttp2_initiate_write(exec_ctx, t, true, "send_ping");
+ grpc_chttp2_ping_type ping_type,
+ grpc_closure *on_initiate, grpc_closure *on_ack) {
+ grpc_chttp2_ping_queue *pq = &t->ping_queues[ping_type];
+ grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate,
+ GRPC_ERROR_NONE);
+ if (grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack,
+ GRPC_ERROR_NONE)) {
+ grpc_chttp2_initiate_write(exec_ctx, t, false, "send_ping");
+ }
}
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- const uint8_t *opaque_8bytes) {
- grpc_chttp2_outstanding_ping *ping;
- for (ping = t->pings.next; ping != &t->pings; ping = ping->next) {
- if (0 == memcmp(opaque_8bytes, ping->id, 8)) {
- grpc_closure_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE);
- ping->next->prev = ping->prev;
- ping->prev->next = ping->next;
- gpr_free(ping);
- return;
- }
+ uint64_t id) {
+ grpc_chttp2_ping_queue *pq =
+ &t->ping_queues[id % GRPC_CHTTP2_PING_TYPE_COUNT];
+ if (pq->inflight_id != id) {
+ char *from = grpc_endpoint_get_peer(t->ep);
+ gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64, from, id);
+ gpr_free(from);
+ return;
+ }
+ grpc_closure_list_sched(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
+ if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
+ grpc_chttp2_initiate_write(exec_ctx, t, false, "continue_pings");
}
- char *msg = gpr_dump((const char *)opaque_8bytes, 8, GPR_DUMP_HEX);
- char *from = grpc_endpoint_get_peer(t->ep);
- gpr_log(GPR_DEBUG, "Unknown ping response from %s: %s", from, msg);
- gpr_free(from);
- gpr_free(msg);
}
static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
@@ -1308,7 +1354,8 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
}
if (op->send_ping) {
- send_ping_locked(exec_ctx, t, op->send_ping);
+ send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, NULL,
+ op->send_ping);
}
if (close_transport != GRPC_ERROR_NONE) {
@@ -1733,34 +1780,28 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_ERROR_UNREF(error);
}
-/** update window from a settings change */
-typedef struct {
- grpc_chttp2_transport *t;
- grpc_exec_ctx *exec_ctx;
-} update_global_window_args;
+/*******************************************************************************
+ * INPUT PROCESSING - PARSING
+ */
-static void update_global_window(void *args, uint32_t id, void *stream) {
- update_global_window_args *a = args;
- grpc_chttp2_transport *t = a->t;
- grpc_chttp2_stream *s = stream;
- int was_zero;
- int is_zero;
- int64_t initial_window_update = t->initial_window_update;
-
- if (initial_window_update > 0) {
- was_zero = s->outgoing_window <= 0;
- GRPC_CHTTP2_FLOW_CREDIT_STREAM("settings", t, s, outgoing_window,
- initial_window_update);
- is_zero = s->outgoing_window <= 0;
-
- if (was_zero && !is_zero) {
- grpc_chttp2_become_writable(a->exec_ctx, t, s, true,
- "update_global_window");
- }
+static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ double bdp_dbl) {
+ uint32_t bdp;
+ if (bdp_dbl <= 0) {
+ bdp = 0;
+ } else if (bdp_dbl > UINT32_MAX) {
+ bdp = UINT32_MAX;
} else {
- GRPC_CHTTP2_FLOW_DEBIT_STREAM("settings", t, s, outgoing_window,
- -initial_window_update);
+ bdp = (uint32_t)(bdp_dbl);
+ }
+ int64_t delta =
+ (int64_t)bdp -
+ (int64_t)t->settings[GRPC_LOCAL_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ if (delta == 0 || (bdp != 0 && delta > -1024 && delta < 1024)) {
+ return;
}
+ push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, bdp);
}
/*******************************************************************************
@@ -1802,6 +1843,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_BEGIN("reading_action_locked", 0);
grpc_chttp2_transport *t = tp;
+ bool need_bdp_ping = false;
GRPC_ERROR_REF(error);
@@ -1819,9 +1861,14 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
GRPC_ERROR_NONE};
for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
+ if (grpc_bdp_estimator_add_incoming_bytes(
+ &t->bdp_estimator,
+ (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i]))) {
+ need_bdp_ping = true;
+ }
errors[1] =
grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]);
- };
+ }
if (errors[1] != GRPC_ERROR_NONE) {
errors[2] = try_http_parsing(exec_ctx, t);
GRPC_ERROR_UNREF(error);
@@ -1835,21 +1882,16 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_BEGIN("post_parse_locked", 0);
if (t->initial_window_update != 0) {
- update_global_window_args args = {t, exec_ctx};
- grpc_chttp2_stream_map_for_each(&t->stream_map, update_global_window,
- &args);
+ if (t->initial_window_update > 0) {
+ grpc_chttp2_stream *s;
+ while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
+ grpc_chttp2_become_writable(
+ exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED,
+ "unstalled");
+ }
+ }
t->initial_window_update = 0;
}
- /* handle higher level things */
- if (t->incoming_window < t->connection_window_target * 3 / 4) {
- int64_t announce_bytes = t->connection_window_target - t->incoming_window;
- GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", t, announce_incoming_window,
- announce_bytes);
- GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", t, incoming_window,
- announce_bytes);
- grpc_chttp2_initiate_write(exec_ctx, t, false, "global incoming window");
- }
-
GPR_TIMER_END("post_parse_locked", 0);
}
@@ -1870,6 +1912,38 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
if (keep_reading) {
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer,
&t->read_action_locked);
+
+ if (t->enable_bdp_probe) {
+ if (need_bdp_ping) {
+ GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
+ grpc_bdp_estimator_schedule_ping(&t->bdp_estimator);
+ send_ping_locked(exec_ctx, t,
+ GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE,
+ &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked);
+ }
+
+ int64_t estimate = -1;
+ if (grpc_bdp_estimator_get_estimate(&t->bdp_estimator, &estimate)) {
+ double target = 1 + log2((double)estimate);
+ double memory_pressure = grpc_resource_quota_get_memory_pressure(
+ grpc_resource_user_quota(grpc_endpoint_get_resource_user(t->ep)));
+ if (memory_pressure > 0.8) {
+ target *= 1 - GPR_MIN(1, (memory_pressure - 0.8) / 0.1);
+ }
+ double bdp_error =
+ target - grpc_pid_controller_last(&t->pid_controller);
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ gpr_timespec dt_timespec = gpr_time_sub(now, t->last_pid_update);
+ double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9;
+ if (dt > 0.1) {
+ dt = 0.1;
+ }
+ double log2_bdp_guess =
+ grpc_pid_controller_update(&t->pid_controller, bdp_error, dt);
+ update_bdp(exec_ctx, t, pow(2, log2_bdp_guess));
+ t->last_pid_update = now;
+ }
+ }
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
} else {
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action");
@@ -1882,6 +1956,26 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_END("reading_action_locked", 0);
}
+static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = tp;
+ if (grpc_http_trace) {
+ gpr_log(GPR_DEBUG, "%s: Start BDP ping", t->peer_string);
+ }
+ grpc_bdp_estimator_start_ping(&t->bdp_estimator);
+}
+
+static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
+ grpc_error *error) {
+ grpc_chttp2_transport *t = tp;
+ if (grpc_http_trace) {
+ gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string);
+ }
+ grpc_bdp_estimator_complete_ping(&t->bdp_estimator);
+
+ GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping");
+}
+
/*******************************************************************************
* CALLBACK LOOP
*/
@@ -1932,10 +2026,12 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
size_t max_size_hint,
size_t have_already) {
uint32_t max_recv_bytes;
+ uint32_t initial_window_size =
+ t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
/* clamp max recv hint to an allowable size */
- if (max_size_hint >= UINT32_MAX - t->stream_lookahead) {
- max_recv_bytes = UINT32_MAX - t->stream_lookahead;
+ if (max_size_hint >= UINT32_MAX - initial_window_size) {
+ max_recv_bytes = UINT32_MAX - initial_window_size;
} else {
max_recv_bytes = (uint32_t)max_size_hint;
}
@@ -1948,20 +2044,26 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
}
/* add some small lookahead to keep pipelines flowing */
- GPR_ASSERT(max_recv_bytes <= UINT32_MAX - t->stream_lookahead);
- max_recv_bytes += t->stream_lookahead;
- if (s->max_recv_bytes < max_recv_bytes) {
- uint32_t add_max_recv_bytes = max_recv_bytes - s->max_recv_bytes;
- bool new_window_write_is_covered_by_poller =
- s->max_recv_bytes < have_already;
- GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, max_recv_bytes,
- add_max_recv_bytes);
- GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, incoming_window,
+ GPR_ASSERT(max_recv_bytes <= UINT32_MAX - initial_window_size);
+ if (s->incoming_window_delta < max_recv_bytes && !s->read_closed) {
+ uint32_t add_max_recv_bytes =
+ (uint32_t)(max_recv_bytes - s->incoming_window_delta);
+ grpc_chttp2_stream_write_type write_type =
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED;
+ if (s->incoming_window_delta + initial_window_size <
+ (int64_t)have_already) {
+ write_type = GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED;
+ }
+ GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, incoming_window_delta,
add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, announce_window,
add_max_recv_bytes);
- grpc_chttp2_become_writable(exec_ctx, t, s,
- new_window_write_is_covered_by_poller,
+ if ((int64_t)s->incoming_window_delta + (int64_t)initial_window_size -
+ (int64_t)s->announce_window >
+ (int64_t)initial_window_size / 2) {
+ write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK;
+ }
+ grpc_chttp2_become_writable(exec_ctx, t, s, write_type,
"read_incoming_stream");
}
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c
index 7de5f6362d..f487533c41 100644
--- a/src/core/ext/transport/chttp2/transport/frame_ping.c
+++ b/src/core/ext/transport/chttp2/transport/frame_ping.c
@@ -40,7 +40,7 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint8_t *opaque_8bytes) {
+grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint64_t opaque_8bytes) {
grpc_slice slice = grpc_slice_malloc(9 + 8);
uint8_t *p = GRPC_SLICE_START_PTR(slice);
@@ -53,7 +53,14 @@ grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint8_t *opaque_8bytes) {
*p++ = 0;
*p++ = 0;
*p++ = 0;
- memcpy(p, opaque_8bytes, 8);
+ *p++ = (uint8_t)(opaque_8bytes >> 56);
+ *p++ = (uint8_t)(opaque_8bytes >> 48);
+ *p++ = (uint8_t)(opaque_8bytes >> 40);
+ *p++ = (uint8_t)(opaque_8bytes >> 32);
+ *p++ = (uint8_t)(opaque_8bytes >> 24);
+ *p++ = (uint8_t)(opaque_8bytes >> 16);
+ *p++ = (uint8_t)(opaque_8bytes >> 8);
+ *p++ = (uint8_t)(opaque_8bytes);
return slice;
}
@@ -70,6 +77,7 @@ grpc_error *grpc_chttp2_ping_parser_begin_frame(grpc_chttp2_ping_parser *parser,
}
parser->byte = 0;
parser->is_ack = flags;
+ parser->opaque_8bytes = 0;
return GRPC_ERROR_NONE;
}
@@ -83,7 +91,7 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_ping_parser *p = parser;
while (p->byte != 8 && cur != end) {
- p->opaque_8bytes[p->byte] = *cur;
+ p->opaque_8bytes |= (((uint64_t)*cur) << (8 * p->byte));
cur++;
p->byte++;
}
@@ -93,8 +101,12 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
if (p->is_ack) {
grpc_chttp2_ack_ping(exec_ctx, t, p->opaque_8bytes);
} else {
- grpc_slice_buffer_add(&t->qbuf,
- grpc_chttp2_ping_create(1, p->opaque_8bytes));
+ if (t->ping_ack_count == t->ping_ack_capacity) {
+ t->ping_ack_capacity = GPR_MAX(t->ping_ack_capacity * 3 / 2, 3);
+ t->ping_acks = gpr_realloc(
+ t->ping_acks, t->ping_ack_capacity * sizeof(*t->ping_acks));
+ }
+ t->ping_acks[t->ping_ack_count++] = p->opaque_8bytes;
grpc_chttp2_initiate_write(exec_ctx, t, false, "ping response");
}
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.h b/src/core/ext/transport/chttp2/transport/frame_ping.h
index b9889e2d11..ef642465d7 100644
--- a/src/core/ext/transport/chttp2/transport/frame_ping.h
+++ b/src/core/ext/transport/chttp2/transport/frame_ping.h
@@ -41,10 +41,10 @@
typedef struct {
uint8_t byte;
uint8_t is_ack;
- uint8_t opaque_8bytes[8];
+ uint64_t opaque_8bytes;
} grpc_chttp2_ping_parser;
-grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint8_t *opaque_8bytes);
+grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint64_t opaque_8bytes);
grpc_error *grpc_chttp2_ping_parser_begin_frame(grpc_chttp2_ping_parser *parser,
uint32_t length, uint8_t flags);
diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c
index 7d5beed09d..cb017e75f0 100644
--- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c
+++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c
@@ -109,8 +109,13 @@ grpc_error *grpc_chttp2_rst_stream_parser_parse(grpc_exec_ctx *exec_ctx,
(((uint32_t)p->reason_bytes[3]));
grpc_error *error = GRPC_ERROR_NONE;
if (reason != GRPC_HTTP2_NO_ERROR || s->header_frames_received < 2) {
- error = grpc_error_set_int(GRPC_ERROR_CREATE("RST_STREAM"),
- GRPC_ERROR_INT_HTTP2_ERROR, (intptr_t)reason);
+ char *message;
+ gpr_asprintf(&message, "Received RST_STREAM with error code %d", reason);
+ error = grpc_error_set_int(
+ grpc_error_set_str(GRPC_ERROR_CREATE("RST_STREAM"),
+ GRPC_ERROR_STR_GRPC_MESSAGE, message),
+ GRPC_ERROR_INT_HTTP2_ERROR, (intptr_t)reason);
+ gpr_free(message);
}
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, true, error);
}
diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.c b/src/core/ext/transport/chttp2/transport/frame_settings.c
index be9b663ac1..82290e34cd 100644
--- a/src/core/ext/transport/chttp2/transport/frame_settings.c
+++ b/src/core/ext/transport/chttp2/transport/frame_settings.c
@@ -236,7 +236,7 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p,
}
if (parser->id == GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE &&
parser->incoming_settings[parser->id] != parser->value) {
- t->initial_window_update =
+ t->initial_window_update +=
(int64_t)parser->value - parser->incoming_settings[parser->id];
if (grpc_http_trace) {
gpr_log(GPR_DEBUG, "adding %d for initial_window change",
@@ -245,8 +245,9 @@ grpc_error *grpc_chttp2_settings_parser_parse(grpc_exec_ctx *exec_ctx, void *p,
}
parser->incoming_settings[parser->id] = parser->value;
if (grpc_http_trace) {
- gpr_log(GPR_DEBUG, "CHTTP2:%s: got setting %d = %d",
- t->is_client ? "CLI" : "SVR", parser->id, parser->value);
+ gpr_log(GPR_DEBUG, "CHTTP2:%s:%s: got setting %d = %d",
+ t->is_client ? "CLI" : "SVR", t->peer_string, parser->id,
+ parser->value);
}
} else if (grpc_http_trace) {
gpr_log(GPR_ERROR, "CHTTP2: Ignoring unknown setting %d (value %d)",
diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c
index 31a31c2871..8fa0bb471a 100644
--- a/src/core/ext/transport/chttp2/transport/frame_window_update.c
+++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c
@@ -110,13 +110,12 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
if (t->incoming_stream_id != 0) {
if (s != NULL) {
- bool was_zero = s->outgoing_window <= 0;
- GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", t, s, outgoing_window,
+ GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", t, s, outgoing_window_delta,
received_update);
- bool is_zero = s->outgoing_window <= 0;
- if (was_zero && !is_zero) {
- grpc_chttp2_become_writable(exec_ctx, t, s, false,
- "stream.read_flow_control");
+ if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) {
+ grpc_chttp2_become_writable(
+ exec_ctx, t, s, GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED,
+ "stream.read_flow_control");
}
}
} else {
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index ee5edc92df..075d421dd4 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -50,7 +50,9 @@
#include "src/core/ext/transport/chttp2/transport/stream_map.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/transport/bdp_estimator.h"
#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/pid_controller.h"
#include "src/core/lib/transport/transport_impl.h"
/* streams are kept in various linked lists depending on what things need to
@@ -59,6 +61,7 @@ typedef enum {
GRPC_CHTTP2_LIST_WRITABLE,
GRPC_CHTTP2_LIST_WRITING,
GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
+ GRPC_CHTTP2_LIST_STALLED_BY_STREAM,
/** streams that are waiting to start because there are too many concurrent
streams on the connection */
GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
@@ -72,6 +75,34 @@ typedef enum {
GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER,
} grpc_chttp2_write_state;
+typedef enum {
+ GRPC_CHTTP2_PING_ON_NEXT_WRITE = 0,
+ GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE,
+ GRPC_CHTTP2_PING_TYPE_COUNT /* must be last */
+} grpc_chttp2_ping_type;
+
+typedef enum {
+ GRPC_CHTTP2_PCL_INITIATE = 0,
+ GRPC_CHTTP2_PCL_NEXT,
+ GRPC_CHTTP2_PCL_INFLIGHT,
+ GRPC_CHTTP2_PCL_COUNT /* must be last */
+} grpc_chttp2_ping_closure_list;
+
+typedef struct {
+ grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT];
+ uint64_t inflight_id;
+} grpc_chttp2_ping_queue;
+
+typedef struct {
+ gpr_timespec min_time_between_pings;
+ int max_pings_without_data;
+} grpc_chttp2_repeated_ping_policy;
+
+typedef struct {
+ gpr_timespec last_ping_sent_time;
+ int pings_before_data_required;
+} grpc_chttp2_repeated_ping_state;
+
/* deframer state for the overall http2 stream of bytes */
typedef enum {
/* prefix: one entry per http2 connection prefix byte */
@@ -144,14 +175,6 @@ typedef enum {
GRPC_CHTTP2_GOAWAY_SENT,
} grpc_chttp2_sent_goaway_state;
-/* Outstanding ping request data */
-typedef struct grpc_chttp2_outstanding_ping {
- uint8_t id[8];
- grpc_closure *on_recv;
- struct grpc_chttp2_outstanding_ping *next;
- struct grpc_chttp2_outstanding_ping *prev;
-} grpc_chttp2_outstanding_ping;
-
typedef struct grpc_chttp2_write_cb {
int64_t call_at_byte;
grpc_closure *closure;
@@ -204,6 +227,9 @@ struct grpc_chttp2_transport {
/** is there a read request to the endpoint outstanding? */
uint8_t endpoint_reading;
+ /** should we probe bdp? */
+ bool enable_bdp_probe;
+
/** various lists of streams */
grpc_chttp2_stream_list lists[STREAM_LIST_COUNT];
@@ -271,16 +297,19 @@ struct grpc_chttp2_transport {
copied to next_stream_id in parsing when parsing commences */
uint32_t next_stream_id;
- /** how far to lookahead in a stream? */
- uint32_t stream_lookahead;
-
/** last new stream id */
uint32_t last_new_stream_id;
- /** pings awaiting responses */
- grpc_chttp2_outstanding_ping pings;
- /** next payload for an outgoing ping */
- uint64_t ping_counter;
+ /** ping queues for various ping insertion points */
+ grpc_chttp2_ping_queue ping_queues[GRPC_CHTTP2_PING_TYPE_COUNT];
+ grpc_chttp2_repeated_ping_policy ping_policy;
+ grpc_chttp2_repeated_ping_state ping_state;
+ uint64_t ping_ctr; /* unique id for pings */
+
+ /** ping acks */
+ size_t ping_ack_count;
+ size_t ping_ack_capacity;
+ uint64_t *ping_acks;
/** parser for headers */
grpc_chttp2_hpack_parser hpack_parser;
@@ -324,6 +353,13 @@ struct grpc_chttp2_transport {
grpc_chttp2_write_cb *write_cb_pool;
+ /* bdp estimator */
+ grpc_bdp_estimator bdp_estimator;
+ grpc_pid_controller pid_controller;
+ grpc_closure start_bdp_ping_locked;
+ grpc_closure finish_bdp_ping_locked;
+ gpr_timespec last_pid_update;
+
/* if non-NULL, close the transport with this error when writes are finished
*/
grpc_error *close_transport_on_writes_finished;
@@ -362,12 +398,10 @@ struct grpc_chttp2_stream {
/** HTTP2 stream id for this stream, or zero if one has not been assigned */
uint32_t id;
- /** window available for us to send to peer */
- int64_t outgoing_window;
- /** The number of bytes the upper layers have offered to receive.
- As the upper layer offers more bytes, this value increases.
- As bytes are read, this value decreases. */
- uint32_t max_recv_bytes;
+ /** window available for us to send to peer, over or under the initial window
+ * size of the transport... ie:
+ * outgoing_window = outgoing_window_delta + transport.initial_window_size */
+ int64_t outgoing_window_delta;
/** things the upper layers would like to send */
grpc_metadata_batch *send_initial_metadata;
grpc_closure *send_initial_metadata_finished;
@@ -428,8 +462,10 @@ struct grpc_chttp2_stream {
grpc_error *forced_close_error;
/** how many header frames have we received? */
uint8_t header_frames_received;
- /** window available for peer to send to us */
- int64_t incoming_window;
+ /** window available for peer to send to us (as a delta on
+ * transport.initial_window_size)
+ * incoming_window = incoming_window_delta + transport.initial_window_size */
+ int64_t incoming_window_delta;
/** parsing state for data frames */
grpc_chttp2_data_parser data_parser;
/** number of bytes received - reset at end of parse thread execution */
@@ -478,36 +514,43 @@ bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
/** Get a writable stream
returns non-zero if there was a stream available */
-int grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s);
+bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s);
bool grpc_chttp2_list_remove_writable_stream(
grpc_chttp2_transport *t, grpc_chttp2_stream *s) GRPC_MUST_USE_RESULT;
bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
-int grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport *t);
-int grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s);
+bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport *t);
+bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s);
void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
-int grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s);
+bool grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s);
void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
-int grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s);
+bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s);
void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
-int grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s);
+bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s);
void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
+void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s);
+bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s);
+bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s);
+
grpc_chttp2_stream *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport *t,
uint32_t id);
grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_exec_ctx *exec_ctx,
@@ -672,13 +715,23 @@ void grpc_chttp2_incoming_byte_stream_finished(
grpc_error *error);
void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
- const uint8_t *opaque_8bytes);
+ uint64_t id);
+
+typedef enum {
+ /* don't initiate a transport write, but piggyback on the next one */
+ GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK,
+ /* initiate a covered write */
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED,
+ /* initiate an uncovered write */
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED
+} grpc_chttp2_stream_write_type;
/** add a ref to the stream and add it to the writable list;
ref will be dropped in writing.c */
void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
- grpc_chttp2_stream *s, bool covered_by_poller,
+ grpc_chttp2_stream *s,
+ grpc_chttp2_stream_write_type type,
const char *reason);
void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index f58cd696f9..24bd93067b 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -376,25 +376,45 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx,
return err;
}
+ uint32_t target_incoming_window = GPR_MAX(
+ t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ 1024);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window,
incoming_frame_size);
+ if (t->incoming_window <= target_incoming_window / 2) {
+ grpc_chttp2_initiate_write(exec_ctx, t, false, "flow_control");
+ }
if (s != NULL) {
- if (incoming_frame_size > s->incoming_window) {
+ if (incoming_frame_size >
+ s->incoming_window_delta +
+ t->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) {
char *msg;
gpr_asprintf(&msg,
"frame of size %d overflows incoming window of %" PRId64,
- t->incoming_frame_size, s->incoming_window);
+ t->incoming_frame_size,
+ s->incoming_window_delta +
+ t->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
grpc_error *err = GRPC_ERROR_CREATE(msg);
gpr_free(msg);
return err;
}
- GRPC_CHTTP2_FLOW_DEBIT_STREAM("parse", t, s, incoming_window,
+ GRPC_CHTTP2_FLOW_DEBIT_STREAM("parse", t, s, incoming_window_delta,
incoming_frame_size);
+ if ((int64_t)t->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] +
+ (int64_t)s->incoming_window_delta - (int64_t)s->announce_window <=
+ (int64_t)t->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] /
+ 2) {
+ grpc_chttp2_become_writable(exec_ctx, t, s,
+ GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED,
+ "window-update-required");
+ }
s->received_bytes += incoming_frame_size;
- s->max_recv_bytes -=
- (uint32_t)GPR_MIN(s->max_recv_bytes, incoming_frame_size);
}
return GRPC_ERROR_NONE;
diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c
index a60264cc51..078818fb18 100644
--- a/src/core/ext/transport/chttp2/transport/stream_lists.c
+++ b/src/core/ext/transport/chttp2/transport/stream_lists.c
@@ -37,14 +37,14 @@
/* core list management */
-static int stream_list_empty(grpc_chttp2_transport *t,
- grpc_chttp2_stream_list_id id) {
+static bool stream_list_empty(grpc_chttp2_transport *t,
+ grpc_chttp2_stream_list_id id) {
return t->lists[id].head == NULL;
}
-static int stream_list_pop(grpc_chttp2_transport *t,
- grpc_chttp2_stream **stream,
- grpc_chttp2_stream_list_id id) {
+static bool stream_list_pop(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **stream,
+ grpc_chttp2_stream_list_id id) {
grpc_chttp2_stream *s = t->lists[id].head;
if (s) {
grpc_chttp2_stream *new_head = s->links[id].next;
@@ -124,8 +124,8 @@ bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport *t,
return stream_list_add(t, s, GRPC_CHTTP2_LIST_WRITABLE);
}
-int grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s) {
+bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s) {
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WRITABLE);
}
@@ -139,12 +139,12 @@ bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport *t,
return stream_list_add(t, s, GRPC_CHTTP2_LIST_WRITING);
}
-int grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport *t) {
+bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport *t) {
return !stream_list_empty(t, GRPC_CHTTP2_LIST_WRITING);
}
-int grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s) {
+bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s) {
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WRITING);
}
@@ -153,8 +153,8 @@ void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport *t,
stream_list_add(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
}
-int grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s) {
+bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s) {
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
}
@@ -168,8 +168,8 @@ void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t,
stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
}
-int grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t,
- grpc_chttp2_stream **s) {
+bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s) {
return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
}
@@ -177,3 +177,18 @@ void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
}
+
+void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s) {
+ stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
+}
+
+bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream **s) {
+ return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
+}
+
+bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t,
+ grpc_chttp2_stream *s) {
+ return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
+}
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 148e3844b5..05e6f59947 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -56,6 +56,75 @@ static void finish_write_cb(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->write_cb_pool = cb;
}
+static void collapse_pings_from_into(grpc_chttp2_transport *t,
+ grpc_chttp2_ping_type ping_type,
+ grpc_chttp2_ping_queue *pq) {
+ for (size_t i = 0; i < GRPC_CHTTP2_PCL_COUNT; i++) {
+ grpc_closure_list_move(&t->ping_queues[ping_type].lists[i], &pq->lists[i]);
+ }
+}
+
+static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t,
+ grpc_chttp2_ping_type ping_type) {
+ grpc_chttp2_ping_queue *pq = &t->ping_queues[ping_type];
+ if (grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) {
+ /* no ping needed: wait */
+ return;
+ }
+ if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
+ /* ping already in-flight: wait */
+ if (grpc_http_trace || grpc_bdp_estimator_trace) {
+ gpr_log(GPR_DEBUG, "Ping delayed [%p]: already pinging", t->peer_string);
+ }
+ return;
+ }
+ if (t->ping_state.pings_before_data_required == 0 &&
+ t->ping_policy.max_pings_without_data != 0) {
+ /* need to send something of substance before sending a ping again */
+ if (grpc_http_trace || grpc_bdp_estimator_trace) {
+ gpr_log(GPR_DEBUG, "Ping delayed [%p]: too many recent pings: %d/%d",
+ t->peer_string, t->ping_state.pings_before_data_required,
+ t->ping_policy.max_pings_without_data);
+ }
+ return;
+ }
+ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
+ gpr_timespec elapsed = gpr_time_sub(now, t->ping_state.last_ping_sent_time);
+ /*gpr_log(GPR_DEBUG, "elapsed:%d.%09d min:%d.%09d", (int)elapsed.tv_sec,
+ elapsed.tv_nsec, (int)t->ping_policy.min_time_between_pings.tv_sec,
+ (int)t->ping_policy.min_time_between_pings.tv_nsec);*/
+ if (gpr_time_cmp(elapsed, t->ping_policy.min_time_between_pings) < 0) {
+ /* not enough elapsed time between successive pings */
+ if (grpc_http_trace || grpc_bdp_estimator_trace) {
+ gpr_log(GPR_DEBUG,
+ "Ping delayed [%p]: not enough time elapsed since last ping",
+ t->peer_string);
+ }
+ return;
+ }
+ /* coalesce equivalent pings into this one */
+ switch (ping_type) {
+ case GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE:
+ collapse_pings_from_into(t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, pq);
+ break;
+ case GRPC_CHTTP2_PING_ON_NEXT_WRITE:
+ break;
+ case GRPC_CHTTP2_PING_TYPE_COUNT:
+ GPR_UNREACHABLE_CODE(break);
+ }
+ pq->inflight_id = t->ping_ctr * GRPC_CHTTP2_PING_TYPE_COUNT + ping_type;
+ t->ping_ctr++;
+ grpc_closure_list_sched(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
+ grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
+ &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
+ grpc_slice_buffer_add(&t->outbuf,
+ grpc_chttp2_ping_create(false, pq->inflight_id));
+ t->ping_state.last_ping_sent_time = now;
+ t->ping_state.pings_before_data_required -=
+ (t->ping_state.pings_before_data_required != 0);
+}
+
static void update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_stream *s, int64_t send_bytes,
grpc_chttp2_write_cb **list, grpc_error *error) {
@@ -139,6 +208,8 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
s->sent_initial_metadata = true;
sent_initial_metadata = true;
now_writing = true;
+ t->ping_state.pings_before_data_required =
+ t->ping_policy.max_pings_without_data;
}
/* send any window updates */
if (s->announce_window > 0) {
@@ -146,15 +217,22 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_add(&t->outbuf,
grpc_chttp2_window_update_create(
s->id, s->announce_window, &s->stats.outgoing));
+ t->ping_state.pings_before_data_required =
+ t->ping_policy.max_pings_without_data;
GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, announce_window, announce);
}
if (sent_initial_metadata) {
/* send any body bytes, if allowed by flow control */
if (s->flow_controlled_buffer.length > 0) {
- uint32_t max_outgoing =
- (uint32_t)GPR_MIN(t->settings[GRPC_ACKED_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
- GPR_MIN(s->outgoing_window, t->outgoing_window));
+ uint32_t stream_outgoing_window = (uint32_t)GPR_MAX(
+ 0,
+ s->outgoing_window_delta +
+ (int64_t)t->settings[GRPC_PEER_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
+ uint32_t max_outgoing = (uint32_t)GPR_MIN(
+ t->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
+ GPR_MIN(stream_outgoing_window, t->outgoing_window));
if (max_outgoing > 0) {
uint32_t send_bytes =
(uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length);
@@ -167,10 +245,12 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes,
is_last_frame, &s->stats.outgoing,
&t->outbuf);
- GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window,
+ GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta,
send_bytes);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
send_bytes);
+ t->ping_state.pings_before_data_required =
+ t->ping_policy.max_pings_without_data;
if (is_last_frame) {
s->send_trailing_metadata = NULL;
s->sent_trailing_metadata = true;
@@ -189,6 +269,9 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
} else if (t->outgoing_window == 0) {
grpc_chttp2_list_add_stalled_by_transport(t, s);
now_writing = true;
+ } else if (stream_outgoing_window == 0) {
+ grpc_chttp2_list_add_stalled_by_stream(t, s);
+ now_writing = true;
}
}
if (s->send_trailing_metadata != NULL &&
@@ -227,15 +310,32 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
/* if the grpc_chttp2_transport is ready to send a window update, do so here
also; 3/4 is a magic number that will likely get tuned soon */
- if (t->announce_incoming_window > 0) {
- uint32_t announced =
- (uint32_t)GPR_MIN(t->announce_incoming_window, UINT32_MAX);
- GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, announce_incoming_window,
- announced);
+ uint32_t target_incoming_window = GPR_MAX(
+ t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ 1024);
+ uint32_t threshold_to_send_transport_window_update =
+ t->outbuf.count > 0 ? 3 * target_incoming_window / 4
+ : target_incoming_window / 2;
+ if (t->incoming_window <= threshold_to_send_transport_window_update) {
+ maybe_initiate_ping(exec_ctx, t,
+ GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE);
+ uint32_t announced = (uint32_t)GPR_CLAMP(
+ target_incoming_window - t->incoming_window, 0, UINT32_MAX);
+ GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("write", t, incoming_window, announced);
grpc_transport_one_way_stats throwaway_stats;
grpc_slice_buffer_add(&t->outbuf, grpc_chttp2_window_update_create(
0, announced, &throwaway_stats));
+ t->ping_state.pings_before_data_required =
+ t->ping_policy.max_pings_without_data;
+ }
+
+ for (size_t i = 0; i < t->ping_ack_count; i++) {
+ grpc_slice_buffer_add(&t->outbuf,
+ grpc_chttp2_ping_create(1, t->ping_acks[i]));
}
+ t->ping_ack_count = 0;
+
+ maybe_initiate_ping(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE);
GPR_TIMER_END("grpc_chttp2_begin_write", 0);