diff options
121 files changed, 1812 insertions, 717 deletions
@@ -168,7 +168,7 @@ cc_library( "src/core/client_config/resolver_factory.h", "src/core/client_config/resolver_registry.h", "src/core/client_config/resolvers/dns_resolver.h", - "src/core/client_config/resolvers/unix_resolver_posix.h", + "src/core/client_config/resolvers/sockaddr_resolver.h", "src/core/client_config/subchannel.h", "src/core/client_config/subchannel_factory.h", "src/core/client_config/uri_parser.h", @@ -246,6 +246,7 @@ cc_library( "src/core/transport/transport.h", "src/core/transport/transport_impl.h", "src/core/census/context.h", + "src/core/census/rpc_stat_id.h", "src/core/httpcli/format_request.c", "src/core/httpcli/httpcli.c", "src/core/httpcli/httpcli_security_connector.c", @@ -287,7 +288,7 @@ cc_library( "src/core/client_config/resolver_factory.c", "src/core/client_config/resolver_registry.c", "src/core/client_config/resolvers/dns_resolver.c", - "src/core/client_config/resolvers/unix_resolver_posix.c", + "src/core/client_config/resolvers/sockaddr_resolver.c", "src/core/client_config/subchannel.c", "src/core/client_config/subchannel_factory.c", "src/core/client_config/uri_parser.c", @@ -381,6 +382,7 @@ cc_library( "src/core/transport/transport_op_string.c", "src/core/census/context.c", "src/core/census/initialize.c", + "src/core/census/record_stat.c", ], hdrs = [ "include/grpc/grpc_security.h", @@ -424,7 +426,7 @@ cc_library( "src/core/client_config/resolver_factory.h", "src/core/client_config/resolver_registry.h", "src/core/client_config/resolvers/dns_resolver.h", - "src/core/client_config/resolvers/unix_resolver_posix.h", + "src/core/client_config/resolvers/sockaddr_resolver.h", "src/core/client_config/subchannel.h", "src/core/client_config/subchannel_factory.h", "src/core/client_config/uri_parser.h", @@ -502,6 +504,7 @@ cc_library( "src/core/transport/transport.h", "src/core/transport/transport_impl.h", "src/core/census/context.h", + "src/core/census/rpc_stat_id.h", "src/core/surface/init_unsecure.c", "src/core/census/grpc_context.c", "src/core/channel/channel_args.c", @@ -520,7 +523,7 @@ cc_library( "src/core/client_config/resolver_factory.c", "src/core/client_config/resolver_registry.c", "src/core/client_config/resolvers/dns_resolver.c", - "src/core/client_config/resolvers/unix_resolver_posix.c", + "src/core/client_config/resolvers/sockaddr_resolver.c", "src/core/client_config/subchannel.c", "src/core/client_config/subchannel_factory.c", "src/core/client_config/uri_parser.c", @@ -614,6 +617,7 @@ cc_library( "src/core/transport/transport_op_string.c", "src/core/census/context.c", "src/core/census/initialize.c", + "src/core/census/record_stat.c", ], hdrs = [ "include/grpc/byte_buffer.h", @@ -998,7 +1002,7 @@ objc_library( "src/core/client_config/resolver_factory.c", "src/core/client_config/resolver_registry.c", "src/core/client_config/resolvers/dns_resolver.c", - "src/core/client_config/resolvers/unix_resolver_posix.c", + "src/core/client_config/resolvers/sockaddr_resolver.c", "src/core/client_config/subchannel.c", "src/core/client_config/subchannel_factory.c", "src/core/client_config/uri_parser.c", @@ -1092,6 +1096,7 @@ objc_library( "src/core/transport/transport_op_string.c", "src/core/census/context.c", "src/core/census/initialize.c", + "src/core/census/record_stat.c", ], hdrs = [ "include/grpc/grpc_security.h", @@ -1137,7 +1142,7 @@ objc_library( "src/core/client_config/resolver_factory.h", "src/core/client_config/resolver_registry.h", "src/core/client_config/resolvers/dns_resolver.h", - "src/core/client_config/resolvers/unix_resolver_posix.h", + "src/core/client_config/resolvers/sockaddr_resolver.h", "src/core/client_config/subchannel.h", "src/core/client_config/subchannel_factory.h", "src/core/client_config/uri_parser.h", @@ -1215,6 +1220,7 @@ objc_library( "src/core/transport/transport.h", "src/core/transport/transport_impl.h", "src/core/census/context.h", + "src/core/census/rpc_stat_id.h", ], includes = [ "include", @@ -117,7 +117,7 @@ most Mac installations. Do the "git submodule" command listed above. Then execute the following for all the needed build dependencies $ sudo /opt/local/bin/port install autoconf automake libtool gflags cmake - $ mkdir ~/gtest + $ mkdir ~/gtest-svn $ svn checkout http://googletest.googlecode.com/svn/trunk/ gtest-svn $ mkdir mybuild $ cd mybuild @@ -3517,7 +3517,7 @@ LIBGRPC_SRC = \ src/core/client_config/resolver_factory.c \ src/core/client_config/resolver_registry.c \ src/core/client_config/resolvers/dns_resolver.c \ - src/core/client_config/resolvers/unix_resolver_posix.c \ + src/core/client_config/resolvers/sockaddr_resolver.c \ src/core/client_config/subchannel.c \ src/core/client_config/subchannel_factory.c \ src/core/client_config/uri_parser.c \ @@ -3611,6 +3611,7 @@ LIBGRPC_SRC = \ src/core/transport/transport_op_string.c \ src/core/census/context.c \ src/core/census/initialize.c \ + src/core/census/record_stat.c \ PUBLIC_HEADERS_C += \ include/grpc/grpc_security.h \ @@ -3781,7 +3782,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/client_config/resolver_factory.c \ src/core/client_config/resolver_registry.c \ src/core/client_config/resolvers/dns_resolver.c \ - src/core/client_config/resolvers/unix_resolver_posix.c \ + src/core/client_config/resolvers/sockaddr_resolver.c \ src/core/client_config/subchannel.c \ src/core/client_config/subchannel_factory.c \ src/core/client_config/uri_parser.c \ @@ -3875,6 +3876,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/transport/transport_op_string.c \ src/core/census/context.c \ src/core/census/initialize.c \ + src/core/census/record_stat.c \ PUBLIC_HEADERS_C += \ include/grpc/byte_buffer.h \ diff --git a/build.json b/build.json index 2755703e1c..f06c6cb8c5 100644 --- a/build.json +++ b/build.json @@ -18,11 +18,13 @@ "include/grpc/census.h" ], "headers": [ - "src/core/census/context.h" + "src/core/census/context.h", + "src/core/census/rpc_stat_id.h" ], "src": [ "src/core/census/context.c", - "src/core/census/initialize.c" + "src/core/census/initialize.c", + "src/core/census/record_stat.c" ] }, { @@ -129,7 +131,7 @@ "src/core/client_config/resolver_factory.h", "src/core/client_config/resolver_registry.h", "src/core/client_config/resolvers/dns_resolver.h", - "src/core/client_config/resolvers/unix_resolver_posix.h", + "src/core/client_config/resolvers/sockaddr_resolver.h", "src/core/client_config/subchannel.h", "src/core/client_config/subchannel_factory.h", "src/core/client_config/uri_parser.h", @@ -225,7 +227,7 @@ "src/core/client_config/resolver_factory.c", "src/core/client_config/resolver_registry.c", "src/core/client_config/resolvers/dns_resolver.c", - "src/core/client_config/resolvers/unix_resolver_posix.c", + "src/core/client_config/resolvers/sockaddr_resolver.c", "src/core/client_config/subchannel.c", "src/core/client_config/subchannel_factory.c", "src/core/client_config/uri_parser.c", diff --git a/composer.json b/composer.json index 875ec55d8a..1d78a2ce21 100644 --- a/composer.json +++ b/composer.json @@ -2,6 +2,7 @@ "name": "grpc/grpc", "type": "library", "description": "gRPC library for PHP", + "version": "0.5.1", "keywords": ["rpc"], "homepage": "http://grpc.io", "license": "BSD-3-Clause", diff --git a/gRPC.podspec b/gRPC.podspec index 28d9ac4c53..f47b44fe9d 100644 --- a/gRPC.podspec +++ b/gRPC.podspec @@ -170,7 +170,7 @@ Pod::Spec.new do |s| 'src/core/client_config/resolver_factory.h', 'src/core/client_config/resolver_registry.h', 'src/core/client_config/resolvers/dns_resolver.h', - 'src/core/client_config/resolvers/unix_resolver_posix.h', + 'src/core/client_config/resolvers/sockaddr_resolver.h', 'src/core/client_config/subchannel.h', 'src/core/client_config/subchannel_factory.h', 'src/core/client_config/uri_parser.h', @@ -248,6 +248,7 @@ Pod::Spec.new do |s| 'src/core/transport/transport.h', 'src/core/transport/transport_impl.h', 'src/core/census/context.h', + 'src/core/census/rpc_stat_id.h', 'grpc/grpc_security.h', 'grpc/byte_buffer.h', 'grpc/byte_buffer_reader.h', @@ -296,7 +297,7 @@ Pod::Spec.new do |s| 'src/core/client_config/resolver_factory.c', 'src/core/client_config/resolver_registry.c', 'src/core/client_config/resolvers/dns_resolver.c', - 'src/core/client_config/resolvers/unix_resolver_posix.c', + 'src/core/client_config/resolvers/sockaddr_resolver.c', 'src/core/client_config/subchannel.c', 'src/core/client_config/subchannel_factory.c', 'src/core/client_config/uri_parser.c', @@ -389,7 +390,8 @@ Pod::Spec.new do |s| 'src/core/transport/transport.c', 'src/core/transport/transport_op_string.c', 'src/core/census/context.c', - 'src/core/census/initialize.c' + 'src/core/census/initialize.c', + 'src/core/census/record_stat.c' ss.private_header_files = 'src/core/support/env.h', 'src/core/support/file.h', @@ -434,7 +436,7 @@ Pod::Spec.new do |s| 'src/core/client_config/resolver_factory.h', 'src/core/client_config/resolver_registry.h', 'src/core/client_config/resolvers/dns_resolver.h', - 'src/core/client_config/resolvers/unix_resolver_posix.h', + 'src/core/client_config/resolvers/sockaddr_resolver.h', 'src/core/client_config/subchannel.h', 'src/core/client_config/subchannel_factory.h', 'src/core/client_config/uri_parser.h', @@ -511,7 +513,8 @@ Pod::Spec.new do |s| 'src/core/transport/stream_op.h', 'src/core/transport/transport.h', 'src/core/transport/transport_impl.h', - 'src/core/census/context.h' + 'src/core/census/context.h', + 'src/core/census/rpc_stat_id.h' ss.header_mappings_dir = '.' diff --git a/include/grpc++/credentials.h b/include/grpc++/credentials.h index 0eaaefcbca..a4f1e73118 100644 --- a/include/grpc++/credentials.h +++ b/include/grpc++/credentials.h @@ -106,13 +106,13 @@ std::shared_ptr<Credentials> ServiceAccountCredentials( const grpc::string& json_key, const grpc::string& scope, long token_lifetime_seconds); -// Builds JWT credentials. +// Builds Service Account JWT Access credentials. // json_key is the JSON key string containing the client's private key. // token_lifetime_seconds is the lifetime in seconds of each Json Web Token // (JWT) created with this credentials. It should not exceed // grpc_max_auth_token_lifetime or will be cropped to this value. -std::shared_ptr<Credentials> JWTCredentials(const grpc::string& json_key, - long token_lifetime_seconds); +std::shared_ptr<Credentials> ServiceAccountJWTAccessCredentials( + const grpc::string& json_key, long token_lifetime_seconds); // Builds refresh token credentials. // json_refresh_token is the JSON string containing the refresh token along diff --git a/include/grpc++/impl/sync_no_cxx11.h b/include/grpc++/impl/sync_no_cxx11.h index dda939bf71..fda668957e 100644 --- a/include/grpc++/impl/sync_no_cxx11.h +++ b/include/grpc++/impl/sync_no_cxx11.h @@ -87,7 +87,7 @@ class condition_variable { ~condition_variable() { gpr_cv_destroy(&cv_); } void wait(lock_guard<mutex> &mu) { mu.locked = false; - gpr_cv_wait(&cv_, &mu.mu_.mu_, gpr_inf_future); + gpr_cv_wait(&cv_, &mu.mu_.mu_, gpr_inf_future(GPR_CLOCK_REALTIME); mu.locked = true; } void notify_one() { gpr_cv_signal(&cv_); } diff --git a/include/grpc/census.h b/include/grpc/census.h index 3fc07affc8..379783905a 100644 --- a/include/grpc/census.h +++ b/include/grpc/census.h @@ -100,6 +100,17 @@ int census_context_deserialize(const char *buffer, census_context **context); * future census calls will result in undefined behavior. */ void census_context_destroy(census_context *context); +/* A census statistic to be recorded comprises two parts: an ID for the + * particular statistic and the value to be recorded against it. */ +typedef struct { + int id; + double value; +} census_stat; + +/* Record new stats against the given context. */ +void census_record_stat(census_context *context, census_stat *stats, + size_t nstats); + #ifdef __cplusplus } #endif diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 504f0cce03..b05e4d65de 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -433,6 +433,20 @@ grpc_call *grpc_channel_create_registered_call( grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t nops, void *tag); +/** Returns a newly allocated string representing the endpoint to which this + call is communicating with. The string is in the uri format accepted by + grpc_channel_create. + The returned string should be disposed of with gpr_free(). + + WARNING: this value is never authenticated or subject to any security + related code. It must not be used for any authentication related + functionality. Instead, use grpc_auth_context. */ +char *grpc_call_get_peer(grpc_call *call); + +/** Return a newly allocated string representing the target a channel was + created for. */ +char *grpc_channel_get_target(grpc_channel *channel); + /** Create a client channel to 'target'. Additional channel level configuration MAY be provided by grpc_channel_args, though the expectation is that most clients will want to simply pass NULL. See grpc_channel_args definition for @@ -442,7 +456,7 @@ grpc_channel *grpc_channel_create(const char *target, const grpc_channel_args *args); /** Create a lame client: this client fails every operation attempted on it. */ -grpc_channel *grpc_lame_client_channel_create(void); +grpc_channel *grpc_lame_client_channel_create(const char *target); /** Close and destroy a grpc channel */ void grpc_channel_destroy(grpc_channel *channel); diff --git a/include/grpc/grpc_security.h b/include/grpc/grpc_security.h index 37d66c04ae..4dd058063d 100644 --- a/include/grpc/grpc_security.h +++ b/include/grpc/grpc_security.h @@ -119,8 +119,8 @@ grpc_credentials *grpc_service_account_credentials_create( - token_lifetime is the lifetime of each Json Web Token (JWT) created with this credentials. It should not exceed grpc_max_auth_token_lifetime or will be cropped to this value. */ -grpc_credentials *grpc_jwt_credentials_create(const char *json_key, - gpr_timespec token_lifetime); +grpc_credentials *grpc_service_account_jwt_access_credentials_create( + const char *json_key, gpr_timespec token_lifetime); /* Creates an Oauth2 Refresh Token credentials object. May return NULL if the input is invalid. diff --git a/include/grpc/support/host_port.h b/include/grpc/support/host_port.h index 3cc2f498e8..30267ab1df 100644 --- a/include/grpc/support/host_port.h +++ b/include/grpc/support/host_port.h @@ -52,8 +52,10 @@ int gpr_join_host_port(char **out, const char *host, int port); /* Given a name in the form "host:port" or "[ho:st]:port", split into hostname and port number, into newly allocated strings, which must later be - destroyed using gpr_free(). */ -void gpr_split_host_port(const char *name, char **host, char **port); + destroyed using gpr_free(). + Return 1 on success, 0 on failure. Guarantees *host and *port == NULL on + failure. */ +int gpr_split_host_port(const char *name, char **host, char **port); #ifdef __cplusplus } diff --git a/src/compiler/csharp_generator.cc b/src/compiler/csharp_generator.cc index 64371047e0..586d6a7994 100644 --- a/src/compiler/csharp_generator.cc +++ b/src/compiler/csharp_generator.cc @@ -269,7 +269,7 @@ void GenerateClientInterface(Printer* out, const ServiceDescriptor *service) { if (method_type == METHODTYPE_NO_STREAMING) { // unary calls have an extra synchronous stub method out->Print( - "$response$ $methodname$($request$ request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));\n", + "$response$ $methodname$($request$ request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));\n", "methodname", method->name(), "request", GetClassName(method->input_type()), "response", GetClassName(method->output_type())); @@ -280,7 +280,7 @@ void GenerateClientInterface(Printer* out, const ServiceDescriptor *service) { method_name += "Async"; // prevent name clash with synchronous method. } out->Print( - "$returntype$ $methodname$($request_maybe$Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));\n", + "$returntype$ $methodname$($request_maybe$Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));\n", "methodname", method_name, "request_maybe", GetMethodRequestParamMaybe(method), "returntype", GetMethodReturnTypeClient(method)); @@ -332,13 +332,13 @@ void GenerateClientStub(Printer* out, const ServiceDescriptor *service) { if (method_type == METHODTYPE_NO_STREAMING) { // unary calls have an extra synchronous stub method out->Print( - "public $response$ $methodname$($request$ request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))\n", + "public $response$ $methodname$($request$ request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))\n", "methodname", method->name(), "request", GetClassName(method->input_type()), "response", GetClassName(method->output_type())); out->Print("{\n"); out->Indent(); - out->Print("var call = CreateCall($servicenamefield$, $methodfield$, headers);\n", + out->Print("var call = CreateCall($servicenamefield$, $methodfield$, headers, deadline);\n", "servicenamefield", GetServiceNameFieldName(), "methodfield", GetMethodFieldName(method)); out->Print("return Calls.BlockingUnaryCall(call, request, cancellationToken);\n"); @@ -351,13 +351,13 @@ void GenerateClientStub(Printer* out, const ServiceDescriptor *service) { method_name += "Async"; // prevent name clash with synchronous method. } out->Print( - "public $returntype$ $methodname$($request_maybe$Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))\n", + "public $returntype$ $methodname$($request_maybe$Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))\n", "methodname", method_name, "request_maybe", GetMethodRequestParamMaybe(method), "returntype", GetMethodReturnTypeClient(method)); out->Print("{\n"); out->Indent(); - out->Print("var call = CreateCall($servicenamefield$, $methodfield$, headers);\n", + out->Print("var call = CreateCall($servicenamefield$, $methodfield$, headers, deadline);\n", "servicenamefield", GetServiceNameFieldName(), "methodfield", GetMethodFieldName(method)); switch (GetMethodType(method)) { diff --git a/src/core/census/record_stat.c b/src/core/census/record_stat.c new file mode 100644 index 0000000000..3dd918618b --- /dev/null +++ b/src/core/census/record_stat.c @@ -0,0 +1,38 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/census.h> +#include "src/core/census/rpc_stat_id.h" + +void census_record_stat(census_context *context, census_stat *stats, + size_t nstats) {} diff --git a/src/core/census/rpc_stat_id.h b/src/core/census/rpc_stat_id.h new file mode 100644 index 0000000000..fc0aa6f43f --- /dev/null +++ b/src/core/census/rpc_stat_id.h @@ -0,0 +1,46 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CENSUS_RPC_STAT_ID_H +#define CENSUS_RPC_STAT_ID_H + +/* Stats ID's used for RPC measurements. */ +#define CENSUS_INVALID_STAT_ID 0 /* ID 0 is always invalid */ +#define CENSUS_RPC_CLIENT_REQUESTS 1 /* Count of client requests sent. */ +#define CENSUS_RPC_SERVER_REQUESTS 2 /* Count of server requests sent. */ +#define CENSUS_RPC_CLIENT_ERRORS 3 /* Client error counts. */ +#define CENSUS_RPC_SERVER_ERRORS 4 /* Server error counts. */ +#define CENSUS_RPC_CLIENT_LATENCY 5 /* Client side request latency. */ +#define CENSUS_RPC_SERVER_LATENCY 6 /* Server side request latency. */ + +#endif /* CENSUS_RPC_STAT_ID_H */ diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index e38dcb58b7..cd7c182ef2 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -191,6 +191,11 @@ void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op) { next_elem->filter->start_transport_stream_op(next_elem, op); } +char *grpc_call_next_get_peer(grpc_call_element *elem) { + grpc_call_element *next_elem = elem + 1; + return next_elem->filter->get_peer(next_elem); +} + void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op) { grpc_channel_element *next_elem = elem + 1; next_elem->filter->start_transport_op(next_elem, op); diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 785be8925b..4a608b956e 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -104,6 +104,9 @@ typedef struct { The filter does not need to do any chaining */ void (*destroy_channel_elem)(grpc_channel_element *elem); + /* Implement grpc_call_get_peer() */ + char *(*get_peer)(grpc_call_element *elem); + /* The name of this filter */ const char *name; } grpc_channel_filter; @@ -173,6 +176,8 @@ void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op); /* Call the next operation (depending on call directionality) in a channel stack */ void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op); +/* Pass through a request to get_peer to the next child element */ +char *grpc_call_next_get_peer(grpc_call_element *elem); /* Given the top element of a channel stack, get the channel stack itself */ grpc_channel_stack *grpc_channel_stack_from_top_element( diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index c1aa580b2d..ec6ca42889 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -265,6 +265,26 @@ static grpc_iomgr_closure *merge_into_waiting_op( return consumed_op; } +static char *cc_get_peer(grpc_call_element *elem) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + grpc_subchannel_call *subchannel_call; + char *result; + + gpr_mu_lock(&calld->mu_state); + if (calld->state == CALL_ACTIVE) { + subchannel_call = calld->subchannel_call; + GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer"); + gpr_mu_unlock(&calld->mu_state); + result = grpc_subchannel_call_get_peer(subchannel_call); + GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "get_peer"); + return result; + } else { + gpr_mu_unlock(&calld->mu_state); + return grpc_channel_get_target(chand->master); + } +} + static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation) { @@ -440,7 +460,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { while (wakeup_closures) { grpc_iomgr_closure *next = wakeup_closures->next; - grpc_iomgr_add_callback(wakeup_closures); + wakeup_closures->cb(wakeup_closures->cb_arg, 1); wakeup_closures = next; } @@ -590,6 +610,7 @@ const grpc_channel_filter grpc_client_channel_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + cc_get_peer, "client-channel", }; diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 14cb3da62d..4bf24e7db3 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -174,6 +174,8 @@ static void process_send_ops(grpc_call_element *elem, size_t i; int did_compress = 0; + /* In streaming calls, we need to reset the previously accumulated slices */ + gpr_slice_buffer_reset_and_unref(&calld->slices); for (i = 0; i < send_ops->nops; ++i) { grpc_stream_op *sop = &send_ops->ops[i]; switch (sop->type) { @@ -200,7 +202,7 @@ static void process_send_ops(grpc_call_element *elem, channeld->default_compression_algorithm; calld->has_compression_algorithm = 1; /* GPR_TRUE */ } - grpc_metadata_batch_add_head( + grpc_metadata_batch_add_tail( &(sop->data.metadata), &calld->compression_algorithm_storage, grpc_mdelem_ref(channeld->mdelem_compression_algorithms [calld->compression_algorithm])); @@ -322,4 +324,5 @@ const grpc_channel_filter grpc_compress_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, "compress"}; diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 34d07de519..b95ed06f2b 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -119,6 +119,11 @@ static void destroy_channel_elem(grpc_channel_element *elem) { grpc_transport_destroy(cd->transport); } +static char *con_get_peer(grpc_call_element *elem) { + channel_data *chand = elem->channel_data; + return grpc_transport_get_peer(chand->transport); +} + const grpc_channel_filter grpc_connected_channel_filter = { con_start_transport_stream_op, con_start_transport_op, @@ -128,6 +133,7 @@ const grpc_channel_filter grpc_connected_channel_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + con_get_peer, "connected", }; diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index bc821b16fa..6ae8488070 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -98,6 +98,18 @@ static void hc_on_recv(void *user_data, int success) { calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success); } +static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) { + grpc_call_element *elem = user_data; + channel_data *channeld = elem->channel_data; + /* eat the things we'd like to set ourselves */ + if (md->key == channeld->method->key) return NULL; + if (md->key == channeld->scheme->key) return NULL; + if (md->key == channeld->te_trailers->key) return NULL; + if (md->key == channeld->content_type->key) return NULL; + if (md->key == channeld->user_agent->key) return NULL; + return md; +} + static void hc_mutate_op(grpc_call_element *elem, grpc_transport_stream_op *op) { /* grab pointers to our data from the call element */ @@ -111,6 +123,7 @@ static void hc_mutate_op(grpc_call_element *elem, grpc_stream_op *op = &ops[i]; if (op->type != GRPC_OP_METADATA) continue; calld->sent_initial_metadata = 1; + grpc_metadata_batch_filter(&op->data.metadata, client_strip_filter, elem); /* Send : prefixed headers, which have to be before any application layer headers. */ grpc_metadata_batch_add_head(&op->data.metadata, &calld->method, @@ -267,4 +280,5 @@ static void destroy_channel_elem(grpc_channel_element *elem) { const grpc_channel_filter grpc_http_client_filter = { hc_start_transport_op, grpc_channel_next_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "http-client"}; + init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + "http-client"}; diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index a6cbb5a7f4..7c798d2fb4 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -280,4 +280,5 @@ static void destroy_channel_elem(grpc_channel_element *elem) { const grpc_channel_filter grpc_http_server_filter = { hs_start_transport_op, grpc_channel_next_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "http-server"}; + init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + "http-server"}; diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c index 5117723617..d631885aaf 100644 --- a/src/core/channel/noop_filter.c +++ b/src/core/channel/noop_filter.c @@ -127,4 +127,5 @@ const grpc_channel_filter grpc_no_op_filter = {noop_start_transport_stream_op, sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, "no-op"}; diff --git a/src/core/client_config/README.md b/src/core/client_config/README.md index d0700cfb13..fff7a5af5b 100644 --- a/src/core/client_config/README.md +++ b/src/core/client_config/README.md @@ -60,3 +60,7 @@ unix:path - the unix scheme is used to create and connect to unix domain sockets - the authority must be empty, and the path represents the absolute or relative path to the desired socket + +ipv4:host:port - a pre-resolved ipv4 dotted decimal address/port combination + +ipv6:[host]:port - a pre-resolved ipv6 address/port combination diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c new file mode 100644 index 0000000000..74584e7e2c --- /dev/null +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -0,0 +1,299 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/client_config/resolvers/sockaddr_resolver.h" + +#include <stdio.h> +#include <string.h> +#ifdef GPR_POSIX_SOCKET +#include <sys/un.h> +#endif + +#include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> +#include <grpc/support/string_util.h> + +#include "src/core/client_config/lb_policies/pick_first.h" +#include "src/core/iomgr/resolve_address.h" +#include "src/core/support/string.h" + +typedef struct { + /** base class: must be first */ + grpc_resolver base; + /** refcount */ + gpr_refcount refs; + /** subchannel factory */ + grpc_subchannel_factory *subchannel_factory; + /** load balancing policy factory */ + grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, + size_t num_subchannels); + + /** the address that we've 'resolved' */ + struct sockaddr_storage addr; + int addr_len; + + /** mutex guarding the rest of the state */ + gpr_mu mu; + /** have we published? */ + int published; + /** pending next completion, or NULL */ + grpc_iomgr_closure *next_completion; + /** target config address for next completion */ + grpc_client_config **target_config; +} sockaddr_resolver; + +static void sockaddr_destroy(grpc_resolver *r); + +static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r); + +static void sockaddr_shutdown(grpc_resolver *r); +static void sockaddr_channel_saw_error(grpc_resolver *r, + struct sockaddr *failing_address, + int failing_address_len); +static void sockaddr_next(grpc_resolver *r, grpc_client_config **target_config, + grpc_iomgr_closure *on_complete); + +static const grpc_resolver_vtable sockaddr_resolver_vtable = { + sockaddr_destroy, sockaddr_shutdown, sockaddr_channel_saw_error, + sockaddr_next}; + +static void sockaddr_shutdown(grpc_resolver *resolver) { + sockaddr_resolver *r = (sockaddr_resolver *)resolver; + gpr_mu_lock(&r->mu); + if (r->next_completion != NULL) { + *r->target_config = NULL; + /* TODO(ctiller): add delayed callback */ + grpc_iomgr_add_callback(r->next_completion); + r->next_completion = NULL; + } + gpr_mu_unlock(&r->mu); +} + +static void sockaddr_channel_saw_error(grpc_resolver *resolver, + struct sockaddr *sa, int len) {} + +static void sockaddr_next(grpc_resolver *resolver, + grpc_client_config **target_config, + grpc_iomgr_closure *on_complete) { + sockaddr_resolver *r = (sockaddr_resolver *)resolver; + gpr_mu_lock(&r->mu); + GPR_ASSERT(!r->next_completion); + r->next_completion = on_complete; + r->target_config = target_config; + sockaddr_maybe_finish_next_locked(r); + gpr_mu_unlock(&r->mu); +} + +static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) { + grpc_client_config *cfg; + grpc_lb_policy *lb_policy; + grpc_subchannel *subchannel; + grpc_subchannel_args args; + + if (r->next_completion != NULL && !r->published) { + cfg = grpc_client_config_create(); + memset(&args, 0, sizeof(args)); + args.addr = (struct sockaddr *)&r->addr; + args.addr_len = r->addr_len; + subchannel = + grpc_subchannel_factory_create_subchannel(r->subchannel_factory, &args); + lb_policy = r->lb_policy_factory(&subchannel, 1); + grpc_client_config_set_lb_policy(cfg, lb_policy); + GRPC_LB_POLICY_UNREF(lb_policy, "unix"); + r->published = 1; + *r->target_config = cfg; + grpc_iomgr_add_callback(r->next_completion); + r->next_completion = NULL; + } +} + +static void sockaddr_destroy(grpc_resolver *gr) { + sockaddr_resolver *r = (sockaddr_resolver *)gr; + gpr_mu_destroy(&r->mu); + grpc_subchannel_factory_unref(r->subchannel_factory); + gpr_free(r); +} + +#ifdef GPR_POSIX_SOCKET +static int parse_unix(grpc_uri *uri, struct sockaddr_storage *addr, int *len) { + struct sockaddr_un *un = (struct sockaddr_un *)addr; + + un->sun_family = AF_UNIX; + strcpy(un->sun_path, uri->path); + *len = strlen(un->sun_path) + sizeof(un->sun_family) + 1; + + return 1; +} +#endif + +static int parse_ipv4(grpc_uri *uri, struct sockaddr_storage *addr, int *len) { + const char *host_port = uri->path; + char *host; + char *port; + int port_num; + int result = 0; + struct sockaddr_in *in = (struct sockaddr_in *)addr; + + if (*host_port == '/') ++host_port; + if (!gpr_split_host_port(host_port, &host, &port)) { + return 0; + } + + memset(in, 0, sizeof(*in)); + *len = sizeof(*in); + in->sin_family = AF_INET; + if (inet_pton(AF_INET, host, &in->sin_addr) == 0) { + gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host); + goto done; + } + + if (port != NULL) { + if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 || + port_num > 65535) { + gpr_log(GPR_ERROR, "invalid ipv4 port: '%s'", port); + goto done; + } + in->sin_port = htons(port_num); + } else { + gpr_log(GPR_ERROR, "no port given for ipv4 scheme"); + goto done; + } + + result = 1; +done: + gpr_free(host); + gpr_free(port); + return result; +} + +static int parse_ipv6(grpc_uri *uri, struct sockaddr_storage *addr, int *len) { + const char *host_port = uri->path; + char *host; + char *port; + int port_num; + int result = 0; + struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)addr; + + if (*host_port == '/') ++host_port; + if (!gpr_split_host_port(host_port, &host, &port)) { + return 0; + } + + memset(in6, 0, sizeof(*in6)); + *len = sizeof(*in6); + in6->sin6_family = AF_INET6; + if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) { + gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host); + goto done; + } + + if (port != NULL) { + if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 || + port_num > 65535) { + gpr_log(GPR_ERROR, "invalid ipv6 port: '%s'", port); + goto done; + } + in6->sin6_port = htons(port_num); + } else { + gpr_log(GPR_ERROR, "no port given for ipv6 scheme"); + goto done; + } + + result = 1; +done: + gpr_free(host); + gpr_free(port); + return result; +} + +static grpc_resolver *sockaddr_create( + grpc_uri *uri, + grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, + size_t num_subchannels), + grpc_subchannel_factory *subchannel_factory, + int parse(grpc_uri *uri, struct sockaddr_storage *dst, int *len)) { + sockaddr_resolver *r; + + if (0 != strcmp(uri->authority, "")) { + gpr_log(GPR_ERROR, "authority based uri's not supported"); + return NULL; + } + + r = gpr_malloc(sizeof(sockaddr_resolver)); + memset(r, 0, sizeof(*r)); + if (!parse(uri, &r->addr, &r->addr_len)) { + gpr_free(r); + return NULL; + } + + gpr_ref_init(&r->refs, 1); + gpr_mu_init(&r->mu); + grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); + r->subchannel_factory = subchannel_factory; + r->lb_policy_factory = lb_policy_factory; + + grpc_subchannel_factory_ref(subchannel_factory); + return &r->base; +} + +/* + * FACTORY + */ + +static void sockaddr_factory_ref(grpc_resolver_factory *factory) {} + +static void sockaddr_factory_unref(grpc_resolver_factory *factory) {} + +#define DECL_FACTORY(name) \ + static grpc_resolver *name##_factory_create_resolver( \ + grpc_resolver_factory *factory, grpc_uri *uri, \ + grpc_subchannel_factory *subchannel_factory) { \ + return sockaddr_create(uri, grpc_create_pick_first_lb_policy, \ + subchannel_factory, parse_##name); \ + } \ + static const grpc_resolver_factory_vtable name##_factory_vtable = { \ + sockaddr_factory_ref, sockaddr_factory_unref, \ + name##_factory_create_resolver}; \ + static grpc_resolver_factory name##_resolver_factory = { \ + &name##_factory_vtable}; \ + grpc_resolver_factory *grpc_##name##_resolver_factory_create() { \ + return &name##_resolver_factory; \ + } + +#ifdef GPR_POSIX_SOCKET +DECL_FACTORY(unix) +#endif +DECL_FACTORY(ipv4) +DECL_FACTORY(ipv6) diff --git a/src/core/client_config/resolvers/unix_resolver_posix.h b/src/core/client_config/resolvers/sockaddr_resolver.h index 57ace59e21..1b7a18f9c2 100644 --- a/src/core/client_config/resolvers/unix_resolver_posix.h +++ b/src/core/client_config/resolvers/sockaddr_resolver.h @@ -38,7 +38,13 @@ #include "src/core/client_config/resolver_factory.h" +grpc_resolver_factory *grpc_ipv4_resolver_factory_create(void); + +grpc_resolver_factory *grpc_ipv6_resolver_factory_create(void); + +#ifdef GPR_POSIX_SOCKET /** Create a unix resolver factory */ grpc_resolver_factory *grpc_unix_resolver_factory_create(void); +#endif #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_UNIX_RESOLVER_H */ diff --git a/src/core/client_config/resolvers/unix_resolver_posix.c b/src/core/client_config/resolvers/unix_resolver_posix.c deleted file mode 100644 index be515d2689..0000000000 --- a/src/core/client_config/resolvers/unix_resolver_posix.c +++ /dev/null @@ -1,195 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <grpc/support/port_platform.h> -#ifdef GPR_POSIX_SOCKET - -#include "src/core/client_config/resolvers/unix_resolver_posix.h" - -#include <string.h> -#include <sys/un.h> - -#include <grpc/support/alloc.h> -#include <grpc/support/string_util.h> - -#include "src/core/client_config/lb_policies/pick_first.h" -#include "src/core/iomgr/resolve_address.h" -#include "src/core/support/string.h" - -typedef struct { - /** base class: must be first */ - grpc_resolver base; - /** refcount */ - gpr_refcount refs; - /** subchannel factory */ - grpc_subchannel_factory *subchannel_factory; - /** load balancing policy factory */ - grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, - size_t num_subchannels); - - /** the address that we've 'resolved' */ - struct sockaddr_un addr; - int addr_len; - - /** mutex guarding the rest of the state */ - gpr_mu mu; - /** have we published? */ - int published; - /** pending next completion, or NULL */ - grpc_iomgr_closure *next_completion; - /** target config address for next completion */ - grpc_client_config **target_config; -} unix_resolver; - -static void unix_destroy(grpc_resolver *r); - -static void unix_maybe_finish_next_locked(unix_resolver *r); - -static void unix_shutdown(grpc_resolver *r); -static void unix_channel_saw_error(grpc_resolver *r, - struct sockaddr *failing_address, - int failing_address_len); -static void unix_next(grpc_resolver *r, grpc_client_config **target_config, - grpc_iomgr_closure *on_complete); - -static const grpc_resolver_vtable unix_resolver_vtable = { - unix_destroy, unix_shutdown, unix_channel_saw_error, unix_next}; - -static void unix_shutdown(grpc_resolver *resolver) { - unix_resolver *r = (unix_resolver *)resolver; - gpr_mu_lock(&r->mu); - if (r->next_completion != NULL) { - *r->target_config = NULL; - /* TODO(ctiller): add delayed callback */ - grpc_iomgr_add_callback(r->next_completion); - r->next_completion = NULL; - } - gpr_mu_unlock(&r->mu); -} - -static void unix_channel_saw_error(grpc_resolver *resolver, struct sockaddr *sa, - int len) {} - -static void unix_next(grpc_resolver *resolver, - grpc_client_config **target_config, - grpc_iomgr_closure *on_complete) { - unix_resolver *r = (unix_resolver *)resolver; - gpr_mu_lock(&r->mu); - GPR_ASSERT(!r->next_completion); - r->next_completion = on_complete; - r->target_config = target_config; - unix_maybe_finish_next_locked(r); - gpr_mu_unlock(&r->mu); -} - -static void unix_maybe_finish_next_locked(unix_resolver *r) { - grpc_client_config *cfg; - grpc_lb_policy *lb_policy; - grpc_subchannel *subchannel; - grpc_subchannel_args args; - - if (r->next_completion != NULL && !r->published) { - cfg = grpc_client_config_create(); - memset(&args, 0, sizeof(args)); - args.addr = (struct sockaddr *)&r->addr; - args.addr_len = r->addr_len; - subchannel = - grpc_subchannel_factory_create_subchannel(r->subchannel_factory, &args); - lb_policy = r->lb_policy_factory(&subchannel, 1); - grpc_client_config_set_lb_policy(cfg, lb_policy); - GRPC_LB_POLICY_UNREF(lb_policy, "unix"); - r->published = 1; - *r->target_config = cfg; - grpc_iomgr_add_callback(r->next_completion); - r->next_completion = NULL; - } -} - -static void unix_destroy(grpc_resolver *gr) { - unix_resolver *r = (unix_resolver *)gr; - gpr_mu_destroy(&r->mu); - grpc_subchannel_factory_unref(r->subchannel_factory); - gpr_free(r); -} - -static grpc_resolver *unix_create( - grpc_uri *uri, - grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, - size_t num_subchannels), - grpc_subchannel_factory *subchannel_factory) { - unix_resolver *r; - - if (0 != strcmp(uri->authority, "")) { - gpr_log(GPR_ERROR, "authority based uri's not supported"); - return NULL; - } - - r = gpr_malloc(sizeof(unix_resolver)); - memset(r, 0, sizeof(*r)); - gpr_ref_init(&r->refs, 1); - gpr_mu_init(&r->mu); - grpc_resolver_init(&r->base, &unix_resolver_vtable); - r->subchannel_factory = subchannel_factory; - r->lb_policy_factory = lb_policy_factory; - - r->addr.sun_family = AF_UNIX; - strcpy(r->addr.sun_path, uri->path); - r->addr_len = strlen(r->addr.sun_path) + sizeof(r->addr.sun_family) + 1; - - grpc_subchannel_factory_ref(subchannel_factory); - return &r->base; -} - -/* - * FACTORY - */ - -static void unix_factory_ref(grpc_resolver_factory *factory) {} - -static void unix_factory_unref(grpc_resolver_factory *factory) {} - -static grpc_resolver *unix_factory_create_resolver( - grpc_resolver_factory *factory, grpc_uri *uri, - grpc_subchannel_factory *subchannel_factory) { - return unix_create(uri, grpc_create_pick_first_lb_policy, subchannel_factory); -} - -static const grpc_resolver_factory_vtable unix_factory_vtable = { - unix_factory_ref, unix_factory_unref, unix_factory_create_resolver}; -static grpc_resolver_factory unix_resolver_factory = {&unix_factory_vtable}; - -grpc_resolver_factory *grpc_unix_resolver_factory_create() { - return &unix_resolver_factory; -} - -#endif diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 487f5afb35..358b907e59 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -640,6 +640,12 @@ void grpc_subchannel_call_unref( } } +char *grpc_subchannel_call_get_peer(grpc_subchannel_call *call) { + grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); + grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0); + return top_elem->filter->get_peer(top_elem); +} + void grpc_subchannel_call_process_op(grpc_subchannel_call *call, grpc_transport_stream_op *op) { grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index a23a623277..d1cd33b2af 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -100,6 +100,9 @@ void grpc_subchannel_del_interested_party(grpc_subchannel *channel, void grpc_subchannel_call_process_op(grpc_subchannel_call *subchannel_call, grpc_transport_stream_op *op); +/** continue querying for peer */ +char *grpc_subchannel_call_get_peer(grpc_subchannel_call *subchannel_call); + struct grpc_subchannel_args { /** Channel filters for this channel - wrapped factories will likely want to mutate this */ diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c index 96487958a7..f2b44aad03 100644 --- a/src/core/iomgr/endpoint.c +++ b/src/core/iomgr/endpoint.c @@ -53,3 +53,7 @@ void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { void grpc_endpoint_shutdown(grpc_endpoint *ep) { ep->vtable->shutdown(ep); } void grpc_endpoint_destroy(grpc_endpoint *ep) { ep->vtable->destroy(ep); } + +char *grpc_endpoint_get_peer(grpc_endpoint *ep) { + return ep->vtable->get_peer(ep); +} diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h index 881e851800..ee0becf3d6 100644 --- a/src/core/iomgr/endpoint.h +++ b/src/core/iomgr/endpoint.h @@ -72,12 +72,15 @@ struct grpc_endpoint_vtable { void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset); void (*shutdown)(grpc_endpoint *ep); void (*destroy)(grpc_endpoint *ep); + char *(*get_peer)(grpc_endpoint *ep); }; /* When data is available on the connection, calls the callback with slices. */ void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, void *user_data); +char *grpc_endpoint_get_peer(grpc_endpoint *ep); + /* Write slices out to the socket. If the connection is ready for more data after the end of the call, it diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c index fa2d2555d6..deae9c6875 100644 --- a/src/core/iomgr/endpoint_pair_posix.c +++ b/src/core/iomgr/endpoint_pair_posix.c @@ -66,12 +66,12 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, create_sockets(sv); gpr_asprintf(&final_name, "%s:client", name); - p.client = - grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size); + p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size, + "socketpair-server"); gpr_free(final_name); gpr_asprintf(&final_name, "%s:server", name); - p.server = - grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size); + p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size, + "socketpair-client"); gpr_free(final_name); return p; } diff --git a/src/core/iomgr/endpoint_pair_windows.c b/src/core/iomgr/endpoint_pair_windows.c index c6790b2937..7d81470a78 100644 --- a/src/core/iomgr/endpoint_pair_windows.c +++ b/src/core/iomgr/endpoint_pair_windows.c @@ -81,8 +81,8 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, size_t read SOCKET sv[2]; grpc_endpoint_pair p; create_sockets(sv); - p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client")); - p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server")); + p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"), "endpoint:server"); + p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"), "endpoint:client"); return p; } diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index a18c176b30..aa4bc6e20d 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -88,6 +88,7 @@ void grpc_kick_poller(void) { void grpc_iomgr_init(void) { gpr_thd_id id; + g_shutdown = 0; gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); grpc_alarm_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); diff --git a/src/core/iomgr/sockaddr_utils.c b/src/core/iomgr/sockaddr_utils.c index e91b94f8c8..71ac12e87b 100644 --- a/src/core/iomgr/sockaddr_utils.c +++ b/src/core/iomgr/sockaddr_utils.c @@ -36,12 +36,18 @@ #include <errno.h> #include <string.h> -#include "src/core/support/string.h" +#ifdef GPR_POSIX_SOCKET +#include <sys/un.h> +#endif + +#include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/port_platform.h> #include <grpc/support/string_util.h> +#include "src/core/support/string.h" + static const gpr_uint8 kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}; @@ -161,6 +167,31 @@ int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr, return ret; } +char *grpc_sockaddr_to_uri(const struct sockaddr *addr) { + char *temp; + char *result; + + switch (addr->sa_family) { + case AF_INET: + grpc_sockaddr_to_string(&temp, addr, 0); + gpr_asprintf(&result, "ipv4:%s", temp); + gpr_free(temp); + return result; + case AF_INET6: + grpc_sockaddr_to_string(&temp, addr, 0); + gpr_asprintf(&result, "ipv6:%s", temp); + gpr_free(temp); + return result; +#ifdef GPR_POSIX_SOCKET + case AF_UNIX: + gpr_asprintf(&result, "unix:%s", ((struct sockaddr_un *)addr)->sun_path); + return result; +#endif + } + + return NULL; +} + int grpc_sockaddr_get_port(const struct sockaddr *addr) { switch (addr->sa_family) { case AF_INET: diff --git a/src/core/iomgr/sockaddr_utils.h b/src/core/iomgr/sockaddr_utils.h index bdfb83479b..99f1ed54da 100644 --- a/src/core/iomgr/sockaddr_utils.h +++ b/src/core/iomgr/sockaddr_utils.h @@ -84,4 +84,6 @@ int grpc_sockaddr_set_port(const struct sockaddr *addr, int port); int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr, int normalize); +char *grpc_sockaddr_to_uri(const struct sockaddr *addr); + #endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_UTILS_H */ diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 41d8b169e0..392eda999e 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -64,6 +64,7 @@ typedef struct { int refs; grpc_iomgr_closure write_closure; grpc_pollset_set *interested_parties; + char *addr_str; } async_connect; static int prepare_socket(const struct sockaddr *addr, int fd) { @@ -99,6 +100,7 @@ static void on_alarm(void *acp, int success) { gpr_mu_unlock(&ac->mu); if (done) { gpr_mu_destroy(&ac->mu); + gpr_free(ac->addr_str); gpr_free(ac); } } @@ -158,7 +160,8 @@ static void on_writable(void *acp, int success) { } } else { grpc_pollset_set_del_fd(ac->interested_parties, ac->fd); - ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, + ac->addr_str); goto finish; } } else { @@ -179,6 +182,7 @@ finish: gpr_mu_unlock(&ac->mu); if (done) { gpr_mu_destroy(&ac->mu); + gpr_free(ac->addr_str); gpr_free(ac); } cb(cb_arg, ep); @@ -223,13 +227,13 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), err = connect(fd, addr, addr_len); } while (err < 0 && errno == EINTR); - grpc_sockaddr_to_string(&addr_str, addr, 1); + addr_str = grpc_sockaddr_to_uri(addr); gpr_asprintf(&name, "tcp-client:%s", addr_str); fdobj = grpc_fd_create(fd, name); if (err >= 0) { - cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); + cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str)); goto done; } @@ -247,6 +251,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), ac->cb_arg = arg; ac->fd = fdobj; ac->interested_parties = interested_parties; + ac->addr_str = addr_str; + addr_str = NULL; gpr_mu_init(&ac->mu); ac->refs = 2; ac->write_closure.cb = on_writable; diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c index 39fd43130b..79a58fe2af 100644 --- a/src/core/iomgr/tcp_client_windows.c +++ b/src/core/iomgr/tcp_client_windows.c @@ -58,6 +58,7 @@ typedef struct { grpc_winsocket *socket; gpr_timespec deadline; grpc_alarm alarm; + char *addr_name; int refs; int aborted; } async_connect; @@ -67,6 +68,7 @@ static void async_connect_cleanup(async_connect *ac) { gpr_mu_unlock(&ac->mu); if (done) { gpr_mu_destroy(&ac->mu); + gpr_free(ac->addr_name); gpr_free(ac); } } @@ -107,7 +109,7 @@ static void on_connect(void *acp, int from_iocp) { gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message); gpr_free(utf8_message); } else if (!aborted) { - ep = grpc_tcp_create(ac->socket); + ep = grpc_tcp_create(ac->socket, ac->addr_name); } } else { gpr_log(GPR_ERROR, "on_connect is shutting down"); @@ -213,6 +215,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), ac->socket = socket; gpr_mu_init(&ac->mu); ac->refs = 2; + ac->addr_name = grpc_sockaddr_to_uri(addr); ac->aborted = 0; grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index b6d6efc9fb..63a8a2720e 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -44,15 +44,17 @@ #include <sys/socket.h> #include <unistd.h> -#include "src/core/support/string.h" -#include "src/core/debug/trace.h" -#include "src/core/profiling/timers.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/slice.h> +#include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "src/core/support/string.h" +#include "src/core/debug/trace.h" +#include "src/core/profiling/timers.h" + #ifdef GPR_HAVE_MSG_NOSIGNAL #define SENDMSG_FLAGS MSG_NOSIGNAL #else @@ -282,6 +284,8 @@ typedef struct { grpc_iomgr_closure write_closure; grpc_iomgr_closure handle_read_closure; + + char *peer_string; } grpc_tcp; static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success); @@ -296,6 +300,7 @@ static void grpc_tcp_unref(grpc_tcp *tcp) { int refcount_zero = gpr_unref(&tcp->refcount); if (refcount_zero) { grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan"); + gpr_free(tcp->peer_string); gpr_free(tcp); } } @@ -314,7 +319,7 @@ static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices, gpr_log(GPR_DEBUG, "read: status=%d", status); for (i = 0; i < nslices; i++) { char *dump = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); - gpr_log(GPR_DEBUG, "READ: %s", dump); + gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump); gpr_free(dump); } } @@ -443,7 +448,7 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); } else { tcp->handle_read_closure.cb_arg = tcp; - grpc_iomgr_add_callback(&tcp->handle_read_closure); + grpc_iomgr_add_delayed_callback(&tcp->handle_read_closure, 1); } } @@ -567,13 +572,20 @@ static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { grpc_pollset_add_fd(pollset, tcp->em_fd); } +static char *grpc_tcp_get_peer(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + return gpr_strdup(tcp->peer_string); +} + static const grpc_endpoint_vtable vtable = { - grpc_tcp_notify_on_read, grpc_tcp_write, grpc_tcp_add_to_pollset, - grpc_tcp_shutdown, grpc_tcp_destroy}; + grpc_tcp_notify_on_read, grpc_tcp_write, grpc_tcp_add_to_pollset, + grpc_tcp_shutdown, grpc_tcp_destroy, grpc_tcp_get_peer}; -grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) { +grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, + const char *peer_string) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); tcp->base.vtable = &vtable; + tcp->peer_string = gpr_strdup(peer_string); tcp->fd = em_fd->fd; tcp->read_cb = NULL; tcp->write_cb = NULL; diff --git a/src/core/iomgr/tcp_posix.h b/src/core/iomgr/tcp_posix.h index 44279d5a26..d752feaeea 100644 --- a/src/core/iomgr/tcp_posix.h +++ b/src/core/iomgr/tcp_posix.h @@ -53,6 +53,7 @@ extern int grpc_tcp_trace; /* Create a tcp endpoint given a file desciptor and a read slice size. Takes ownership of fd. */ -grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size); +grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size, + const char *peer_string); #endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_POSIX_H */ diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 5854031c9b..8538600112 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -332,7 +332,7 @@ static void on_read(void *arg, int success) { grpc_set_socket_no_sigpipe_if_possible(fd); - grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); + addr_str = grpc_sockaddr_to_uri((struct sockaddr *)&addr); gpr_asprintf(&name, "tcp-server-connection:%s", addr_str); fdobj = grpc_fd_create(fd, name); @@ -342,8 +342,9 @@ static void on_read(void *arg, int success) { for (i = 0; i < sp->server->pollset_count; i++) { grpc_pollset_add_fd(sp->server->pollsets[i], fdobj); } - sp->server->cb(sp->server->cb_arg, - grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); + sp->server->cb( + sp->server->cb_arg, + grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str)); gpr_free(name); gpr_free(addr_str); diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index 187009b2c8..8f634fcd7a 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -243,6 +243,10 @@ static void on_accept(void *arg, int from_iocp) { SOCKET sock = sp->new_socket; grpc_winsocket_callback_info *info = &sp->socket->read_info; grpc_endpoint *ep = NULL; + struct sockaddr_storage peer_name; + char *peer_name_string; + char *fd_name; + int peer_name_len = sizeof(peer_name); DWORD transfered_bytes; DWORD flags; BOOL wsa_success; @@ -277,8 +281,12 @@ static void on_accept(void *arg, int from_iocp) { } } else { if (!sp->shutting_down) { - /* TODO(ctiller): add sockaddr address to label */ - ep = grpc_tcp_create(grpc_winsocket_create(sock, "server")); + getpeername(sock, (struct sockaddr*)&peer_name, &peer_name_len); + peer_name_string = grpc_sockaddr_to_uri((struct sockaddr*)&peer_name); + gpr_asprintf(&fd_name, "tcp_server:%s", peer_name_string); + ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name), peer_name_string); + gpr_free(fd_name); + gpr_free(peer_name_string); } } diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index 1bf81a73e0..d68e6aee79 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -96,6 +96,8 @@ typedef struct grpc_tcp { to protect ourselves when requesting a shutdown. */ gpr_mu mu; int shutting_down; + + char *peer_string; } grpc_tcp; static void tcp_ref(grpc_tcp *tcp) { @@ -107,6 +109,7 @@ static void tcp_unref(grpc_tcp *tcp) { gpr_slice_buffer_destroy(&tcp->write_slices); grpc_winsocket_orphan(tcp->socket); gpr_mu_destroy(&tcp->mu); + gpr_free(tcp->peer_string); gpr_free(tcp); } } @@ -393,11 +396,16 @@ static void win_destroy(grpc_endpoint *ep) { tcp_unref(tcp); } +static char *win_get_peer(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + return gpr_strdup(tcp->peer_string); +} + static grpc_endpoint_vtable vtable = { - win_notify_on_read, win_write, win_add_to_pollset, win_shutdown, win_destroy + win_notify_on_read, win_write, win_add_to_pollset, win_shutdown, win_destroy, win_get_peer }; -grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket) { +grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { grpc_tcp *tcp = (grpc_tcp *) gpr_malloc(sizeof(grpc_tcp)); memset(tcp, 0, sizeof(grpc_tcp)); tcp->base.vtable = &vtable; @@ -405,6 +413,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket) { gpr_mu_init(&tcp->mu); gpr_slice_buffer_init(&tcp->write_slices); gpr_ref_init(&tcp->refcount, 1); + tcp->peer_string = gpr_strdup(peer_string); return &tcp->base; } diff --git a/src/core/iomgr/tcp_windows.h b/src/core/iomgr/tcp_windows.h index 4cbc12c53a..7e301db250 100644 --- a/src/core/iomgr/tcp_windows.h +++ b/src/core/iomgr/tcp_windows.h @@ -50,7 +50,7 @@ /* Create a tcp endpoint given a winsock handle. * Takes ownership of the handle. */ -grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket); +grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string); int grpc_tcp_prepare_socket(SOCKET sock); diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c index 9e49a807f1..9a69f53a5a 100644 --- a/src/core/security/client_auth_filter.c +++ b/src/core/security/client_auth_filter.c @@ -344,6 +344,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_client_auth_filter = { - auth_start_transport_op, grpc_channel_next_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "client-auth"}; + auth_start_transport_op, grpc_channel_next_op, + sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), + init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, "client-auth"}; diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index fb59fa4b0e..38612cf308 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -315,7 +315,7 @@ grpc_server_credentials *grpc_ssl_server_credentials_create( /* -- Jwt credentials -- */ -static void jwt_reset_cache(grpc_jwt_credentials *c) { +static void jwt_reset_cache(grpc_service_account_jwt_access_credentials *c) { if (c->cached.jwt_md != NULL) { grpc_credentials_md_store_unref(c->cached.jwt_md); c->cached.jwt_md = NULL; @@ -328,7 +328,8 @@ static void jwt_reset_cache(grpc_jwt_credentials *c) { } static void jwt_destroy(grpc_credentials *creds) { - grpc_jwt_credentials *c = (grpc_jwt_credentials *)creds; + grpc_service_account_jwt_access_credentials *c = + (grpc_service_account_jwt_access_credentials *)creds; grpc_auth_json_key_destruct(&c->key); jwt_reset_cache(c); gpr_mu_destroy(&c->cache_mu); @@ -346,7 +347,8 @@ static void jwt_get_request_metadata(grpc_credentials *creds, const char *service_url, grpc_credentials_metadata_cb cb, void *user_data) { - grpc_jwt_credentials *c = (grpc_jwt_credentials *)creds; + grpc_service_account_jwt_access_credentials *c = + (grpc_service_account_jwt_access_credentials *)creds; gpr_timespec refresh_threshold = gpr_time_from_seconds( GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS, GPR_TIMESPAN); @@ -399,15 +401,16 @@ static grpc_credentials_vtable jwt_vtable = { jwt_destroy, jwt_has_request_metadata, jwt_has_request_metadata_only, jwt_get_request_metadata, NULL}; -grpc_credentials *grpc_jwt_credentials_create_from_auth_json_key( +grpc_credentials * +grpc_service_account_jwt_access_credentials_create_from_auth_json_key( grpc_auth_json_key key, gpr_timespec token_lifetime) { - grpc_jwt_credentials *c; + grpc_service_account_jwt_access_credentials *c; if (!grpc_auth_json_key_is_valid(&key)) { gpr_log(GPR_ERROR, "Invalid input for jwt credentials creation"); return NULL; } - c = gpr_malloc(sizeof(grpc_jwt_credentials)); - memset(c, 0, sizeof(grpc_jwt_credentials)); + c = gpr_malloc(sizeof(grpc_service_account_jwt_access_credentials)); + memset(c, 0, sizeof(grpc_service_account_jwt_access_credentials)); c->base.type = GRPC_CREDENTIALS_TYPE_JWT; gpr_ref_init(&c->base.refcount, 1); c->base.vtable = &jwt_vtable; @@ -418,9 +421,9 @@ grpc_credentials *grpc_jwt_credentials_create_from_auth_json_key( return &c->base; } -grpc_credentials *grpc_jwt_credentials_create(const char *json_key, - gpr_timespec token_lifetime) { - return grpc_jwt_credentials_create_from_auth_json_key( +grpc_credentials *grpc_service_account_jwt_access_credentials_create( + const char *json_key, gpr_timespec token_lifetime) { + return grpc_service_account_jwt_access_credentials_create_from_auth_json_key( grpc_auth_json_key_create_from_string(json_key), token_lifetime); } diff --git a/src/core/security/credentials.h b/src/core/security/credentials.h index d988901cf7..7f4141967d 100644 --- a/src/core/security/credentials.h +++ b/src/core/security/credentials.h @@ -188,7 +188,8 @@ grpc_credentials *grpc_fake_oauth2_credentials_create( /* Private constructor for jwt credentials from an already parsed json key. Takes ownership of the key. */ -grpc_credentials *grpc_jwt_credentials_create_from_auth_json_key( +grpc_credentials * +grpc_service_account_jwt_access_credentials_create_from_auth_json_key( grpc_auth_json_key key, gpr_timespec token_lifetime); /* Private constructor for refresh token credentials from an already parsed @@ -240,7 +241,7 @@ typedef struct { grpc_auth_json_key key; gpr_timespec jwt_lifetime; -} grpc_jwt_credentials; +} grpc_service_account_jwt_access_credentials; /* -- Oauth2TokenFetcher credentials -- diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index 833484310f..de1929fe76 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -140,8 +140,9 @@ static grpc_credentials *create_default_creds_from_path(char *creds_path) { /* First, try an auth json key. */ key = grpc_auth_json_key_create_from_json(json); if (grpc_auth_json_key_is_valid(&key)) { - result = grpc_jwt_credentials_create_from_auth_json_key( - key, grpc_max_auth_token_lifetime); + result = + grpc_service_account_jwt_access_credentials_create_from_auth_json_key( + key, grpc_max_auth_token_lifetime); goto end; } diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c index 3548198046..e189380ec5 100644 --- a/src/core/security/secure_endpoint.c +++ b/src/core/security/secure_endpoint.c @@ -331,9 +331,14 @@ static void endpoint_add_to_pollset(grpc_endpoint *secure_ep, grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset); } +static char *endpoint_get_peer(grpc_endpoint *secure_ep) { + secure_endpoint *ep = (secure_endpoint *)secure_ep; + return grpc_endpoint_get_peer(ep->wrapped_ep); +} + static const grpc_endpoint_vtable vtable = { endpoint_notify_on_read, endpoint_write, endpoint_add_to_pollset, - endpoint_shutdown, endpoint_unref}; + endpoint_shutdown, endpoint_unref, endpoint_get_peer}; grpc_endpoint *grpc_secure_endpoint_create( struct tsi_frame_protector *protector, grpc_endpoint *transport, diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c index 10eef6d237..69789c2f0d 100644 --- a/src/core/security/server_auth_filter.c +++ b/src/core/security/server_auth_filter.c @@ -120,6 +120,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_server_auth_filter = { - auth_start_transport_op, grpc_channel_next_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "server-auth"}; + auth_start_transport_op, grpc_channel_next_op, + sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), + init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, "server-auth"}; diff --git a/src/core/support/host_port.c b/src/core/support/host_port.c index 0906ebc2a3..a28f04df9c 100644 --- a/src/core/support/host_port.c +++ b/src/core/support/host_port.c @@ -50,7 +50,7 @@ int gpr_join_host_port(char **out, const char *host, int port) { } } -void gpr_split_host_port(const char *name, char **host, char **port) { +int gpr_split_host_port(const char *name, char **host, char **port) { const char *host_start; size_t host_len; const char *port_start; @@ -63,7 +63,7 @@ void gpr_split_host_port(const char *name, char **host, char **port) { const char *rbracket = strchr(name, ']'); if (rbracket == NULL) { /* Unmatched [ */ - return; + return 0; } if (rbracket[1] == '\0') { /* ]<end> */ @@ -73,14 +73,14 @@ void gpr_split_host_port(const char *name, char **host, char **port) { port_start = rbracket + 2; } else { /* ]<invalid> */ - return; + return 0; } host_start = name + 1; host_len = (size_t)(rbracket - host_start); if (memchr(host_start, ':', host_len) == NULL) { /* Require all bracketed hosts to contain a colon, because a hostname or IPv4 address should never use brackets. */ - return; + return 0; } } else { const char *colon = strchr(name, ':'); @@ -105,4 +105,6 @@ void gpr_split_host_port(const char *name, char **host, char **port) { if (port_start != NULL) { *port = gpr_strdup(port_start); } + + return 1; } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index e08273e451..a1da822113 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -1253,6 +1253,11 @@ static void execute_op(grpc_call *call, grpc_transport_stream_op *op) { elem->filter->start_transport_stream_op(elem, op); } +char *grpc_call_get_peer(grpc_call *call) { + grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); + return elem->filter->get_peer(elem); +} + grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { return CALL_FROM_TOP_ELEM(elem); } @@ -1368,7 +1373,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { l->md = 0; } } - if (gpr_time_cmp(md->deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) { + if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != + 0) { set_deadline_alarm(call, md->deadline); } if (!is_trailing) { diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index a6438ff512..4052c65cc6 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -36,12 +36,14 @@ #include <stdlib.h> #include <string.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> + #include "src/core/iomgr/iomgr.h" #include "src/core/support/string.h" #include "src/core/surface/call.h" #include "src/core/surface/init.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> /** Cache grpc-status: X mdelems for X = 0..NUM_CACHED_STATUS_ELEMS. * Avoids needing to take a metadata context lock for sending status @@ -73,6 +75,7 @@ struct grpc_channel { gpr_mu registered_call_mu; registered_call *registered_calls; grpc_iomgr_closure destroy_closure; + char *target; }; #define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1)) @@ -85,13 +88,14 @@ struct grpc_channel { #define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024) grpc_channel *grpc_channel_create_from_filters( - const grpc_channel_filter **filters, size_t num_filters, + const char *target, const grpc_channel_filter **filters, size_t num_filters, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) { size_t i; size_t size = sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters); grpc_channel *channel = gpr_malloc(size); memset(channel, 0, sizeof(*channel)); + channel->target = gpr_strdup(target); GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); channel->is_client = is_client; /* decremented by grpc_channel_destroy */ @@ -137,6 +141,10 @@ grpc_channel *grpc_channel_create_from_filters( return channel; } +char *grpc_channel_get_target(grpc_channel *channel) { + return gpr_strdup(channel->target); +} + static grpc_call *grpc_channel_create_call_internal( grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem, grpc_mdelem *authority_mdelem, gpr_timespec deadline) { @@ -222,6 +230,7 @@ static void destroy_channel(void *p, int ok) { } grpc_mdctx_unref(channel->metadata_context); gpr_mu_destroy(&channel->registered_call_mu); + gpr_free(channel->target); gpr_free(channel); } diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index 4e03eb4411..9e0646efaa 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -38,7 +38,7 @@ #include "src/core/client_config/subchannel_factory.h" grpc_channel *grpc_channel_create_from_filters( - const grpc_channel_filter **filters, size_t count, + const char *target, const grpc_channel_filter **filters, size_t count, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client); /** Get a (borrowed) pointer to this channels underlying channel stack */ diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 91c7b35550..778f7108fd 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -179,7 +179,8 @@ grpc_channel *grpc_channel_create(const char *target, return NULL; } - channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1); + channel = + grpc_channel_create_from_filters(target, filters, n, args, mdctx, 1); grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver); GRPC_RESOLVER_UNREF(resolver, "create"); diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 04e27d30ac..5cba479317 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -39,6 +39,7 @@ #include "src/core/channel/channel_stack.h" #include "src/core/client_config/resolver_registry.h" #include "src/core/client_config/resolvers/dns_resolver.h" +#include "src/core/client_config/resolvers/sockaddr_resolver.h" #include "src/core/debug/trace.h" #include "src/core/iomgr/iomgr.h" #include "src/core/profiling/timers.h" @@ -47,10 +48,6 @@ #include "src/core/surface/surface_trace.h" #include "src/core/transport/chttp2_transport.h" -#ifdef GPR_POSIX_SOCKET -#include "src/core/client_config/resolvers/unix_resolver_posix.h" -#endif - static gpr_once g_basic_init = GPR_ONCE_INIT; static gpr_mu g_init_mu; static int g_initializations; @@ -68,6 +65,8 @@ void grpc_init(void) { gpr_time_init(); grpc_resolver_registry_init("dns:///"); grpc_register_resolver_type("dns", grpc_dns_resolver_factory_create()); + grpc_register_resolver_type("ipv4", grpc_ipv4_resolver_factory_create()); + grpc_register_resolver_type("ipv6", grpc_ipv6_resolver_factory_create()); #ifdef GPR_POSIX_SOCKET grpc_register_resolver_type("unix", grpc_unix_resolver_factory_create()); #endif diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 3f2bb5c8a9..c4215a2cfb 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -47,7 +47,10 @@ typedef struct { grpc_linked_mdelem details; } call_data; -typedef struct { grpc_mdctx *mdctx; } channel_data; +typedef struct { + grpc_mdctx *mdctx; + grpc_channel *master; +} channel_data; static void lame_start_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op) { @@ -82,6 +85,11 @@ static void lame_start_transport_stream_op(grpc_call_element *elem, } } +static char *lame_get_peer(grpc_call_element *elem) { + channel_data *chand = elem->channel_data; + return grpc_channel_get_target(chand->master); +} + static void lame_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) { if (op->on_connectivity_state_change) { @@ -112,6 +120,7 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, GPR_ASSERT(is_first); GPR_ASSERT(is_last); chand->mdctx = mdctx; + chand->master = master; } static void destroy_channel_elem(grpc_channel_element *elem) {} @@ -125,11 +134,12 @@ static const grpc_channel_filter lame_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + lame_get_peer, "lame-client", }; -grpc_channel *grpc_lame_client_channel_create(void) { +grpc_channel *grpc_lame_client_channel_create(const char *target) { static const grpc_channel_filter *filters[] = {&lame_filter}; - return grpc_channel_create_from_filters(filters, 1, NULL, grpc_mdctx_create(), - 1); + return grpc_channel_create_from_filters(target, filters, 1, NULL, + grpc_mdctx_create(), 1); } diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index d87ec97b53..a280311ba0 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -196,13 +196,13 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, if (grpc_find_security_connector_in_args(args) != NULL) { gpr_log(GPR_ERROR, "Cannot set security context in channel args."); - return grpc_lame_client_channel_create(); + return grpc_lame_client_channel_create(target); } if (grpc_credentials_create_security_connector( creds, target, args, NULL, &connector, &new_args_from_connector) != GRPC_SECURITY_OK) { - return grpc_lame_client_channel_create(); + return grpc_lame_client_channel_create(target); } mdctx = grpc_mdctx_create(); @@ -231,7 +231,8 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, return NULL; } - channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1); + channel = + grpc_channel_create_from_filters(target, filters, n, args_copy, mdctx, 1); grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver); GRPC_RESOLVER_UNREF(resolver, "create"); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 439452aea2..7b3e412db0 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -400,6 +400,15 @@ static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, call_data *calld = elem->call_data; int request_id; + if (gpr_atm_acq_load(&server->shutdown_flag)) { + gpr_mu_lock(&calld->mu_state); + calld->state = ZOMBIED; + gpr_mu_unlock(&calld->mu_state); + grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); + grpc_iomgr_add_callback(&calld->kill_zombie_closure); + return; + } + request_id = gpr_stack_lockfree_pop(request_matcher->requests); if (request_id == -1) { gpr_mu_lock(&server->mu_call); @@ -530,6 +539,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { static void server_on_recv(void *ptr, int success) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; + gpr_timespec op_deadline; if (success && !calld->got_initial_metadata) { size_t i; @@ -539,8 +549,9 @@ static void server_on_recv(void *ptr, int success) { grpc_stream_op *op = &ops[i]; if (op->type != GRPC_OP_METADATA) continue; grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem); - if (0 != gpr_time_cmp(op->data.metadata.deadline, - gpr_inf_future(GPR_CLOCK_REALTIME))) { + op_deadline = op->data.metadata.deadline; + if (0 != + gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) { calld->deadline = op->data.metadata.deadline; } calld->got_initial_metadata = 1; @@ -722,6 +733,7 @@ static const grpc_channel_filter server_surface_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, "server", }; @@ -878,8 +890,8 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, grpc_transport_perform_op(transport, &op); } - channel = - grpc_channel_create_from_filters(filters, num_filters, args, mdctx, 0); + channel = grpc_channel_create_from_filters(NULL, filters, num_filters, args, + mdctx, 0); chand = (channel_data *)grpc_channel_stack_element( grpc_channel_get_channel_stack(channel), 0) ->channel_data; diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index 7a4c355f23..40bf2ebd79 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -92,7 +92,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( p->frame_type = *cur; switch (p->frame_type) { case 0: - /* noop */ + p->is_frame_compressed = 0; /* GPR_FALSE */ break; case 1: p->is_frame_compressed = 1; /* GPR_TRUE */ diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index e5e6f445b7..f0eeb6de50 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -60,7 +60,6 @@ typedef enum { GRPC_CHTTP2_LIST_WRITABLE, GRPC_CHTTP2_LIST_WRITING, GRPC_CHTTP2_LIST_WRITTEN, - GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE, GRPC_CHTTP2_LIST_PARSING_SEEN, GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING, GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING, @@ -286,6 +285,7 @@ struct grpc_chttp2_transport { grpc_endpoint *ep; grpc_mdctx *metadata_context; gpr_refcount refs; + char *peer_string; gpr_mu mu; @@ -382,6 +382,8 @@ typedef struct { gpr_uint8 published_cancelled; /** is this stream in the stream map? (boolean) */ gpr_uint8 in_stream_map; + /** is this stream actively being written? */ + gpr_uint8 writing_now; /** stream state already published to the upper layer */ grpc_stream_state published_state; @@ -474,11 +476,17 @@ void grpc_chttp2_publish_reads(grpc_chttp2_transport_global *global, void grpc_chttp2_list_add_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global); +void grpc_chttp2_list_add_first_writable_stream( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global); int grpc_chttp2_list_pop_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing); +void grpc_chttp2_list_remove_writable_stream( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global); void grpc_chttp2_list_add_incoming_window_updated( grpc_chttp2_transport_global *transport_global, @@ -510,18 +518,6 @@ int grpc_chttp2_list_pop_written_stream( grpc_chttp2_stream_global **stream_global, grpc_chttp2_stream_writing **stream_writing); -void grpc_chttp2_list_add_writable_window_update_stream( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global); -int grpc_chttp2_list_pop_writable_window_update_stream( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_transport_writing *transport_writing, - grpc_chttp2_stream_global **stream_global, - grpc_chttp2_stream_writing **stream_writing); -void grpc_chttp2_list_remove_writable_window_update_stream( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global); - void grpc_chttp2_list_add_parsing_seen_stream( grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing); diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 904b9afce7..d84960009b 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -182,8 +182,7 @@ void grpc_chttp2_publish_reads( stream_global->max_recv_bytes -= stream_parsing->incoming_window_delta; stream_parsing->incoming_window_delta = 0; - grpc_chttp2_list_add_writable_window_update_stream(transport_global, - stream_global); + grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } /* update outgoing flow control window */ @@ -607,7 +606,7 @@ static void on_header(void *tp, grpc_mdelem *md) { } grpc_chttp2_incoming_metadata_buffer_set_deadline( &stream_parsing->incoming_metadata, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), *cached_timeout)); + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), *cached_timeout)); GRPC_MDELEM_UNREF(md); } else { grpc_chttp2_incoming_metadata_buffer_add(&stream_parsing->incoming_metadata, diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 590f6abfbc..9e68c1e146 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -108,6 +108,23 @@ static void stream_list_maybe_remove(grpc_chttp2_transport *t, } } +static void stream_list_add_head(grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + grpc_chttp2_stream_list_id id) { + grpc_chttp2_stream *old_head; + GPR_ASSERT(!s->included[id]); + old_head = t->lists[id].head; + s->links[id].next = old_head; + s->links[id].prev = NULL; + if (old_head) { + old_head->links[id].prev = s; + } else { + t->lists[id].tail = s; + } + t->lists[id].head = s; + s->included[id] = 1; +} + static void stream_list_add_tail(grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream_list_id id) { @@ -119,7 +136,6 @@ static void stream_list_add_tail(grpc_chttp2_transport *t, if (old_tail) { old_tail->links[id].next = s; } else { - s->links[id].prev = NULL; t->lists[id].head = s; } t->lists[id].tail = s; @@ -144,6 +160,18 @@ void grpc_chttp2_list_add_writable_stream( STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE); } +void grpc_chttp2_list_add_first_writable_stream( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global) { + GPR_ASSERT(stream_global->id != 0); + gpr_log(GPR_DEBUG, "add:%d:%d:%d:%d", stream_global->id, + stream_global->write_state, stream_global->in_stream_map, + stream_global->read_closed); + stream_list_add_head(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_WRITABLE); +} + int grpc_chttp2_list_pop_writable_stream( grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, @@ -157,6 +185,14 @@ int grpc_chttp2_list_pop_writable_stream( return r; } +void grpc_chttp2_list_remove_writable_stream( + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global) { + stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), + STREAM_FROM_GLOBAL(stream_global), + GRPC_CHTTP2_LIST_WRITABLE); +} + void grpc_chttp2_list_add_writing_stream( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { @@ -202,36 +238,6 @@ int grpc_chttp2_list_pop_written_stream( return r; } -void grpc_chttp2_list_add_writable_window_update_stream( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global) { - GPR_ASSERT(stream_global->id != 0); - stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global), - STREAM_FROM_GLOBAL(stream_global), - GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); -} - -int grpc_chttp2_list_pop_writable_window_update_stream( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_transport_writing *transport_writing, - grpc_chttp2_stream_global **stream_global, - grpc_chttp2_stream_writing **stream_writing) { - grpc_chttp2_stream *stream; - int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream, - GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); - *stream_global = &stream->global; - *stream_writing = &stream->writing; - return r; -} - -void grpc_chttp2_list_remove_writable_window_update_stream( - grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global) { - stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global), - STREAM_FROM_GLOBAL(stream_global), - GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE); -} - void grpc_chttp2_list_add_parsing_seen_stream( grpc_chttp2_transport_parsing *transport_parsing, grpc_chttp2_stream_parsing *stream_parsing) { diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index d8ec117aa5..d39b0c42f7 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -44,6 +44,7 @@ int grpc_chttp2_unlocking_check_writes( grpc_chttp2_transport_writing *transport_writing) { grpc_chttp2_stream_global *stream_global; grpc_chttp2_stream_writing *stream_writing; + grpc_chttp2_stream_global *first_reinserted_stream = NULL; gpr_uint32 window_delta; /* simple writes are queued to qbuf, and flushed here */ @@ -64,50 +65,54 @@ int grpc_chttp2_unlocking_check_writes( } /* for each grpc_chttp2_stream that's become writable, frame it's data - (according to - available window sizes) and add to the output buffer */ - while (grpc_chttp2_list_pop_writable_stream(transport_global, - transport_writing, &stream_global, - &stream_writing)) { + (according to available window sizes) and add to the output buffer */ + while (grpc_chttp2_list_pop_writable_stream( + transport_global, transport_writing, &stream_global, &stream_writing)) { + if (stream_global == first_reinserted_stream) { + /* prevent infinite loop */ + grpc_chttp2_list_add_first_writable_stream(transport_global, + stream_global); + break; + } + stream_writing->id = stream_global->id; - window_delta = grpc_chttp2_preencode( - stream_global->outgoing_sopb->ops, &stream_global->outgoing_sopb->nops, - GPR_MIN(transport_global->outgoing_window, - stream_global->outgoing_window), - &stream_writing->sopb); - GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( - "write", transport_global, outgoing_window, -(gpr_int64)window_delta); - GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, - outgoing_window, -(gpr_int64)window_delta); - transport_global->outgoing_window -= window_delta; - stream_global->outgoing_window -= window_delta; - - if (stream_global->write_state == GRPC_WRITE_STATE_QUEUED_CLOSE && - stream_global->outgoing_sopb->nops == 0) { - if (!transport_global->is_client && !stream_global->read_closed) { - stream_writing->send_closed = GRPC_SEND_CLOSED_WITH_RST_STREAM; - } else { - stream_writing->send_closed = GRPC_SEND_CLOSED; + stream_writing->send_closed = GRPC_DONT_SEND_CLOSED; + GPR_ASSERT(!stream_global->writing_now); + + if (stream_global->outgoing_sopb) { + window_delta = + grpc_chttp2_preencode(stream_global->outgoing_sopb->ops, + &stream_global->outgoing_sopb->nops, + GPR_MIN(transport_global->outgoing_window, + stream_global->outgoing_window), + &stream_writing->sopb); + GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT( + "write", transport_global, outgoing_window, -(gpr_int64)window_delta); + GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, + outgoing_window, + -(gpr_int64)window_delta); + transport_global->outgoing_window -= window_delta; + stream_global->outgoing_window -= window_delta; + + if (stream_global->write_state == GRPC_WRITE_STATE_QUEUED_CLOSE && + stream_global->outgoing_sopb->nops == 0) { + if (!transport_global->is_client && !stream_global->read_closed) { + stream_writing->send_closed = GRPC_SEND_CLOSED_WITH_RST_STREAM; + } else { + stream_writing->send_closed = GRPC_SEND_CLOSED; + } } - } - if (stream_writing->sopb.nops > 0 || - stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { - grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); - } - if (stream_global->outgoing_window > 0 && - stream_global->outgoing_sopb->nops != 0) { - grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + if (stream_global->outgoing_window > 0 && + stream_global->outgoing_sopb->nops != 0) { + grpc_chttp2_list_add_writable_stream(transport_global, stream_global); + if (first_reinserted_stream == NULL && + transport_global->outgoing_window == 0) { + first_reinserted_stream = stream_global; + } + } } - } - /* for each grpc_chttp2_stream that wants to update its window, add that - * window here */ - while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global, - transport_writing, - &stream_global, - &stream_writing)) { - stream_writing->id = stream_global->id; if (!stream_global->read_closed && stream_global->unannounced_incoming_window > 0) { stream_writing->announce_window = stream_global->unannounced_incoming_window; GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global, @@ -118,6 +123,11 @@ int grpc_chttp2_unlocking_check_writes( stream_global->unannounced_incoming_window = 0; grpc_chttp2_list_add_incoming_window_updated(transport_global, stream_global); + stream_global->writing_now = 1; + grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); + } else if (stream_writing->sopb.nops > 0 || + stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) { + stream_global->writing_now = 1; grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing); } } @@ -205,6 +215,8 @@ void grpc_chttp2_cleanup_writing( while (grpc_chttp2_list_pop_written_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { + GPR_ASSERT(stream_global->writing_now); + stream_global->writing_now = 0; if (stream_global->outgoing_sopb != NULL && stream_global->outgoing_sopb->nops == 0) { stream_global->outgoing_sopb = NULL; @@ -216,9 +228,9 @@ void grpc_chttp2_cleanup_writing( if (!transport_global->is_client) { stream_global->read_closed = 1; } - grpc_chttp2_list_add_read_write_state_changed(transport_global, - stream_global); } + grpc_chttp2_list_add_read_write_state_changed(transport_global, + stream_global); } transport_writing->outbuf.count = 0; transport_writing->outbuf.length = 0; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index c923d5e42f..5f49b2ddd6 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -168,6 +168,7 @@ static void destruct_transport(grpc_chttp2_transport *t) { grpc_mdctx_unref(t->metadata_context); + gpr_free(t->peer_string); gpr_free(t); } @@ -217,6 +218,7 @@ static void init_transport(grpc_chttp2_transport *t, gpr_ref_init(&t->refs, 2); gpr_mu_init(&t->mu); grpc_mdctx_ref(mdctx); + t->peer_string = grpc_endpoint_get_peer(ep); t->metadata_context = mdctx; t->endpoint_reading = 1; t->global.next_stream_id = is_client ? 1 : 2; @@ -393,12 +395,16 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) { } grpc_chttp2_list_remove_incoming_window_updated(&t->global, &s->global); - grpc_chttp2_list_remove_writable_window_update_stream(&t->global, &s->global); + grpc_chttp2_list_remove_writable_stream(&t->global, &s->global); gpr_mu_unlock(&t->mu); for (i = 0; i < STREAM_LIST_COUNT; i++) { - GPR_ASSERT(!s->included[i]); + if (s->included[i]) { + gpr_log(GPR_ERROR, "%s stream %d still included in list %d", + t->global.is_client ? "client" : "server", s->global.id, i); + abort(); + } } GPR_ASSERT(s->global.outgoing_sopb == NULL); @@ -574,8 +580,6 @@ static void maybe_start_some_streams( grpc_chttp2_list_add_incoming_window_updated(transport_global, stream_global); grpc_chttp2_list_add_writable_stream(transport_global, stream_global); - grpc_chttp2_list_add_writable_window_update_stream(transport_global, - stream_global); } /* cancel out streams that will never be started */ @@ -641,8 +645,7 @@ static void perform_stream_op_locked( if (stream_global->id != 0) { grpc_chttp2_list_add_read_write_state_changed(transport_global, stream_global); - grpc_chttp2_list_add_writable_window_update_stream(transport_global, - stream_global); + grpc_chttp2_list_add_writable_stream(transport_global, stream_global); } } @@ -750,6 +753,7 @@ static void remove_stream(grpc_chttp2_transport *t, gpr_uint32 id) { if (!s) { s = grpc_chttp2_stream_map_delete(&t->new_stream_map, id); } + grpc_chttp2_list_remove_writable_stream(&t->global, &s->global); GPR_ASSERT(s); s->global.in_stream_map = 0; if (t->parsing.incoming_stream == &s->parsing) { @@ -831,6 +835,9 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) { if (!stream_global->publish_sopb) { continue; } + if (stream_global->writing_now) { + continue; + } /* FIXME(ctiller): we include in_stream_map in our computation of whether the stream is write-closed. This is completely bogus, but has the effect of delaying stream-closed until the stream @@ -1069,9 +1076,17 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason, * INTEGRATION GLUE */ -static const grpc_transport_vtable vtable = { - sizeof(grpc_chttp2_stream), init_stream, perform_stream_op, - perform_transport_op, destroy_stream, destroy_transport}; +static char *chttp2_get_peer(grpc_transport *t) { + return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string); +} + +static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), + init_stream, + perform_stream_op, + perform_transport_op, + destroy_stream, + destroy_transport, + chttp2_get_peer}; grpc_transport *grpc_create_chttp2_transport( const grpc_channel_args *channel_args, grpc_endpoint *ep, grpc_mdctx *mdctx, diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index 2689e3028a..69c00b6a4f 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -65,6 +65,10 @@ void grpc_transport_destroy_stream(grpc_transport *transport, transport->vtable->destroy_stream(transport, stream); } +char *grpc_transport_get_peer(grpc_transport *transport) { + return transport->vtable->get_peer(transport); +} + void grpc_transport_stream_op_finish_with_failure( grpc_transport_stream_op *op) { if (op->send_ops) { diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 64503604ee..553779602a 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -182,4 +182,7 @@ void grpc_transport_close(grpc_transport *transport); /* Destroy the transport */ void grpc_transport_destroy(grpc_transport *transport); +/* Get the transports peer */ +char *grpc_transport_get_peer(grpc_transport *transport); + #endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */ diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h index 515721dfb6..d3bbdf6c27 100644 --- a/src/core/transport/transport_impl.h +++ b/src/core/transport/transport_impl.h @@ -58,6 +58,9 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_destroy */ void (*destroy)(grpc_transport *self); + + /* implementation of grpc_transport_get_peer */ + char *(*get_peer)(grpc_transport *self); } grpc_transport_vtable; /* an instance of a grpc transport */ diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c index 10d796fc15..f62c340e97 100644 --- a/src/core/transport/transport_op_string.c +++ b/src/core/transport/transport_op_string.c @@ -116,10 +116,9 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) { if (op->send_ops) { if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); first = 0; - gpr_strvec_add(&b, gpr_strdup("SEND")); - if (op->is_last_send) { - gpr_strvec_add(&b, gpr_strdup("_LAST")); - } + gpr_asprintf(&tmp, "SEND%s:%p", op->is_last_send ? "_LAST" : "", + op->on_done_send); + gpr_strvec_add(&b, tmp); gpr_strvec_add(&b, gpr_strdup("[")); gpr_strvec_add(&b, grpc_sopb_string(op->send_ops)); gpr_strvec_add(&b, gpr_strdup("]")); @@ -128,7 +127,8 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) { if (op->recv_ops) { if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); first = 0; - gpr_asprintf(&tmp, "RECV:max_recv_bytes=%d", op->max_recv_bytes); + gpr_asprintf(&tmp, "RECV:%p:max_recv_bytes=%d", op->on_done_recv, + op->max_recv_bytes); gpr_strvec_add(&b, tmp); } diff --git a/src/cpp/client/create_channel.cc b/src/cpp/client/create_channel.cc index 38eeda0dc0..62f179d361 100644 --- a/src/cpp/client/create_channel.cc +++ b/src/cpp/client/create_channel.cc @@ -52,6 +52,6 @@ std::shared_ptr<ChannelInterface> CreateChannel( user_agent_prefix.str()); return creds ? creds->CreateChannel(target, cp_args) : std::shared_ptr<ChannelInterface>( - new Channel(target, grpc_lame_client_channel_create())); + new Channel(target, grpc_lame_client_channel_create(NULL))); } } // namespace grpc diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc index 01c7f14f1a..abf0cb387e 100644 --- a/src/cpp/client/secure_credentials.cc +++ b/src/cpp/client/secure_credentials.cc @@ -99,8 +99,8 @@ std::shared_ptr<Credentials> ServiceAccountCredentials( } // Builds JWT credentials. -std::shared_ptr<Credentials> JWTCredentials(const grpc::string& json_key, - long token_lifetime_seconds) { +std::shared_ptr<Credentials> ServiceAccountJWTAccessCredentials( + const grpc::string& json_key, long token_lifetime_seconds) { if (token_lifetime_seconds <= 0) { gpr_log(GPR_ERROR, "Trying to create JWTCredentials with non-positive lifetime"); @@ -108,8 +108,8 @@ std::shared_ptr<Credentials> JWTCredentials(const grpc::string& json_key, } gpr_timespec lifetime = gpr_time_from_seconds(token_lifetime_seconds, GPR_TIMESPAN); - return WrapCredentials( - grpc_jwt_credentials_create(json_key.c_str(), lifetime)); + return WrapCredentials(grpc_service_account_jwt_access_credentials_create( + json_key.c_str(), lifetime)); } // Builds refresh token credentials. diff --git a/src/cpp/util/time.cc b/src/cpp/util/time.cc index a814cad452..799c597e0b 100644 --- a/src/cpp/util/time.cc +++ b/src/cpp/util/time.cc @@ -79,9 +79,10 @@ void TimepointHR2Timespec(const high_resolution_clock::time_point& from, } system_clock::time_point Timespec2Timepoint(gpr_timespec t) { - if (gpr_time_cmp(t, gpr_inf_future(GPR_CLOCK_REALTIME)) == 0) { + if (gpr_time_cmp(t, gpr_inf_future(t.clock_type)) == 0) { return system_clock::time_point::max(); } + t = gpr_convert_clock_type(t, GPR_CLOCK_REALTIME); system_clock::time_point tp; tp += duration_cast<system_clock::time_point::duration>(seconds(t.tv_sec)); tp += diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index 927954c448..9364779df9 100644 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -44,13 +44,14 @@ <Compile Include="ClientServerTest.cs" /> <Compile Include="ServerTest.cs" /> <Compile Include="GrpcEnvironmentTest.cs" /> - <Compile Include="TimespecTest.cs" /> <Compile Include="PInvokeTest.cs" /> <Compile Include="Internal\MetadataArraySafeHandleTest.cs" /> <Compile Include="Internal\CompletionQueueSafeHandleTest.cs" /> <Compile Include="Internal\CompletionQueueEventTest.cs" /> <Compile Include="Internal\ChannelArgsSafeHandleTest.cs" /> <Compile Include="ChannelOptionsTest.cs" /> + <Compile Include="Internal\TimespecTest.cs" /> + <Compile Include="TimeoutsTest.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup> diff --git a/src/csharp/Grpc.Core.Tests/Internal/TimespecTest.cs b/src/csharp/Grpc.Core.Tests/Internal/TimespecTest.cs new file mode 100644 index 0000000000..874df02baa --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/Internal/TimespecTest.cs @@ -0,0 +1,202 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; +using Grpc.Core.Internal; +using NUnit.Framework; + +namespace Grpc.Core.Internal.Tests +{ + public class TimespecTest + { + [Test] + public void Now_IsInUtc() + { + Assert.AreEqual(DateTimeKind.Utc, Timespec.Now.ToDateTime().Kind); + } + + [Test] + public void Now_AgreesWithUtcNow() + { + var timespec = Timespec.Now; + var utcNow = DateTime.UtcNow; + + TimeSpan difference = utcNow - timespec.ToDateTime(); + + // This test is inherently a race - but the two timestamps + // should really be way less that a minute apart. + Assert.IsTrue(difference.TotalSeconds < 60); + } + + [Test] + public void InfFuture() + { + var timespec = Timespec.InfFuture; + } + + [Test] + public void InfPast() + { + var timespec = Timespec.InfPast; + } + + [Test] + public void TimespecSizeIsNativeSize() + { + Assert.AreEqual(Timespec.NativeSize, Marshal.SizeOf(typeof(Timespec))); + } + + [Test] + public void ToDateTime() + { + Assert.AreEqual(new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc), + new Timespec(IntPtr.Zero, 0).ToDateTime()); + + Assert.AreEqual(new DateTime(1970, 1, 1, 0, 0, 10, DateTimeKind.Utc).AddTicks(50), + new Timespec(new IntPtr(10), 5000).ToDateTime()); + + Assert.AreEqual(new DateTime(2015, 7, 21, 4, 21, 48, DateTimeKind.Utc), + new Timespec(new IntPtr(1437452508), 0).ToDateTime()); + + // before epoch + Assert.AreEqual(new DateTime(1969, 12, 31, 23, 59, 55, DateTimeKind.Utc).AddTicks(10), + new Timespec(new IntPtr(-5), 1000).ToDateTime()); + + // infinity + Assert.AreEqual(DateTime.MaxValue, Timespec.InfFuture.ToDateTime()); + Assert.AreEqual(DateTime.MinValue, Timespec.InfPast.ToDateTime()); + + // nanos are rounded to ticks are rounded up + Assert.AreEqual(new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc).AddTicks(1), + new Timespec(IntPtr.Zero, 99).ToDateTime()); + + // Illegal inputs + Assert.Throws(typeof(InvalidOperationException), + () => new Timespec(new IntPtr(0), -2).ToDateTime()); + Assert.Throws(typeof(InvalidOperationException), + () => new Timespec(new IntPtr(0), 1000 * 1000 * 1000).ToDateTime()); + Assert.Throws(typeof(InvalidOperationException), + () => new Timespec(new IntPtr(0), 0, GPRClockType.Monotonic).ToDateTime()); + } + + [Test] + public void ToDateTime_ReturnsUtc() + { + Assert.AreEqual(DateTimeKind.Utc, new Timespec(new IntPtr(1437452508), 0).ToDateTime().Kind); + Assert.AreNotEqual(DateTimeKind.Unspecified, new Timespec(new IntPtr(1437452508), 0).ToDateTime().Kind); + } + + [Test] + public void ToDateTime_Overflow() + { + // we can only get overflow in ticks arithmetic on 64-bit + if (IntPtr.Size == 8) + { + var timespec = new Timespec(new IntPtr(long.MaxValue - 100), 0); + Assert.AreNotEqual(Timespec.InfFuture, timespec); + Assert.AreEqual(DateTime.MaxValue, timespec.ToDateTime()); + + Assert.AreEqual(DateTime.MinValue, new Timespec(new IntPtr(long.MinValue + 100), 0).ToDateTime()); + } + else + { + Console.WriteLine("Test cannot be run on this platform, skipping the test."); + } + } + + [Test] + public void ToDateTime_OutOfDateTimeRange() + { + // we can only get out of range on 64-bit, on 32 bit the max + // timestamp is ~ Jan 19 2038, which is far within range of DateTime + // same case for min value. + if (IntPtr.Size == 8) + { + // DateTime range goes up to year 9999, 20000 years from now should + // be out of range. + long seconds = 20000L * 365L * 24L * 3600L; + + var timespec = new Timespec(new IntPtr(seconds), 0); + Assert.AreNotEqual(Timespec.InfFuture, timespec); + Assert.AreEqual(DateTime.MaxValue, timespec.ToDateTime()); + + Assert.AreEqual(DateTime.MinValue, new Timespec(new IntPtr(-seconds), 0).ToDateTime()); + } + else + { + Console.WriteLine("Test cannot be run on this platform, skipping the test"); + } + } + + [Test] + public void FromDateTime() + { + Assert.AreEqual(new Timespec(IntPtr.Zero, 0), + Timespec.FromDateTime(new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc))); + + Assert.AreEqual(new Timespec(new IntPtr(10), 5000), + Timespec.FromDateTime(new DateTime(1970, 1, 1, 0, 0, 10, DateTimeKind.Utc).AddTicks(50))); + + Assert.AreEqual(new Timespec(new IntPtr(1437452508), 0), + Timespec.FromDateTime(new DateTime(2015, 7, 21, 4, 21, 48, DateTimeKind.Utc))); + + // before epoch + Assert.AreEqual(new Timespec(new IntPtr(-5), 1000), + Timespec.FromDateTime(new DateTime(1969, 12, 31, 23, 59, 55, DateTimeKind.Utc).AddTicks(10))); + + // infinity + Assert.AreEqual(Timespec.InfFuture, Timespec.FromDateTime(DateTime.MaxValue)); + Assert.AreEqual(Timespec.InfPast, Timespec.FromDateTime(DateTime.MinValue)); + + // illegal inputs + Assert.Throws(typeof(ArgumentException), + () => Timespec.FromDateTime(new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Unspecified))); + } + + [Test] + public void FromDateTime_OutOfTimespecRange() + { + // we can only get overflow in Timespec on 32-bit + if (IntPtr.Size == 4) + { + Assert.AreEqual(Timespec.InfFuture, Timespec.FromDateTime(new DateTime(2040, 1, 1, 0, 0, 0, DateTimeKind.Utc))); + Assert.AreEqual(Timespec.InfPast, Timespec.FromDateTime(new DateTime(1800, 1, 1, 0, 0, 0, DateTimeKind.Utc))); + } + else + { + Console.WriteLine("Test cannot be run on this platform, skipping the test."); + } + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs new file mode 100644 index 0000000000..c350391acd --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs @@ -0,0 +1,207 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Tests +{ + /// <summary> + /// Tests for Deadline support. + /// </summary> + public class TimeoutsTest + { + const string Host = "localhost"; + const string ServiceName = "/tests.Test"; + + static readonly Method<string, string> TestMethod = new Method<string, string>( + MethodType.Unary, + "/tests.Test/Test", + Marshallers.StringMarshaller, + Marshallers.StringMarshaller); + + static readonly ServerServiceDefinition ServiceDefinition = ServerServiceDefinition.CreateBuilder(ServiceName) + .AddMethod(TestMethod, TestMethodHandler) + .Build(); + + // provides a way how to retrieve an out-of-band result value from server handler + static TaskCompletionSource<string> stringFromServerHandlerTcs; + + Server server; + Channel channel; + + [SetUp] + public void Init() + { + server = new Server(); + server.AddServiceDefinition(ServiceDefinition); + int port = server.AddListeningPort(Host, Server.PickUnusedPort); + server.Start(); + channel = new Channel(Host, port); + + stringFromServerHandlerTcs = new TaskCompletionSource<string>(); + } + + [TearDown] + public void Cleanup() + { + channel.Dispose(); + server.ShutdownAsync().Wait(); + } + + [TestFixtureTearDown] + public void CleanupClass() + { + GrpcEnvironment.Shutdown(); + } + + [Test] + public void InfiniteDeadline() + { + // no deadline specified, check server sees infinite deadline + var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, Metadata.Empty); + Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(internalCall, "RETURN_DEADLINE", CancellationToken.None)); + + // DateTime.MaxValue deadline specified, check server sees infinite deadline + var internalCall2 = new Call<string, string>(ServiceName, TestMethod, channel, Metadata.Empty, DateTime.MaxValue); + Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(internalCall2, "RETURN_DEADLINE", CancellationToken.None)); + } + + [Test] + public void DeadlineTransferredToServer() + { + var remainingTimeClient = TimeSpan.FromDays(7); + var deadline = DateTime.UtcNow + remainingTimeClient; + Thread.Sleep(1000); + var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, Metadata.Empty, deadline); + + var serverDeadlineTicksString = Calls.BlockingUnaryCall(internalCall, "RETURN_DEADLINE", CancellationToken.None); + var serverDeadline = new DateTime(long.Parse(serverDeadlineTicksString), DateTimeKind.Utc); + + // A fairly relaxed check that the deadline set by client and deadline seen by server + // are in agreement. C core takes care of the work with transferring deadline over the wire, + // so we don't need an exact check here. + Assert.IsTrue(Math.Abs((deadline - serverDeadline).TotalMilliseconds) < 5000); + } + + [Test] + public void DeadlineInThePast() + { + var deadline = DateTime.MinValue; + var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, Metadata.Empty, deadline); + + try + { + Calls.BlockingUnaryCall(internalCall, "TIMEOUT", CancellationToken.None); + Assert.Fail(); + } + catch (RpcException e) + { + Assert.AreEqual(StatusCode.DeadlineExceeded, e.Status.StatusCode); + } + } + + [Test] + public void DeadlineExceededStatusOnTimeout() + { + var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(5)); + var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, Metadata.Empty, deadline); + + try + { + Calls.BlockingUnaryCall(internalCall, "TIMEOUT", CancellationToken.None); + Assert.Fail(); + } + catch (RpcException e) + { + Assert.AreEqual(StatusCode.DeadlineExceeded, e.Status.StatusCode); + } + } + + [Test] + public void ServerReceivesCancellationOnTimeout() + { + var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(5)); + var internalCall = new Call<string, string>(ServiceName, TestMethod, channel, Metadata.Empty, deadline); + + try + { + Calls.BlockingUnaryCall(internalCall, "CHECK_CANCELLATION_RECEIVED", CancellationToken.None); + Assert.Fail(); + } + catch (RpcException e) + { + Assert.AreEqual(StatusCode.DeadlineExceeded, e.Status.StatusCode); + } + Assert.AreEqual("CANCELLED", stringFromServerHandlerTcs.Task.Result); + } + + private static async Task<string> TestMethodHandler(string request, ServerCallContext context) + { + if (request == "TIMEOUT") + { + await Task.Delay(60000); + return ""; + } + + if (request == "RETURN_DEADLINE") + { + if (context.Deadline == DateTime.MaxValue) + { + return "DATETIME_MAXVALUE"; + } + + return context.Deadline.Ticks.ToString(); + } + + if (request == "CHECK_CANCELLATION_RECEIVED") + { + // wait until cancellation token is fired. + var tcs = new TaskCompletionSource<object>(); + context.CancellationToken.Register(() => { tcs.SetResult(null); }); + await tcs.Task; + stringFromServerHandlerTcs.SetResult("CANCELLED"); + return ""; + } + + return ""; + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/TimespecTest.cs b/src/csharp/Grpc.Core.Tests/TimespecTest.cs deleted file mode 100644 index a34b407a01..0000000000 --- a/src/csharp/Grpc.Core.Tests/TimespecTest.cs +++ /dev/null @@ -1,101 +0,0 @@ -#region Copyright notice and license - -// Copyright 2015, Google Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -#endregion - -using System; -using System.Runtime.InteropServices; -using Grpc.Core.Internal; -using NUnit.Framework; - -namespace Grpc.Core.Internal.Tests -{ - public class TimespecTest - { - [Test] - public void Now() - { - var timespec = Timespec.Now; - } - - [Test] - public void InfFuture() - { - var timespec = Timespec.InfFuture; - } - - [Test] - public void TimespecSizeIsNativeSize() - { - Assert.AreEqual(Timespec.NativeSize, Marshal.SizeOf(typeof(Timespec))); - } - - [Test] - public void ToDateTime() - { - Assert.AreEqual(new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc), - new Timespec(IntPtr.Zero, 0).ToDateTime()); - - Assert.AreEqual(new DateTime(1970, 1, 1, 0, 0, 10, DateTimeKind.Utc).AddTicks(50), - new Timespec(new IntPtr(10), 5000).ToDateTime()); - - Assert.AreEqual(new DateTime(2015, 7, 21, 4, 21, 48, DateTimeKind.Utc), - new Timespec(new IntPtr(1437452508), 0).ToDateTime()); - } - - [Test] - public void Add() - { - var t = new Timespec { tv_sec = new IntPtr(12345), tv_nsec = 123456789 }; - var result = t.Add(TimeSpan.FromTicks(TimeSpan.TicksPerSecond * 10)); - Assert.AreEqual(result.tv_sec, new IntPtr(12355)); - Assert.AreEqual(result.tv_nsec, 123456789); - } - - [Test] - public void Add_Nanos() - { - var t = new Timespec { tv_sec = new IntPtr(12345), tv_nsec = 123456789 }; - var result = t.Add(TimeSpan.FromTicks(10)); - Assert.AreEqual(result.tv_sec, new IntPtr(12345)); - Assert.AreEqual(result.tv_nsec, 123456789 + 1000); - } - - [Test] - public void Add_NanosOverflow() - { - var t = new Timespec { tv_sec = new IntPtr(12345), tv_nsec = 999999999 }; - var result = t.Add(TimeSpan.FromTicks(TimeSpan.TicksPerSecond * 10 + 10)); - Assert.AreEqual(result.tv_sec, new IntPtr(12356)); - Assert.AreEqual(result.tv_nsec, 999); - } - } -} diff --git a/src/csharp/Grpc.Core/Call.cs b/src/csharp/Grpc.Core/Call.cs index 37b452f020..94c5e26082 100644 --- a/src/csharp/Grpc.Core/Call.cs +++ b/src/csharp/Grpc.Core/Call.cs @@ -47,14 +47,21 @@ namespace Grpc.Core readonly Marshaller<TResponse> responseMarshaller; readonly Channel channel; readonly Metadata headers; + readonly DateTime deadline; public Call(string serviceName, Method<TRequest, TResponse> method, Channel channel, Metadata headers) + : this(serviceName, method, channel, headers, DateTime.MaxValue) + { + } + + public Call(string serviceName, Method<TRequest, TResponse> method, Channel channel, Metadata headers, DateTime deadline) { this.name = method.GetFullName(serviceName); this.requestMarshaller = method.RequestMarshaller; this.responseMarshaller = method.ResponseMarshaller; this.channel = Preconditions.CheckNotNull(channel); this.headers = Preconditions.CheckNotNull(headers); + this.deadline = deadline; } public Channel Channel @@ -87,6 +94,14 @@ namespace Grpc.Core } } + public DateTime Deadline + { + get + { + return this.deadline; + } + } + public Marshaller<TRequest> RequestMarshaller { get diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs index 359fe53741..054fc27491 100644 --- a/src/csharp/Grpc.Core/Calls.cs +++ b/src/csharp/Grpc.Core/Calls.cs @@ -50,7 +50,7 @@ namespace Grpc.Core var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); // TODO(jtattermusch): this gives a race that cancellation can be requested before the call even starts. RegisterCancellationCallback(asyncCall, token); - return asyncCall.UnaryCall(call.Channel, call.Name, req, call.Headers); + return asyncCall.UnaryCall(call.Channel, call.Name, req, call.Headers, call.Deadline); } public static AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) @@ -58,8 +58,8 @@ namespace Grpc.Core where TResponse : class { var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); - asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name); - var asyncResult = asyncCall.UnaryCallAsync(req, call.Headers); + asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline)); + var asyncResult = asyncCall.UnaryCallAsync(req, call.Headers, call.Deadline); RegisterCancellationCallback(asyncCall, token); return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } @@ -69,8 +69,8 @@ namespace Grpc.Core where TResponse : class { var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); - asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name); - asyncCall.StartServerStreamingCall(req, call.Headers); + asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline)); + asyncCall.StartServerStreamingCall(req, call.Headers, call.Deadline); RegisterCancellationCallback(asyncCall, token); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); @@ -81,8 +81,8 @@ namespace Grpc.Core where TResponse : class { var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); - asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name); - var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers); + asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline)); + var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers, call.Deadline); RegisterCancellationCallback(asyncCall, token); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); @@ -93,8 +93,8 @@ namespace Grpc.Core where TResponse : class { var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); - asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name); - asyncCall.StartDuplexStreamingCall(call.Headers); + asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline)); + asyncCall.StartDuplexStreamingCall(call.Headers, call.Deadline); RegisterCancellationCallback(asyncCall, token); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); diff --git a/src/csharp/Grpc.Core/ClientBase.cs b/src/csharp/Grpc.Core/ClientBase.cs index a099f96aea..fd3473128a 100644 --- a/src/csharp/Grpc.Core/ClientBase.cs +++ b/src/csharp/Grpc.Core/ClientBase.cs @@ -76,7 +76,7 @@ namespace Grpc.Core /// <summary> /// Creates a new call to given method. /// </summary> - protected Call<TRequest, TResponse> CreateCall<TRequest, TResponse>(string serviceName, Method<TRequest, TResponse> method, Metadata metadata) + protected Call<TRequest, TResponse> CreateCall<TRequest, TResponse>(string serviceName, Method<TRequest, TResponse> method, Metadata metadata, DateTime? deadline) where TRequest : class where TResponse : class { @@ -87,8 +87,8 @@ namespace Grpc.Core interceptor(metadata); metadata.Freeze(); } - metadata = metadata ?? Metadata.Empty; - return new Call<TRequest, TResponse>(serviceName, method, channel, metadata); + return new Call<TRequest, TResponse>(serviceName, method, channel, + metadata ?? Metadata.Empty, deadline ?? DateTime.MaxValue); } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index f983dbb759..51022ac34f 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -61,10 +61,10 @@ namespace Grpc.Core.Internal { } - public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName) + public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName, Timespec deadline) { this.channel = channel; - var call = CallSafeHandle.Create(channel.Handle, channel.CompletionRegistry, cq, methodName, channel.Target, Timespec.InfFuture); + var call = channel.Handle.CreateCall(channel.CompletionRegistry, cq, methodName, channel.Target, deadline); channel.Environment.DebugStats.ActiveClientCalls.Increment(); InitializeInternal(call); } @@ -76,7 +76,7 @@ namespace Grpc.Core.Internal /// <summary> /// Blocking unary request - unary response call. /// </summary> - public TResponse UnaryCall(Channel channel, string methodName, TRequest msg, Metadata headers) + public TResponse UnaryCall(Channel channel, string methodName, TRequest msg, Metadata headers, DateTime deadline) { using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create()) { @@ -86,7 +86,7 @@ namespace Grpc.Core.Internal lock (myLock) { - Initialize(channel, cq, methodName); + Initialize(channel, cq, methodName, Timespec.FromDateTime(deadline)); started = true; halfcloseRequested = true; readingDone = true; @@ -126,7 +126,7 @@ namespace Grpc.Core.Internal /// <summary> /// Starts a unary request - unary response call. /// </summary> - public Task<TResponse> UnaryCallAsync(TRequest msg, Metadata headers) + public Task<TResponse> UnaryCallAsync(TRequest msg, Metadata headers, DateTime deadline) { lock (myLock) { @@ -151,7 +151,7 @@ namespace Grpc.Core.Internal /// Starts a streamed request - unary response call. /// Use StartSendMessage and StartSendCloseFromClient to stream requests. /// </summary> - public Task<TResponse> ClientStreamingCallAsync(Metadata headers) + public Task<TResponse> ClientStreamingCallAsync(Metadata headers, DateTime deadline) { lock (myLock) { @@ -173,7 +173,7 @@ namespace Grpc.Core.Internal /// <summary> /// Starts a unary request - streamed response call. /// </summary> - public void StartServerStreamingCall(TRequest msg, Metadata headers) + public void StartServerStreamingCall(TRequest msg, Metadata headers, DateTime deadline) { lock (myLock) { @@ -196,7 +196,7 @@ namespace Grpc.Core.Internal /// Starts a streaming request - streaming response call. /// Use StartSendMessage and StartSendCloseFromClient to stream requests. /// </summary> - public void StartDuplexStreamingCall(Metadata headers) + public void StartDuplexStreamingCall(Metadata headers, DateTime deadline) { lock (myLock) { diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index f809f4a84c..702f1ce9e7 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -48,6 +48,7 @@ namespace Grpc.Core.Internal internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest> { readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>(); + readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); readonly GrpcEnvironment environment; public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment) : base(serializer, deserializer) @@ -118,6 +119,18 @@ namespace Grpc.Core.Internal } } + /// <summary> + /// Gets cancellation token that gets cancelled once close completion + /// is received and the cancelled flag is set. + /// </summary> + public CancellationToken CancellationToken + { + get + { + return cancellationTokenSource.Token; + } + } + protected override void OnReleaseResources() { environment.DebugStats.ActiveServerCalls.Decrement(); @@ -138,6 +151,8 @@ namespace Grpc.Core.Internal { // Once we cancel, we don't have to care that much // about reads and writes. + + // TODO(jtattermusch): is this still necessary? Cancel(); } @@ -145,6 +160,11 @@ namespace Grpc.Core.Internal } // TODO(jtattermusch): handle error + if (cancelled) + { + cancellationTokenSource.Cancel(); + } + finishedServersideTcs.SetResult(null); } } diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 19dbb83f24..04e35a5efd 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -46,9 +46,6 @@ namespace Grpc.Core.Internal CompletionRegistry completionRegistry; [DllImport("grpc_csharp_ext.dll")] - static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline); - - [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call); [DllImport("grpc_csharp_ext.dll")] @@ -98,13 +95,6 @@ namespace Grpc.Core.Internal { } - public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionRegistry registry, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline) - { - var result = grpcsharp_channel_create_call(channel, cq, method, host, deadline); - result.SetCompletionRegistry(registry); - return result; - } - public void SetCompletionRegistry(CompletionRegistry completionRegistry) { this.completionRegistry = completionRegistry; diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs index f046f4c6d0..53c506c872 100644 --- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs @@ -47,6 +47,9 @@ namespace Grpc.Core.Internal static extern ChannelSafeHandle grpcsharp_secure_channel_create(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs); [DllImport("grpc_csharp_ext.dll")] + static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline); + + [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_channel_destroy(IntPtr channel); private ChannelSafeHandle() @@ -63,6 +66,13 @@ namespace Grpc.Core.Internal return grpcsharp_secure_channel_create(credentials, target, channelArgs); } + public CallSafeHandle CreateCall(CompletionRegistry registry, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline) + { + var result = grpcsharp_channel_create_call(this, cq, method, host, deadline); + result.SetCompletionRegistry(registry); + return result; + } + protected override bool ReleaseHandle() { grpcsharp_channel_destroy(handle); diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 3680b1e791..93e1c0b294 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -72,7 +72,7 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; - var context = HandlerUtils.NewContext(newRpc); + var context = HandlerUtils.NewContext(newRpc, asyncCall.CancellationToken); try { Preconditions.CheckArgument(await requestStream.MoveNext()); @@ -126,7 +126,7 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; - var context = HandlerUtils.NewContext(newRpc); + var context = HandlerUtils.NewContext(newRpc, asyncCall.CancellationToken); try { Preconditions.CheckArgument(await requestStream.MoveNext()); @@ -180,7 +180,7 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; - var context = HandlerUtils.NewContext(newRpc); + var context = HandlerUtils.NewContext(newRpc, asyncCall.CancellationToken); try { var result = await handler(requestStream, context); @@ -238,7 +238,7 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; - var context = HandlerUtils.NewContext(newRpc); + var context = HandlerUtils.NewContext(newRpc, asyncCall.CancellationToken); try { await handler(requestStream, responseStream, context); @@ -295,11 +295,13 @@ namespace Grpc.Core.Internal return new Status(StatusCode.Unknown, "Exception was thrown by handler."); } - public static ServerCallContext NewContext(ServerRpcNew newRpc) + public static ServerCallContext NewContext(ServerRpcNew newRpc, CancellationToken cancellationToken) { + DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime(); + return new ServerCallContext( - newRpc.Method, newRpc.Host, newRpc.Deadline.ToDateTime(), - newRpc.RequestMetadata, CancellationToken.None); + newRpc.Method, newRpc.Host, realtimeDeadline, + newRpc.RequestMetadata, cancellationToken); } } } diff --git a/src/csharp/Grpc.Core/Internal/Timespec.cs b/src/csharp/Grpc.Core/Internal/Timespec.cs index da2819f14d..e83d21f4a4 100644 --- a/src/csharp/Grpc.Core/Internal/Timespec.cs +++ b/src/csharp/Grpc.Core/Internal/Timespec.cs @@ -32,6 +32,8 @@ using System; using System.Runtime.InteropServices; using System.Threading; +using Grpc.Core.Utils; + namespace Grpc.Core.Internal { /// <summary> @@ -40,32 +42,43 @@ namespace Grpc.Core.Internal [StructLayout(LayoutKind.Sequential)] internal struct Timespec { - const int NanosPerSecond = 1000 * 1000 * 1000; - const int NanosPerTick = 100; + const long NanosPerSecond = 1000 * 1000 * 1000; + const long NanosPerTick = 100; + const long TicksPerSecond = NanosPerSecond / NanosPerTick; static readonly DateTime UnixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc); [DllImport("grpc_csharp_ext.dll")] - static extern Timespec gprsharp_now(); + static extern Timespec gprsharp_now(GPRClockType clockType); + + [DllImport("grpc_csharp_ext.dll")] + static extern Timespec gprsharp_inf_future(GPRClockType clockType); + + [DllImport("grpc_csharp_ext.dll")] + static extern Timespec gprsharp_inf_past(GPRClockType clockType); [DllImport("grpc_csharp_ext.dll")] - static extern Timespec gprsharp_inf_future(); + static extern Timespec gprsharp_convert_clock_type(Timespec t, GPRClockType targetClock); [DllImport("grpc_csharp_ext.dll")] static extern int gprsharp_sizeof_timespec(); - public Timespec(IntPtr tv_sec, int tv_nsec) + public Timespec(IntPtr tv_sec, int tv_nsec) : this(tv_sec, tv_nsec, GPRClockType.Realtime) + { + } + + public Timespec(IntPtr tv_sec, int tv_nsec, GPRClockType clock_type) { this.tv_sec = tv_sec; this.tv_nsec = tv_nsec; - this.clock_type = GPRClockType.Realtime; + this.clock_type = clock_type; } // NOTE: on linux 64bit sizeof(gpr_timespec) = 16, on windows 32bit sizeof(gpr_timespec) = 8 // so IntPtr seems to have the right size to work on both. - public System.IntPtr tv_sec; - public int tv_nsec; - public GPRClockType clock_type; + private System.IntPtr tv_sec; + private int tv_nsec; + private GPRClockType clock_type; /// <summary> /// Timespec a long time in the future. @@ -74,54 +87,164 @@ namespace Grpc.Core.Internal { get { - return gprsharp_inf_future(); + return gprsharp_inf_future(GPRClockType.Realtime); + } + } + + /// <summary> + /// Timespec a long time in the past. + /// </summary> + public static Timespec InfPast + { + get + { + return gprsharp_inf_past(GPRClockType.Realtime); } } + /// <summary> + /// Return Timespec representing the current time. + /// </summary> public static Timespec Now { get { - return gprsharp_now(); + return gprsharp_now(GPRClockType.Realtime); } } - - public DateTime ToDateTime() + + /// <summary> + /// Seconds since unix epoch. + /// </summary> + public IntPtr TimevalSeconds { - return UnixEpoch.AddTicks(tv_sec.ToInt64() * (NanosPerSecond / NanosPerTick) + tv_nsec / NanosPerTick); + get + { + return tv_sec; + } } - internal static int NativeSize + /// <summary> + /// The nanoseconds part of timeval. + /// </summary> + public int TimevalNanos { get { - return gprsharp_sizeof_timespec(); + return tv_nsec; } } /// <summary> - /// Creates a GPR deadline from current instant and given timeout. + /// Converts the timespec to desired clock type. /// </summary> - /// <returns>The from timeout.</returns> - public static Timespec DeadlineFromTimeout(TimeSpan timeout) + public Timespec ToClockType(GPRClockType targetClock) + { + return gprsharp_convert_clock_type(this, targetClock); + } + + /// <summary> + /// Converts Timespec to DateTime. + /// Timespec needs to be of type GPRClockType.Realtime and needs to represent a legal value. + /// DateTime has lower resolution (100ns), so rounding can occurs. + /// Value are always rounded up to the nearest DateTime value in the future. + /// + /// For Timespec.InfFuture or if timespec is after the largest representable DateTime, DateTime.MaxValue is returned. + /// For Timespec.InfPast or if timespec is before the lowest representable DateTime, DateTime.MinValue is returned. + /// + /// Unless DateTime.MaxValue or DateTime.MinValue is returned, the resulting DateTime is always in UTC + /// (DateTimeKind.Utc) + /// </summary> + public DateTime ToDateTime() { - if (timeout == Timeout.InfiniteTimeSpan) + Preconditions.CheckState(tv_nsec >= 0 && tv_nsec < NanosPerSecond); + Preconditions.CheckState(clock_type == GPRClockType.Realtime); + + // fast path for InfFuture + if (this.Equals(InfFuture)) + { + return DateTime.MaxValue; + } + + // fast path for InfPast + if (this.Equals(InfPast)) + { + return DateTime.MinValue; + } + + try + { + // convert nanos to ticks, round up to the nearest tick + long ticksFromNanos = tv_nsec / NanosPerTick + ((tv_nsec % NanosPerTick != 0) ? 1 : 0); + long ticksTotal = checked(tv_sec.ToInt64() * TicksPerSecond + ticksFromNanos); + return UnixEpoch.AddTicks(ticksTotal); + } + catch (OverflowException) + { + // ticks out of long range + return tv_sec.ToInt64() > 0 ? DateTime.MaxValue : DateTime.MinValue; + } + catch (ArgumentOutOfRangeException) + { + // resulting date time would be larger than MaxValue + return tv_sec.ToInt64() > 0 ? DateTime.MaxValue : DateTime.MinValue; + } + } + + /// <summary> + /// Creates DateTime to Timespec. + /// DateTime has to be in UTC (DateTimeKind.Utc) unless it's DateTime.MaxValue or DateTime.MinValue. + /// For DateTime.MaxValue of date time after the largest representable Timespec, Timespec.InfFuture is returned. + /// For DateTime.MinValue of date time before the lowest representable Timespec, Timespec.InfPast is returned. + /// </summary> + /// <returns>The date time.</returns> + /// <param name="dateTime">Date time.</param> + public static Timespec FromDateTime(DateTime dateTime) + { + if (dateTime == DateTime.MaxValue) { return Timespec.InfFuture; } - return Timespec.Now.Add(timeout); + + if (dateTime == DateTime.MinValue) + { + return Timespec.InfPast; + } + + Preconditions.CheckArgument(dateTime.Kind == DateTimeKind.Utc, "dateTime"); + + try + { + TimeSpan timeSpan = dateTime - UnixEpoch; + long ticks = timeSpan.Ticks; + + long seconds = ticks / TicksPerSecond; + int nanos = (int)((ticks % TicksPerSecond) * NanosPerTick); + if (nanos < 0) + { + // correct the result based on C# modulo semantics for negative dividend + seconds--; + nanos += (int)NanosPerSecond; + } + // new IntPtr possibly throws OverflowException + return new Timespec(new IntPtr(seconds), nanos); + } + catch (OverflowException) + { + return dateTime > UnixEpoch ? Timespec.InfFuture : Timespec.InfPast; + } + catch (ArgumentOutOfRangeException) + { + return dateTime > UnixEpoch ? Timespec.InfFuture : Timespec.InfPast; + } } - public Timespec Add(TimeSpan timeSpan) + internal static int NativeSize { - long nanos = (long)tv_nsec + (timeSpan.Ticks % TimeSpan.TicksPerSecond) * NanosPerTick; - long overflow_sec = (nanos > NanosPerSecond) ? 1 : 0; - - Timespec result; - result.tv_nsec = (int)(nanos % NanosPerSecond); - result.tv_sec = new IntPtr(tv_sec.ToInt64() + (timeSpan.Ticks / TimeSpan.TicksPerSecond) + overflow_sec); - result.clock_type = GPRClockType.Realtime; - return result; + get + { + return gprsharp_sizeof_timespec(); + } } } } diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs index 7a957c5b6f..dd56016a39 100644 --- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs +++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs @@ -132,6 +132,60 @@ namespace math.Tests }).Wait(); } + [Test] + public void FibWithCancel() + { + Task.Run(async () => + { + var cts = new CancellationTokenSource(); + + using (var call = client.Fib(new FibArgs.Builder { Limit = 0 }.Build(), + cancellationToken: cts.Token)) + { + List<long> responses = new List<long>(); + + try + { + while (await call.ResponseStream.MoveNext()) + { + if (responses.Count == 0) + { + cts.CancelAfter(500); // make sure we cancel soon + } + responses.Add(call.ResponseStream.Current.Num_); + } + Assert.Fail(); + } + catch (RpcException e) + { + Assert.IsTrue(responses.Count > 0); + Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode); + } + } + }).Wait(); + } + + [Test] + public void FibWithDeadline() + { + Task.Run(async () => + { + using (var call = client.Fib(new FibArgs.Builder { Limit = 0 }.Build(), + deadline: DateTime.UtcNow.AddMilliseconds(500))) + { + try + { + await call.ResponseStream.ToList(); + Assert.Fail(); + } + catch (RpcException e) + { + Assert.AreEqual(StatusCode.DeadlineExceeded, e.Status.StatusCode); + } + } + }).Wait(); + } + // TODO: test Fib with limit=0 and cancellation [Test] public void Sum() diff --git a/src/csharp/Grpc.Examples/MathGrpc.cs b/src/csharp/Grpc.Examples/MathGrpc.cs index ef787cf1d8..67827e7b4f 100644 --- a/src/csharp/Grpc.Examples/MathGrpc.cs +++ b/src/csharp/Grpc.Examples/MathGrpc.cs @@ -44,11 +44,11 @@ namespace math { // client interface public interface IMathClient { - global::math.DivReply Div(global::math.DivArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); - AsyncUnaryCall<global::math.DivReply> DivAsync(global::math.DivArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); - AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); - AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); - AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); + global::math.DivReply Div(global::math.DivArgs request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); + AsyncUnaryCall<global::math.DivReply> DivAsync(global::math.DivArgs request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); + AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); + AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); + AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); } // server-side interface @@ -66,29 +66,29 @@ namespace math { public MathClient(Channel channel) : base(channel) { } - public global::math.DivReply Div(global::math.DivArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) + public global::math.DivReply Div(global::math.DivArgs request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) { - var call = CreateCall(__ServiceName, __Method_Div, headers); + var call = CreateCall(__ServiceName, __Method_Div, headers, deadline); return Calls.BlockingUnaryCall(call, request, cancellationToken); } - public AsyncUnaryCall<global::math.DivReply> DivAsync(global::math.DivArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) + public AsyncUnaryCall<global::math.DivReply> DivAsync(global::math.DivArgs request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) { - var call = CreateCall(__ServiceName, __Method_Div, headers); + var call = CreateCall(__ServiceName, __Method_Div, headers, deadline); return Calls.AsyncUnaryCall(call, request, cancellationToken); } - public AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) + public AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) { - var call = CreateCall(__ServiceName, __Method_DivMany, headers); + var call = CreateCall(__ServiceName, __Method_DivMany, headers, deadline); return Calls.AsyncDuplexStreamingCall(call, cancellationToken); } - public AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) + public AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) { - var call = CreateCall(__ServiceName, __Method_Fib, headers); + var call = CreateCall(__ServiceName, __Method_Fib, headers, deadline); return Calls.AsyncServerStreamingCall(call, request, cancellationToken); } - public AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) + public AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) { - var call = CreateCall(__ServiceName, __Method_Sum, headers); + var call = CreateCall(__ServiceName, __Method_Sum, headers, deadline); return Calls.AsyncClientStreamingCall(call, cancellationToken); } } diff --git a/src/csharp/Grpc.Examples/MathServiceImpl.cs b/src/csharp/Grpc.Examples/MathServiceImpl.cs index 3dd0f53a0d..dd26b1d350 100644 --- a/src/csharp/Grpc.Examples/MathServiceImpl.cs +++ b/src/csharp/Grpc.Examples/MathServiceImpl.cs @@ -54,8 +54,13 @@ namespace math { if (request.Limit <= 0) { - // TODO(jtattermusch): support cancellation - throw new NotImplementedException("Not implemented yet"); + // keep streaming the sequence until cancelled. + IEnumerator<Num> fibEnumerator = FibInternal(long.MaxValue).GetEnumerator(); + while (!context.CancellationToken.IsCancellationRequested && fibEnumerator.MoveNext()) + { + await responseStream.WriteAsync(fibEnumerator.Current); + await Task.Delay(100); + } } if (request.Limit > 0) diff --git a/src/csharp/Grpc.HealthCheck/HealthGrpc.cs b/src/csharp/Grpc.HealthCheck/HealthGrpc.cs index 217127eca7..892cdb3f04 100644 --- a/src/csharp/Grpc.HealthCheck/HealthGrpc.cs +++ b/src/csharp/Grpc.HealthCheck/HealthGrpc.cs @@ -24,8 +24,8 @@ namespace Grpc.Health.V1Alpha { // client interface public interface IHealthClient { - global::Grpc.Health.V1Alpha.HealthCheckResponse Check(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); - AsyncUnaryCall<global::Grpc.Health.V1Alpha.HealthCheckResponse> CheckAsync(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); + global::Grpc.Health.V1Alpha.HealthCheckResponse Check(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); + AsyncUnaryCall<global::Grpc.Health.V1Alpha.HealthCheckResponse> CheckAsync(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); } // server-side interface @@ -40,14 +40,14 @@ namespace Grpc.Health.V1Alpha { public HealthClient(Channel channel) : base(channel) { } - public global::Grpc.Health.V1Alpha.HealthCheckResponse Check(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) + public global::Grpc.Health.V1Alpha.HealthCheckResponse Check(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) { - var call = CreateCall(__ServiceName, __Method_Check, headers); + var call = CreateCall(__ServiceName, __Method_Check, headers, deadline); return Calls.BlockingUnaryCall(call, request, cancellationToken); } - public AsyncUnaryCall<global::Grpc.Health.V1Alpha.HealthCheckResponse> CheckAsync(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) + public AsyncUnaryCall<global::Grpc.Health.V1Alpha.HealthCheckResponse> CheckAsync(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) { - var call = CreateCall(__ServiceName, __Method_Check, headers); + var call = CreateCall(__ServiceName, __Method_Check, headers, deadline); return Calls.AsyncUnaryCall(call, request, cancellationToken); } } diff --git a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs index de2fa07441..ddcd0c2958 100644 --- a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs +++ b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs @@ -59,14 +59,14 @@ namespace grpc.testing { // client interface public interface ITestServiceClient { - global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); - AsyncUnaryCall<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); - global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); - AsyncUnaryCall<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); - AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); - AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); - AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); - AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> HalfDuplexCall(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); + global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); + AsyncUnaryCall<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); + global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); + AsyncUnaryCall<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); + AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); + AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); + AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); + AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> HalfDuplexCall(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); } // server-side interface @@ -86,44 +86,44 @@ namespace grpc.testing { public TestServiceClient(Channel channel) : base(channel) { } - public global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) + public global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) { - var call = CreateCall(__ServiceName, __Method_EmptyCall, headers); + var call = CreateCall(__ServiceName, __Method_EmptyCall, headers, deadline); return Calls.BlockingUnaryCall(call, request, cancellationToken); } - public AsyncUnaryCall<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) + public AsyncUnaryCall<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) { - var call = CreateCall(__ServiceName, __Method_EmptyCall, headers); + var call = CreateCall(__ServiceName, __Method_EmptyCall, headers, deadline); return Calls.AsyncUnaryCall(call, request, cancellationToken); } - public global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) + public global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) { - var call = CreateCall(__ServiceName, __Method_UnaryCall, headers); + var call = CreateCall(__ServiceName, __Method_UnaryCall, headers, deadline); return Calls.BlockingUnaryCall(call, request, cancellationToken); } - public AsyncUnaryCall<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) + public AsyncUnaryCall<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) { - var call = CreateCall(__ServiceName, __Method_UnaryCall, headers); + var call = CreateCall(__ServiceName, __Method_UnaryCall, headers, deadline); return Calls.AsyncUnaryCall(call, request, cancellationToken); } - public AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) + public AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) { - var call = CreateCall(__ServiceName, __Method_StreamingOutputCall, headers); + var call = CreateCall(__ServiceName, __Method_StreamingOutputCall, headers, deadline); return Calls.AsyncServerStreamingCall(call, request, cancellationToken); } - public AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) + public AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) { - var call = CreateCall(__ServiceName, __Method_StreamingInputCall, headers); + var call = CreateCall(__ServiceName, __Method_StreamingInputCall, headers, deadline); return Calls.AsyncClientStreamingCall(call, cancellationToken); } - public AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) + public AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) { - var call = CreateCall(__ServiceName, __Method_FullDuplexCall, headers); + var call = CreateCall(__ServiceName, __Method_FullDuplexCall, headers, deadline); return Calls.AsyncDuplexStreamingCall(call, cancellationToken); } - public AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> HalfDuplexCall(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) + public AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> HalfDuplexCall(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) { - var call = CreateCall(__ServiceName, __Method_HalfDuplexCall, headers); + var call = CreateCall(__ServiceName, __Method_HalfDuplexCall, headers, deadline); return Calls.AsyncDuplexStreamingCall(call, cancellationToken); } } diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 682521446f..7e30a84a62 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -433,10 +433,20 @@ grpcsharp_channel_args_destroy(grpc_channel_args *args) { /* Timespec */ -GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_now(void) { return gpr_now(GPR_CLOCK_REALTIME); } +GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_now(gpr_clock_type clock_type) { + return gpr_now(clock_type); +} + +GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_inf_future(gpr_clock_type clock_type) { + return gpr_inf_future(clock_type); +} + +GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_inf_past(gpr_clock_type clock_type) { + return gpr_inf_past(clock_type); +} -GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_inf_future(void) { - return gpr_inf_future(GPR_CLOCK_REALTIME); +GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_convert_clock_type(gpr_timespec t, gpr_clock_type target_clock) { + return gpr_convert_clock_type(t, target_clock); } GPR_EXPORT gpr_int32 GPR_CALLTYPE gprsharp_sizeof_timespec(void) { diff --git a/src/node/ext/timeval.cc b/src/node/ext/timeval.cc index 60de4d816d..bf68513c48 100644 --- a/src/node/ext/timeval.cc +++ b/src/node/ext/timeval.cc @@ -52,6 +52,7 @@ gpr_timespec MillisecondsToTimespec(double millis) { } double TimespecToMilliseconds(gpr_timespec timespec) { + timespec = gpr_convert_clock_type(timespec, GPR_CLOCK_REALTIME); if (gpr_time_cmp(timespec, gpr_inf_future(GPR_CLOCK_REALTIME)) == 0) { return std::numeric_limits<double>::infinity(); } else if (gpr_time_cmp(timespec, gpr_inf_past(GPR_CLOCK_REALTIME)) == 0) { diff --git a/src/php/composer.json b/src/php/composer.json index b0115bdadd..2d0fe0c87a 100644 --- a/src/php/composer.json +++ b/src/php/composer.json @@ -1,7 +1,7 @@ { "name": "grpc/grpc", "description": "gRPC library for PHP", - "version": "0.5.0", + "version": "0.5.1", "homepage": "http://grpc.io", "license": "BSD-3-Clause", "repositories": [ diff --git a/src/php/lib/Grpc/BaseStub.php b/src/php/lib/Grpc/BaseStub.php index 48c00977eb..8c438e4bf9 100755 --- a/src/php/lib/Grpc/BaseStub.php +++ b/src/php/lib/Grpc/BaseStub.php @@ -60,7 +60,10 @@ class BaseStub { } unset($opts['update_metadata']); } - + $package_config = json_decode( + file_get_contents(dirname(__FILE__) . '/../../composer.json'), true); + $opts['grpc.primary_user_agent'] = + 'grpc-php/' . $package_config['version']; $this->channel = new Channel($hostname, $opts); } diff --git a/src/python/src/grpc/_adapter/_c/types/client_credentials.c b/src/python/src/grpc/_adapter/_c/types/client_credentials.c index 6a4561c060..9ea2b39cad 100644 --- a/src/python/src/grpc/_adapter/_c/types/client_credentials.c +++ b/src/python/src/grpc/_adapter/_c/types/client_credentials.c @@ -208,6 +208,7 @@ ClientCredentials *pygrpc_ClientCredentials_service_account( return self; } +/* TODO: Rename this credentials to something like service_account_jwt_access */ ClientCredentials *pygrpc_ClientCredentials_jwt( PyTypeObject *type, PyObject *args, PyObject *kwargs) { ClientCredentials *self; @@ -219,7 +220,7 @@ ClientCredentials *pygrpc_ClientCredentials_jwt( return NULL; } self = (ClientCredentials *)type->tp_alloc(type, 0); - self->c_creds = grpc_jwt_credentials_create( + self->c_creds = grpc_service_account_jwt_access_credentials_create( json_key, pygrpc_cast_double_to_gpr_timespec(lifetime)); if (!self->c_creds) { Py_DECREF(self); diff --git a/src/python/src/grpc/_adapter/_c/utility.c b/src/python/src/grpc/_adapter/_c/utility.c index d9f911a41a..51f3c9be01 100644 --- a/src/python/src/grpc/_adapter/_c/utility.c +++ b/src/python/src/grpc/_adapter/_c/utility.c @@ -374,6 +374,7 @@ PyObject *pygrpc_consume_ops(grpc_op *op, size_t nops) { } double pygrpc_cast_gpr_timespec_to_double(gpr_timespec timespec) { + timespec = gpr_convert_clock_type(timespec, GPR_CLOCK_REALTIME); return timespec.tv_sec + 1e-9*timespec.tv_nsec; } diff --git a/src/python/src/grpc/_cython/_cygrpc/credentials.pyx b/src/python/src/grpc/_cython/_cygrpc/credentials.pyx index c14d8844dd..7bb3f798b2 100644 --- a/src/python/src/grpc/_cython/_cygrpc/credentials.pyx +++ b/src/python/src/grpc/_cython/_cygrpc/credentials.pyx @@ -126,6 +126,7 @@ def client_credentials_service_account( credentials.references.extend([json_key, scope]) return credentials +#TODO rename to something like client_credentials_service_account_jwt_access. def client_credentials_jwt(json_key, records.Timespec token_lifetime not None): if isinstance(json_key, bytes): pass @@ -134,7 +135,7 @@ def client_credentials_jwt(json_key, records.Timespec token_lifetime not None): else: raise TypeError("expected json_key to be str or bytes") cdef ClientCredentials credentials = ClientCredentials() - credentials.c_credentials = grpc.grpc_jwt_credentials_create( + credentials.c_credentials = grpc.grpc_service_account_jwt_access_credentials_create( json_key, token_lifetime.c_time) credentials.references.append(json_key) return credentials diff --git a/src/python/src/grpc/_cython/_cygrpc/grpc.pxd b/src/python/src/grpc/_cython/_cygrpc/grpc.pxd index 7db8fbe31c..a76ddfc9e1 100644 --- a/src/python/src/grpc/_cython/_cygrpc/grpc.pxd +++ b/src/python/src/grpc/_cython/_cygrpc/grpc.pxd @@ -313,7 +313,7 @@ cdef extern from "grpc/grpc_security.h": grpc_credentials *grpc_compute_engine_credentials_create() grpc_credentials *grpc_service_account_credentials_create( const char *json_key, const char *scope, gpr_timespec token_lifetime) - grpc_credentials *grpc_jwt_credentials_create(const char *json_key, + grpc_credentials *grpc_service_account_jwt_access_credentials_create(const char *json_key, gpr_timespec token_lifetime) grpc_credentials *grpc_refresh_token_credentials_create( const char *json_refresh_token) diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c index 829f825597..65d9c9a237 100644 --- a/src/ruby/ext/grpc/rb_grpc.c +++ b/src/ruby/ext/grpc/rb_grpc.c @@ -209,10 +209,12 @@ static ID id_to_s; /* Converts a wrapped time constant to a standard time. */ static VALUE grpc_rb_time_val_to_time(VALUE self) { gpr_timespec *time_const = NULL; + gpr_timespec real_time; TypedData_Get_Struct(self, gpr_timespec, &grpc_rb_timespec_data_type, time_const); - return rb_funcall(rb_cTime, id_at, 2, INT2NUM(time_const->tv_sec), - INT2NUM(time_const->tv_nsec)); + real_time = gpr_convert_clock_type(*time_const, GPR_CLOCK_REALTIME); + return rb_funcall(rb_cTime, id_at, 2, INT2NUM(real_time.tv_sec), + INT2NUM(real_time.tv_nsec)); } /* Invokes inspect on the ctime version of the time val. */ diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index e3a0a5ad80..375a651d24 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -213,6 +213,7 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue, grpc_call_error err; request_call_stack st; VALUE result; + gpr_timespec deadline; TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s); if (s->wrapped == NULL) { rb_raise(rb_eRuntimeError, "destroyed!"); @@ -245,15 +246,13 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue, } /* build the NewServerRpc struct result */ + deadline = gpr_convert_clock_type(st.details.deadline, GPR_CLOCK_REALTIME); result = rb_struct_new( - grpc_rb_sNewServerRpc, - rb_str_new2(st.details.method), + grpc_rb_sNewServerRpc, rb_str_new2(st.details.method), rb_str_new2(st.details.host), - rb_funcall(rb_cTime, id_at, 2, INT2NUM(st.details.deadline.tv_sec), - INT2NUM(st.details.deadline.tv_nsec)), - grpc_rb_md_ary_to_h(&st.md_ary), - grpc_rb_wrap_call(call), - NULL); + rb_funcall(rb_cTime, id_at, 2, INT2NUM(deadline.tv_sec), + INT2NUM(deadline.tv_nsec)), + grpc_rb_md_ary_to_h(&st.md_ary), grpc_rb_wrap_call(call), NULL); grpc_request_call_stack_cleanup(&st); return result; } diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c index eca2a40c97..60129afc43 100644 --- a/test/core/channel/channel_stack_test.c +++ b/test/core/channel/channel_stack_test.c @@ -37,6 +37,8 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> + #include "test/core/util/test_config.h" static void channel_init_func(grpc_channel_element *elem, grpc_channel *master, @@ -73,11 +75,14 @@ static void channel_func(grpc_channel_element *elem, grpc_transport_op *op) { ++*(int *)(elem->channel_data); } +static char *get_peer(grpc_call_element *elem) { return gpr_strdup("peer"); } + static void test_create_channel_stack(void) { - const grpc_channel_filter filter = { - call_func, channel_func, sizeof(int), - call_init_func, call_destroy_func, sizeof(int), - channel_init_func, channel_destroy_func, "some_test_filter"}; + const grpc_channel_filter filter = {call_func, channel_func, + sizeof(int), call_init_func, + call_destroy_func, sizeof(int), + channel_init_func, channel_destroy_func, + get_peer, "some_test_filter"}; const grpc_channel_filter *filters = &filter; grpc_channel_stack *channel_stack; grpc_call_stack *call_stack; diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c index 05ad42ca1f..453376caec 100644 --- a/test/core/end2end/dualstack_socket_test.c +++ b/test/core/end2end/dualstack_socket_test.c @@ -37,6 +37,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include "test/core/end2end/cq_verifier.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" @@ -78,6 +79,7 @@ void test_connect(const char *server_host, const char *client_host, int port, size_t details_capacity = 0; int was_cancelled = 2; grpc_call_details call_details; + char *peer; if (port == 0) { port = grpc_pick_unused_port_or_die(); @@ -105,7 +107,12 @@ void test_connect(const char *server_host, const char *client_host, int port, cqv = cq_verifier_create(cq); /* Create client. */ - gpr_join_host_port(&client_hostport, client_host, port); + if (client_host[0] == 'i') { + /* for ipv4:/ipv6: addresses, just concatenate the port */ + gpr_asprintf(&client_hostport, "%s:%d", client_host, port); + } else { + gpr_join_host_port(&client_hostport, client_host, port); + } client = grpc_channel_create(client_hostport, NULL); gpr_log(GPR_INFO, "Testing with server=%s client=%s (expecting %s)", @@ -179,6 +186,10 @@ void test_connect(const char *server_host, const char *client_host, int port, cq_expect_completion(cqv, tag(1), 1); cq_verify(cqv); + peer = grpc_call_get_peer(c); + gpr_log(GPR_DEBUG, "got peer: '%s'", peer); + gpr_free(peer); + GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED); GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); @@ -237,21 +248,31 @@ int main(int argc, char **argv) { /* :: and 0.0.0.0 are handled identically. */ test_connect("::", "127.0.0.1", 0, 1); test_connect("::", "::ffff:127.0.0.1", 0, 1); + test_connect("::", "ipv4:127.0.0.1", 0, 1); + test_connect("::", "ipv6:[::ffff:127.0.0.1]", 0, 1); test_connect("::", "localhost", 0, 1); test_connect("0.0.0.0", "127.0.0.1", 0, 1); test_connect("0.0.0.0", "::ffff:127.0.0.1", 0, 1); + test_connect("0.0.0.0", "ipv4:127.0.0.1", 0, 1); + test_connect("0.0.0.0", "ipv6:[::ffff:127.0.0.1]", 0, 1); test_connect("0.0.0.0", "localhost", 0, 1); if (do_ipv6) { test_connect("::", "::1", 0, 1); test_connect("0.0.0.0", "::1", 0, 1); + test_connect("::", "ipv6:[::1]", 0, 1); + test_connect("0.0.0.0", "ipv6:[::1]", 0, 1); } /* These only work when the families agree. */ test_connect("127.0.0.1", "127.0.0.1", 0, 1); + test_connect("127.0.0.1", "ipv4:127.0.0.1", 0, 1); if (do_ipv6) { test_connect("::1", "::1", 0, 1); test_connect("::1", "127.0.0.1", 0, 0); test_connect("127.0.0.1", "::1", 0, 0); + test_connect("::1", "ipv6:[::1]", 0, 1); + test_connect("::1", "ipv4:127.0.0.1", 0, 0); + test_connect("127.0.0.1", "ipv6:[::1]", 0, 0); } } diff --git a/test/core/end2end/fixtures/chttp2_socket_pair.c b/test/core/end2end/fixtures/chttp2_socket_pair.c index 37b61cf37f..807fc8e7bc 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair.c @@ -80,7 +80,7 @@ static void client_setup_transport(void *ts, grpc_transport *transport, &grpc_connected_channel_filter}; size_t nfilters = sizeof(filters) / sizeof(*filters); grpc_channel *channel = grpc_channel_create_from_filters( - filters, nfilters, cs->client_args, mdctx, 1); + "socketpair-target", filters, nfilters, cs->client_args, mdctx, 1); cs->f->client = channel; diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c index 2ec2697288..21d4404237 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c @@ -80,7 +80,7 @@ static void client_setup_transport(void *ts, grpc_transport *transport, &grpc_connected_channel_filter}; size_t nfilters = sizeof(filters) / sizeof(*filters); grpc_channel *channel = grpc_channel_create_from_filters( - filters, nfilters, cs->client_args, mdctx, 1); + "socketpair-target", filters, nfilters, cs->client_args, mdctx, 1); cs->f->client = channel; diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c b/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c index 3aa364c5e0..c59628b959 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c @@ -81,7 +81,7 @@ static void client_setup_transport(void *ts, grpc_transport *transport, &grpc_connected_channel_filter}; size_t nfilters = sizeof(filters) / sizeof(*filters); grpc_channel *channel = grpc_channel_create_from_filters( - filters, nfilters, cs->client_args, mdctx, 1); + "socketpair-target", filters, nfilters, cs->client_args, mdctx, 1); cs->f->client = channel; diff --git a/test/core/end2end/tests/simple_request.c b/test/core/end2end/tests/simple_request.c index 6194b841d8..ca783afccd 100644 --- a/test/core/end2end/tests/simple_request.c +++ b/test/core/end2end/tests/simple_request.c @@ -114,11 +114,17 @@ static void simple_request_body(grpc_end2end_test_fixture f) { char *details = NULL; size_t details_capacity = 0; int was_cancelled = 2; + char *peer; c = grpc_channel_create_call(f.client, f.cq, "/foo", "foo.test.google.fr:1234", deadline); GPR_ASSERT(c); + peer = grpc_call_get_peer(c); + GPR_ASSERT(peer != NULL); + gpr_log(GPR_DEBUG, "client_peer_before_call=%s", peer); + gpr_free(peer); + grpc_metadata_array_init(&initial_metadata_recv); grpc_metadata_array_init(&trailing_metadata_recv); grpc_metadata_array_init(&request_metadata_recv); @@ -151,6 +157,15 @@ static void simple_request_body(grpc_end2end_test_fixture f) { cq_expect_completion(cqv, tag(101), 1); cq_verify(cqv); + peer = grpc_call_get_peer(s); + GPR_ASSERT(peer != NULL); + gpr_log(GPR_DEBUG, "server_peer=%s", peer); + gpr_free(peer); + peer = grpc_call_get_peer(c); + GPR_ASSERT(peer != NULL); + gpr_log(GPR_DEBUG, "client_peer=%s", peer); + gpr_free(peer); + op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index a23c64928e..4e7fb446a7 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -172,7 +172,7 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) { create_sockets(sv); - ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size); + ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size, "test"); grpc_endpoint_add_to_pollset(ep, &g_pollset); written_bytes = fill_socket_partial(sv[0], num_bytes); @@ -207,7 +207,8 @@ static void large_read_test(ssize_t slice_size) { create_sockets(sv); - ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), slice_size); + ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), slice_size, + "test"); grpc_endpoint_add_to_pollset(ep, &g_pollset); written_bytes = fill_socket(sv[0]); @@ -340,7 +341,7 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) { create_sockets(sv); ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"), - GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test"); grpc_endpoint_add_to_pollset(ep, &g_pollset); state.ep = ep; @@ -394,7 +395,7 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) { create_sockets(sv); ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_error_test"), - GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test"); grpc_endpoint_add_to_pollset(ep, &g_pollset); close(sv[0]); @@ -459,10 +460,10 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair( grpc_endpoint_test_fixture f; create_sockets(sv); - f.client_ep = - grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"), slice_size); - f.server_ep = - grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"), slice_size); + f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"), + slice_size, "test"); + f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"), + slice_size, "test"); grpc_endpoint_add_to_pollset(f.client_ep, &g_pollset); grpc_endpoint_add_to_pollset(f.server_ep, &g_pollset); diff --git a/test/core/security/credentials_test.c b/test/core/security/credentials_test.c index d3fea9680a..dd6e0d7bb3 100644 --- a/test/core/security/credentials_test.c +++ b/test/core/security/credentials_test.c @@ -826,8 +826,9 @@ static void on_jwt_creds_get_metadata_failure(void *user_data, static void test_jwt_creds_success(void) { char *json_key_string = test_json_key_str(); - grpc_credentials *jwt_creds = grpc_jwt_credentials_create( - json_key_string, grpc_max_auth_token_lifetime); + grpc_credentials *jwt_creds = + grpc_service_account_jwt_access_credentials_create( + json_key_string, grpc_max_auth_token_lifetime); GPR_ASSERT(grpc_credentials_has_request_metadata(jwt_creds)); GPR_ASSERT(grpc_credentials_has_request_metadata_only(jwt_creds)); @@ -858,8 +859,9 @@ static void test_jwt_creds_success(void) { static void test_jwt_creds_signing_failure(void) { char *json_key_string = test_json_key_str(); - grpc_credentials *jwt_creds = grpc_jwt_credentials_create( - json_key_string, grpc_max_auth_token_lifetime); + grpc_credentials *jwt_creds = + grpc_service_account_jwt_access_credentials_create( + json_key_string, grpc_max_auth_token_lifetime); GPR_ASSERT(grpc_credentials_has_request_metadata(jwt_creds)); GPR_ASSERT(grpc_credentials_has_request_metadata_only(jwt_creds)); @@ -900,7 +902,7 @@ static grpc_credentials *composite_inner_creds(grpc_credentials *creds, } static void test_google_default_creds_auth_key(void) { - grpc_jwt_credentials *jwt; + grpc_service_account_jwt_access_credentials *jwt; grpc_credentials *creds; char *json_key = test_json_key_str(); grpc_flush_cached_google_default_credentials(); @@ -909,7 +911,7 @@ static void test_google_default_creds_auth_key(void) { gpr_free(json_key); creds = grpc_google_default_credentials_create(); GPR_ASSERT(creds != NULL); - jwt = (grpc_jwt_credentials *)composite_inner_creds( + jwt = (grpc_service_account_jwt_access_credentials *)composite_inner_creds( creds, GRPC_CREDENTIALS_TYPE_JWT); GPR_ASSERT( strcmp(jwt->key.client_id, diff --git a/test/core/surface/lame_client_test.c b/test/core/surface/lame_client_test.c index b2facd33b1..3a7a3a36ee 100644 --- a/test/core/surface/lame_client_test.c +++ b/test/core/surface/lame_client_test.c @@ -57,7 +57,7 @@ int main(int argc, char **argv) { grpc_metadata_array_init(&trailing_metadata_recv); - chan = grpc_lame_client_channel_create(); + chan = grpc_lame_client_channel_create("lampoon:national"); GPR_ASSERT(chan); cq = grpc_completion_queue_create(); call = grpc_channel_create_call(chan, cq, "/Foo", "anywhere", diff --git a/test/cpp/client/credentials_test.cc b/test/cpp/client/credentials_test.cc index ee94f455a4..bbf7705f0a 100644 --- a/test/cpp/client/credentials_test.cc +++ b/test/cpp/client/credentials_test.cc @@ -47,7 +47,7 @@ class CredentialsTest : public ::testing::Test { TEST_F(CredentialsTest, InvalidServiceAccountCreds) { std::shared_ptr<Credentials> bad1 = ServiceAccountCredentials("", "", 1); - EXPECT_EQ(nullptr, bad1.get()); + EXPECT_EQ(static_cast<Credentials *>(nullptr), bad1.get()); } } // namespace testing diff --git a/test/cpp/common/secure_auth_context_test.cc b/test/cpp/common/secure_auth_context_test.cc index f18a04178e..d0243a5432 100644 --- a/test/cpp/common/secure_auth_context_test.cc +++ b/test/cpp/common/secure_auth_context_test.cc @@ -92,26 +92,6 @@ TEST_F(SecureAuthContextTest, Iterators) { EXPECT_EQ("bar", p2.second); ++iter; EXPECT_EQ(context.end(), iter); - // Range-based for loop test. - int i = 0; - for (auto p : context) { - switch (i++) { - case 0: - EXPECT_EQ("name", p.first); - EXPECT_EQ("chapi", p.second); - break; - case 1: - EXPECT_EQ("name", p.first); - EXPECT_EQ("chapo", p.second); - break; - case 2: - EXPECT_EQ("foo", p.first); - EXPECT_EQ("bar", p.second); - break; - default: - EXPECT_TRUE(0); - } - } } } // namespace diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index a865a0e359..c433b78948 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -144,6 +144,11 @@ class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service { if (request->has_param() && request->param().check_auth_context()) { CheckAuthContext(context); } + if (request->has_param() && + request->param().response_message_length() > 0) { + response->set_message( + grpc::string(request->param().response_message_length(), '\0')); + } return Status::OK; } @@ -509,7 +514,7 @@ TEST_F(End2endTest, DiffPackageServices) { // rpc and stream should fail on bad credentials. TEST_F(End2endTest, BadCredentials) { std::shared_ptr<Credentials> bad_creds = ServiceAccountCredentials("", "", 1); - EXPECT_EQ(nullptr, bad_creds.get()); + EXPECT_EQ(static_cast<Credentials *>(nullptr), bad_creds.get()); std::shared_ptr<ChannelInterface> channel = CreateChannel(server_address_.str(), bad_creds, ChannelArguments()); std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub( @@ -786,6 +791,21 @@ TEST_F(End2endTest, ClientAuthContext) { CheckAuthContext(&context); } +// Make the response larger than the flow control window. +TEST_F(End2endTest, HugeResponse) { + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("huge response"); + const int kResponseSize = 1024 * (1024 + 10); + request.mutable_param()->set_response_message_length(kResponseSize); + + ClientContext context; + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(kResponseSize, response.message().size()); + EXPECT_TRUE(s.ok()); +} + } // namespace testing } // namespace grpc diff --git a/test/cpp/interop/client_helper.cc b/test/cpp/interop/client_helper.cc index 48b1b2e864..73d82f7b88 100644 --- a/test/cpp/interop/client_helper.cc +++ b/test/cpp/interop/client_helper.cc @@ -123,7 +123,8 @@ std::shared_ptr<ChannelInterface> CreateChannelForTestCase( GPR_ASSERT(FLAGS_enable_ssl); grpc::string json_key = GetServiceAccountJsonKey(); std::chrono::seconds token_lifetime = std::chrono::hours(1); - creds = JWTCredentials(json_key, token_lifetime.count()); + creds = + ServiceAccountJWTAccessCredentials(json_key, token_lifetime.count()); return CreateTestChannel(host_port, FLAGS_server_host_override, FLAGS_enable_ssl, FLAGS_use_prod_roots, creds); } else if (test_case == "oauth2_auth_token") { diff --git a/test/cpp/util/messages.proto b/test/cpp/util/messages.proto index 3708972b90..2fad8b42a2 100644 --- a/test/cpp/util/messages.proto +++ b/test/cpp/util/messages.proto @@ -38,6 +38,7 @@ message RequestParams { optional int32 server_cancel_after_us = 3; optional bool echo_metadata = 4; optional bool check_auth_context = 5; + optional int32 response_message_length = 6; } message EchoRequest { diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 31ad56a97c..3e578c171b 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -803,7 +803,7 @@ src/core/client_config/resolver.h \ src/core/client_config/resolver_factory.h \ src/core/client_config/resolver_registry.h \ src/core/client_config/resolvers/dns_resolver.h \ -src/core/client_config/resolvers/unix_resolver_posix.h \ +src/core/client_config/resolvers/sockaddr_resolver.h \ src/core/client_config/subchannel.h \ src/core/client_config/subchannel_factory.h \ src/core/client_config/uri_parser.h \ @@ -881,6 +881,7 @@ src/core/transport/stream_op.h \ src/core/transport/transport.h \ src/core/transport/transport_impl.h \ src/core/census/context.h \ +src/core/census/rpc_stat_id.h \ src/core/httpcli/format_request.c \ src/core/httpcli/httpcli.c \ src/core/httpcli/httpcli_security_connector.c \ @@ -922,7 +923,7 @@ src/core/client_config/resolver.c \ src/core/client_config/resolver_factory.c \ src/core/client_config/resolver_registry.c \ src/core/client_config/resolvers/dns_resolver.c \ -src/core/client_config/resolvers/unix_resolver_posix.c \ +src/core/client_config/resolvers/sockaddr_resolver.c \ src/core/client_config/subchannel.c \ src/core/client_config/subchannel_factory.c \ src/core/client_config/uri_parser.c \ @@ -1016,6 +1017,7 @@ src/core/transport/transport.c \ src/core/transport/transport_op_string.c \ src/core/census/context.c \ src/core/census/initialize.c \ +src/core/census/record_stat.c \ include/grpc/support/alloc.h \ include/grpc/support/atm.h \ include/grpc/support/atm_gcc_atomic.h \ diff --git a/tools/jenkins/run_distribution.sh b/tools/jenkins/run_distribution.sh index e5281adcf4..49b7d306d1 100755 --- a/tools/jenkins/run_distribution.sh +++ b/tools/jenkins/run_distribution.sh @@ -32,6 +32,8 @@ # linuxbrew installation of a selected language set -ex +# Our homebrew installation script command, per language +# Can be used in both linux and macos if [ "$language" == "core" ]; then command="curl -fsSL https://goo.gl/getgrpc | bash -" elif [[ "python nodejs ruby php" =~ "$language" ]]; then @@ -66,6 +68,7 @@ if [ "$platform" == "linux" ]; then elif [ "$platform" == "macos" ]; then if [ "$dist_channel" == "homebrew" ]; then + echo "Formulas installed by system-wide homebrew (before)" brew list -l @@ -93,11 +96,12 @@ elif [ "$platform" == "macos" ]; then *nodejs*) export PATH=$HOME/.nvm/versions/node/v0.12.7/bin:$PATH ;; + *ruby*) + export PATH=/usr/local/rvm/rubies/ruby-2.2.1/bin:$PATH + ;; *php*) export CFLAGS="-Wno-parentheses-equality" ;; - *) - ;; esac # Run our homebrew installation script @@ -105,7 +109,6 @@ elif [ "$platform" == "macos" ]; then # Uninstall / clean up per-language modules/extensions after the test case $language in - *core*) ;; *python*) deactivate rm -rf jenkins_python_venv @@ -121,10 +124,6 @@ elif [ "$platform" == "macos" ]; then *php*) rm grpc.so ;; - *) - echo "Unsupported language $language" - exit 1 - ;; esac # Clean up diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index abddaab699..d6db928d2a 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -9849,6 +9849,7 @@ "include/grpc/status.h", "src/core/census/context.h", "src/core/census/grpc_context.h", + "src/core/census/rpc_stat_id.h", "src/core/channel/census_filter.h", "src/core/channel/channel_args.h", "src/core/channel/channel_stack.h", @@ -9867,7 +9868,7 @@ "src/core/client_config/resolver_factory.h", "src/core/client_config/resolver_registry.h", "src/core/client_config/resolvers/dns_resolver.h", - "src/core/client_config/resolvers/unix_resolver_posix.h", + "src/core/client_config/resolvers/sockaddr_resolver.h", "src/core/client_config/subchannel.h", "src/core/client_config/subchannel_factory.h", "src/core/client_config/uri_parser.h", @@ -9977,6 +9978,8 @@ "src/core/census/grpc_context.c", "src/core/census/grpc_context.h", "src/core/census/initialize.c", + "src/core/census/record_stat.c", + "src/core/census/rpc_stat_id.h", "src/core/channel/census_filter.h", "src/core/channel/channel_args.c", "src/core/channel/channel_args.h", @@ -10011,8 +10014,8 @@ "src/core/client_config/resolver_registry.h", "src/core/client_config/resolvers/dns_resolver.c", "src/core/client_config/resolvers/dns_resolver.h", - "src/core/client_config/resolvers/unix_resolver_posix.c", - "src/core/client_config/resolvers/unix_resolver_posix.h", + "src/core/client_config/resolvers/sockaddr_resolver.c", + "src/core/client_config/resolvers/sockaddr_resolver.h", "src/core/client_config/subchannel.c", "src/core/client_config/subchannel.h", "src/core/client_config/subchannel_factory.c", @@ -10311,6 +10314,7 @@ "include/grpc/status.h", "src/core/census/context.h", "src/core/census/grpc_context.h", + "src/core/census/rpc_stat_id.h", "src/core/channel/census_filter.h", "src/core/channel/channel_args.h", "src/core/channel/channel_stack.h", @@ -10329,7 +10333,7 @@ "src/core/client_config/resolver_factory.h", "src/core/client_config/resolver_registry.h", "src/core/client_config/resolvers/dns_resolver.h", - "src/core/client_config/resolvers/unix_resolver_posix.h", + "src/core/client_config/resolvers/sockaddr_resolver.h", "src/core/client_config/subchannel.h", "src/core/client_config/subchannel_factory.h", "src/core/client_config/uri_parser.h", @@ -10421,6 +10425,8 @@ "src/core/census/grpc_context.c", "src/core/census/grpc_context.h", "src/core/census/initialize.c", + "src/core/census/record_stat.c", + "src/core/census/rpc_stat_id.h", "src/core/channel/census_filter.h", "src/core/channel/channel_args.c", "src/core/channel/channel_args.h", @@ -10455,8 +10461,8 @@ "src/core/client_config/resolver_registry.h", "src/core/client_config/resolvers/dns_resolver.c", "src/core/client_config/resolvers/dns_resolver.h", - "src/core/client_config/resolvers/unix_resolver_posix.c", - "src/core/client_config/resolvers/unix_resolver_posix.h", + "src/core/client_config/resolvers/sockaddr_resolver.c", + "src/core/client_config/resolvers/sockaddr_resolver.h", "src/core/client_config/subchannel.c", "src/core/client_config/subchannel.h", "src/core/client_config/subchannel_factory.c", diff --git a/vsprojects/grpc/grpc.vcxproj b/vsprojects/grpc/grpc.vcxproj index 6f929a72c7..744627e388 100644 --- a/vsprojects/grpc/grpc.vcxproj +++ b/vsprojects/grpc/grpc.vcxproj @@ -192,7 +192,7 @@ <ClInclude Include="..\..\src\core\client_config\resolver_factory.h" /> <ClInclude Include="..\..\src\core\client_config\resolver_registry.h" /> <ClInclude Include="..\..\src\core\client_config\resolvers\dns_resolver.h" /> - <ClInclude Include="..\..\src\core\client_config\resolvers\unix_resolver_posix.h" /> + <ClInclude Include="..\..\src\core\client_config\resolvers\sockaddr_resolver.h" /> <ClInclude Include="..\..\src\core\client_config\subchannel.h" /> <ClInclude Include="..\..\src\core\client_config\subchannel_factory.h" /> <ClInclude Include="..\..\src\core\client_config\uri_parser.h" /> @@ -270,6 +270,7 @@ <ClInclude Include="..\..\src\core\transport\transport.h" /> <ClInclude Include="..\..\src\core\transport\transport_impl.h" /> <ClInclude Include="..\..\src\core\census\context.h" /> + <ClInclude Include="..\..\src\core\census\rpc_stat_id.h" /> </ItemGroup> <ItemGroup> <ClCompile Include="..\..\src\core\httpcli\format_request.c"> @@ -354,7 +355,7 @@ </ClCompile> <ClCompile Include="..\..\src\core\client_config\resolvers\dns_resolver.c"> </ClCompile> - <ClCompile Include="..\..\src\core\client_config\resolvers\unix_resolver_posix.c"> + <ClCompile Include="..\..\src\core\client_config\resolvers\sockaddr_resolver.c"> </ClCompile> <ClCompile Include="..\..\src\core\client_config\subchannel.c"> </ClCompile> @@ -542,6 +543,8 @@ </ClCompile> <ClCompile Include="..\..\src\core\census\initialize.c"> </ClCompile> + <ClCompile Include="..\..\src\core\census\record_stat.c"> + </ClCompile> </ItemGroup> <ItemGroup> <ProjectReference Include="..\gpr\gpr.vcxproj"> diff --git a/vsprojects/grpc/grpc.vcxproj.filters b/vsprojects/grpc/grpc.vcxproj.filters index 0c388222c2..84a7823b2d 100644 --- a/vsprojects/grpc/grpc.vcxproj.filters +++ b/vsprojects/grpc/grpc.vcxproj.filters @@ -124,7 +124,7 @@ <ClCompile Include="..\..\src\core\client_config\resolvers\dns_resolver.c"> <Filter>src\core\client_config\resolvers</Filter> </ClCompile> - <ClCompile Include="..\..\src\core\client_config\resolvers\unix_resolver_posix.c"> + <ClCompile Include="..\..\src\core\client_config\resolvers\sockaddr_resolver.c"> <Filter>src\core\client_config\resolvers</Filter> </ClCompile> <ClCompile Include="..\..\src\core\client_config\subchannel.c"> @@ -406,6 +406,9 @@ <ClCompile Include="..\..\src\core\census\initialize.c"> <Filter>src\core\census</Filter> </ClCompile> + <ClCompile Include="..\..\src\core\census\record_stat.c"> + <Filter>src\core\census</Filter> + </ClCompile> </ItemGroup> <ItemGroup> <ClInclude Include="..\..\include\grpc\grpc_security.h"> @@ -539,7 +542,7 @@ <ClInclude Include="..\..\src\core\client_config\resolvers\dns_resolver.h"> <Filter>src\core\client_config\resolvers</Filter> </ClInclude> - <ClInclude Include="..\..\src\core\client_config\resolvers\unix_resolver_posix.h"> + <ClInclude Include="..\..\src\core\client_config\resolvers\sockaddr_resolver.h"> <Filter>src\core\client_config\resolvers</Filter> </ClInclude> <ClInclude Include="..\..\src\core\client_config\subchannel.h"> @@ -773,6 +776,9 @@ <ClInclude Include="..\..\src\core\census\context.h"> <Filter>src\core\census</Filter> </ClInclude> + <ClInclude Include="..\..\src\core\census\rpc_stat_id.h"> + <Filter>src\core\census</Filter> + </ClInclude> </ItemGroup> <ItemGroup> diff --git a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj index 753f3424b3..a5730235fb 100644 --- a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj @@ -173,7 +173,7 @@ <ClInclude Include="..\..\src\core\client_config\resolver_factory.h" /> <ClInclude Include="..\..\src\core\client_config\resolver_registry.h" /> <ClInclude Include="..\..\src\core\client_config\resolvers\dns_resolver.h" /> - <ClInclude Include="..\..\src\core\client_config\resolvers\unix_resolver_posix.h" /> + <ClInclude Include="..\..\src\core\client_config\resolvers\sockaddr_resolver.h" /> <ClInclude Include="..\..\src\core\client_config\subchannel.h" /> <ClInclude Include="..\..\src\core\client_config\subchannel_factory.h" /> <ClInclude Include="..\..\src\core\client_config\uri_parser.h" /> @@ -251,6 +251,7 @@ <ClInclude Include="..\..\src\core\transport\transport.h" /> <ClInclude Include="..\..\src\core\transport\transport_impl.h" /> <ClInclude Include="..\..\src\core\census\context.h" /> + <ClInclude Include="..\..\src\core\census\rpc_stat_id.h" /> </ItemGroup> <ItemGroup> <ClCompile Include="..\..\src\core\surface\init_unsecure.c"> @@ -289,7 +290,7 @@ </ClCompile> <ClCompile Include="..\..\src\core\client_config\resolvers\dns_resolver.c"> </ClCompile> - <ClCompile Include="..\..\src\core\client_config\resolvers\unix_resolver_posix.c"> + <ClCompile Include="..\..\src\core\client_config\resolvers\sockaddr_resolver.c"> </ClCompile> <ClCompile Include="..\..\src\core\client_config\subchannel.c"> </ClCompile> @@ -477,6 +478,8 @@ </ClCompile> <ClCompile Include="..\..\src\core\census\initialize.c"> </ClCompile> + <ClCompile Include="..\..\src\core\census\record_stat.c"> + </ClCompile> </ItemGroup> <ItemGroup> <ProjectReference Include="..\gpr\gpr.vcxproj"> diff --git a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters index 1b312cca51..c7790431df 100644 --- a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -55,7 +55,7 @@ <ClCompile Include="..\..\src\core\client_config\resolvers\dns_resolver.c"> <Filter>src\core\client_config\resolvers</Filter> </ClCompile> - <ClCompile Include="..\..\src\core\client_config\resolvers\unix_resolver_posix.c"> + <ClCompile Include="..\..\src\core\client_config\resolvers\sockaddr_resolver.c"> <Filter>src\core\client_config\resolvers</Filter> </ClCompile> <ClCompile Include="..\..\src\core\client_config\subchannel.c"> @@ -337,6 +337,9 @@ <ClCompile Include="..\..\src\core\census\initialize.c"> <Filter>src\core\census</Filter> </ClCompile> + <ClCompile Include="..\..\src\core\census\record_stat.c"> + <Filter>src\core\census</Filter> + </ClCompile> </ItemGroup> <ItemGroup> <ClInclude Include="..\..\include\grpc\byte_buffer.h"> @@ -416,7 +419,7 @@ <ClInclude Include="..\..\src\core\client_config\resolvers\dns_resolver.h"> <Filter>src\core\client_config\resolvers</Filter> </ClInclude> - <ClInclude Include="..\..\src\core\client_config\resolvers\unix_resolver_posix.h"> + <ClInclude Include="..\..\src\core\client_config\resolvers\sockaddr_resolver.h"> <Filter>src\core\client_config\resolvers</Filter> </ClInclude> <ClInclude Include="..\..\src\core\client_config\subchannel.h"> @@ -650,6 +653,9 @@ <ClInclude Include="..\..\src\core\census\context.h"> <Filter>src\core\census</Filter> </ClInclude> + <ClInclude Include="..\..\src\core\census\rpc_stat_id.h"> + <Filter>src\core\census</Filter> + </ClInclude> </ItemGroup> <ItemGroup> |