aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--BUILD12
-rw-r--r--CMakeLists.txt5
-rw-r--r--Makefile6
-rw-r--r--binding.gyp1
-rw-r--r--build.yaml2
-rw-r--r--config.m41
-rw-r--r--doc/core/pending_api_cleanups.md2
-rw-r--r--gRPC-Core.podspec3
-rwxr-xr-xgrpc.gemspec2
-rw-r--r--include/grpc++/ext/reflection.grpc.pb.h2
-rw-r--r--include/grpc++/ext/reflection.pb.h2
-rw-r--r--include/grpc/impl/codegen/grpc_types.h6
-rw-r--r--package.xml2
-rw-r--r--src/core/ext/client_config/client_channel.c65
-rw-r--r--src/core/ext/client_config/lb_policy.c10
-rw-r--r--src/core/ext/client_config/lb_policy.h10
-rw-r--r--src/core/ext/client_config/subchannel.c9
-rw-r--r--src/core/ext/client_config/subchannel.h3
-rw-r--r--src/core/ext/client_config/subchannel_factory.c49
-rw-r--r--src/core/ext/client_config/subchannel_factory.h66
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c20
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c20
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c20
-rw-r--r--src/core/lib/channel/channel_stack.c32
-rw-r--r--src/core/lib/channel/channel_stack.h18
-rw-r--r--src/core/lib/channel/deadline_filter.c302
-rw-r--r--src/core/lib/channel/deadline_filter.h79
-rw-r--r--src/core/lib/channel/http_client_filter.c4
-rw-r--r--src/core/lib/channel/message_size_filter.c46
-rw-r--r--src/core/lib/iomgr/error.c58
-rw-r--r--src/core/lib/iomgr/error.h8
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c96
-rw-r--r--src/core/lib/surface/call.c143
-rw-r--r--src/core/lib/surface/completion_queue.h2
-rw-r--r--src/core/lib/surface/init.c7
-rw-r--r--src/core/lib/transport/transport.c2
-rw-r--r--src/cpp/ext/reflection.grpc.pb.cc2
-rw-r--r--src/cpp/ext/reflection.pb.cc2
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi1
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi3
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py1
-rw-r--r--test/core/channel/channel_stack_test.c6
-rw-r--r--test/core/nanopb/fuzzer_response.c3
-rw-r--r--test/core/nanopb/fuzzer_serverlist.c3
-rwxr-xr-xtools/codegen/extensions/gen_reflection_proto.sh32
-rw-r--r--tools/doxygen/Doxyfile.c++.internal2
-rw-r--r--tools/doxygen/Doxyfile.core.internal2
-rw-r--r--tools/run_tests/sources_and_headers.json3
-rw-r--r--vsprojects/vcxproj/grpc++/grpc++.vcxproj3
-rw-r--r--vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters6
-rw-r--r--vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj3
-rw-r--r--vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters6
-rw-r--r--vsprojects/vcxproj/grpc/grpc.vcxproj3
-rw-r--r--vsprojects/vcxproj/grpc/grpc.vcxproj.filters6
-rw-r--r--vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj3
-rw-r--r--vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters6
-rw-r--r--vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj3
-rw-r--r--vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters6
58 files changed, 855 insertions, 365 deletions
diff --git a/BUILD b/BUILD
index 0325d3c08a..30c2b9ff6b 100644
--- a/BUILD
+++ b/BUILD
@@ -168,6 +168,7 @@ cc_library(
"src/core/lib/channel/compress_filter.h",
"src/core/lib/channel/connected_channel.h",
"src/core/lib/channel/context.h",
+ "src/core/lib/channel/deadline_filter.h",
"src/core/lib/channel/handshaker.h",
"src/core/lib/channel/http_client_filter.h",
"src/core/lib/channel/http_server_filter.h",
@@ -326,6 +327,7 @@ cc_library(
"src/core/lib/channel/channel_stack_builder.c",
"src/core/lib/channel/compress_filter.c",
"src/core/lib/channel/connected_channel.c",
+ "src/core/lib/channel/deadline_filter.c",
"src/core/lib/channel/handshaker.c",
"src/core/lib/channel/http_client_filter.c",
"src/core/lib/channel/http_server_filter.c",
@@ -565,6 +567,7 @@ cc_library(
"src/core/lib/channel/compress_filter.h",
"src/core/lib/channel/connected_channel.h",
"src/core/lib/channel/context.h",
+ "src/core/lib/channel/deadline_filter.h",
"src/core/lib/channel/handshaker.h",
"src/core/lib/channel/http_client_filter.h",
"src/core/lib/channel/http_server_filter.h",
@@ -708,6 +711,7 @@ cc_library(
"src/core/lib/channel/channel_stack_builder.c",
"src/core/lib/channel/compress_filter.c",
"src/core/lib/channel/connected_channel.c",
+ "src/core/lib/channel/deadline_filter.c",
"src/core/lib/channel/handshaker.c",
"src/core/lib/channel/http_client_filter.c",
"src/core/lib/channel/http_server_filter.c",
@@ -917,6 +921,7 @@ cc_library(
"src/core/lib/channel/compress_filter.h",
"src/core/lib/channel/connected_channel.h",
"src/core/lib/channel/context.h",
+ "src/core/lib/channel/deadline_filter.h",
"src/core/lib/channel/handshaker.h",
"src/core/lib/channel/http_client_filter.h",
"src/core/lib/channel/http_server_filter.h",
@@ -1052,6 +1057,7 @@ cc_library(
"src/core/lib/channel/channel_stack_builder.c",
"src/core/lib/channel/compress_filter.c",
"src/core/lib/channel/connected_channel.c",
+ "src/core/lib/channel/deadline_filter.c",
"src/core/lib/channel/handshaker.c",
"src/core/lib/channel/http_client_filter.c",
"src/core/lib/channel/http_server_filter.c",
@@ -1266,6 +1272,7 @@ cc_library(
"src/core/lib/channel/compress_filter.h",
"src/core/lib/channel/connected_channel.h",
"src/core/lib/channel/context.h",
+ "src/core/lib/channel/deadline_filter.h",
"src/core/lib/channel/handshaker.h",
"src/core/lib/channel/http_client_filter.h",
"src/core/lib/channel/http_server_filter.h",
@@ -1379,6 +1386,7 @@ cc_library(
"src/core/lib/channel/channel_stack_builder.c",
"src/core/lib/channel/compress_filter.c",
"src/core/lib/channel/connected_channel.c",
+ "src/core/lib/channel/deadline_filter.c",
"src/core/lib/channel/handshaker.c",
"src/core/lib/channel/http_client_filter.c",
"src/core/lib/channel/http_server_filter.c",
@@ -1672,6 +1680,7 @@ cc_library(
"src/core/lib/channel/compress_filter.h",
"src/core/lib/channel/connected_channel.h",
"src/core/lib/channel/context.h",
+ "src/core/lib/channel/deadline_filter.h",
"src/core/lib/channel/handshaker.h",
"src/core/lib/channel/http_client_filter.h",
"src/core/lib/channel/http_server_filter.h",
@@ -1780,6 +1789,7 @@ cc_library(
"src/core/lib/channel/channel_stack_builder.c",
"src/core/lib/channel/compress_filter.c",
"src/core/lib/channel/connected_channel.c",
+ "src/core/lib/channel/deadline_filter.c",
"src/core/lib/channel/handshaker.c",
"src/core/lib/channel/http_client_filter.c",
"src/core/lib/channel/http_server_filter.c",
@@ -2167,6 +2177,7 @@ objc_library(
"src/core/lib/channel/channel_stack_builder.c",
"src/core/lib/channel/compress_filter.c",
"src/core/lib/channel/connected_channel.c",
+ "src/core/lib/channel/deadline_filter.c",
"src/core/lib/channel/handshaker.c",
"src/core/lib/channel/http_client_filter.c",
"src/core/lib/channel/http_server_filter.c",
@@ -2385,6 +2396,7 @@ objc_library(
"src/core/lib/channel/compress_filter.h",
"src/core/lib/channel/connected_channel.h",
"src/core/lib/channel/context.h",
+ "src/core/lib/channel/deadline_filter.h",
"src/core/lib/channel/handshaker.h",
"src/core/lib/channel/http_client_filter.h",
"src/core/lib/channel/http_server_filter.h",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 8fd0d39b74..88fcf5752b 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -295,6 +295,7 @@ add_library(grpc
src/core/lib/channel/channel_stack_builder.c
src/core/lib/channel/compress_filter.c
src/core/lib/channel/connected_channel.c
+ src/core/lib/channel/deadline_filter.c
src/core/lib/channel/handshaker.c
src/core/lib/channel/http_client_filter.c
src/core/lib/channel/http_server_filter.c
@@ -552,6 +553,7 @@ add_library(grpc_cronet
src/core/lib/channel/channel_stack_builder.c
src/core/lib/channel/compress_filter.c
src/core/lib/channel/connected_channel.c
+ src/core/lib/channel/deadline_filter.c
src/core/lib/channel/handshaker.c
src/core/lib/channel/http_client_filter.c
src/core/lib/channel/http_server_filter.c
@@ -781,6 +783,7 @@ add_library(grpc_unsecure
src/core/lib/channel/channel_stack_builder.c
src/core/lib/channel/compress_filter.c
src/core/lib/channel/connected_channel.c
+ src/core/lib/channel/deadline_filter.c
src/core/lib/channel/handshaker.c
src/core/lib/channel/http_client_filter.c
src/core/lib/channel/http_server_filter.c
@@ -1037,6 +1040,7 @@ add_library(grpc++
src/core/lib/channel/channel_stack_builder.c
src/core/lib/channel/compress_filter.c
src/core/lib/channel/connected_channel.c
+ src/core/lib/channel/deadline_filter.c
src/core/lib/channel/handshaker.c
src/core/lib/channel/http_client_filter.c
src/core/lib/channel/http_server_filter.c
@@ -1389,6 +1393,7 @@ add_library(grpc++_unsecure
src/core/lib/channel/channel_stack_builder.c
src/core/lib/channel/compress_filter.c
src/core/lib/channel/connected_channel.c
+ src/core/lib/channel/deadline_filter.c
src/core/lib/channel/handshaker.c
src/core/lib/channel/http_client_filter.c
src/core/lib/channel/http_server_filter.c
diff --git a/Makefile b/Makefile
index 978d99177c..639e7f5ed8 100644
--- a/Makefile
+++ b/Makefile
@@ -2532,6 +2532,7 @@ LIBGRPC_SRC = \
src/core/lib/channel/channel_stack_builder.c \
src/core/lib/channel/compress_filter.c \
src/core/lib/channel/connected_channel.c \
+ src/core/lib/channel/deadline_filter.c \
src/core/lib/channel/handshaker.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \
@@ -2807,6 +2808,7 @@ LIBGRPC_CRONET_SRC = \
src/core/lib/channel/channel_stack_builder.c \
src/core/lib/channel/compress_filter.c \
src/core/lib/channel/connected_channel.c \
+ src/core/lib/channel/deadline_filter.c \
src/core/lib/channel/handshaker.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \
@@ -3071,6 +3073,7 @@ LIBGRPC_TEST_UTIL_SRC = \
src/core/lib/channel/channel_stack_builder.c \
src/core/lib/channel/compress_filter.c \
src/core/lib/channel/connected_channel.c \
+ src/core/lib/channel/deadline_filter.c \
src/core/lib/channel/handshaker.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \
@@ -3262,6 +3265,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/channel/channel_stack_builder.c \
src/core/lib/channel/compress_filter.c \
src/core/lib/channel/connected_channel.c \
+ src/core/lib/channel/deadline_filter.c \
src/core/lib/channel/handshaker.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \
@@ -3601,6 +3605,7 @@ LIBGRPC++_SRC = \
src/core/lib/channel/channel_stack_builder.c \
src/core/lib/channel/compress_filter.c \
src/core/lib/channel/connected_channel.c \
+ src/core/lib/channel/deadline_filter.c \
src/core/lib/channel/handshaker.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \
@@ -4228,6 +4233,7 @@ LIBGRPC++_UNSECURE_SRC = \
src/core/lib/channel/channel_stack_builder.c \
src/core/lib/channel/compress_filter.c \
src/core/lib/channel/connected_channel.c \
+ src/core/lib/channel/deadline_filter.c \
src/core/lib/channel/handshaker.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \
diff --git a/binding.gyp b/binding.gyp
index 45d04c8e67..dc215cf5ff 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -570,6 +570,7 @@
'src/core/lib/channel/channel_stack_builder.c',
'src/core/lib/channel/compress_filter.c',
'src/core/lib/channel/connected_channel.c',
+ 'src/core/lib/channel/deadline_filter.c',
'src/core/lib/channel/handshaker.c',
'src/core/lib/channel/http_client_filter.c',
'src/core/lib/channel/http_server_filter.c',
diff --git a/build.yaml b/build.yaml
index a0a3dc53ac..c95bec4169 100644
--- a/build.yaml
+++ b/build.yaml
@@ -172,6 +172,7 @@ filegroups:
- src/core/lib/channel/compress_filter.h
- src/core/lib/channel/connected_channel.h
- src/core/lib/channel/context.h
+ - src/core/lib/channel/deadline_filter.h
- src/core/lib/channel/handshaker.h
- src/core/lib/channel/http_client_filter.h
- src/core/lib/channel/http_server_filter.h
@@ -253,6 +254,7 @@ filegroups:
- src/core/lib/channel/channel_stack_builder.c
- src/core/lib/channel/compress_filter.c
- src/core/lib/channel/connected_channel.c
+ - src/core/lib/channel/deadline_filter.c
- src/core/lib/channel/handshaker.c
- src/core/lib/channel/http_client_filter.c
- src/core/lib/channel/http_server_filter.c
diff --git a/config.m4 b/config.m4
index 6f8b0ce48c..1c52caae48 100644
--- a/config.m4
+++ b/config.m4
@@ -89,6 +89,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/channel/channel_stack_builder.c \
src/core/lib/channel/compress_filter.c \
src/core/lib/channel/connected_channel.c \
+ src/core/lib/channel/deadline_filter.c \
src/core/lib/channel/handshaker.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \
diff --git a/doc/core/pending_api_cleanups.md b/doc/core/pending_api_cleanups.md
index 6d30e0b0e6..a12e8a9dfb 100644
--- a/doc/core/pending_api_cleanups.md
+++ b/doc/core/pending_api_cleanups.md
@@ -13,3 +13,5 @@ number:
- remove `GRPC_ARG_MAX_MESSAGE_LENGTH` channel arg from
`include/grpc/impl/codegen/grpc_types.h` (commit `af00d8b`)
+- remove `ServerBuilder::SetMaxMessageSize()` method from
+ `include/grpc++/server_builder.h` (commit `6980362`)
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 0214880ff9..4d8186190a 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -255,6 +255,7 @@ Pod::Spec.new do |s|
'src/core/lib/channel/compress_filter.h',
'src/core/lib/channel/connected_channel.h',
'src/core/lib/channel/context.h',
+ 'src/core/lib/channel/deadline_filter.h',
'src/core/lib/channel/handshaker.h',
'src/core/lib/channel/http_client_filter.h',
'src/core/lib/channel/http_server_filter.h',
@@ -417,6 +418,7 @@ Pod::Spec.new do |s|
'src/core/lib/channel/channel_stack_builder.c',
'src/core/lib/channel/compress_filter.c',
'src/core/lib/channel/connected_channel.c',
+ 'src/core/lib/channel/deadline_filter.c',
'src/core/lib/channel/handshaker.c',
'src/core/lib/channel/http_client_filter.c',
'src/core/lib/channel/http_server_filter.c',
@@ -624,6 +626,7 @@ Pod::Spec.new do |s|
'src/core/lib/channel/compress_filter.h',
'src/core/lib/channel/connected_channel.h',
'src/core/lib/channel/context.h',
+ 'src/core/lib/channel/deadline_filter.h',
'src/core/lib/channel/handshaker.h',
'src/core/lib/channel/http_client_filter.h',
'src/core/lib/channel/http_server_filter.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index 71763aa6fe..bc28444a0d 100755
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -175,6 +175,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/channel/compress_filter.h )
s.files += %w( src/core/lib/channel/connected_channel.h )
s.files += %w( src/core/lib/channel/context.h )
+ s.files += %w( src/core/lib/channel/deadline_filter.h )
s.files += %w( src/core/lib/channel/handshaker.h )
s.files += %w( src/core/lib/channel/http_client_filter.h )
s.files += %w( src/core/lib/channel/http_server_filter.h )
@@ -337,6 +338,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/channel/channel_stack_builder.c )
s.files += %w( src/core/lib/channel/compress_filter.c )
s.files += %w( src/core/lib/channel/connected_channel.c )
+ s.files += %w( src/core/lib/channel/deadline_filter.c )
s.files += %w( src/core/lib/channel/handshaker.c )
s.files += %w( src/core/lib/channel/http_client_filter.c )
s.files += %w( src/core/lib/channel/http_server_filter.c )
diff --git a/include/grpc++/ext/reflection.grpc.pb.h b/include/grpc++/ext/reflection.grpc.pb.h
index 064117e303..6e56088497 100644
--- a/include/grpc++/ext/reflection.grpc.pb.h
+++ b/include/grpc++/ext/reflection.grpc.pb.h
@@ -32,7 +32,7 @@
*/
-// Generated by the gRPC protobuf plugin.
+// Generated by tools/codegen/extensions/gen_reflection_proto.sh
// If you make any local change, they will be lost.
// source: reflection.proto
// Original file comments:
diff --git a/include/grpc++/ext/reflection.pb.h b/include/grpc++/ext/reflection.pb.h
index bdb86197d0..caa1592424 100644
--- a/include/grpc++/ext/reflection.pb.h
+++ b/include/grpc++/ext/reflection.pb.h
@@ -32,7 +32,7 @@
*/
-// Generated by the protocol buffer compiler. DO NOT EDIT!
+// Generated by tools/codegen/extensions/gen_reflection_proto.sh
// source: reflection.proto
#ifndef PROTOBUF_reflection_2eproto__INCLUDED
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index a8e8ebe6ea..191cdd0e5f 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -148,11 +148,13 @@ typedef struct {
/** Maximum number of concurrent incoming streams to allow on a http2
connection. Int valued. */
#define GRPC_ARG_MAX_CONCURRENT_STREAMS "grpc.max_concurrent_streams"
-/** Maximum message length that the channel can receive. Int valued, bytes. */
+/** Maximum message length that the channel can receive. Int valued, bytes.
+ -1 means unlimited. */
#define GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH "grpc.max_receive_message_length"
/** \deprecated For backward compatibility. */
#define GRPC_ARG_MAX_MESSAGE_LENGTH GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH
-/** Maximum message length that the channel can send. Int valued, bytes. */
+/** Maximum message length that the channel can send. Int valued, bytes.
+ -1 means unlimited. */
#define GRPC_ARG_MAX_SEND_MESSAGE_LENGTH "grpc.max_send_message_length"
/** Initial sequence number for http2 transports. Int valued. */
#define GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER \
diff --git a/package.xml b/package.xml
index 140d116175..3e54e1a599 100644
--- a/package.xml
+++ b/package.xml
@@ -182,6 +182,7 @@
<file baseinstalldir="/" name="src/core/lib/channel/compress_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/connected_channel.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/context.h" role="src" />
+ <file baseinstalldir="/" name="src/core/lib/channel/deadline_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/handshaker.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/http_client_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/http_server_filter.h" role="src" />
@@ -344,6 +345,7 @@
<file baseinstalldir="/" name="src/core/lib/channel/channel_stack_builder.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/compress_filter.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/connected_channel.c" role="src" />
+ <file baseinstalldir="/" name="src/core/lib/channel/deadline_filter.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/handshaker.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/http_client_filter.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/http_server_filter.c" role="src" />
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index b2b4fea83c..feb4cbde7b 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -46,6 +46,7 @@
#include "src/core/ext/client_config/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/channel/deadline_filter.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/profiling/timers.h"
@@ -114,7 +115,7 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_cancel_picks(
exec_ctx, chand->lb_policy,
/* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY,
- /* check= */ 0);
+ /* check= */ 0, GRPC_ERROR_REF(error));
}
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
reason);
@@ -391,6 +392,17 @@ typedef enum {
for initial metadata before trying to create a call object,
and handling cancellation gracefully. */
typedef struct client_channel_call_data {
+ // State for handling deadlines.
+ // The code in deadline_filter.c requires this to be the first field.
+ // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
+ // and this struct both independently store a pointer to the call
+ // stack and each has its own mutex. If/when we have time, find a way
+ // to avoid this without breaking the grpc_deadline_state abstraction.
+ grpc_deadline_state deadline_state;
+ gpr_timespec deadline;
+
+ grpc_error *cancel_error;
+
/** either 0 for no call, 1 for cancelled, or a pointer to a
grpc_subchannel_call */
gpr_atm subchannel_call;
@@ -485,7 +497,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
gpr_atm_no_barrier_store(&calld->subchannel_call, 1);
fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING(
"Failed to create subchannel", &error, 1));
- } else if (1 == gpr_atm_acq_load(&calld->subchannel_call)) {
+ } else if (GET_CALL(calld) == CANCELLED_CALL) {
/* already cancelled before subchannel became ready */
fail_locked(exec_ctx, calld,
GRPC_ERROR_CREATE_REFERENCING(
@@ -493,7 +505,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
} else {
grpc_subchannel_call *subchannel_call = NULL;
grpc_error *new_error = grpc_connected_subchannel_create_call(
- exec_ctx, calld->connected_subchannel, calld->pollent,
+ exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline,
&subchannel_call);
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
@@ -531,7 +543,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
- grpc_closure *on_ready);
+ grpc_closure *on_ready, grpc_error *error);
static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
@@ -542,7 +554,8 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg,
grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL);
} else if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
cpa->initial_metadata_flags,
- cpa->connected_subchannel, cpa->on_ready)) {
+ cpa->connected_subchannel, cpa->on_ready,
+ GRPC_ERROR_NONE)) {
grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL);
}
gpr_free(cpa);
@@ -552,7 +565,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
- grpc_closure *on_ready) {
+ grpc_closure *on_ready, grpc_error *error) {
GPR_TIMER_BEGIN("pick_subchannel", 0);
channel_data *chand = elem->channel_data;
@@ -566,21 +579,24 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
if (initial_metadata == NULL) {
if (chand->lb_policy != NULL) {
grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy,
- connected_subchannel);
+ connected_subchannel, GRPC_ERROR_REF(error));
}
for (closure = chand->waiting_for_config_closures.head; closure != NULL;
closure = closure->next_data.next) {
cpa = closure->cb_arg;
if (cpa->connected_subchannel == connected_subchannel) {
cpa->connected_subchannel = NULL;
- grpc_exec_ctx_sched(exec_ctx, cpa->on_ready,
- GRPC_ERROR_CREATE("Pick cancelled"), NULL);
+ grpc_exec_ctx_sched(
+ exec_ctx, cpa->on_ready,
+ GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL);
}
}
gpr_mu_unlock(&chand->mu);
GPR_TIMER_END("pick_subchannel", 0);
+ GRPC_ERROR_UNREF(error);
return true;
}
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
if (chand->lb_policy != NULL) {
grpc_lb_policy *lb_policy = chand->lb_policy;
int r;
@@ -631,12 +647,13 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
/* try to (atomically) get the call */
grpc_subchannel_call *call = GET_CALL(calld);
GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0);
if (call == CANCELLED_CALL) {
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
- GRPC_ERROR_CANCELLED);
+ grpc_transport_stream_op_finish_with_failure(
+ exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
GPR_TIMER_END("cc_start_transport_stream_op", 0);
return;
}
@@ -652,8 +669,8 @@ retry:
call = GET_CALL(calld);
if (call == CANCELLED_CALL) {
gpr_mu_unlock(&calld->mu);
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
- GRPC_ERROR_CANCELLED);
+ grpc_transport_stream_op_finish_with_failure(
+ exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
GPR_TIMER_END("cc_start_transport_stream_op", 0);
return;
}
@@ -669,18 +686,24 @@ retry:
(gpr_atm)(uintptr_t)CANCELLED_CALL)) {
goto retry;
} else {
+ // Stash a copy of cancel_error in our call data, so that we can use
+ // it for subsequent operations. This ensures that if the call is
+ // cancelled before any ops are passed down (e.g., if the deadline
+ // is in the past when the call starts), we can return the right
+ // error to the caller when the first op does get passed down.
+ calld->cancel_error = GRPC_ERROR_REF(op->cancel_error);
switch (calld->creation_phase) {
case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error));
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
pick_subchannel(exec_ctx, elem, NULL, 0, &calld->connected_subchannel,
- NULL);
+ NULL, GRPC_ERROR_REF(op->cancel_error));
break;
}
gpr_mu_unlock(&calld->mu);
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
- GRPC_ERROR_CANCELLED);
+ grpc_transport_stream_op_finish_with_failure(
+ exec_ctx, op, GRPC_ERROR_REF(op->cancel_error));
GPR_TIMER_END("cc_start_transport_stream_op", 0);
return;
}
@@ -694,7 +717,8 @@ retry:
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata,
op->send_initial_metadata_flags,
- &calld->connected_subchannel, &calld->next_step)) {
+ &calld->connected_subchannel, &calld->next_step,
+ GRPC_ERROR_NONE)) {
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
}
@@ -704,7 +728,7 @@ retry:
calld->connected_subchannel != NULL) {
grpc_subchannel_call *subchannel_call = NULL;
grpc_error *error = grpc_connected_subchannel_create_call(
- exec_ctx, calld->connected_subchannel, calld->pollent,
+ exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline,
&subchannel_call);
if (error != GRPC_ERROR_NONE) {
subchannel_call = CANCELLED_CALL;
@@ -727,6 +751,9 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *calld = elem->call_data;
+ grpc_deadline_state_init(exec_ctx, elem, args);
+ calld->deadline = args->deadline;
+ calld->cancel_error = GRPC_ERROR_NONE;
gpr_atm_rel_store(&calld->subchannel_call, 0);
gpr_mu_init(&calld->mu);
calld->connected_subchannel = NULL;
@@ -745,6 +772,8 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
const grpc_call_final_info *final_info,
void *and_free_memory) {
call_data *calld = elem->call_data;
+ grpc_deadline_state_destroy(exec_ctx, elem);
+ GRPC_ERROR_UNREF(calld->cancel_error);
grpc_subchannel_call *call = GET_CALL(calld);
if (call != NULL && call != CANCELLED_CALL) {
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");
diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c
index 903563ef6b..46391272a6 100644
--- a/src/core/ext/client_config/lb_policy.c
+++ b/src/core/ext/client_config/lb_policy.c
@@ -108,16 +108,18 @@ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
}
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_connected_subchannel **target) {
- policy->vtable->cancel_pick(exec_ctx, policy, target);
+ grpc_connected_subchannel **target,
+ grpc_error *error) {
+ policy->vtable->cancel_pick(exec_ctx, policy, target, error);
}
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq) {
+ uint32_t initial_metadata_flags_eq,
+ grpc_error *error) {
policy->vtable->cancel_picks(exec_ctx, policy, initial_metadata_flags_mask,
- initial_metadata_flags_eq);
+ initial_metadata_flags_eq, error);
}
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h
index 37c93d707c..7fb3e08cb3 100644
--- a/src/core/ext/client_config/lb_policy.h
+++ b/src/core/ext/client_config/lb_policy.h
@@ -77,12 +77,12 @@ struct grpc_lb_policy_vtable {
/** \see grpc_lb_policy_cancel_pick */
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_connected_subchannel **target);
+ grpc_connected_subchannel **target, grpc_error *error);
/** \see grpc_lb_policy_cancel_picks */
void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq);
+ uint32_t initial_metadata_flags_eq, grpc_error *error);
/** \see grpc_lb_policy_ping_one */
void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
@@ -161,7 +161,8 @@ void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
The \a on_complete callback of the pending picks will be invoked with \a
*target set to NULL. */
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_connected_subchannel **target);
+ grpc_connected_subchannel **target,
+ grpc_error *error);
/** Cancel all pending picks for which their \a initial_metadata_flags (as given
in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq
@@ -169,7 +170,8 @@ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq);
+ uint32_t initial_metadata_flags_eq,
+ grpc_error *error);
/** Try to enter a READY connectivity state */
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index 96595a38dd..f26f9d6ab7 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -629,7 +629,9 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
c->have_alarm = 1;
grpc_connectivity_state_set(
exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
- GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1),
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
"connect_failed");
gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now);
const char *errmsg = grpc_error_string(error);
@@ -698,14 +700,15 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
- grpc_polling_entity *pollent, grpc_subchannel_call **call) {
+ grpc_polling_entity *pollent, gpr_timespec deadline,
+ grpc_subchannel_call **call) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
*call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
(*call)->connection = con; // Ref is added below.
grpc_error *error =
grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call,
- NULL, NULL, callstk);
+ NULL, NULL, deadline, callstk);
if (error != GRPC_ERROR_NONE) {
const char *error_string = grpc_error_string(error);
gpr_log(GPR_ERROR, "error: %s", error_string);
diff --git a/src/core/ext/client_config/subchannel.h b/src/core/ext/client_config/subchannel.h
index 218bb43e0a..3330621071 100644
--- a/src/core/ext/client_config/subchannel.h
+++ b/src/core/ext/client_config/subchannel.h
@@ -110,7 +110,8 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
/** construct a subchannel call */
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel,
- grpc_polling_entity *pollent, grpc_subchannel_call **subchannel_call);
+ grpc_polling_entity *pollent, gpr_timespec deadline,
+ grpc_subchannel_call **subchannel_call);
/** process a transport level op */
void grpc_connected_subchannel_process_transport_op(
diff --git a/src/core/ext/client_config/subchannel_factory.c b/src/core/ext/client_config/subchannel_factory.c
deleted file mode 100644
index d1e4d75a02..0000000000
--- a/src/core/ext/client_config/subchannel_factory.c
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/ext/client_config/subchannel_factory.h"
-
-void grpc_subchannel_factory_ref(grpc_subchannel_factory* factory) {
- factory->vtable->ref(factory);
-}
-
-void grpc_subchannel_factory_unref(grpc_exec_ctx* exec_ctx,
- grpc_subchannel_factory* factory) {
- factory->vtable->unref(exec_ctx, factory);
-}
-
-grpc_subchannel* grpc_subchannel_factory_create_subchannel(
- grpc_exec_ctx* exec_ctx, grpc_subchannel_factory* factory,
- grpc_subchannel_args* args) {
- return factory->vtable->create_subchannel(exec_ctx, factory, args);
-}
diff --git a/src/core/ext/client_config/subchannel_factory.h b/src/core/ext/client_config/subchannel_factory.h
deleted file mode 100644
index 0fb806d081..0000000000
--- a/src/core/ext/client_config/subchannel_factory.h
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H
-#define GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H
-
-#include "src/core/ext/client_config/subchannel.h"
-#include "src/core/lib/channel/channel_stack.h"
-
-typedef struct grpc_subchannel_factory grpc_subchannel_factory;
-typedef struct grpc_subchannel_factory_vtable grpc_subchannel_factory_vtable;
-
-/** Constructor for new configured channels.
- Creating decorators around this type is encouraged to adapt behavior. */
-struct grpc_subchannel_factory {
- const grpc_subchannel_factory_vtable *vtable;
-};
-
-struct grpc_subchannel_factory_vtable {
- void (*ref)(grpc_subchannel_factory *factory);
- void (*unref)(grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory);
- grpc_subchannel *(*create_subchannel)(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_factory *factory,
- grpc_subchannel_args *args);
-};
-
-void grpc_subchannel_factory_ref(grpc_subchannel_factory *factory);
-void grpc_subchannel_factory_unref(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_factory *factory);
-
-/** Create a new grpc_subchannel */
-grpc_subchannel *grpc_subchannel_factory_create_subchannel(
- grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory,
- grpc_subchannel_args *args);
-
-#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H */
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index f45cff1a17..0050489425 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -688,7 +688,8 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_connected_subchannel **target) {
+ grpc_connected_subchannel **target,
+ grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
gpr_mu_lock(&glb_policy->mu);
pending_pick *pp = glb_policy->pending_picks;
@@ -699,8 +700,10 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity_del_from_pollset_set(
exec_ctx, pp->pollent, glb_policy->base.interested_parties);
*target = NULL;
- grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
- GRPC_ERROR_CANCELLED, NULL);
+ grpc_exec_ctx_sched(
+ exec_ctx, &pp->wrapped_on_complete,
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
+ gpr_free(pp);
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@@ -708,12 +711,14 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&glb_policy->mu);
+ GRPC_ERROR_UNREF(error);
}
static grpc_call *lb_client_data_get_call(struct lb_client_data *lb_client);
static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq) {
+ uint32_t initial_metadata_flags_eq,
+ grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
gpr_mu_lock(&glb_policy->mu);
if (glb_policy->lb_client != NULL) {
@@ -728,8 +733,10 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
initial_metadata_flags_eq) {
grpc_polling_entity_del_from_pollset_set(
exec_ctx, pp->pollent, glb_policy->base.interested_parties);
- grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
- GRPC_ERROR_CANCELLED, NULL);
+ grpc_exec_ctx_sched(
+ exec_ctx, &pp->wrapped_on_complete,
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
+ gpr_free(pp);
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@@ -737,6 +744,7 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&glb_policy->mu);
+ GRPC_ERROR_UNREF(error);
}
static void query_for_backends(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 466a0fdede..961a0c9b19 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -128,7 +128,8 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_connected_subchannel **target) {
+ grpc_connected_subchannel **target,
+ grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@@ -140,8 +141,9 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
*target = NULL;
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE("Pick Cancelled"), NULL);
+ grpc_exec_ctx_sched(
+ exec_ctx, pp->on_complete,
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -150,11 +152,13 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&p->mu);
+ GRPC_ERROR_UNREF(error);
}
static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq) {
+ uint32_t initial_metadata_flags_eq,
+ grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@@ -166,8 +170,9 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
initial_metadata_flags_eq) {
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
- GRPC_ERROR_CREATE("Pick Cancelled"), NULL);
+ grpc_exec_ctx_sched(
+ exec_ctx, pp->on_complete,
+ GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -176,6 +181,7 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&p->mu);
+ GRPC_ERROR_UNREF(error);
}
static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
@@ -466,6 +472,8 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
}
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
+ /* server_name will be copied as part of the subchannel creation. This makes
+ * the copying of args->server_name (a borrowed pointer) OK. */
sc_args.server_name = args->server_name;
sc_args.addr =
(struct sockaddr *)(&args->addresses->addresses[i].address.addr);
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 037f180a9e..930fa86aca 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -308,7 +308,8 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_connected_subchannel **target) {
+ grpc_connected_subchannel **target,
+ grpc_error *error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@@ -320,8 +321,9 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
*target = NULL;
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED,
- NULL);
+ grpc_exec_ctx_sched(
+ exec_ctx, pp->on_complete,
+ GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -330,11 +332,13 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&p->mu);
+ GRPC_ERROR_UNREF(error);
}
static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_mask,
- uint32_t initial_metadata_flags_eq) {
+ uint32_t initial_metadata_flags_eq,
+ grpc_error *error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@@ -347,8 +351,9 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
*pp->target = NULL;
- grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED,
- NULL);
+ grpc_exec_ctx_sched(
+ exec_ctx, pp->on_complete,
+ GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -357,6 +362,7 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&p->mu);
+ GRPC_ERROR_UNREF(error);
}
static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
@@ -629,6 +635,8 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
if (args->addresses->addresses[i].is_balancer) continue;
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
+ /* server_name will be copied as part of the subchannel creation. This makes
+ * the copying of args->server_name (a borrowed pointer) OK. */
sc_args.server_name = args->server_name;
sc_args.addr =
(struct sockaddr *)(&args->addresses->addresses[i].address.addr);
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 0655b9353f..57d34d9e9a 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -158,13 +158,11 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
}
}
-grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack *channel_stack,
- int initial_refs, grpc_iomgr_cb_func destroy,
- void *destroy_arg,
- grpc_call_context_element *context,
- const void *transport_server_data,
- grpc_call_stack *call_stack) {
+grpc_error *grpc_call_stack_init(
+ grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
+ int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
+ grpc_call_context_element *context, const void *transport_server_data,
+ gpr_timespec deadline, grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
grpc_call_element_args args;
size_t count = channel_stack->count;
@@ -185,6 +183,7 @@ grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
args.call_stack = call_stack;
args.server_transport_data = transport_server_data;
args.context = context;
+ args.deadline = deadline;
call_elems[i].filter = channel_elems[i].filter;
call_elems[i].channel_data = channel_elems[i].channel_data;
call_elems[i].call_data = user_data;
@@ -276,16 +275,16 @@ static void destroy_op(grpc_exec_ctx *exec_ctx, void *op, grpc_error *error) {
}
void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx,
- grpc_call_element *cur_elem) {
+ grpc_call_element *elem) {
grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
memset(op, 0, sizeof(*op));
op->cancel_error = GRPC_ERROR_CANCELLED;
op->on_complete = grpc_closure_create(destroy_op, op);
- grpc_call_next_op(exec_ctx, cur_elem, op);
+ elem->filter->start_transport_stream_op(exec_ctx, elem, op);
}
void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
- grpc_call_element *cur_elem,
+ grpc_call_element *elem,
grpc_status_code status,
gpr_slice *optional_message) {
grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
@@ -293,5 +292,16 @@ void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
op->on_complete = grpc_closure_create(destroy_op, op);
grpc_transport_stream_op_add_cancellation_with_message(op, status,
optional_message);
- grpc_call_next_op(exec_ctx, cur_elem, op);
+ elem->filter->start_transport_stream_op(exec_ctx, elem, op);
+}
+
+void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_status_code status,
+ gpr_slice *optional_message) {
+ grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
+ memset(op, 0, sizeof(*op));
+ op->on_complete = grpc_closure_create(destroy_op, op);
+ grpc_transport_stream_op_add_close(op, status, optional_message);
+ elem->filter->start_transport_stream_op(exec_ctx, elem, op);
}
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index 6b73cce380..1cfe2885d8 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -74,6 +74,7 @@ typedef struct {
grpc_call_stack *call_stack;
const void *server_transport_data;
grpc_call_context_element *context;
+ gpr_timespec deadline;
} grpc_call_element_args;
typedef struct {
@@ -220,13 +221,11 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
/* Initialize a call stack given a channel stack. transport_server_data is
expected to be NULL on a client, or an opaque transport owned pointer on the
server. */
-grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack *channel_stack,
- int initial_refs, grpc_iomgr_cb_func destroy,
- void *destroy_arg,
- grpc_call_context_element *context,
- const void *transport_server_data,
- grpc_call_stack *call_stack);
+grpc_error *grpc_call_stack_init(
+ grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
+ int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
+ grpc_call_context_element *context, const void *transport_server_data,
+ gpr_timespec deadline, grpc_call_stack *call_stack);
/* Set a pollset or a pollset_set for a call stack: must occur before the first
* op is started */
void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
@@ -290,6 +289,11 @@ void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
grpc_status_code status,
gpr_slice *optional_message);
+void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *cur_elem,
+ grpc_status_code status,
+ gpr_slice *optional_message);
+
extern int grpc_trace_channel;
#define GRPC_CALL_LOG_OP(sev, elem, op) \
diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c
new file mode 100644
index 0000000000..079b98a2f8
--- /dev/null
+++ b/src/core/lib/channel/deadline_filter.c
@@ -0,0 +1,302 @@
+//
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+#include "src/core/lib/channel/deadline_filter.h"
+
+#include <stdbool.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/time.h>
+
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/timer.h"
+
+//
+// grpc_deadline_state
+//
+
+// Timer callback.
+static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_call_element* elem = arg;
+ grpc_deadline_state* deadline_state = elem->call_data;
+ gpr_mu_lock(&deadline_state->timer_mu);
+ deadline_state->timer_pending = false;
+ gpr_mu_unlock(&deadline_state->timer_mu);
+ if (error != GRPC_ERROR_CANCELLED) {
+ gpr_slice msg = gpr_slice_from_static_string("Deadline Exceeded");
+ grpc_call_element_send_cancel_with_message(
+ exec_ctx, elem, GRPC_STATUS_DEADLINE_EXCEEDED, &msg);
+ gpr_slice_unref(msg);
+ }
+ GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer");
+}
+
+// Starts the deadline timer.
+static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ gpr_timespec deadline) {
+ grpc_deadline_state* deadline_state = elem->call_data;
+ deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
+ if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
+ // Take a reference to the call stack, to be owned by the timer.
+ GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
+ gpr_mu_lock(&deadline_state->timer_mu);
+ deadline_state->timer_pending = true;
+ grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, timer_callback,
+ elem, gpr_now(GPR_CLOCK_MONOTONIC));
+ gpr_mu_unlock(&deadline_state->timer_mu);
+ }
+}
+
+// Cancels the deadline timer.
+static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
+ grpc_deadline_state* deadline_state) {
+ gpr_mu_lock(&deadline_state->timer_mu);
+ if (deadline_state->timer_pending) {
+ grpc_timer_cancel(exec_ctx, &deadline_state->timer);
+ deadline_state->timer_pending = false;
+ }
+ gpr_mu_unlock(&deadline_state->timer_mu);
+}
+
+// Callback run when the call is complete.
+static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+ grpc_deadline_state* deadline_state = arg;
+ cancel_timer_if_needed(exec_ctx, deadline_state);
+ // Invoke the next callback.
+ deadline_state->next_on_complete->cb(
+ exec_ctx, deadline_state->next_on_complete->cb_arg, error);
+}
+
+// Inject our own on_complete callback into op.
+static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
+ grpc_transport_stream_op* op) {
+ deadline_state->next_on_complete = op->on_complete;
+ grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state);
+ op->on_complete = &deadline_state->on_complete;
+}
+
+// Callback and associated state for starting the timer after call stack
+// initialization has been completed.
+struct start_timer_after_init_state {
+ grpc_call_element* elem;
+ gpr_timespec deadline;
+ grpc_closure closure;
+};
+static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ struct start_timer_after_init_state* state = arg;
+ start_timer_if_needed(exec_ctx, state->elem, state->deadline);
+ gpr_free(state);
+}
+
+void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_call_element_args* args) {
+ grpc_deadline_state* deadline_state = elem->call_data;
+ memset(deadline_state, 0, sizeof(*deadline_state));
+ deadline_state->call_stack = args->call_stack;
+ gpr_mu_init(&deadline_state->timer_mu);
+ // Deadline will always be infinite on servers, so the timer will only be
+ // set on clients with a finite deadline.
+ const gpr_timespec deadline =
+ gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
+ if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
+ // When the deadline passes, we indicate the failure by sending down
+ // an op with cancel_error set. However, we can't send down any ops
+ // until after the call stack is fully initialized. If we start the
+ // timer here, we have no guarantee that the timer won't pop before
+ // call stack initialization is finished. To avoid that problem, we
+ // create a closure to start the timer, and we schedule that closure
+ // to be run after call stack initialization is done.
+ struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state));
+ state->elem = elem;
+ state->deadline = deadline;
+ grpc_closure_init(&state->closure, start_timer_after_init, state);
+ grpc_exec_ctx_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE, NULL);
+ }
+}
+
+void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem) {
+ grpc_deadline_state* deadline_state = elem->call_data;
+ cancel_timer_if_needed(exec_ctx, deadline_state);
+ gpr_mu_destroy(&deadline_state->timer_mu);
+}
+
+void grpc_deadline_state_client_start_transport_stream_op(
+ grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_transport_stream_op* op) {
+ grpc_deadline_state* deadline_state = elem->call_data;
+ if (op->cancel_error != GRPC_ERROR_NONE ||
+ op->close_error != GRPC_ERROR_NONE) {
+ cancel_timer_if_needed(exec_ctx, deadline_state);
+ } else {
+ // Make sure we know when the call is complete, so that we can cancel
+ // the timer.
+ if (op->recv_trailing_metadata != NULL) {
+ inject_on_complete_cb(deadline_state, op);
+ }
+ }
+}
+
+//
+// filter code
+//
+
+// Constructor for channel_data. Used for both client and server filters.
+static void init_channel_elem(grpc_exec_ctx* exec_ctx,
+ grpc_channel_element* elem,
+ grpc_channel_element_args* args) {
+ GPR_ASSERT(!args->is_last);
+}
+
+// Destructor for channel_data. Used for both client and server filters.
+static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
+ grpc_channel_element* elem) {}
+
+// Call data used for both client and server filter.
+typedef struct base_call_data {
+ grpc_deadline_state deadline_state;
+} base_call_data;
+
+// Additional call data used only for the server filter.
+typedef struct server_call_data {
+ base_call_data base; // Must be first.
+ // The closure for receiving initial metadata.
+ grpc_closure recv_initial_metadata_ready;
+ // Received initial metadata batch.
+ grpc_metadata_batch* recv_initial_metadata;
+ // The original recv_initial_metadata_ready closure, which we chain to
+ // after our own closure is invoked.
+ grpc_closure* next_recv_initial_metadata_ready;
+} server_call_data;
+
+// Constructor for call_data. Used for both client and server filters.
+static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ grpc_call_element_args* args) {
+ // Note: size of call data is different between client and server.
+ memset(elem->call_data, 0, elem->filter->sizeof_call_data);
+ grpc_deadline_state_init(exec_ctx, elem, args);
+ return GRPC_ERROR_NONE;
+}
+
+// Destructor for call_data. Used for both client and server filters.
+static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ const grpc_call_final_info* final_info,
+ void* and_free_memory) {
+ grpc_deadline_state_destroy(exec_ctx, elem);
+}
+
+// Method for starting a call op for client filter.
+static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ grpc_transport_stream_op* op) {
+ grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
+ // Chain to next filter.
+ grpc_call_next_op(exec_ctx, elem, op);
+}
+
+// Callback for receiving initial metadata on the server.
+static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_call_element* elem = arg;
+ server_call_data* calld = elem->call_data;
+ // Get deadline from metadata and start the timer if needed.
+ start_timer_if_needed(exec_ctx, elem, calld->recv_initial_metadata->deadline);
+ // Invoke the next callback.
+ calld->next_recv_initial_metadata_ready->cb(
+ exec_ctx, calld->next_recv_initial_metadata_ready->cb_arg, error);
+}
+
+// Method for starting a call op for server filter.
+static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ grpc_transport_stream_op* op) {
+ server_call_data* calld = elem->call_data;
+ if (op->cancel_error != GRPC_ERROR_NONE ||
+ op->close_error != GRPC_ERROR_NONE) {
+ cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state);
+ } else {
+ // If we're receiving initial metadata, we need to get the deadline
+ // from the recv_initial_metadata_ready callback. So we inject our
+ // own callback into that hook.
+ if (op->recv_initial_metadata_ready != NULL) {
+ calld->next_recv_initial_metadata_ready = op->recv_initial_metadata_ready;
+ calld->recv_initial_metadata = op->recv_initial_metadata;
+ grpc_closure_init(&calld->recv_initial_metadata_ready,
+ recv_initial_metadata_ready, elem);
+ op->recv_initial_metadata_ready = &calld->recv_initial_metadata_ready;
+ }
+ // Make sure we know when the call is complete, so that we can cancel
+ // the timer.
+ // Note that we trigger this on recv_trailing_metadata, even though
+ // the client never sends trailing metadata, because this is the
+ // hook that tells us when the call is complete on the server side.
+ if (op->recv_trailing_metadata != NULL) {
+ inject_on_complete_cb(&calld->base.deadline_state, op);
+ }
+ }
+ // Chain to next filter.
+ grpc_call_next_op(exec_ctx, elem, op);
+}
+
+const grpc_channel_filter grpc_client_deadline_filter = {
+ client_start_transport_stream_op,
+ grpc_channel_next_op,
+ sizeof(base_call_data),
+ init_call_elem,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
+ destroy_call_elem,
+ 0, // sizeof(channel_data)
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ "deadline",
+};
+
+const grpc_channel_filter grpc_server_deadline_filter = {
+ server_start_transport_stream_op,
+ grpc_channel_next_op,
+ sizeof(server_call_data),
+ init_call_elem,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
+ destroy_call_elem,
+ 0, // sizeof(channel_data)
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ "deadline",
+};
diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h
new file mode 100644
index 0000000000..685df87761
--- /dev/null
+++ b/src/core/lib/channel/deadline_filter.h
@@ -0,0 +1,79 @@
+//
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+#ifndef GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H
+#define GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H
+
+#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/iomgr/timer.h"
+
+// State used for filters that enforce call deadlines.
+// Must be the first field in the filter's call_data.
+typedef struct grpc_deadline_state {
+ // We take a reference to the call stack for the timer callback.
+ grpc_call_stack* call_stack;
+ // Guards access to timer_pending and timer.
+ gpr_mu timer_mu;
+ // True if the timer callback is currently pending.
+ bool timer_pending;
+ // The deadline timer.
+ grpc_timer timer;
+ // Closure to invoke when the call is complete.
+ // We use this to cancel the timer.
+ grpc_closure on_complete;
+ // The original on_complete closure, which we chain to after our own
+ // closure is invoked.
+ grpc_closure* next_on_complete;
+} grpc_deadline_state;
+
+// To be used in a filter's init_call_elem(), destroy_call_elem(), and
+// start_transport_stream_op() methods to enforce call deadlines.
+//
+// REQUIRES: The first field in elem->call_data is a grpc_deadline_state.
+//
+// For grpc_deadline_state_client_start_transport_stream_op(), it is the
+// caller's responsibility to chain to the next filter if necessary
+// after the function returns.
+void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_call_element_args* args);
+void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem);
+void grpc_deadline_state_client_start_transport_stream_op(
+ grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_transport_stream_op* op);
+
+// Deadline filters for direct client channels and server channels.
+// Note: Deadlines for non-direct client channels are handled by the
+// client_channel filter.
+extern const grpc_channel_filter grpc_client_deadline_filter;
+extern const grpc_channel_filter grpc_server_deadline_filter;
+
+#endif /* GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H */
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index edcc741ff6..1dc05fb20d 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -103,8 +103,8 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) {
grpc_mdstr_as_c_string(md->value));
gpr_slice message = gpr_slice_from_copied_string(message_string);
gpr_free(message_string);
- grpc_call_element_send_cancel_with_message(a->exec_ctx, a->elem,
- GRPC_STATUS_CANCELLED, &message);
+ grpc_call_element_send_close_with_message(a->exec_ctx, a->elem,
+ GRPC_STATUS_CANCELLED, &message);
return NULL;
} else if (md == GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC) {
return NULL;
diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c
index 6613785dea..02fc68fc3a 100644
--- a/src/core/lib/channel/message_size_filter.c
+++ b/src/core/lib/channel/message_size_filter.c
@@ -40,8 +40,9 @@
#include "src/core/lib/channel/channel_args.h"
+#define DEFAULT_MAX_SEND_MESSAGE_LENGTH -1 // Unlimited.
// The protobuf library will (by default) start warning at 100 megs.
-#define DEFAULT_MAX_MESSAGE_LENGTH (4 * 1024 * 1024)
+#define DEFAULT_MAX_RECV_MESSAGE_LENGTH (4 * 1024 * 1024)
typedef struct call_data {
// Receive closures are chained: we inject this closure as the
@@ -55,8 +56,8 @@ typedef struct call_data {
} call_data;
typedef struct channel_data {
- size_t max_send_size;
- size_t max_recv_size;
+ int max_send_size;
+ int max_recv_size;
} channel_data;
// Callback invoked when we receive a message. Here we check the max
@@ -66,15 +67,15 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
grpc_call_element* elem = user_data;
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
- if (*calld->recv_message != NULL &&
- (*calld->recv_message)->length > chand->max_recv_size) {
+ if (*calld->recv_message != NULL && chand->max_recv_size >= 0 &&
+ (*calld->recv_message)->length > (size_t)chand->max_recv_size) {
char* message_string;
- gpr_asprintf(
- &message_string, "Received message larger than max (%u vs. %lu)",
- (*calld->recv_message)->length, (unsigned long)chand->max_recv_size);
+ gpr_asprintf(&message_string,
+ "Received message larger than max (%u vs. %d)",
+ (*calld->recv_message)->length, chand->max_recv_size);
gpr_slice message = gpr_slice_from_copied_string(message_string);
gpr_free(message_string);
- grpc_call_element_send_cancel_with_message(
+ grpc_call_element_send_close_with_message(
exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message);
}
// Invoke the next callback.
@@ -88,14 +89,14 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
// Check max send message size.
- if (op->send_message != NULL &&
- op->send_message->length > chand->max_send_size) {
+ if (op->send_message != NULL && chand->max_send_size >= 0 &&
+ op->send_message->length > (size_t)chand->max_send_size) {
char* message_string;
- gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %lu)",
- op->send_message->length, (unsigned long)chand->max_send_size);
+ gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)",
+ op->send_message->length, chand->max_send_size);
gpr_slice message = gpr_slice_from_copied_string(message_string);
gpr_free(message_string);
- grpc_call_element_send_cancel_with_message(
+ grpc_call_element_send_close_with_message(
exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message);
}
// Inject callback for receiving a message.
@@ -130,19 +131,22 @@ static void init_channel_elem(grpc_exec_ctx* exec_ctx,
GPR_ASSERT(!args->is_last);
channel_data* chand = elem->channel_data;
memset(chand, 0, sizeof(*chand));
- chand->max_send_size = DEFAULT_MAX_MESSAGE_LENGTH;
- chand->max_recv_size = DEFAULT_MAX_MESSAGE_LENGTH;
- const grpc_integer_options options = {DEFAULT_MAX_MESSAGE_LENGTH, 0, INT_MAX};
+ chand->max_send_size = DEFAULT_MAX_SEND_MESSAGE_LENGTH;
+ chand->max_recv_size = DEFAULT_MAX_RECV_MESSAGE_LENGTH;
for (size_t i = 0; i < args->channel_args->num_args; ++i) {
if (strcmp(args->channel_args->args[i].key,
GRPC_ARG_MAX_SEND_MESSAGE_LENGTH) == 0) {
- chand->max_send_size = (size_t)grpc_channel_arg_get_integer(
- &args->channel_args->args[i], options);
+ const grpc_integer_options options = {DEFAULT_MAX_SEND_MESSAGE_LENGTH, 0,
+ INT_MAX};
+ chand->max_send_size =
+ grpc_channel_arg_get_integer(&args->channel_args->args[i], options);
}
if (strcmp(args->channel_args->args[i].key,
GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH) == 0) {
- chand->max_recv_size = (size_t)grpc_channel_arg_get_integer(
- &args->channel_args->args[i], options);
+ const grpc_integer_options options = {DEFAULT_MAX_RECV_MESSAGE_LENGTH, 0,
+ INT_MAX};
+ chand->max_recv_size =
+ grpc_channel_arg_get_integer(&args->channel_args->args[i], options);
}
}
}
diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c
index 45ef75e04d..38fd1e0960 100644
--- a/src/core/lib/iomgr/error.c
+++ b/src/core/lib/iomgr/error.c
@@ -324,6 +324,64 @@ const char *grpc_error_get_str(grpc_error *err, grpc_error_strs which) {
return gpr_avl_get(err->strs, (void *)(uintptr_t)which);
}
+typedef struct {
+ grpc_error *error;
+ grpc_status_code code;
+ const char *msg;
+} special_error_status_map;
+static special_error_status_map error_status_map[] = {
+ {GRPC_ERROR_NONE, GRPC_STATUS_OK, ""},
+ {GRPC_ERROR_CANCELLED, GRPC_STATUS_CANCELLED, "RPC cancelled"},
+ {GRPC_ERROR_OOM, GRPC_STATUS_RESOURCE_EXHAUSTED, "Out of memory"},
+};
+
+static grpc_error *recursively_find_error_with_status(grpc_error *error,
+ intptr_t *status) {
+ // If the error itself has a status code, return it.
+ if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, status)) {
+ return error;
+ }
+ // Otherwise, search through its children.
+ intptr_t key = 0;
+ while (true) {
+ grpc_error *child_error = gpr_avl_get(error->errs, (void *)key++);
+ if (child_error == NULL) break;
+ grpc_error *result =
+ recursively_find_error_with_status(child_error, status);
+ if (result != NULL) return result;
+ }
+ return NULL;
+}
+
+void grpc_error_get_status(grpc_error *error, grpc_status_code *code,
+ const char **msg) {
+ // Handle special errors via the static map.
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(error_status_map); ++i) {
+ if (error == error_status_map[i].error) {
+ *code = error_status_map[i].code;
+ *msg = error_status_map[i].msg;
+ return;
+ }
+ }
+ // Populate code.
+ // Start with the parent error and recurse through the tree of children
+ // until we find the first one that has a status code.
+ intptr_t status = GRPC_STATUS_UNKNOWN; // Default in case we don't find one.
+ grpc_error *found_error = recursively_find_error_with_status(error, &status);
+ *code = (grpc_status_code)status;
+ // Now populate msg.
+ // If we found an error with a status code above, use that; otherwise,
+ // fall back to using the parent error.
+ if (found_error == NULL) found_error = error;
+ // If the error has a status message, use it. Otherwise, fall back to
+ // the error description.
+ *msg = grpc_error_get_str(found_error, GRPC_ERROR_STR_GRPC_MESSAGE);
+ if (*msg == NULL) {
+ *msg = grpc_error_get_str(found_error, GRPC_ERROR_STR_DESCRIPTION);
+ if (*msg == NULL) *msg = "uknown error"; // Just in case.
+ }
+}
+
grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) {
GPR_TIMER_BEGIN("grpc_error_add_child", 0);
grpc_error *new = copy_error_and_unref(src);
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index 2ab3ef9f40..2d715500d1 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -37,6 +37,7 @@
#include <stdbool.h>
#include <stdint.h>
+#include <grpc/status.h>
#include <grpc/support/time.h>
/// Opaque representation of an error.
@@ -179,6 +180,13 @@ grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which,
/// Returns NULL if the specified string is not set.
/// Caller does NOT own return value.
const char *grpc_error_get_str(grpc_error *error, grpc_error_strs which);
+
+/// A utility function to get the status code and message to be returned
+/// to the application. If not set in the top-level message, looks
+/// through child errors until it finds the first one with these attributes.
+void grpc_error_get_status(grpc_error *error, grpc_status_code *code,
+ const char **msg);
+
/// Add a child error: an error that is believed to have contributed to this
/// error occurring. Allows root causing high level errors from lower level
/// errors that contributed to them.
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index 80c7a3f128..3496b6094f 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -146,61 +146,57 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
grpc_timer_cancel(exec_ctx, &ac->alarm);
gpr_mu_lock(&ac->mu);
- if (error == GRPC_ERROR_NONE) {
- do {
- so_error_size = sizeof(so_error);
- err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error,
- &so_error_size);
- } while (err < 0 && errno == EINTR);
- if (err < 0) {
- error = GRPC_OS_ERROR(errno, "getsockopt");
- goto finish;
- } else if (so_error != 0) {
- if (so_error == ENOBUFS) {
- /* We will get one of these errors if we have run out of
- memory in the kernel for the data structures allocated
- when you connect a socket. If this happens it is very
- likely that if we wait a little bit then try again the
- connection will work (since other programs or this
- program will close their network connections and free up
- memory). This does _not_ indicate that there is anything
- wrong with the server we are connecting to, this is a
- local problem.
-
- If you are looking at this code, then chances are that
- your program or another program on the same computer
- opened too many network connections. The "easy" fix:
- don't do that! */
- gpr_log(GPR_ERROR, "kernel out of buffers");
- gpr_mu_unlock(&ac->mu);
- grpc_fd_notify_on_write(exec_ctx, fd, &ac->write_closure);
- return;
- } else {
- switch (so_error) {
- case ECONNREFUSED:
- error = grpc_error_set_int(error, GRPC_ERROR_INT_ERRNO, errno);
- error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
- "Connection refused");
- break;
- default:
- error = GRPC_OS_ERROR(errno, "getsockopt(SO_ERROR)");
- break;
- }
- goto finish;
- }
- } else {
- grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd);
- *ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str);
- fd = NULL;
- goto finish;
- }
- } else {
+ if (error != GRPC_ERROR_NONE) {
error =
grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, "Timeout occurred");
goto finish;
}
- GPR_UNREACHABLE_CODE(return );
+ do {
+ so_error_size = sizeof(so_error);
+ err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error,
+ &so_error_size);
+ } while (err < 0 && errno == EINTR);
+ if (err < 0) {
+ error = GRPC_OS_ERROR(errno, "getsockopt");
+ goto finish;
+ }
+
+ switch (so_error) {
+ case 0:
+ grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd);
+ *ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str);
+ fd = NULL;
+ break;
+ case ENOBUFS:
+ /* We will get one of these errors if we have run out of
+ memory in the kernel for the data structures allocated
+ when you connect a socket. If this happens it is very
+ likely that if we wait a little bit then try again the
+ connection will work (since other programs or this
+ program will close their network connections and free up
+ memory). This does _not_ indicate that there is anything
+ wrong with the server we are connecting to, this is a
+ local problem.
+
+ If you are looking at this code, then chances are that
+ your program or another program on the same computer
+ opened too many network connections. The "easy" fix:
+ don't do that! */
+ gpr_log(GPR_ERROR, "kernel out of buffers");
+ gpr_mu_unlock(&ac->mu);
+ grpc_fd_notify_on_write(exec_ctx, fd, &ac->write_closure);
+ return;
+ case ECONNREFUSED:
+ /* This error shouldn't happen for anything other than connect(). */
+ error = GRPC_OS_ERROR(so_error, "connect");
+ break;
+ default:
+ /* We don't really know which syscall triggered the problem here,
+ so punt by reporting getsockopt(). */
+ error = GRPC_OS_ERROR(so_error, "getsockopt(SO_ERROR)");
+ break;
+ }
finish:
if (fd != NULL) {
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 6b2badf71b..601b0c8580 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -126,8 +126,6 @@ struct grpc_call {
/* client or server call */
bool is_client;
- /* is the alarm set */
- bool have_alarm;
/** has grpc_call_destroy been called */
bool destroy_called;
/** flag indicating that cancellation is inherited */
@@ -170,9 +168,6 @@ struct grpc_call {
/* Contexts for various subsystems (security, tracing, ...). */
grpc_call_context_element context[GRPC_CONTEXT_COUNT];
- /* Deadline alarm - if have_alarm is non-zero */
- grpc_timer alarm;
-
/* for the client, extra metadata is initial metadata; for the
server, it's trailing metadata */
grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
@@ -215,8 +210,6 @@ struct grpc_call {
#define CALL_FROM_TOP_ELEM(top_elem) \
CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
-static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call,
- gpr_timespec deadline);
static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_transport_stream_op *op);
static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
@@ -264,40 +257,9 @@ grpc_error *grpc_call_create(const grpc_call_create_args *args,
call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
}
}
- call->send_deadline =
+ gpr_timespec send_deadline =
gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC);
- GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
- /* initial refcount dropped by grpc_call_destroy */
- grpc_error *error = grpc_call_stack_init(
- &exec_ctx, channel_stack, 1, destroy_call, call, call->context,
- args->server_transport_data, CALL_STACK_FROM_CALL(call));
- if (error != GRPC_ERROR_NONE) {
- intptr_t status;
- if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) {
- status = GRPC_STATUS_UNKNOWN;
- }
- const char *error_str =
- grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION);
- close_with_status(&exec_ctx, call, (grpc_status_code)status,
- error_str == NULL ? "unknown error" : error_str);
- }
- if (args->cq != NULL) {
- GPR_ASSERT(
- args->pollset_set_alternative == NULL &&
- "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL.");
- GRPC_CQ_INTERNAL_REF(args->cq, "bind");
- call->pollent =
- grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
- }
- if (args->pollset_set_alternative != NULL) {
- call->pollent = grpc_polling_entity_create_from_pollset_set(
- args->pollset_set_alternative);
- }
- if (!grpc_polling_entity_is_empty(&call->pollent)) {
- grpc_call_stack_set_pollset_or_pollset_set(
- &exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
- }
- gpr_timespec send_deadline = args->send_deadline;
+
if (args->parent_call != NULL) {
GRPC_CALL_INTERNAL_REF(args->parent_call, "child");
GPR_ASSERT(call->is_client);
@@ -339,10 +301,38 @@ grpc_error *grpc_call_create(const grpc_call_create_args *args,
gpr_mu_unlock(&args->parent_call->mu);
}
- if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) !=
- 0) {
- set_deadline_alarm(&exec_ctx, call, send_deadline);
+
+ call->send_deadline = send_deadline;
+
+ GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
+ /* initial refcount dropped by grpc_call_destroy */
+ grpc_error *error = grpc_call_stack_init(
+ &exec_ctx, channel_stack, 1, destroy_call, call, call->context,
+ args->server_transport_data, send_deadline, CALL_STACK_FROM_CALL(call));
+ if (error != GRPC_ERROR_NONE) {
+ grpc_status_code status;
+ const char *error_str;
+ grpc_error_get_status(error, &status, &error_str);
+ close_with_status(&exec_ctx, call, status, error_str);
+ GRPC_ERROR_UNREF(error);
}
+ if (args->cq != NULL) {
+ GPR_ASSERT(
+ args->pollset_set_alternative == NULL &&
+ "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL.");
+ GRPC_CQ_INTERNAL_REF(args->cq, "bind");
+ call->pollent =
+ grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
+ }
+ if (args->pollset_set_alternative != NULL) {
+ call->pollent = grpc_polling_entity_create_from_pollset_set(
+ args->pollset_set_alternative);
+ }
+ if (!grpc_polling_entity_is_empty(&call->pollent)) {
+ grpc_call_stack_set_pollset_or_pollset_set(
+ &exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
+ }
+
grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_call_create", 0);
return error;
@@ -455,20 +445,11 @@ static void set_status_details(grpc_call *call, status_source source,
static void set_status_from_error(grpc_call *call, status_source source,
grpc_error *error) {
- intptr_t status;
- if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) {
- set_status_code(call, source, (uint32_t)status);
- } else {
- set_status_code(call, source, GRPC_STATUS_INTERNAL);
- }
- const char *msg = grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE);
- bool free_msg = false;
- if (msg == NULL) {
- free_msg = true;
- msg = grpc_error_string(error);
- }
+ grpc_status_code status;
+ const char *msg;
+ grpc_error_get_status(error, &status, &msg);
+ set_status_code(call, source, (uint32_t)status);
set_status_details(call, source, grpc_mdstr_from_string(msg));
- if (free_msg) grpc_error_free_string(msg);
}
static void set_incoming_compression_algorithm(
@@ -741,9 +722,6 @@ void grpc_call_destroy(grpc_call *c) {
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->destroy_called);
c->destroy_called = 1;
- if (c->have_alarm) {
- grpc_timer_cancel(&exec_ctx, &c->alarm);
- }
cancel = !c->received_final_op;
gpr_mu_unlock(&c->mu);
if (cancel) grpc_call_cancel(c, NULL);
@@ -781,7 +759,6 @@ typedef struct termination_closure {
grpc_closure closure;
grpc_call *call;
grpc_error *error;
- grpc_closure *op_closure;
enum { TC_CANCEL, TC_CLOSE } type;
grpc_transport_stream_op op;
} termination_closure;
@@ -798,7 +775,6 @@ static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp,
break;
}
GRPC_ERROR_UNREF(tc->error);
- grpc_exec_ctx_sched(exec_ctx, tc->op_closure, GRPC_ERROR_NONE, NULL);
gpr_free(tc);
}
@@ -818,7 +794,6 @@ static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
tc->op.close_error = tc->error;
/* reuse closure to catch completion */
grpc_closure_init(&tc->closure, done_termination, tc);
- tc->op_closure = tc->op.on_complete;
tc->op.on_complete = &tc->closure;
execute_op(exec_ctx, tc->call, &tc->op);
}
@@ -901,32 +876,6 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
return CALL_FROM_TOP_ELEM(elem);
}
-static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- grpc_call *call = arg;
- gpr_mu_lock(&call->mu);
- call->have_alarm = 0;
- if (error != GRPC_ERROR_CANCELLED) {
- cancel_with_status(exec_ctx, call, GRPC_STATUS_DEADLINE_EXCEEDED,
- "Deadline Exceeded");
- }
- gpr_mu_unlock(&call->mu);
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "alarm");
-}
-
-static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call,
- gpr_timespec deadline) {
- if (call->have_alarm) {
- gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
- assert(0);
- return;
- }
- GRPC_CALL_INTERNAL_REF(call, "alarm");
- call->have_alarm = 1;
- call->send_deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
- grpc_timer_init(exec_ctx, &call->alarm, call->send_deadline, call_alarm, call,
- gpr_now(GPR_CLOCK_MONOTONIC));
-}
-
/* we offset status by a small amount when storing it into transport metadata
as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
*/
@@ -1280,9 +1229,8 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
0 &&
!call->is_client) {
- GPR_TIMER_BEGIN("set_deadline_alarm", 0);
- set_deadline_alarm(exec_ctx, call, md->deadline);
- GPR_TIMER_END("set_deadline_alarm", 0);
+ call->send_deadline =
+ gpr_convert_clock_type(md->deadline, GPR_CLOCK_MONOTONIC);
}
}
@@ -1311,9 +1259,17 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
GRPC_ERROR_REF(error);
gpr_mu_lock(&call->mu);
+
+ // If the error has an associated status code, set the call's status.
+ intptr_t status;
+ if (error != GRPC_ERROR_NONE &&
+ grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) {
+ set_status_from_error(call, STATUS_FROM_CORE, error);
+ }
+
if (bctl->send_initial_metadata) {
if (error != GRPC_ERROR_NONE) {
- set_status_code(call, STATUS_FROM_CORE, GRPC_STATUS_UNAVAILABLE);
+ set_status_from_error(call, STATUS_FROM_CORE, error);
}
grpc_metadata_batch_destroy(
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
@@ -1331,9 +1287,6 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_metadata_batch_filter(md, recv_trailing_filter, call);
call->received_final_op = true;
- if (call->have_alarm) {
- grpc_timer_cancel(exec_ctx, &call->alarm);
- }
/* propagate cancellation to any interested children */
child_call = call->first_child;
if (child_call != NULL) {
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index e9d840df77..c1cafba5f2 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -60,6 +60,8 @@ typedef struct grpc_cq_completion {
uintptr_t next;
} grpc_cq_completion;
+//#define GRPC_CQ_REF_COUNT_DEBUG
+
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line);
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 3cbbaa7b0c..8ca0643ba7 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -43,6 +43,7 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/compress_filter.h"
#include "src/core/lib/channel/connected_channel.h"
+#include "src/core/lib/channel/deadline_filter.h"
#include "src/core/lib/channel/http_client_filter.h"
#include "src/core/lib/channel/http_server_filter.h"
#include "src/core/lib/channel/message_size_filter.h"
@@ -100,6 +101,12 @@ static bool maybe_add_http_filter(grpc_channel_stack_builder *builder,
static void register_builtin_channel_init() {
grpc_channel_init_register_stage(
+ GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+ prepend_filter, (void *)&grpc_client_deadline_filter);
+ grpc_channel_init_register_stage(
+ GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
+ (void *)&grpc_server_deadline_filter);
+ grpc_channel_init_register_stage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
prepend_filter, (void *)&grpc_message_size_filter);
grpc_channel_init_register_stage(
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index b951218130..75aec7a5b4 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -226,7 +226,7 @@ void grpc_transport_stream_op_add_cancellation_with_message(
error = GRPC_ERROR_CREATE("Call cancelled");
}
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status);
- add_error(op, &op->close_error, error);
+ add_error(op, &op->cancel_error, error);
}
void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op,
diff --git a/src/cpp/ext/reflection.grpc.pb.cc b/src/cpp/ext/reflection.grpc.pb.cc
index b046dfc1b8..8139c8ea16 100644
--- a/src/cpp/ext/reflection.grpc.pb.cc
+++ b/src/cpp/ext/reflection.grpc.pb.cc
@@ -32,7 +32,7 @@
*/
-// Generated by the gRPC protobuf plugin.
+// Generated by tools/codegen/extensions/gen_reflection_proto.sh
// If you make any local change, they will be lost.
// source: reflection.proto
diff --git a/src/cpp/ext/reflection.pb.cc b/src/cpp/ext/reflection.pb.cc
index a84494f9a9..b50465b9b5 100644
--- a/src/cpp/ext/reflection.pb.cc
+++ b/src/cpp/ext/reflection.pb.cc
@@ -32,7 +32,7 @@
*/
-// Generated by the protocol buffer compiler. DO NOT EDIT!
+// Generated by tools/codegen/extensions/gen_reflection_proto.sh
// source: reflection.proto
#define INTERNAL_SUPPRESS_PROTOBUF_FIELD_DEPRECATION
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index dae28e15f2..ad20c94de9 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -136,6 +136,7 @@ cdef extern from "grpc/grpc.h":
const char *GRPC_ARG_ENABLE_CENSUS
const char *GRPC_ARG_MAX_CONCURRENT_STREAMS
const char *GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH
+ const char *GRPC_ARG_MAX_SEND_MESSAGE_LENGTH
const char *GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER
const char *GRPC_ARG_DEFAULT_AUTHORITY
const char *GRPC_ARG_PRIMARY_USER_AGENT_STRING
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index f444e33cf0..5a11a08f54 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -39,7 +39,8 @@ class ConnectivityState:
class ChannelArgKey:
enable_census = GRPC_ARG_ENABLE_CENSUS
max_concurrent_streams = GRPC_ARG_MAX_CONCURRENT_STREAMS
- max_message_length = GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH
+ max_receive_message_length = GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH
+ max_send_message_length = GRPC_ARG_MAX_SEND_MESSAGE_LENGTH
http2_initial_sequence_number = GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER
default_authority = GRPC_ARG_DEFAULT_AUTHORITY
primary_user_agent_string = GRPC_ARG_PRIMARY_USER_AGENT_STRING
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 6b63581932..0c6a66e9c8 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -83,6 +83,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/channel/channel_stack_builder.c',
'src/core/lib/channel/compress_filter.c',
'src/core/lib/channel/connected_channel.c',
+ 'src/core/lib/channel/deadline_filter.c',
'src/core/lib/channel/handshaker.c',
'src/core/lib/channel/http_client_filter.c',
'src/core/lib/channel/http_server_filter.c',
diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c
index 569b3f7cd2..b1c1ed9039 100644
--- a/test/core/channel/channel_stack_test.c
+++ b/test/core/channel/channel_stack_test.c
@@ -135,9 +135,9 @@ static void test_create_channel_stack(void) {
GPR_ASSERT(*channel_data == 0);
call_stack = gpr_malloc(channel_stack->call_stack_size);
- grpc_error *error =
- grpc_call_stack_init(&exec_ctx, channel_stack, 1, free_call, call_stack,
- NULL, NULL, call_stack);
+ grpc_error *error = grpc_call_stack_init(
+ &exec_ctx, channel_stack, 1, free_call, call_stack, NULL, NULL,
+ gpr_inf_future(GPR_CLOCK_MONOTONIC), call_stack);
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(call_stack->count == 1);
call_elem = grpc_call_stack_element(call_stack, 0);
diff --git a/test/core/nanopb/fuzzer_response.c b/test/core/nanopb/fuzzer_response.c
index 75a99faf3f..a82f20df83 100644
--- a/test/core/nanopb/fuzzer_response.c
+++ b/test/core/nanopb/fuzzer_response.c
@@ -41,7 +41,10 @@
bool squelch = true;
bool leak_check = true;
+static void dont_log(gpr_log_func_args *args) {}
+
int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
+ if (squelch) gpr_set_log_function(dont_log);
gpr_slice slice = gpr_slice_from_copied_buffer((const char *)data, size);
grpc_grpclb_initial_response *response;
if ((response = grpc_grpclb_initial_response_parse(slice))) {
diff --git a/test/core/nanopb/fuzzer_serverlist.c b/test/core/nanopb/fuzzer_serverlist.c
index df2044d907..9700bf1cda 100644
--- a/test/core/nanopb/fuzzer_serverlist.c
+++ b/test/core/nanopb/fuzzer_serverlist.c
@@ -41,7 +41,10 @@
bool squelch = true;
bool leak_check = true;
+static void dont_log(gpr_log_func_args *args) {}
+
int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
+ if (squelch) gpr_set_log_function(dont_log);
gpr_slice slice = gpr_slice_from_copied_buffer((const char *)data, size);
grpc_grpclb_serverlist *serverlist;
if ((serverlist = grpc_grpclb_response_parse_serverlist(slice))) {
diff --git a/tools/codegen/extensions/gen_reflection_proto.sh b/tools/codegen/extensions/gen_reflection_proto.sh
index bd8aac6a7b..ea7689f7e8 100755
--- a/tools/codegen/extensions/gen_reflection_proto.sh
+++ b/tools/codegen/extensions/gen_reflection_proto.sh
@@ -29,20 +29,39 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+set -e
+cd $(dirname $0)/../../..
+
PROTO_DIR="src/proto/grpc/reflection/v1alpha"
PROTO_FILE="reflection"
HEADER_DIR="include/grpc++/ext"
SRC_DIR="src/cpp/ext"
INCLUDE_DIR="grpc++/ext"
TMP_DIR="tmp"
-GRPC_PLUGIN="bins/opt/grpc_cpp_plugin"
-PROTOC="bins/opt/protobuf/protoc"
-set -e
+if hash grpc_cpp_plugin 2>/dev/null; then
+ GRPC_PLUGIN=$(which grpc_cpp_plugin)
+else
+ if [ -f bins/opt/grpc_cpp_plugin ]; then
+ GRPC_PLUGIN="bins/opt/grpc_cpp_plugin"
+ else
+ echo "gRPC protoc plugin not found"
+ exit 1
+ fi
+fi
-TMP_DIR=${TMP_DIR}_${PROTO_FILE}
+if hash protoc 2>/dev/null; then
+ PROTOC=$(which protoc)
+else
+ if [ -f bins/opt/protobuf/protoc ]; then
+ PROTOC="bins/opt/protobuf/protoc"
+ else
+ echo "protoc not found"
+ exit 1
+ fi
+fi
-cd $(dirname $0)/../../..
+TMP_DIR=${TMP_DIR}_${PROTO_FILE}
[ ! -d $HEADER_DIR ] && mkdir -p $HEADER_DIR || :
[ ! -d $SRC_DIR ] && mkdir -p $SRC_DIR || :
@@ -56,6 +75,9 @@ sed -i "s/\"${PROTO_FILE}.pb.h\"/<${INCLUDE_DIR/\//\\\/}\/${PROTO_FILE}.pb.h>/g"
sed -i "s/\"${PROTO_FILE}.pb.h\"/<${INCLUDE_DIR/\//\\\/}\/${PROTO_FILE}.pb.h>/g" ${TMP_DIR}/${PROTO_FILE}.grpc.pb.cc
sed -i "s/\"${PROTO_FILE}.grpc.pb.h\"/<${INCLUDE_DIR/\//\\\/}\/${PROTO_FILE}.grpc.pb.h>/g" ${TMP_DIR}/${PROTO_FILE}.grpc.pb.cc
+sed -i "1s/.*/\/\/ Generated by tools\/codegen\/extensions\/gen_reflection_proto.sh/g" ${TMP_DIR}/*.pb.h
+sed -i "1s/.*/\/\/ Generated by tools\/codegen\/extensions\/gen_reflection_proto.sh/g" ${TMP_DIR}/*.pb.cc
+
/bin/cp LICENSE ${TMP_DIR}/TMP_LICENSE
sed -i -e "s/./ &/" -e "s/.*/ \*&/" ${TMP_DIR}/TMP_LICENSE
sed -i -r "\$a\ *\n *\/\n\n" ${TMP_DIR}/TMP_LICENSE
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index b34989937b..239dad51fd 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -876,6 +876,7 @@ src/core/lib/channel/channel_stack_builder.h \
src/core/lib/channel/compress_filter.h \
src/core/lib/channel/connected_channel.h \
src/core/lib/channel/context.h \
+src/core/lib/channel/deadline_filter.h \
src/core/lib/channel/handshaker.h \
src/core/lib/channel/http_client_filter.h \
src/core/lib/channel/http_server_filter.h \
@@ -989,6 +990,7 @@ src/core/lib/channel/channel_stack.c \
src/core/lib/channel/channel_stack_builder.c \
src/core/lib/channel/compress_filter.c \
src/core/lib/channel/connected_channel.c \
+src/core/lib/channel/deadline_filter.c \
src/core/lib/channel/handshaker.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index d8e73c5860..2c63f36729 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -792,6 +792,7 @@ src/core/lib/channel/channel_stack_builder.h \
src/core/lib/channel/compress_filter.h \
src/core/lib/channel/connected_channel.h \
src/core/lib/channel/context.h \
+src/core/lib/channel/deadline_filter.h \
src/core/lib/channel/handshaker.h \
src/core/lib/channel/http_client_filter.h \
src/core/lib/channel/http_server_filter.h \
@@ -954,6 +955,7 @@ src/core/lib/channel/channel_stack.c \
src/core/lib/channel/channel_stack_builder.c \
src/core/lib/channel/compress_filter.c \
src/core/lib/channel/connected_channel.c \
+src/core/lib/channel/deadline_filter.c \
src/core/lib/channel/handshaker.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \
diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json
index 94615a8130..761ba5ac68 100644
--- a/tools/run_tests/sources_and_headers.json
+++ b/tools/run_tests/sources_and_headers.json
@@ -5978,6 +5978,7 @@
"src/core/lib/channel/compress_filter.h",
"src/core/lib/channel/connected_channel.h",
"src/core/lib/channel/context.h",
+ "src/core/lib/channel/deadline_filter.h",
"src/core/lib/channel/handshaker.h",
"src/core/lib/channel/http_client_filter.h",
"src/core/lib/channel/http_server_filter.h",
@@ -6075,6 +6076,8 @@
"src/core/lib/channel/connected_channel.c",
"src/core/lib/channel/connected_channel.h",
"src/core/lib/channel/context.h",
+ "src/core/lib/channel/deadline_filter.c",
+ "src/core/lib/channel/deadline_filter.h",
"src/core/lib/channel/handshaker.c",
"src/core/lib/channel/handshaker.h",
"src/core/lib/channel/http_client_filter.c",
diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj b/vsprojects/vcxproj/grpc++/grpc++.vcxproj
index 0b1784f1f6..7f9fdb2346 100644
--- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj
+++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj
@@ -376,6 +376,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\compress_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_server_filter.h" />
@@ -529,6 +530,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.c">
diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters
index 31930fc62f..9b6cfb4000 100644
--- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters
@@ -115,6 +115,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c">
+ <Filter>src\core\lib\channel</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
@@ -725,6 +728,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h">
+ <Filter>src\core\lib\channel</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>
diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj
index 6ccb7a7902..b0bbccabb3 100644
--- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj
+++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj
@@ -372,6 +372,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\compress_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_server_filter.h" />
@@ -515,6 +516,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.c">
diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
index 54acbe2e47..9a48ebf78a 100644
--- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
@@ -100,6 +100,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c">
+ <Filter>src\core\lib\channel</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
@@ -698,6 +701,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h">
+ <Filter>src\core\lib\channel</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>
diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj
index 1b5891d6c9..21561f0057 100644
--- a/vsprojects/vcxproj/grpc/grpc.vcxproj
+++ b/vsprojects/vcxproj/grpc/grpc.vcxproj
@@ -301,6 +301,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\compress_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_server_filter.h" />
@@ -471,6 +472,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.c">
diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
index 1499618240..ea61c43859 100644
--- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
@@ -19,6 +19,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c">
+ <Filter>src\core\lib\channel</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
@@ -680,6 +683,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h">
+ <Filter>src\core\lib\channel</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>
diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj
index 0abce83802..bb5a56271f 100644
--- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj
+++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj
@@ -193,6 +193,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\compress_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_server_filter.h" />
@@ -316,6 +317,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.c">
diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters
index 582fb1447e..39188a409b 100644
--- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters
@@ -70,6 +70,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c">
+ <Filter>src\core\lib\channel</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
@@ -461,6 +464,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h">
+ <Filter>src\core\lib\channel</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>
diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
index 66824be530..efe285640a 100644
--- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
+++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
@@ -291,6 +291,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\compress_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_server_filter.h" />
@@ -439,6 +440,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.c">
diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters
index db77774d00..911cf112cf 100644
--- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters
@@ -22,6 +22,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c">
+ <Filter>src\core\lib\channel</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
@@ -590,6 +593,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h">
+ <Filter>src\core\lib\channel</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>