aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rwxr-xr-xsrc/boringssl/gen_build_yaml.py5
-rw-r--r--src/compiler/cpp_generator.cc350
-rw-r--r--src/compiler/python_generator.cc1
-rw-r--r--src/core/census/context.c500
-rw-r--r--src/core/census/grpc_filter.c4
-rw-r--r--src/core/census/initialize.c4
-rw-r--r--src/core/census/placeholders.c114
-rw-r--r--src/core/census/tag_set.c535
-rw-r--r--src/core/channel/channel_args.c7
-rw-r--r--src/core/channel/client_channel.c23
-rw-r--r--src/core/channel/client_uchannel.c6
-rw-r--r--src/core/channel/compress_filter.c6
-rw-r--r--src/core/channel/http_client_filter.c4
-rw-r--r--src/core/channel/http_server_filter.c4
-rw-r--r--src/core/channel/subchannel_call_holder.c18
-rw-r--r--src/core/client_config/lb_policies/pick_first.c22
-rw-r--r--src/core/client_config/lb_policies/round_robin.c14
-rw-r--r--src/core/client_config/resolvers/dns_resolver.c6
-rw-r--r--src/core/client_config/resolvers/sockaddr_resolver.c6
-rw-r--r--src/core/client_config/resolvers/zookeeper_resolver.c4
-rw-r--r--src/core/client_config/subchannel.c41
-rw-r--r--src/core/httpcli/httpcli.c10
-rw-r--r--src/core/iomgr/closure.c8
-rw-r--r--src/core/iomgr/closure.h9
-rw-r--r--src/core/iomgr/exec_ctx.c18
-rw-r--r--src/core/iomgr/exec_ctx.h17
-rw-r--r--src/core/iomgr/executor.c8
-rw-r--r--src/core/iomgr/executor.h4
-rw-r--r--src/core/iomgr/fd_posix.c6
-rw-r--r--src/core/iomgr/iocp_windows.c10
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c6
-rw-r--r--src/core/iomgr/pollset_posix.c13
-rw-r--r--src/core/iomgr/pollset_windows.c15
-rw-r--r--src/core/iomgr/resolve_address_posix.c4
-rw-r--r--src/core/iomgr/resolve_address_windows.c4
-rw-r--r--src/core/iomgr/sockaddr_win32.h7
-rw-r--r--src/core/iomgr/tcp_client_posix.c14
-rw-r--r--src/core/iomgr/tcp_client_windows.c8
-rw-r--r--src/core/iomgr/tcp_posix.c14
-rw-r--r--src/core/iomgr/tcp_server_posix.c9
-rw-r--r--src/core/iomgr/tcp_server_windows.c8
-rw-r--r--src/core/iomgr/tcp_windows.c26
-rw-r--r--src/core/iomgr/timer.c6
-rw-r--r--src/core/iomgr/udp_server.c12
-rw-r--r--src/core/iomgr/udp_server.h10
-rw-r--r--src/core/iomgr/workqueue.h6
-rw-r--r--src/core/iomgr/workqueue_posix.c6
-rw-r--r--src/core/security/credentials.c4
-rw-r--r--src/core/security/google_default_credentials.c6
-rw-r--r--src/core/security/handshake.c11
-rw-r--r--src/core/security/json_token.c14
-rw-r--r--src/core/security/secure_endpoint.c10
-rw-r--r--src/core/security/security_connector.c45
-rw-r--r--src/core/security/security_connector.h3
-rw-r--r--src/core/security/server_auth_filter.c4
-rw-r--r--src/core/security/server_secure_chttp2.c2
-rw-r--r--src/core/statistics/census_init.c2
-rw-r--r--src/core/support/env_win32.c22
-rw-r--r--src/core/support/log_win32.c6
-rw-r--r--src/core/support/string_win32.c10
-rw-r--r--src/core/support/subprocess_windows.c141
-rw-r--r--src/core/support/sync_win32.c8
-rw-r--r--src/core/support/time_win32.c21
-rw-r--r--src/core/surface/alarm.c6
-rw-r--r--src/core/surface/call.c22
-rw-r--r--src/core/surface/channel.c6
-rw-r--r--src/core/surface/channel_connectivity.c6
-rw-r--r--src/core/surface/channel_create.c6
-rw-r--r--src/core/surface/channel_ping.c4
-rw-r--r--src/core/surface/completion_queue.c6
-rw-r--r--src/core/surface/init.c6
-rw-r--r--src/core/surface/lame_client.c6
-rw-r--r--src/core/surface/secure_channel_create.c6
-rw-r--r--src/core/surface/server.c36
-rw-r--r--src/core/surface/server_chttp2.c2
-rw-r--r--src/core/surface/server_create.c5
-rw-r--r--src/core/surface/version.c2
-rw-r--r--src/core/transport/chttp2/hpack_encoder.c4
-rw-r--r--src/core/transport/chttp2/internal.h15
-rw-r--r--src/core/transport/chttp2/stream_lists.c21
-rw-r--r--src/core/transport/chttp2/writing.c20
-rw-r--r--src/core/transport/chttp2_transport.c34
-rw-r--r--src/core/transport/connectivity_state.c12
-rw-r--r--src/core/transport/transport.c10
-rw-r--r--src/core/transport/transport.h15
-rw-r--r--src/cpp/client/channel.cc17
-rw-r--r--src/cpp/client/client_context.cc4
-rw-r--r--src/cpp/client/create_channel.cc6
-rw-r--r--src/cpp/client/credentials.cc8
-rw-r--r--src/cpp/client/secure_credentials.cc15
-rw-r--r--src/cpp/client/secure_credentials.h16
-rw-r--r--src/cpp/codegen/grpc_library.cc (renamed from src/core/census/context.h)17
-rw-r--r--src/cpp/common/alarm.cc11
-rw-r--r--src/cpp/common/completion_queue.cc8
-rw-r--r--src/cpp/server/server.cc103
-rw-r--r--src/cpp/server/server_builder.cc57
-rw-r--r--src/cpp/server/server_context.cc2
-rw-r--r--src/cpp/util/byte_buffer.cc8
-rw-r--r--src/csharp/Grpc.Core/VersionInfo.cs2
-rw-r--r--src/csharp/build_packages.bat4
-rw-r--r--src/node/ext/byte_buffer.cc4
-rw-r--r--src/node/ext/node_grpc.cc4
-rw-r--r--src/node/index.js4
-rw-r--r--src/node/performance/benchmark_server.js2
-rw-r--r--src/node/performance/worker_service_impl.js4
-rw-r--r--src/node/src/client.js4
-rw-r--r--src/node/src/credentials.js4
-rw-r--r--src/node/src/grpc_extension.js40
-rw-r--r--src/node/src/metadata.js2
-rw-r--r--src/node/src/server.js4
-rw-r--r--src/node/test/call_test.js4
-rw-r--r--src/node/test/channel_test.js4
-rw-r--r--src/node/test/constant_test.js4
-rw-r--r--src/node/test/end_to_end_test.js4
-rw-r--r--src/node/test/server_test.js4
-rw-r--r--src/node/test/surface_test.js18
-rw-r--r--src/objective-c/BoringSSL.podspec4
-rw-r--r--src/proto/grpc/testing/control.proto5
-rw-r--r--src/proto/grpc/testing/echo_messages.proto3
-rw-r--r--src/python/README.md100
-rw-r--r--src/python/grpcio/commands.py138
-rw-r--r--src/python/grpcio/grpc/framework/foundation/logging_pool.py7
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py4
-rw-r--r--src/python/grpcio/support.py44
-rw-r--r--src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py26
-rw-r--r--src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py50
-rw-r--r--src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py55
-rw-r--r--src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py6
-rw-r--r--src/ruby/ext/grpc/rb_call.c7
-rw-r--r--src/ruby/ext/grpc/rb_call_credentials.c2
-rw-r--r--src/ruby/ext/grpc/rb_event_thread.c3
-rwxr-xr-xsrc/zlib/gen_build_yaml.py1
132 files changed, 1961 insertions, 1373 deletions
diff --git a/src/boringssl/gen_build_yaml.py b/src/boringssl/gen_build_yaml.py
index b635ee4c13..7c7a57993f 100755
--- a/src/boringssl/gen_build_yaml.py
+++ b/src/boringssl/gen_build_yaml.py
@@ -82,6 +82,7 @@ class Grpc(object):
for f in files['ssl_headers'] + files['ssl_internal_headers'] + files['crypto_headers'] + files['crypto_internal_headers']
),
'boringssl': True,
+ 'defaults': 'boringssl',
},
{
'name': 'boringssl_test_util',
@@ -89,6 +90,7 @@ class Grpc(object):
'language': 'c++',
'secure': 'no',
'boringssl': True,
+ 'defaults': 'boringssl',
'src': [
map_dir(f)
for f in sorted(files['test_support'])
@@ -103,6 +105,7 @@ class Grpc(object):
'src': [map_dir(test)],
'vs_proj_dir': 'test/boringssl',
'boringssl': True,
+ 'defaults': 'boringssl',
'deps': [
'boringssl_test_util',
'boringssl',
@@ -120,6 +123,7 @@ class Grpc(object):
'src': [],
'vs_proj_dir': 'test/boringssl',
'boringssl': True,
+ 'defaults': 'boringssl',
'deps': [
'boringssl_%s_lib' % os.path.splitext(os.path.basename(test))[0],
'boringssl_test_util',
@@ -138,6 +142,7 @@ class Grpc(object):
'flaky': False,
'language': 'c++',
'boringssl': True,
+ 'defaults': 'boringssl',
'cpu_cost': 1.0
}
for test in files['tests']
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index 3c8ca8ab45..d3d6224de2 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -112,18 +112,17 @@ grpc::string GetHeaderPrologue(const grpc::protobuf::FileDescriptor *file,
grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file,
const Parameters &params) {
grpc::string temp =
- "#include <grpc++/support/async_stream.h>\n"
- "#include <grpc++/impl/rpc_method.h>\n"
- "#include <grpc++/impl/proto_utils.h>\n"
- "#include <grpc++/impl/service_type.h>\n"
- "#include <grpc++/support/async_unary_call.h>\n"
- "#include <grpc++/support/status.h>\n"
- "#include <grpc++/support/stub_options.h>\n"
- "#include <grpc++/support/sync_stream.h>\n"
+ "#include <grpc++/impl/codegen/async_stream.h>\n"
+ "#include <grpc++/impl/codegen/async_unary_call.h>\n"
+ "#include <grpc++/impl/codegen/proto_utils.h>\n"
+ "#include <grpc++/impl/codegen/rpc_method.h>\n"
+ "#include <grpc++/impl/codegen/service_type.h>\n"
+ "#include <grpc++/impl/codegen/status.h>\n"
+ "#include <grpc++/impl/codegen/stub_options.h>\n"
+ "#include <grpc++/impl/codegen/sync_stream.h>\n"
"\n"
"namespace grpc {\n"
"class CompletionQueue;\n"
- "class Channel;\n"
"class RpcService;\n"
"class ServerCompletionQueue;\n"
"class ServerContext;\n"
@@ -491,39 +490,186 @@ void PrintHeaderServerMethodAsync(
grpc_cpp_generator::ClassName(method->input_type(), true);
(*vars)["Response"] =
grpc_cpp_generator::ClassName(method->output_type(), true);
+ printer->Print(*vars, "template <class BaseClass>\n");
+ printer->Print(*vars,
+ "class WithAsyncMethod_$Method$ : public BaseClass {\n");
+ printer->Print(
+ " private:\n"
+ " void BaseClassMustBeDerivedFromService(Service *service) {}\n");
+ printer->Print(" public:\n");
+ printer->Indent();
+ printer->Print(*vars,
+ "WithAsyncMethod_$Method$() {\n"
+ " ::grpc::Service::MarkMethodAsync($Idx$);\n"
+ "}\n");
+ printer->Print(*vars,
+ "~WithAsyncMethod_$Method$() GRPC_OVERRIDE {\n"
+ " BaseClassMustBeDerivedFromService(this);\n"
+ "}\n");
if (NoStreaming(method)) {
printer->Print(
*vars,
+ "// disable synchronous version of this method\n"
+ "::grpc::Status $Method$("
+ "::grpc::ServerContext* context, const $Request$* request, "
+ "$Response$* response) GRPC_FINAL GRPC_OVERRIDE {\n"
+ " abort();\n"
+ " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
+ "}\n");
+ printer->Print(
+ *vars,
"void Request$Method$("
"::grpc::ServerContext* context, $Request$* request, "
"::grpc::ServerAsyncResponseWriter< $Response$>* response, "
"::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
+ "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
+ printer->Print(*vars,
+ " ::grpc::Service::RequestAsyncUnary($Idx$, context, "
+ "request, response, new_call_cq, notification_cq, tag);\n");
+ printer->Print("}\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
+ "// disable synchronous version of this method\n"
+ "::grpc::Status $Method$("
+ "::grpc::ServerContext* context, "
+ "::grpc::ServerReader< $Request$>* reader, "
+ "$Response$* response) GRPC_FINAL GRPC_OVERRIDE {\n"
+ " abort();\n"
+ " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
+ "}\n");
+ printer->Print(
+ *vars,
"void Request$Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerAsyncReader< $Response$, $Request$>* reader, "
"::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
+ "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
+ printer->Print(*vars,
+ " ::grpc::Service::RequestAsyncClientStreaming($Idx$, "
+ "context, reader, new_call_cq, notification_cq, tag);\n");
+ printer->Print("}\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
+ "// disable synchronous version of this method\n"
+ "::grpc::Status $Method$("
+ "::grpc::ServerContext* context, const $Request$* request, "
+ "::grpc::ServerWriter< $Response$>* writer) GRPC_FINAL GRPC_OVERRIDE "
+ "{\n"
+ " abort();\n"
+ " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
+ "}\n");
+ printer->Print(
+ *vars,
"void Request$Method$("
"::grpc::ServerContext* context, $Request$* request, "
"::grpc::ServerAsyncWriter< $Response$>* writer, "
"::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
+ "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
+ printer->Print(
+ *vars,
+ " ::grpc::Service::RequestAsyncServerStreaming($Idx$, "
+ "context, request, writer, new_call_cq, notification_cq, tag);\n");
+ printer->Print("}\n");
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
+ "// disable synchronous version of this method\n"
+ "::grpc::Status $Method$("
+ "::grpc::ServerContext* context, "
+ "::grpc::ServerReaderWriter< $Response$, $Request$>* stream) "
+ "GRPC_FINAL GRPC_OVERRIDE {\n"
+ " abort();\n"
+ " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
+ "}\n");
+ printer->Print(
+ *vars,
"void Request$Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, "
"::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag);\n");
+ "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
+ printer->Print(*vars,
+ " ::grpc::Service::RequestAsyncBidiStreaming($Idx$, "
+ "context, stream, new_call_cq, notification_cq, tag);\n");
+ printer->Print("}\n");
+ }
+ printer->Outdent();
+ printer->Print(*vars, "};\n");
+}
+
+void PrintHeaderServerMethodGeneric(
+ grpc::protobuf::io::Printer *printer,
+ const grpc::protobuf::MethodDescriptor *method,
+ std::map<grpc::string, grpc::string> *vars) {
+ (*vars)["Method"] = method->name();
+ (*vars)["Request"] =
+ grpc_cpp_generator::ClassName(method->input_type(), true);
+ (*vars)["Response"] =
+ grpc_cpp_generator::ClassName(method->output_type(), true);
+ printer->Print(*vars, "template <class BaseClass>\n");
+ printer->Print(*vars,
+ "class WithGenericMethod_$Method$ : public BaseClass {\n");
+ printer->Print(
+ " private:\n"
+ " void BaseClassMustBeDerivedFromService(Service *service) {}\n");
+ printer->Print(" public:\n");
+ printer->Indent();
+ printer->Print(*vars,
+ "WithGenericMethod_$Method$() {\n"
+ " ::grpc::Service::MarkMethodGeneric($Idx$);\n"
+ "}\n");
+ printer->Print(*vars,
+ "~WithGenericMethod_$Method$() GRPC_OVERRIDE {\n"
+ " BaseClassMustBeDerivedFromService(this);\n"
+ "}\n");
+ if (NoStreaming(method)) {
+ printer->Print(
+ *vars,
+ "// disable synchronous version of this method\n"
+ "::grpc::Status $Method$("
+ "::grpc::ServerContext* context, const $Request$* request, "
+ "$Response$* response) GRPC_FINAL GRPC_OVERRIDE {\n"
+ " abort();\n"
+ " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
+ "}\n");
+ } else if (ClientOnlyStreaming(method)) {
+ printer->Print(
+ *vars,
+ "// disable synchronous version of this method\n"
+ "::grpc::Status $Method$("
+ "::grpc::ServerContext* context, "
+ "::grpc::ServerReader< $Request$>* reader, "
+ "$Response$* response) GRPC_FINAL GRPC_OVERRIDE {\n"
+ " abort();\n"
+ " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
+ "}\n");
+ } else if (ServerOnlyStreaming(method)) {
+ printer->Print(
+ *vars,
+ "// disable synchronous version of this method\n"
+ "::grpc::Status $Method$("
+ "::grpc::ServerContext* context, const $Request$* request, "
+ "::grpc::ServerWriter< $Response$>* writer) GRPC_FINAL GRPC_OVERRIDE "
+ "{\n"
+ " abort();\n"
+ " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
+ "}\n");
+ } else if (BidiStreaming(method)) {
+ printer->Print(
+ *vars,
+ "// disable synchronous version of this method\n"
+ "::grpc::Status $Method$("
+ "::grpc::ServerContext* context, "
+ "::grpc::ServerReaderWriter< $Response$, $Request$>* stream) "
+ "GRPC_FINAL GRPC_OVERRIDE {\n"
+ " abort();\n"
+ " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
+ "}\n");
}
+ printer->Outdent();
+ printer->Print(*vars, "};\n");
}
void PrintHeaderService(grpc::protobuf::io::Printer *printer,
@@ -557,14 +703,14 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer,
"class Stub GRPC_FINAL : public StubInterface"
" {\n public:\n");
printer->Indent();
- printer->Print("Stub(const std::shared_ptr< ::grpc::Channel>& channel);\n");
+ printer->Print("Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);\n");
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderClientMethod(printer, service->method(i), vars, true);
}
printer->Outdent();
printer->Print("\n private:\n");
printer->Indent();
- printer->Print("std::shared_ptr< ::grpc::Channel> channel_;\n");
+ printer->Print("std::shared_ptr< ::grpc::ChannelInterface> channel_;\n");
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderClientMethod(printer, service->method(i), vars, false);
}
@@ -575,14 +721,14 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer,
printer->Print("};\n");
printer->Print(
"static std::unique_ptr<Stub> NewStub(const std::shared_ptr< "
- "::grpc::Channel>& channel, "
+ "::grpc::ChannelInterface>& channel, "
"const ::grpc::StubOptions& options = ::grpc::StubOptions());\n");
printer->Print("\n");
- // Server side - Synchronous
+ // Server side - base
printer->Print(
- "class Service : public ::grpc::SynchronousService {\n"
+ "class Service : public ::grpc::Service {\n"
" public:\n");
printer->Indent();
printer->Print("Service();\n");
@@ -590,26 +736,32 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer,
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderServerMethodSync(printer, service->method(i), vars);
}
- printer->Print("::grpc::RpcService* service() GRPC_OVERRIDE GRPC_FINAL;\n");
printer->Outdent();
- printer->Print(
- " private:\n"
- " std::unique_ptr< ::grpc::RpcService> service_;\n");
printer->Print("};\n");
// Server side - Asynchronous
- printer->Print(
- "class AsyncService GRPC_FINAL : public ::grpc::AsynchronousService {\n"
- " public:\n");
- printer->Indent();
- (*vars)["MethodCount"] = as_string(service->method_count());
- printer->Print("explicit AsyncService();\n");
- printer->Print("~AsyncService() {};\n");
for (int i = 0; i < service->method_count(); ++i) {
+ (*vars)["Idx"] = as_string(i);
PrintHeaderServerMethodAsync(printer, service->method(i), vars);
}
- printer->Outdent();
- printer->Print("};\n");
+
+ printer->Print("typedef ");
+
+ for (int i = 0; i < service->method_count(); ++i) {
+ (*vars)["method_name"] = service->method(i)->name();
+ printer->Print(*vars, "WithAsyncMethod_$method_name$<");
+ }
+ printer->Print("Service");
+ for (int i = 0; i < service->method_count(); ++i) {
+ printer->Print(" >");
+ }
+ printer->Print(" AsyncService;\n");
+
+ // Server side - Generic
+ for (int i = 0; i < service->method_count(); ++i) {
+ (*vars)["Idx"] = as_string(i);
+ PrintHeaderServerMethodGeneric(printer, service->method(i), vars);
+ }
printer->Outdent();
printer->Print("};\n");
@@ -623,6 +775,12 @@ grpc::string GetHeaderServices(const grpc::protobuf::FileDescriptor *file,
grpc::protobuf::io::StringOutputStream output_stream(&output);
grpc::protobuf::io::Printer printer(&output_stream, '$');
std::map<grpc::string, grpc::string> vars;
+ // Package string is empty or ends with a dot. It is used to fully qualify
+ // method names.
+ vars["Package"] = file->package();
+ if (!file->package().empty()) {
+ vars["Package"].append(".");
+ }
if (!params.services_namespace.empty()) {
vars["services_namespace"] = params.services_namespace;
@@ -702,13 +860,16 @@ grpc::string GetSourceIncludes(const grpc::protobuf::FileDescriptor *file,
grpc::protobuf::io::Printer printer(&output_stream, '$');
std::map<grpc::string, grpc::string> vars;
- printer.Print(vars, "#include <grpc++/channel.h>\n");
- printer.Print(vars, "#include <grpc++/impl/client_unary_call.h>\n");
- printer.Print(vars, "#include <grpc++/impl/rpc_service_method.h>\n");
- printer.Print(vars, "#include <grpc++/impl/service_type.h>\n");
- printer.Print(vars, "#include <grpc++/support/async_unary_call.h>\n");
- printer.Print(vars, "#include <grpc++/support/async_stream.h>\n");
- printer.Print(vars, "#include <grpc++/support/sync_stream.h>\n");
+ printer.Print(vars, "#include <grpc++/impl/codegen/async_stream.h>\n");
+ printer.Print(vars, "#include <grpc++/impl/codegen/async_unary_call.h>\n");
+ printer.Print(vars, "#include <grpc++/impl/codegen/channel_interface.h>\n");
+ printer.Print(vars, "#include <grpc++/impl/codegen/client_unary_call.h>\n");
+ printer.Print(vars,
+ "#include <grpc++/impl/codegen/method_handler_impl.h>\n");
+ printer.Print(vars,
+ "#include <grpc++/impl/codegen/rpc_service_method.h>\n");
+ printer.Print(vars, "#include <grpc++/impl/codegen/service_type.h>\n");
+ printer.Print(vars, "#include <grpc++/impl/codegen/sync_stream.h>\n");
if (!file->package().empty()) {
std::vector<grpc::string> parts =
@@ -889,69 +1050,6 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer,
}
}
-void PrintSourceServerAsyncMethod(
- grpc::protobuf::io::Printer *printer,
- const grpc::protobuf::MethodDescriptor *method,
- std::map<grpc::string, grpc::string> *vars) {
- (*vars)["Method"] = method->name();
- (*vars)["Request"] =
- grpc_cpp_generator::ClassName(method->input_type(), true);
- (*vars)["Response"] =
- grpc_cpp_generator::ClassName(method->output_type(), true);
- if (NoStreaming(method)) {
- printer->Print(
- *vars,
- "void $ns$$Service$::AsyncService::Request$Method$("
- "::grpc::ServerContext* context, "
- "$Request$* request, "
- "::grpc::ServerAsyncResponseWriter< $Response$>* response, "
- "::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
- printer->Print(*vars,
- " AsynchronousService::RequestAsyncUnary($Idx$, context, "
- "request, response, new_call_cq, notification_cq, tag);\n");
- printer->Print("}\n\n");
- } else if (ClientOnlyStreaming(method)) {
- printer->Print(
- *vars,
- "void $ns$$Service$::AsyncService::Request$Method$("
- "::grpc::ServerContext* context, "
- "::grpc::ServerAsyncReader< $Response$, $Request$>* reader, "
- "::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
- printer->Print(*vars,
- " AsynchronousService::RequestClientStreaming($Idx$, "
- "context, reader, new_call_cq, notification_cq, tag);\n");
- printer->Print("}\n\n");
- } else if (ServerOnlyStreaming(method)) {
- printer->Print(
- *vars,
- "void $ns$$Service$::AsyncService::Request$Method$("
- "::grpc::ServerContext* context, "
- "$Request$* request, "
- "::grpc::ServerAsyncWriter< $Response$>* writer, "
- "::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
- printer->Print(
- *vars,
- " AsynchronousService::RequestServerStreaming($Idx$, "
- "context, request, writer, new_call_cq, notification_cq, tag);\n");
- printer->Print("}\n\n");
- } else if (BidiStreaming(method)) {
- printer->Print(
- *vars,
- "void $ns$$Service$::AsyncService::Request$Method$("
- "::grpc::ServerContext* context, "
- "::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, "
- "::grpc::CompletionQueue* new_call_cq, "
- "::grpc::ServerCompletionQueue* notification_cq, void *tag) {\n");
- printer->Print(*vars,
- " AsynchronousService::RequestBidiStreaming($Idx$, "
- "context, stream, new_call_cq, notification_cq, tag);\n");
- printer->Print("}\n\n");
- }
-}
-
void PrintSourceService(grpc::protobuf::io::Printer *printer,
const grpc::protobuf::ServiceDescriptor *service,
std::map<grpc::string, grpc::string> *vars) {
@@ -967,7 +1065,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
printer->Print(*vars,
"std::unique_ptr< $ns$$Service$::Stub> $ns$$Service$::NewStub("
- "const std::shared_ptr< ::grpc::Channel>& channel, "
+ "const std::shared_ptr< ::grpc::ChannelInterface>& channel, "
"const ::grpc::StubOptions& options) {\n"
" std::unique_ptr< $ns$$Service$::Stub> stub(new "
"$ns$$Service$::Stub(channel));\n"
@@ -975,7 +1073,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
"}\n\n");
printer->Print(*vars,
"$ns$$Service$::Stub::Stub(const std::shared_ptr< "
- "::grpc::Channel>& channel)\n");
+ "::grpc::ChannelInterface>& channel)\n");
printer->Indent();
printer->Print(": channel_(channel)");
for (int i = 0; i < service->method_count(); ++i) {
@@ -1006,32 +1104,8 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
PrintSourceClientMethod(printer, service->method(i), vars);
}
- (*vars)["MethodCount"] = as_string(service->method_count());
- printer->Print(*vars,
- "$ns$$Service$::AsyncService::AsyncService() : "
- "::grpc::AsynchronousService("
- "$prefix$$Service$_method_names, $MethodCount$) "
- "{}\n\n");
-
- printer->Print(*vars,
- "$ns$$Service$::Service::Service() {\n"
- "}\n\n");
- printer->Print(*vars,
- "$ns$$Service$::Service::~Service() {\n"
- "}\n\n");
- for (int i = 0; i < service->method_count(); ++i) {
- (*vars)["Idx"] = as_string(i);
- PrintSourceServerMethod(printer, service->method(i), vars);
- PrintSourceServerAsyncMethod(printer, service->method(i), vars);
- }
- printer->Print(*vars,
- "::grpc::RpcService* $ns$$Service$::Service::service() {\n");
+ printer->Print(*vars, "$ns$$Service$::Service::Service() {\n");
printer->Indent();
- printer->Print(
- "if (service_) {\n"
- " return service_.get();\n"
- "}\n");
- printer->Print("service_ = std::unique_ptr< ::grpc::RpcService>(new ::grpc::RpcService());\n");
for (int i = 0; i < service->method_count(); ++i) {
const grpc::protobuf::MethodDescriptor *method = service->method(i);
(*vars)["Idx"] = as_string(i);
@@ -1043,7 +1117,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
if (NoStreaming(method)) {
printer->Print(
*vars,
- "service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
" ::grpc::RpcMethod::NORMAL_RPC,\n"
" new ::grpc::RpcMethodHandler< $ns$$Service$::Service, "
@@ -1053,7 +1127,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
- "service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
" ::grpc::RpcMethod::CLIENT_STREAMING,\n"
" new ::grpc::ClientStreamingHandler< "
@@ -1062,7 +1136,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
- "service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
" ::grpc::RpcMethod::SERVER_STREAMING,\n"
" new ::grpc::ServerStreamingHandler< "
@@ -1071,7 +1145,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
- "service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
" ::grpc::RpcMethod::BIDI_STREAMING,\n"
" new ::grpc::BidiStreamingHandler< "
@@ -1079,9 +1153,15 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
}
}
- printer->Print("return service_.get();\n");
printer->Outdent();
- printer->Print("}\n\n");
+ printer->Print(*vars, "}\n\n");
+ printer->Print(*vars,
+ "$ns$$Service$::Service::~Service() {\n"
+ "}\n\n");
+ for (int i = 0; i < service->method_count(); ++i) {
+ (*vars)["Idx"] = as_string(i);
+ PrintSourceServerMethod(printer, service->method(i), vars);
+ }
}
grpc::string GetSourceServices(const grpc::protobuf::FileDescriptor *file,
diff --git a/src/compiler/python_generator.cc b/src/compiler/python_generator.cc
index c754ae299b..4fd5dfbecb 100644
--- a/src/compiler/python_generator.cc
+++ b/src/compiler/python_generator.cc
@@ -42,7 +42,6 @@
#include <tuple>
#include <vector>
-#include <grpc++/support/config.h>
#include "src/compiler/config.h"
#include "src/compiler/generator_helpers.h"
#include "src/compiler/python_generator.h"
diff --git a/src/core/census/context.c b/src/core/census/context.c
index cab58b653c..e60330de64 100644
--- a/src/core/census/context.c
+++ b/src/core/census/context.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -31,16 +31,500 @@
*
*/
-#include "src/core/census/context.h"
-
-#include <string.h>
#include <grpc/census.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/port_platform.h>
+#include <grpc/support/useful.h>
+#include <stdbool.h>
+#include <string.h>
+#include "src/core/support/string.h"
+
+// Functions in this file support the public context API, including
+// encoding/decoding as part of context propagation across RPC's. The overall
+// requirements (in approximate priority order) for the
+// context representation:
+// 1. Efficient conversion to/from wire format
+// 2. Minimal bytes used on-wire
+// 3. Efficient context creation
+// 4. Efficient lookup of tag value for a key
+// 5. Efficient iteration over tags
+// 6. Minimal memory footprint
+//
+// Notes on tradeoffs/decisions:
+// * tag includes 1 byte length of key, as well as nil-terminating byte. These
+// are to aid in efficient parsing and the ability to directly return key
+// strings. This is more important than saving a single byte/tag on the wire.
+// * The wire encoding uses only single byte values. This eliminates the need
+// to handle endian-ness conversions. It also means there is a hard upper
+// limit of 255 for both CENSUS_MAX_TAG_KV_LEN and CENSUS_MAX_PROPAGATED_TAGS.
+// * Keep all tag information (keys/values/flags) in a single memory buffer,
+// that can be directly copied to the wire.
+// * Binary tags share the same structure as, but are encoded separately from,
+// non-binary tags. This is primarily because non-binary tags are far more
+// likely to be repeated across multiple RPC calls, so are more efficiently
+// cached and compressed in any metadata schemes.
+
+// Structure representing a set of tags. Essentially a count of number of tags
+// present, and pointer to a chunk of memory that contains the per-tag details.
+struct tag_set {
+ int ntags; // number of tags.
+ int ntags_alloc; // ntags + number of deleted tags (total number of tags
+ // in all of kvm). This will always be == ntags, except during the process
+ // of building a new tag set.
+ size_t kvm_size; // number of bytes allocated for key/value storage.
+ size_t kvm_used; // number of bytes of used key/value memory
+ char *kvm; // key/value memory. Consists of repeated entries of:
+ // Offset Size Description
+ // 0 1 Key length, including trailing 0. (K)
+ // 1 1 Value length. (V)
+ // 2 1 Flags
+ // 3 K Key bytes
+ // 3 + K V Value bytes
+ //
+ // We refer to the first 3 entries as the 'tag header'. If extra values are
+ // introduced in the header, you will need to modify the TAG_HEADER_SIZE
+ // constant, the raw_tag structure (and everything that uses it) and the
+ // encode/decode functions appropriately.
+};
+
+// Number of bytes in tag header.
+#define TAG_HEADER_SIZE 3 // key length (1) + value length (1) + flags (1)
+// Offsets to tag header entries.
+#define KEY_LEN_OFFSET 0
+#define VALUE_LEN_OFFSET 1
+#define FLAG_OFFSET 2
+
+// raw_tag represents the raw-storage form of a tag in the kvm of a tag_set.
+struct raw_tag {
+ uint8_t key_len;
+ uint8_t value_len;
+ uint8_t flags;
+ char *key;
+ char *value;
+};
+
+// Use a reserved flag bit for indication of deleted tag.
+#define CENSUS_TAG_DELETED CENSUS_TAG_RESERVED
+#define CENSUS_TAG_IS_DELETED(flags) (flags & CENSUS_TAG_DELETED)
+
+// Primary (external) representation of a context. Composed of 3 underlying
+// tag_set structs, one for each of the binary/printable propagated tags, and
+// one for everything else. This is to efficiently support tag
+// encoding/decoding.
+struct census_context {
+ struct tag_set tags[3];
+ census_context_status status;
+};
+
+// Indices into the tags member of census_context
+#define PROPAGATED_TAGS 0
+#define PROPAGATED_BINARY_TAGS 1
+#define LOCAL_TAGS 2
+
+// Extract a raw tag given a pointer (raw) to the tag header. Allow for some
+// extra bytes in the tag header (see encode/decode functions for usage: this
+// allows for future expansion of the tag header).
+static char *decode_tag(struct raw_tag *tag, char *header, int offset) {
+ tag->key_len = (uint8_t)(*header++);
+ tag->value_len = (uint8_t)(*header++);
+ tag->flags = (uint8_t)(*header++);
+ header += offset;
+ tag->key = header;
+ header += tag->key_len;
+ tag->value = header;
+ return header + tag->value_len;
+}
-/* Placeholder implementation only. */
+// Make a copy (in 'to') of an existing tag_set.
+static void tag_set_copy(struct tag_set *to, const struct tag_set *from) {
+ memcpy(to, from, sizeof(struct tag_set));
+ to->kvm = gpr_malloc(to->kvm_size);
+ memcpy(to->kvm, from->kvm, from->kvm_used);
+}
+
+// Delete a tag from a tag_set, if it exists (returns true if it did).
+static bool tag_set_delete_tag(struct tag_set *tags, const char *key,
+ size_t key_len) {
+ char *kvp = tags->kvm;
+ for (int i = 0; i < tags->ntags_alloc; i++) {
+ uint8_t *flags = (uint8_t *)(kvp + FLAG_OFFSET);
+ struct raw_tag tag;
+ kvp = decode_tag(&tag, kvp, 0);
+ if (CENSUS_TAG_IS_DELETED(tag.flags)) continue;
+ if ((key_len == tag.key_len) && (memcmp(key, tag.key, key_len) == 0)) {
+ *flags |= CENSUS_TAG_DELETED;
+ tags->ntags--;
+ return true;
+ }
+ }
+ return false;
+}
+
+// Delete a tag from a context, return true if it existed.
+static bool context_delete_tag(census_context *context, const census_tag *tag,
+ size_t key_len) {
+ return (
+ tag_set_delete_tag(&context->tags[LOCAL_TAGS], tag->key, key_len) ||
+ tag_set_delete_tag(&context->tags[PROPAGATED_TAGS], tag->key, key_len) ||
+ tag_set_delete_tag(&context->tags[PROPAGATED_BINARY_TAGS], tag->key,
+ key_len));
+}
+
+// Add a tag to a tag_set. Return true on success, false if the tag could
+// not be added because of constraints on tag set size. This function should
+// not be called if the tag may already exist (in a non-deleted state) in
+// the tag_set, as that would result in two tags with the same key.
+static bool tag_set_add_tag(struct tag_set *tags, const census_tag *tag,
+ size_t key_len) {
+ if (tags->ntags == CENSUS_MAX_PROPAGATED_TAGS) {
+ return false;
+ }
+ const size_t tag_size = key_len + tag->value_len + TAG_HEADER_SIZE;
+ if (tags->kvm_used + tag_size > tags->kvm_size) {
+ // allocate new memory if needed
+ tags->kvm_size += 2 * CENSUS_MAX_TAG_KV_LEN + TAG_HEADER_SIZE;
+ char *new_kvm = gpr_malloc(tags->kvm_size);
+ memcpy(new_kvm, tags->kvm, tags->kvm_used);
+ gpr_free(tags->kvm);
+ tags->kvm = new_kvm;
+ }
+ char *kvp = tags->kvm + tags->kvm_used;
+ *kvp++ = (char)key_len;
+ *kvp++ = (char)tag->value_len;
+ // ensure reserved flags are not used.
+ *kvp++ = (char)(tag->flags & (CENSUS_TAG_PROPAGATE | CENSUS_TAG_STATS |
+ CENSUS_TAG_BINARY));
+ memcpy(kvp, tag->key, key_len);
+ kvp += key_len;
+ memcpy(kvp, tag->value, tag->value_len);
+ tags->kvm_used += tag_size;
+ tags->ntags++;
+ tags->ntags_alloc++;
+ return true;
+}
+
+// Add/modify/delete a tag to/in a context. Caller must validate that tag key
+// etc. are valid.
+static void context_modify_tag(census_context *context, const census_tag *tag,
+ size_t key_len) {
+ // First delete the tag if it is already present.
+ bool deleted = context_delete_tag(context, tag, key_len);
+ // Determine if we need to add it back.
+ bool call_add = tag->value != NULL && tag->value_len != 0;
+ bool added = false;
+ if (call_add) {
+ if (CENSUS_TAG_IS_PROPAGATED(tag->flags)) {
+ if (CENSUS_TAG_IS_BINARY(tag->flags)) {
+ added = tag_set_add_tag(&context->tags[PROPAGATED_BINARY_TAGS], tag,
+ key_len);
+ } else {
+ added = tag_set_add_tag(&context->tags[PROPAGATED_TAGS], tag, key_len);
+ }
+ } else {
+ added = tag_set_add_tag(&context->tags[LOCAL_TAGS], tag, key_len);
+ }
+ }
+ if (deleted) {
+ if (call_add) {
+ context->status.n_modified_tags++;
+ } else {
+ context->status.n_deleted_tags++;
+ }
+ } else {
+ if (added) {
+ context->status.n_added_tags++;
+ } else {
+ context->status.n_ignored_tags++;
+ }
+ }
+}
+
+// Remove memory used for deleted tags from a tag set. Basic algorithm:
+// 1) Walk through tag set to find first deleted tag. Record where it is.
+// 2) Find the next not-deleted tag. Copy all of kvm from there to the end
+// "over" the deleted tags
+// 3) repeat #1 and #2 until we have seen all tags
+// 4) if we are still looking for a not-deleted tag, then all the end portion
+// of the kvm is deleted. Just reduce the used amount of memory by the
+// appropriate amount.
+static void tag_set_flatten(struct tag_set *tags) {
+ if (tags->ntags == tags->ntags_alloc) return;
+ bool found_deleted = false; // found a deleted tag.
+ char *kvp = tags->kvm;
+ char *dbase = NULL; // record location of deleted tag
+ for (int i = 0; i < tags->ntags_alloc; i++) {
+ struct raw_tag tag;
+ char *next_kvp = decode_tag(&tag, kvp, 0);
+ if (found_deleted) {
+ if (!CENSUS_TAG_IS_DELETED(tag.flags)) {
+ ptrdiff_t reduce = kvp - dbase; // #bytes in deleted tags
+ GPR_ASSERT(reduce > 0);
+ ptrdiff_t copy_size = tags->kvm + tags->kvm_used - kvp;
+ GPR_ASSERT(copy_size > 0);
+ memmove(dbase, kvp, (size_t)copy_size);
+ tags->kvm_used -= (size_t)reduce;
+ next_kvp -= reduce;
+ found_deleted = false;
+ }
+ } else {
+ if (CENSUS_TAG_IS_DELETED(tag.flags)) {
+ dbase = kvp;
+ found_deleted = true;
+ }
+ }
+ kvp = next_kvp;
+ }
+ if (found_deleted) {
+ GPR_ASSERT(dbase > tags->kvm);
+ tags->kvm_used = (size_t)(dbase - tags->kvm);
+ }
+ tags->ntags_alloc = tags->ntags;
+}
-size_t census_context_serialize(const census_context *context, char *buffer,
- size_t buf_size) {
- /* TODO(aveitch): implement serialization */
+census_context *census_context_create(const census_context *base,
+ const census_tag *tags, int ntags,
+ census_context_status const **status) {
+ census_context *context = gpr_malloc(sizeof(census_context));
+ // If we are given a base, copy it into our new tag set. Otherwise set it
+ // to zero/NULL everything.
+ if (base == NULL) {
+ memset(context, 0, sizeof(census_context));
+ } else {
+ tag_set_copy(&context->tags[PROPAGATED_TAGS], &base->tags[PROPAGATED_TAGS]);
+ tag_set_copy(&context->tags[PROPAGATED_BINARY_TAGS],
+ &base->tags[PROPAGATED_BINARY_TAGS]);
+ tag_set_copy(&context->tags[LOCAL_TAGS], &base->tags[LOCAL_TAGS]);
+ memset(&context->status, 0, sizeof(context->status));
+ }
+ // Walk over the additional tags and, for those that aren't invalid, modify
+ // the context to add/replace/delete as required.
+ for (int i = 0; i < ntags; i++) {
+ const census_tag *tag = &tags[i];
+ size_t key_len = strlen(tag->key) + 1;
+ // ignore the tag if it is too long/short.
+ if (key_len != 1 && key_len <= CENSUS_MAX_TAG_KV_LEN &&
+ tag->value_len <= CENSUS_MAX_TAG_KV_LEN) {
+ context_modify_tag(context, tag, key_len);
+ } else {
+ context->status.n_invalid_tags++;
+ }
+ }
+ // Remove any deleted tags, update status if needed, and return.
+ tag_set_flatten(&context->tags[PROPAGATED_TAGS]);
+ tag_set_flatten(&context->tags[PROPAGATED_BINARY_TAGS]);
+ tag_set_flatten(&context->tags[LOCAL_TAGS]);
+ context->status.n_propagated_tags = context->tags[PROPAGATED_TAGS].ntags;
+ context->status.n_propagated_binary_tags =
+ context->tags[PROPAGATED_BINARY_TAGS].ntags;
+ context->status.n_local_tags = context->tags[LOCAL_TAGS].ntags;
+ if (status) {
+ *status = &context->status;
+ }
+ return context;
+}
+
+const census_context_status *census_context_get_status(
+ const census_context *context) {
+ return &context->status;
+}
+
+void census_context_destroy(census_context *context) {
+ gpr_free(context->tags[PROPAGATED_TAGS].kvm);
+ gpr_free(context->tags[PROPAGATED_BINARY_TAGS].kvm);
+ gpr_free(context->tags[LOCAL_TAGS].kvm);
+ gpr_free(context);
+}
+
+void census_context_initialize_iterator(const census_context *context,
+ census_context_iterator *iterator) {
+ iterator->context = context;
+ iterator->index = 0;
+ if (context->tags[PROPAGATED_TAGS].ntags != 0) {
+ iterator->base = PROPAGATED_TAGS;
+ iterator->kvm = context->tags[PROPAGATED_TAGS].kvm;
+ } else if (context->tags[PROPAGATED_BINARY_TAGS].ntags != 0) {
+ iterator->base = PROPAGATED_BINARY_TAGS;
+ iterator->kvm = context->tags[PROPAGATED_BINARY_TAGS].kvm;
+ } else if (context->tags[LOCAL_TAGS].ntags != 0) {
+ iterator->base = LOCAL_TAGS;
+ iterator->kvm = context->tags[LOCAL_TAGS].kvm;
+ } else {
+ iterator->base = -1;
+ }
+}
+
+int census_context_next_tag(census_context_iterator *iterator,
+ census_tag *tag) {
+ if (iterator->base < 0) {
+ return 0;
+ }
+ struct raw_tag raw;
+ iterator->kvm = decode_tag(&raw, iterator->kvm, 0);
+ tag->key = raw.key;
+ tag->value = raw.value;
+ tag->value_len = raw.value_len;
+ tag->flags = raw.flags;
+ if (++iterator->index == iterator->context->tags[iterator->base].ntags) {
+ do {
+ if (iterator->base == LOCAL_TAGS) {
+ iterator->base = -1;
+ return 1;
+ }
+ } while (iterator->context->tags[++iterator->base].ntags == 0);
+ iterator->index = 0;
+ iterator->kvm = iterator->context->tags[iterator->base].kvm;
+ }
+ return 1;
+}
+
+// Find a tag in a tag_set by key. Return true if found, false otherwise.
+static bool tag_set_get_tag(const struct tag_set *tags, const char *key,
+ size_t key_len, census_tag *tag) {
+ char *kvp = tags->kvm;
+ for (int i = 0; i < tags->ntags; i++) {
+ struct raw_tag raw;
+ kvp = decode_tag(&raw, kvp, 0);
+ if (key_len == raw.key_len && memcmp(raw.key, key, key_len) == 0) {
+ tag->key = raw.key;
+ tag->value = raw.value;
+ tag->value_len = raw.value_len;
+ tag->flags = raw.flags;
+ return true;
+ }
+ }
+ return false;
+}
+
+int census_context_get_tag(const census_context *context, const char *key,
+ census_tag *tag) {
+ size_t key_len = strlen(key) + 1;
+ if (key_len == 1) {
+ return 0;
+ }
+ if (tag_set_get_tag(&context->tags[PROPAGATED_TAGS], key, key_len, tag) ||
+ tag_set_get_tag(&context->tags[PROPAGATED_BINARY_TAGS], key, key_len,
+ tag) ||
+ tag_set_get_tag(&context->tags[LOCAL_TAGS], key, key_len, tag)) {
+ return 1;
+ }
return 0;
}
+
+// Context encoding and decoding functions.
+//
+// Wire format for tag_set's on the wire:
+//
+// First, a tag set header:
+//
+// offset bytes description
+// 0 1 version number
+// 1 1 number of bytes in this header. This allows for future
+// expansion.
+// 2 1 number of bytes in each tag header.
+// 3 1 ntags value from tag set.
+//
+// This is followed by the key/value memory from struct tag_set.
+
+#define ENCODED_VERSION 0 // Version number
+#define ENCODED_HEADER_SIZE 4 // size of tag set header
+
+// Encode a tag set. Returns 0 if buffer is too small.
+static size_t tag_set_encode(const struct tag_set *tags, char *buffer,
+ size_t buf_size) {
+ if (buf_size < ENCODED_HEADER_SIZE + tags->kvm_used) {
+ return 0;
+ }
+ buf_size -= ENCODED_HEADER_SIZE;
+ *buffer++ = (char)ENCODED_VERSION;
+ *buffer++ = (char)ENCODED_HEADER_SIZE;
+ *buffer++ = (char)TAG_HEADER_SIZE;
+ *buffer++ = (char)tags->ntags;
+ if (tags->ntags == 0) {
+ return ENCODED_HEADER_SIZE;
+ }
+ memcpy(buffer, tags->kvm, tags->kvm_used);
+ return ENCODED_HEADER_SIZE + tags->kvm_used;
+}
+
+char *census_context_encode(const census_context *context, char *buffer,
+ size_t buf_size, size_t *print_buf_size,
+ size_t *bin_buf_size) {
+ *print_buf_size =
+ tag_set_encode(&context->tags[PROPAGATED_TAGS], buffer, buf_size);
+ if (*print_buf_size == 0) {
+ return NULL;
+ }
+ char *b_buffer = buffer + *print_buf_size;
+ *bin_buf_size = tag_set_encode(&context->tags[PROPAGATED_BINARY_TAGS],
+ b_buffer, buf_size - *print_buf_size);
+ if (*bin_buf_size == 0) {
+ return NULL;
+ }
+ return b_buffer;
+}
+
+// Decode a tag set.
+static void tag_set_decode(struct tag_set *tags, const char *buffer,
+ size_t size) {
+ uint8_t version = (uint8_t)(*buffer++);
+ uint8_t header_size = (uint8_t)(*buffer++);
+ uint8_t tag_header_size = (uint8_t)(*buffer++);
+ tags->ntags = tags->ntags_alloc = (int)(*buffer++);
+ if (tags->ntags == 0) {
+ tags->ntags_alloc = 0;
+ tags->kvm_size = 0;
+ tags->kvm_used = 0;
+ tags->kvm = NULL;
+ return;
+ }
+ if (header_size != ENCODED_HEADER_SIZE) {
+ GPR_ASSERT(version != ENCODED_VERSION);
+ GPR_ASSERT(ENCODED_HEADER_SIZE < header_size);
+ buffer += (header_size - ENCODED_HEADER_SIZE);
+ }
+ tags->kvm_used = size - header_size;
+ tags->kvm_size = tags->kvm_used + CENSUS_MAX_TAG_KV_LEN;
+ tags->kvm = gpr_malloc(tags->kvm_size);
+ if (tag_header_size != TAG_HEADER_SIZE) {
+ // something new in the tag information. I don't understand it, so
+ // don't copy it over.
+ GPR_ASSERT(version != ENCODED_VERSION);
+ GPR_ASSERT(tag_header_size > TAG_HEADER_SIZE);
+ char *kvp = tags->kvm;
+ for (int i = 0; i < tags->ntags; i++) {
+ memcpy(kvp, buffer, TAG_HEADER_SIZE);
+ kvp += header_size;
+ struct raw_tag raw;
+ buffer =
+ decode_tag(&raw, (char *)buffer, tag_header_size - TAG_HEADER_SIZE);
+ memcpy(kvp, raw.key, (size_t)raw.key_len + raw.value_len);
+ kvp += raw.key_len + raw.value_len;
+ }
+ } else {
+ memcpy(tags->kvm, buffer, tags->kvm_used);
+ }
+}
+
+census_context *census_context_decode(const char *buffer, size_t size,
+ const char *bin_buffer, size_t bin_size) {
+ census_context *context = gpr_malloc(sizeof(census_context));
+ memset(&context->tags[LOCAL_TAGS], 0, sizeof(struct tag_set));
+ if (buffer == NULL) {
+ memset(&context->tags[PROPAGATED_TAGS], 0, sizeof(struct tag_set));
+ } else {
+ tag_set_decode(&context->tags[PROPAGATED_TAGS], buffer, size);
+ }
+ if (bin_buffer == NULL) {
+ memset(&context->tags[PROPAGATED_BINARY_TAGS], 0, sizeof(struct tag_set));
+ } else {
+ tag_set_decode(&context->tags[PROPAGATED_BINARY_TAGS], bin_buffer,
+ bin_size);
+ }
+ memset(&context->status, 0, sizeof(context->status));
+ context->status.n_propagated_tags = context->tags[PROPAGATED_TAGS].ntags;
+ context->status.n_propagated_binary_tags =
+ context->tags[PROPAGATED_BINARY_TAGS].ntags;
+ // TODO(aveitch): check that BINARY flag is correct for each type.
+ return context;
+}
diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c
index ee1f62f8c2..a8db32b9d5 100644
--- a/src/core/census/grpc_filter.c
+++ b/src/core/census/grpc_filter.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -91,7 +91,7 @@ static void client_start_transport_op(grpc_exec_ctx *exec_ctx,
}
static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr,
- int success) {
+ bool success) {
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
diff --git a/src/core/census/initialize.c b/src/core/census/initialize.c
index b7af714e0b..ce7ec09b89 100644
--- a/src/core/census/initialize.c
+++ b/src/core/census/initialize.c
@@ -37,9 +37,7 @@ static int features_enabled = CENSUS_FEATURE_NONE;
int census_initialize(int features) {
if (features_enabled != CENSUS_FEATURE_NONE) {
- return 1;
- }
- if (features == CENSUS_FEATURE_NONE) {
+ // Must have been a previous call to census_initialize; return error
return 1;
}
features_enabled = features;
diff --git a/src/core/census/placeholders.c b/src/core/census/placeholders.c
new file mode 100644
index 0000000000..5829cc9460
--- /dev/null
+++ b/src/core/census/placeholders.c
@@ -0,0 +1,114 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/census.h>
+
+#include <grpc/support/log.h>
+
+/* Placeholders for the pending APIs */
+
+census_tag_set *census_context_tag_set(census_context *context) {
+ (void)context;
+ abort();
+}
+
+int census_get_trace_record(census_trace_record *trace_record) {
+ (void)trace_record;
+ abort();
+}
+
+void census_record_values(census_context *context, census_value *values,
+ size_t nvalues) {
+ (void)context;
+ (void)values;
+ (void)nvalues;
+ abort();
+}
+
+void census_set_rpc_client_peer(census_context *context, const char *peer) {
+ (void)context;
+ (void)peer;
+ abort();
+}
+
+void census_trace_scan_end() { abort(); }
+
+int census_trace_scan_start(int consume) {
+ (void)consume;
+ abort();
+}
+
+const census_aggregation *census_view_aggregrations(const census_view *view) {
+ (void)view;
+ abort();
+}
+
+census_view *census_view_create(uint32_t metric_id, const census_tag_set *tags,
+ const census_aggregation *aggregations,
+ size_t naggregations) {
+ (void)metric_id;
+ (void)tags;
+ (void)aggregations;
+ (void)naggregations;
+ abort();
+}
+
+const census_tag_set *census_view_tags(const census_view *view) {
+ (void)view;
+ abort();
+}
+
+void census_view_delete(census_view *view) {
+ (void)view;
+ abort();
+}
+
+const census_view_data *census_view_get_data(const census_view *view) {
+ (void)view;
+ abort();
+}
+
+size_t census_view_metric(const census_view *view) {
+ (void)view;
+ abort();
+}
+
+size_t census_view_naggregations(const census_view *view) {
+ (void)view;
+ abort();
+}
+
+void census_view_reset(census_view *view) {
+ (void)view;
+ abort();
+}
diff --git a/src/core/census/tag_set.c b/src/core/census/tag_set.c
deleted file mode 100644
index 9b65a1dfa1..0000000000
--- a/src/core/census/tag_set.c
+++ /dev/null
@@ -1,535 +0,0 @@
-/*
- *
- * Copyright 2015-2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <grpc/census.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/port_platform.h>
-#include <grpc/support/useful.h>
-#include <stdbool.h>
-#include <string.h>
-#include "src/core/support/string.h"
-
-// Functions in this file support the public tag_set API, as well as
-// encoding/decoding tag_sets as part of context propagation across
-// RPC's. The overall requirements (in approximate priority order) for the
-// tag_set representations:
-// 1. Efficient conversion to/from wire format
-// 2. Minimal bytes used on-wire
-// 3. Efficient tag set creation
-// 4. Efficient lookup of value for a key
-// 5. Efficient lookup of value for an index (to support iteration)
-// 6. Minimal memory footprint
-//
-// Notes on tradeoffs/decisions:
-// * tag includes 1 byte length of key, as well as nil-terminating byte. These
-// are to aid in efficient parsing and the ability to directly return key
-// strings. This is more important than saving a single byte/tag on the wire.
-// * The wire encoding uses only single byte values. This eliminates the need
-// to handle endian-ness conversions. It also means there is a hard upper
-// limit of 255 for both CENSUS_MAX_TAG_KV_LEN and CENSUS_MAX_PROPAGATED_TAGS.
-// * Keep all tag information (keys/values/flags) in a single memory buffer,
-// that can be directly copied to the wire.14
-// * Binary tags share the same structure as, but are encoded separately from,
-// non-binary tags. This is primarily because non-binary tags are far more
-// likely to be repeated across multiple RPC calls, so are more efficiently
-// cached and compressed in any metadata schemes.
-// * all lengths etc. are restricted to one byte. This eliminates endian
-// issues.
-
-// Structure representing a set of tags. Essentially a count of number of tags
-// present, and pointer to a chunk of memory that contains the per-tag details.
-struct tag_set {
- int ntags; // number of tags.
- int ntags_alloc; // ntags + number of deleted tags (total number of tags
- // in all of kvm). This will always be == ntags, except during the process
- // of building a new tag set.
- size_t kvm_size; // number of bytes allocated for key/value storage.
- size_t kvm_used; // number of bytes of used key/value memory
- char *kvm; // key/value memory. Consists of repeated entries of:
- // Offset Size Description
- // 0 1 Key length, including trailing 0. (K)
- // 1 1 Value length. (V)
- // 2 1 Flags
- // 3 K Key bytes
- // 3 + K V Value bytes
- //
- // We refer to the first 3 entries as the 'tag header'. If extra values are
- // introduced in the header, you will need to modify the TAG_HEADER_SIZE
- // constant, the raw_tag structure (and everything that uses it) and the
- // encode/decode functions appropriately.
-};
-
-// Number of bytes in tag header.
-#define TAG_HEADER_SIZE 3 // key length (1) + value length (1) + flags (1)
-// Offsets to tag header entries.
-#define KEY_LEN_OFFSET 0
-#define VALUE_LEN_OFFSET 1
-#define FLAG_OFFSET 2
-
-// raw_tag represents the raw-storage form of a tag in the kvm of a tag_set.
-struct raw_tag {
- uint8_t key_len;
- uint8_t value_len;
- uint8_t flags;
- char *key;
- char *value;
-};
-
-// Use a reserved flag bit for indication of deleted tag.
-#define CENSUS_TAG_DELETED CENSUS_TAG_RESERVED
-#define CENSUS_TAG_IS_DELETED(flags) (flags & CENSUS_TAG_DELETED)
-
-// Primary (external) representation of a tag set. Composed of 3 underlying
-// tag_set structs, one for each of the binary/printable propagated tags, and
-// one for everything else. This is to efficiently support tag
-// encoding/decoding.
-struct census_tag_set {
- struct tag_set tags[3];
- census_tag_set_create_status status;
-};
-
-// Indices into the tags member of census_tag_set
-#define PROPAGATED_TAGS 0
-#define PROPAGATED_BINARY_TAGS 1
-#define LOCAL_TAGS 2
-
-// Extract a raw tag given a pointer (raw) to the tag header. Allow for some
-// extra bytes in the tag header (see encode/decode functions for usage: this
-// allows for future expansion of the tag header).
-static char *decode_tag(struct raw_tag *tag, char *header, int offset) {
- tag->key_len = (uint8_t)(*header++);
- tag->value_len = (uint8_t)(*header++);
- tag->flags = (uint8_t)(*header++);
- header += offset;
- tag->key = header;
- header += tag->key_len;
- tag->value = header;
- return header + tag->value_len;
-}
-
-// Make a copy (in 'to') of an existing tag_set.
-static void tag_set_copy(struct tag_set *to, const struct tag_set *from) {
- memcpy(to, from, sizeof(struct tag_set));
- to->kvm = gpr_malloc(to->kvm_size);
- memcpy(to->kvm, from->kvm, from->kvm_used);
-}
-
-// Delete a tag from a tag_set, if it exists (returns true if it did).
-static bool tag_set_delete_tag(struct tag_set *tags, const char *key,
- size_t key_len) {
- char *kvp = tags->kvm;
- for (int i = 0; i < tags->ntags_alloc; i++) {
- uint8_t *flags = (uint8_t *)(kvp + FLAG_OFFSET);
- struct raw_tag tag;
- kvp = decode_tag(&tag, kvp, 0);
- if (CENSUS_TAG_IS_DELETED(tag.flags)) continue;
- if ((key_len == tag.key_len) && (memcmp(key, tag.key, key_len) == 0)) {
- *flags |= CENSUS_TAG_DELETED;
- tags->ntags--;
- return true;
- }
- }
- return false;
-}
-
-// Delete a tag from a census_tag_set, return true if it existed.
-static bool cts_delete_tag(census_tag_set *tags, const census_tag *tag,
- size_t key_len) {
- return (tag_set_delete_tag(&tags->tags[LOCAL_TAGS], tag->key, key_len) ||
- tag_set_delete_tag(&tags->tags[PROPAGATED_TAGS], tag->key, key_len) ||
- tag_set_delete_tag(&tags->tags[PROPAGATED_BINARY_TAGS], tag->key,
- key_len));
-}
-
-// Add a tag to a tag_set. Return true on sucess, false if the tag could
-// not be added because of constraints on tag set size. This function should
-// not be called if the tag may already exist (in a non-deleted state) in
-// the tag_set, as that would result in two tags with the same key.
-static bool tag_set_add_tag(struct tag_set *tags, const census_tag *tag,
- size_t key_len) {
- if (tags->ntags == CENSUS_MAX_PROPAGATED_TAGS) {
- return false;
- }
- const size_t tag_size = key_len + tag->value_len + TAG_HEADER_SIZE;
- if (tags->kvm_used + tag_size > tags->kvm_size) {
- // allocate new memory if needed
- tags->kvm_size += 2 * CENSUS_MAX_TAG_KV_LEN + TAG_HEADER_SIZE;
- char *new_kvm = gpr_malloc(tags->kvm_size);
- memcpy(new_kvm, tags->kvm, tags->kvm_used);
- gpr_free(tags->kvm);
- tags->kvm = new_kvm;
- }
- char *kvp = tags->kvm + tags->kvm_used;
- *kvp++ = (char)key_len;
- *kvp++ = (char)tag->value_len;
- // ensure reserved flags are not used.
- *kvp++ = (char)(tag->flags & (CENSUS_TAG_PROPAGATE | CENSUS_TAG_STATS |
- CENSUS_TAG_BINARY));
- memcpy(kvp, tag->key, key_len);
- kvp += key_len;
- memcpy(kvp, tag->value, tag->value_len);
- tags->kvm_used += tag_size;
- tags->ntags++;
- tags->ntags_alloc++;
- return true;
-}
-
-// Add/modify/delete a tag to/in a census_tag_set. Caller must validate that
-// tag key etc. are valid.
-static void cts_modify_tag(census_tag_set *tags, const census_tag *tag,
- size_t key_len) {
- // First delete the tag if it is already present.
- bool deleted = cts_delete_tag(tags, tag, key_len);
- // Determine if we need to add it back.
- bool call_add = tag->value != NULL && tag->value_len != 0;
- bool added = false;
- if (call_add) {
- if (CENSUS_TAG_IS_PROPAGATED(tag->flags)) {
- if (CENSUS_TAG_IS_BINARY(tag->flags)) {
- added =
- tag_set_add_tag(&tags->tags[PROPAGATED_BINARY_TAGS], tag, key_len);
- } else {
- added = tag_set_add_tag(&tags->tags[PROPAGATED_TAGS], tag, key_len);
- }
- } else {
- added = tag_set_add_tag(&tags->tags[LOCAL_TAGS], tag, key_len);
- }
- }
- if (deleted) {
- if (call_add) {
- tags->status.n_modified_tags++;
- } else {
- tags->status.n_deleted_tags++;
- }
- } else {
- if (added) {
- tags->status.n_added_tags++;
- } else {
- tags->status.n_ignored_tags++;
- }
- }
-}
-
-// Remove memory used for deleted tags from the tag set. Basic algorithm:
-// 1) Walk through tag set to find first deleted tag. Record where it is.
-// 2) Find the next not-deleted tag. Copy all of kvm from there to the end
-// "over" the deleted tags
-// 3) repeat #1 and #2 until we have seen all tags
-// 4) if we are still looking for a not-deleted tag, then all the end portion
-// of the kvm is deleted. Just reduce the used amount of memory by the
-// appropriate amount.
-static void tag_set_flatten(struct tag_set *tags) {
- if (tags->ntags == tags->ntags_alloc) return;
- bool found_deleted = false; // found a deleted tag.
- char *kvp = tags->kvm;
- char *dbase = NULL; // record location of deleted tag
- for (int i = 0; i < tags->ntags_alloc; i++) {
- struct raw_tag tag;
- char *next_kvp = decode_tag(&tag, kvp, 0);
- if (found_deleted) {
- if (!CENSUS_TAG_IS_DELETED(tag.flags)) {
- ptrdiff_t reduce = kvp - dbase; // #bytes in deleted tags
- GPR_ASSERT(reduce > 0);
- ptrdiff_t copy_size = tags->kvm + tags->kvm_used - kvp;
- GPR_ASSERT(copy_size > 0);
- memmove(dbase, kvp, (size_t)copy_size);
- tags->kvm_used -= (size_t)reduce;
- next_kvp -= reduce;
- found_deleted = false;
- }
- } else {
- if (CENSUS_TAG_IS_DELETED(tag.flags)) {
- dbase = kvp;
- found_deleted = true;
- }
- }
- kvp = next_kvp;
- }
- if (found_deleted) {
- GPR_ASSERT(dbase > tags->kvm);
- tags->kvm_used = (size_t)(dbase - tags->kvm);
- }
- tags->ntags_alloc = tags->ntags;
-}
-
-census_tag_set *census_tag_set_create(
- const census_tag_set *base, const census_tag *tags, int ntags,
- census_tag_set_create_status const **status) {
- census_tag_set *new_ts = gpr_malloc(sizeof(census_tag_set));
- // If we are given a base, copy it into our new tag set. Otherwise set it
- // to zero/NULL everything.
- if (base == NULL) {
- memset(new_ts, 0, sizeof(census_tag_set));
- } else {
- tag_set_copy(&new_ts->tags[PROPAGATED_TAGS], &base->tags[PROPAGATED_TAGS]);
- tag_set_copy(&new_ts->tags[PROPAGATED_BINARY_TAGS],
- &base->tags[PROPAGATED_BINARY_TAGS]);
- tag_set_copy(&new_ts->tags[LOCAL_TAGS], &base->tags[LOCAL_TAGS]);
- memset(&new_ts->status, 0, sizeof(new_ts->status));
- }
- // Walk over the additional tags and, for those that aren't invalid, modify
- // the tag set to add/replace/delete as required.
- for (int i = 0; i < ntags; i++) {
- const census_tag *tag = &tags[i];
- size_t key_len = strlen(tag->key) + 1;
- // ignore the tag if it is too long/short.
- if (key_len != 1 && key_len <= CENSUS_MAX_TAG_KV_LEN &&
- tag->value_len <= CENSUS_MAX_TAG_KV_LEN) {
- cts_modify_tag(new_ts, tag, key_len);
- } else {
- new_ts->status.n_invalid_tags++;
- }
- }
- // Remove any deleted tags, update status if needed, and return.
- tag_set_flatten(&new_ts->tags[PROPAGATED_TAGS]);
- tag_set_flatten(&new_ts->tags[PROPAGATED_BINARY_TAGS]);
- tag_set_flatten(&new_ts->tags[LOCAL_TAGS]);
- new_ts->status.n_propagated_tags = new_ts->tags[PROPAGATED_TAGS].ntags;
- new_ts->status.n_propagated_binary_tags =
- new_ts->tags[PROPAGATED_BINARY_TAGS].ntags;
- new_ts->status.n_local_tags = new_ts->tags[LOCAL_TAGS].ntags;
- if (status) {
- *status = &new_ts->status;
- }
- return new_ts;
-}
-
-const census_tag_set_create_status *census_tag_set_get_create_status(
- const census_tag_set *tags) {
- return &tags->status;
-}
-
-void census_tag_set_destroy(census_tag_set *tags) {
- gpr_free(tags->tags[PROPAGATED_TAGS].kvm);
- gpr_free(tags->tags[PROPAGATED_BINARY_TAGS].kvm);
- gpr_free(tags->tags[LOCAL_TAGS].kvm);
- gpr_free(tags);
-}
-
-// Initialize a tag set iterator. Must be called before first use of the
-// iterator.
-void census_tag_set_initialize_iterator(const census_tag_set *tags,
- census_tag_set_iterator *iterator) {
- iterator->tags = tags;
- iterator->index = 0;
- if (tags->tags[PROPAGATED_TAGS].ntags != 0) {
- iterator->base = PROPAGATED_TAGS;
- iterator->kvm = tags->tags[PROPAGATED_TAGS].kvm;
- } else if (tags->tags[PROPAGATED_BINARY_TAGS].ntags != 0) {
- iterator->base = PROPAGATED_BINARY_TAGS;
- iterator->kvm = tags->tags[PROPAGATED_BINARY_TAGS].kvm;
- } else if (tags->tags[LOCAL_TAGS].ntags != 0) {
- iterator->base = LOCAL_TAGS;
- iterator->kvm = tags->tags[LOCAL_TAGS].kvm;
- } else {
- iterator->base = -1;
- }
-}
-
-// Get the contents of the "next" tag in the tag set. If there are no more
-// tags in the tag set, returns 0 (and 'tag' contents will be unchanged),
-// otherwise returns 1. */
-int census_tag_set_next_tag(census_tag_set_iterator *iterator,
- census_tag *tag) {
- if (iterator->base < 0) {
- return 0;
- }
- struct raw_tag raw;
- iterator->kvm = decode_tag(&raw, iterator->kvm, 0);
- tag->key = raw.key;
- tag->value = raw.value;
- tag->value_len = raw.value_len;
- tag->flags = raw.flags;
- if (++iterator->index == iterator->tags->tags[iterator->base].ntags) {
- do {
- if (iterator->base == LOCAL_TAGS) {
- iterator->base = -1;
- return 1;
- }
- } while (iterator->tags->tags[++iterator->base].ntags == 0);
- iterator->index = 0;
- iterator->kvm = iterator->tags->tags[iterator->base].kvm;
- }
- return 1;
-}
-
-// Find a tag in a tag_set by key. Return true if found, false otherwise.
-static bool tag_set_get_tag_by_key(const struct tag_set *tags, const char *key,
- size_t key_len, census_tag *tag) {
- char *kvp = tags->kvm;
- for (int i = 0; i < tags->ntags; i++) {
- struct raw_tag raw;
- kvp = decode_tag(&raw, kvp, 0);
- if (key_len == raw.key_len && memcmp(raw.key, key, key_len) == 0) {
- tag->key = raw.key;
- tag->value = raw.value;
- tag->value_len = raw.value_len;
- tag->flags = raw.flags;
- return true;
- }
- }
- return false;
-}
-
-int census_tag_set_get_tag_by_key(const census_tag_set *tags, const char *key,
- census_tag *tag) {
- size_t key_len = strlen(key) + 1;
- if (key_len == 1) {
- return 0;
- }
- if (tag_set_get_tag_by_key(&tags->tags[PROPAGATED_TAGS], key, key_len, tag) ||
- tag_set_get_tag_by_key(&tags->tags[PROPAGATED_BINARY_TAGS], key, key_len,
- tag) ||
- tag_set_get_tag_by_key(&tags->tags[LOCAL_TAGS], key, key_len, tag)) {
- return 1;
- }
- return 0;
-}
-
-// tag_set encoding and decoding functions.
-//
-// Wire format for tag sets on the wire:
-//
-// First, a tag set header:
-//
-// offset bytes description
-// 0 1 version number
-// 1 1 number of bytes in this header. This allows for future
-// expansion.
-// 2 1 number of bytes in each tag header.
-// 3 1 ntags value from tag set.
-//
-// This is followed by the key/value memory from struct tag_set.
-
-#define ENCODED_VERSION 0 // Version number
-#define ENCODED_HEADER_SIZE 4 // size of tag set header
-
-// Encode a tag set. Returns 0 if buffer is too small.
-static size_t tag_set_encode(const struct tag_set *tags, char *buffer,
- size_t buf_size) {
- if (buf_size < ENCODED_HEADER_SIZE + tags->kvm_used) {
- return 0;
- }
- buf_size -= ENCODED_HEADER_SIZE;
- *buffer++ = (char)ENCODED_VERSION;
- *buffer++ = (char)ENCODED_HEADER_SIZE;
- *buffer++ = (char)TAG_HEADER_SIZE;
- *buffer++ = (char)tags->ntags;
- if (tags->ntags == 0) {
- return ENCODED_HEADER_SIZE;
- }
- memcpy(buffer, tags->kvm, tags->kvm_used);
- return ENCODED_HEADER_SIZE + tags->kvm_used;
-}
-
-char *census_tag_set_encode(const census_tag_set *tags, char *buffer,
- size_t buf_size, size_t *print_buf_size,
- size_t *bin_buf_size) {
- *print_buf_size =
- tag_set_encode(&tags->tags[PROPAGATED_TAGS], buffer, buf_size);
- if (*print_buf_size == 0) {
- return NULL;
- }
- char *b_buffer = buffer + *print_buf_size;
- *bin_buf_size = tag_set_encode(&tags->tags[PROPAGATED_BINARY_TAGS], b_buffer,
- buf_size - *print_buf_size);
- if (*bin_buf_size == 0) {
- return NULL;
- }
- return b_buffer;
-}
-
-// Decode a tag set.
-static void tag_set_decode(struct tag_set *tags, const char *buffer,
- size_t size) {
- uint8_t version = (uint8_t)(*buffer++);
- uint8_t header_size = (uint8_t)(*buffer++);
- uint8_t tag_header_size = (uint8_t)(*buffer++);
- tags->ntags = tags->ntags_alloc = (int)(*buffer++);
- if (tags->ntags == 0) {
- tags->ntags_alloc = 0;
- tags->kvm_size = 0;
- tags->kvm_used = 0;
- tags->kvm = NULL;
- return;
- }
- if (header_size != ENCODED_HEADER_SIZE) {
- GPR_ASSERT(version != ENCODED_VERSION);
- GPR_ASSERT(ENCODED_HEADER_SIZE < header_size);
- buffer += (header_size - ENCODED_HEADER_SIZE);
- }
- tags->kvm_used = size - header_size;
- tags->kvm_size = tags->kvm_used + CENSUS_MAX_TAG_KV_LEN;
- tags->kvm = gpr_malloc(tags->kvm_size);
- if (tag_header_size != TAG_HEADER_SIZE) {
- // something new in the tag information. I don't understand it, so
- // don't copy it over.
- GPR_ASSERT(version != ENCODED_VERSION);
- GPR_ASSERT(tag_header_size > TAG_HEADER_SIZE);
- char *kvp = tags->kvm;
- for (int i = 0; i < tags->ntags; i++) {
- memcpy(kvp, buffer, TAG_HEADER_SIZE);
- kvp += header_size;
- struct raw_tag raw;
- buffer =
- decode_tag(&raw, (char *)buffer, tag_header_size - TAG_HEADER_SIZE);
- memcpy(kvp, raw.key, (size_t)raw.key_len + raw.value_len);
- kvp += raw.key_len + raw.value_len;
- }
- } else {
- memcpy(tags->kvm, buffer, tags->kvm_used);
- }
-}
-
-census_tag_set *census_tag_set_decode(const char *buffer, size_t size,
- const char *bin_buffer, size_t bin_size) {
- census_tag_set *new_ts = gpr_malloc(sizeof(census_tag_set));
- memset(&new_ts->tags[LOCAL_TAGS], 0, sizeof(struct tag_set));
- if (buffer == NULL) {
- memset(&new_ts->tags[PROPAGATED_TAGS], 0, sizeof(struct tag_set));
- } else {
- tag_set_decode(&new_ts->tags[PROPAGATED_TAGS], buffer, size);
- }
- if (bin_buffer == NULL) {
- memset(&new_ts->tags[PROPAGATED_BINARY_TAGS], 0, sizeof(struct tag_set));
- } else {
- tag_set_decode(&new_ts->tags[PROPAGATED_BINARY_TAGS], bin_buffer, bin_size);
- }
- memset(&new_ts->status, 0, sizeof(new_ts->status));
- new_ts->status.n_propagated_tags = new_ts->tags[PROPAGATED_TAGS].ntags;
- new_ts->status.n_propagated_binary_tags =
- new_ts->tags[PROPAGATED_BINARY_TAGS].ntags;
- // TODO(aveitch): check that BINARY flag is correct for each type.
- return new_ts;
-}
diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c
index 487db1119a..0427ce0b8d 100644
--- a/src/core/channel/channel_args.c
+++ b/src/core/channel/channel_args.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -35,6 +35,7 @@
#include "src/core/channel/channel_args.h"
#include "src/core/support/string.h"
+#include <grpc/census.h>
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
@@ -119,10 +120,10 @@ int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) {
if (a == NULL) return 0;
for (i = 0; i < a->num_args; i++) {
if (0 == strcmp(a->args[i].key, GRPC_ARG_ENABLE_CENSUS)) {
- return a->args[i].value.integer != 0;
+ return a->args[i].value.integer != 0 && census_enabled();
}
}
- return 0;
+ return census_enabled();
}
grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index a92a6ecaf2..7176c01b05 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -135,7 +135,7 @@ static void on_lb_policy_state_changed_locked(
}
static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
lb_policy_connectivity_watcher *w = arg;
gpr_mu_lock(&w->chand->mu_config);
@@ -161,7 +161,7 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
}
static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
channel_data *chand = arg;
grpc_lb_policy *lb_policy = NULL;
grpc_lb_policy *old_lb_policy;
@@ -191,7 +191,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
- grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
+ NULL);
}
if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
@@ -249,7 +250,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
channel_data *chand = elem->channel_data;
grpc_resolver *destroy_resolver = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL);
GPR_ASSERT(op->set_accept_stream == NULL);
if (op->bind_pollset != NULL) {
@@ -268,7 +269,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
if (op->send_ping != NULL) {
if (chand->lb_policy == NULL) {
- grpc_exec_ctx_enqueue(exec_ctx, op->send_ping, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, op->send_ping, false, NULL);
} else {
grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping);
op->bind_pollset = NULL;
@@ -310,15 +311,15 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready);
-static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
continue_picking_args *cpa = arg;
if (!success) {
- grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL);
} else if (cpa->connected_subchannel == NULL) {
/* cancelled, do nothing */
} else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
cpa->connected_subchannel, cpa->on_ready)) {
- grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, true, NULL);
}
gpr_free(cpa);
}
@@ -346,7 +347,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
cpa = closure->cb_arg;
if (cpa->connected_subchannel == connected_subchannel) {
cpa->connected_subchannel = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL);
}
}
gpr_mu_unlock(&chand->mu_config);
@@ -497,7 +498,7 @@ typedef struct {
} external_connectivity_watcher;
static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
external_connectivity_watcher *w = arg;
grpc_closure *follow_up = w->on_complete;
grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties,
diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c
index 2c0b07d8bf..b1e7155773 100644
--- a/src/core/channel/client_uchannel.c
+++ b/src/core/channel/client_uchannel.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -79,7 +79,7 @@ typedef struct client_uchannel_channel_data {
typedef grpc_subchannel_call_holder call_data;
static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
channel_data *chand = arg;
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
chand->subchannel_connectivity,
@@ -105,7 +105,7 @@ static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_transport_op *op) {
channel_data *chand = elem->channel_data;
- grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL);
GPR_ASSERT(op->set_accept_stream == NULL);
GPR_ASSERT(op->bind_pollset == NULL);
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index 3de1ef8a35..3e7ca08fd2 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -153,7 +153,7 @@ static void process_send_initial_metadata(
static void continue_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem);
-static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, int success) {
+static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, bool success) {
grpc_call_element *elem = elemp;
call_data *calld = elem->call_data;
gpr_slice_buffer_reset_and_unref(&calld->slices);
@@ -183,7 +183,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_next_op(exec_ctx, elem, &calld->send_op);
}
-static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, int success) {
+static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, bool success) {
grpc_call_element *elem = elemp;
call_data *calld = elem->call_data;
gpr_slice_buffer_add(&calld->slices, calld->incoming_slice);
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index 65cfb778bb..43eee046b8 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -1,5 +1,5 @@
/*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -80,7 +80,7 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, int success) {
+static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
client_recv_filter_args a;
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index a10b105494..bb75323933 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -131,7 +131,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
}
}
-static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, int success) {
+static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
if (success) {
diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c
index 1e8f4ba1b6..3ad9fd9efb 100644
--- a/src/core/channel/subchannel_call_holder.c
+++ b/src/core/channel/subchannel_call_holder.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -43,9 +43,9 @@
#define CANCELLED_CALL ((grpc_subchannel_call *)1)
static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder,
- int success);
+ bool success);
static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args,
- int success);
+ bool success);
static void add_waiting_locked(grpc_subchannel_call_holder *holder,
grpc_transport_stream_op *op);
@@ -166,7 +166,7 @@ retry:
GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
}
-static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
grpc_subchannel_call_holder *holder = arg;
grpc_subchannel_call *call;
gpr_mu_lock(&holder->mu);
@@ -209,10 +209,11 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
holder->waiting_ops_count = 0;
holder->waiting_ops_capacity = 0;
GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops");
- grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(retry_ops, a), 1);
+ grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(retry_ops, a), true,
+ NULL);
}
-static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, int success) {
+static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, bool success) {
retry_ops_args *a = args;
size_t i;
for (i = 0; i < a->nops; i++) {
@@ -240,9 +241,10 @@ static void fail_locked(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder) {
size_t i;
for (i = 0; i < holder->waiting_ops_count; i++) {
- grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].on_complete, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].on_complete, false,
+ NULL);
grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].recv_message_ready,
- 0);
+ false, NULL);
}
holder->waiting_ops_count = 0;
}
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index e6ddb1a11f..459bbebb68 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -76,7 +76,7 @@ typedef struct {
} pick_first_lb_policy;
#define GET_SELECTED(p) \
- ((grpc_connected_subchannel *)gpr_atm_no_barrier_load(&(p)->selected))
+ ((grpc_connected_subchannel *)gpr_atm_acq_load(&(p)->selected))
void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
@@ -121,7 +121,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
*pp->target = NULL;
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
pp = next;
}
@@ -140,7 +140,7 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
*target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -209,7 +209,7 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
}
static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
pick_first_lb_policy *p = arg;
size_t i;
size_t num_subchannels = p->num_subchannels;
@@ -230,7 +230,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
}
static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
pick_first_lb_policy *p = arg;
grpc_subchannel *selected_subchannel;
pending_pick *pp;
@@ -268,19 +268,19 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
selected =
grpc_subchannel_get_connected_subchannel(selected_subchannel);
GPR_ASSERT(selected != NULL);
- gpr_atm_no_barrier_store(&p->selected, (gpr_atm)selected);
GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked_first");
/* drop the pick list: we are connected now */
GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
- grpc_exec_ctx_enqueue(exec_ctx,
- grpc_closure_create(destroy_subchannels, p), 1);
+ gpr_atm_rel_store(&p->selected, (gpr_atm)selected);
+ grpc_exec_ctx_enqueue(
+ exec_ctx, grpc_closure_create(destroy_subchannels, p), true, NULL);
/* update any calls that were waiting for a pick */
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = selected;
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
}
grpc_connected_subchannel_notify_on_state_change(
@@ -327,7 +327,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
}
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
@@ -374,7 +374,7 @@ void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (selected) {
grpc_connected_subchannel_ping(exec_ctx, selected, closure);
} else {
- grpc_exec_ctx_enqueue(exec_ctx, closure, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
}
}
diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c
index d487456363..b1171c45b0 100644
--- a/src/core/client_config/lb_policies/round_robin.c
+++ b/src/core/client_config/lb_policies/round_robin.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -237,7 +237,7 @@ void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
gpr_free(pp);
}
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
@@ -263,7 +263,7 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
*target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -336,7 +336,7 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
}
static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
subchannel_data *sd = arg;
round_robin_lb_policy *p = sd->policy;
pending_pick *pp;
@@ -376,7 +376,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
}
grpc_subchannel_notify_on_state_change(
@@ -428,7 +428,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
}
} else {
@@ -479,7 +479,7 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel_ping(exec_ctx, target, closure);
} else {
gpr_mu_unlock(&p->mu);
- grpc_exec_ctx_enqueue(exec_ctx, closure, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
}
}
diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c
index 28ca30b946..376b6b3d76 100644
--- a/src/core/client_config/resolvers/dns_resolver.c
+++ b/src/core/client_config/resolvers/dns_resolver.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -93,7 +93,7 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) {
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
@@ -182,7 +182,7 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
if (r->resolved_config) {
grpc_client_config_ref(r->resolved_config);
}
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
r->next_completion = NULL;
r->published_version = r->resolved_version;
}
diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c
index 6343259246..68910ad975 100644
--- a/src/core/client_config/resolvers/sockaddr_resolver.c
+++ b/src/core/client_config/resolvers/sockaddr_resolver.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -98,7 +98,7 @@ static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
@@ -153,7 +153,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr");
r->published = 1;
*r->target_config = cfg;
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
r->next_completion = NULL;
}
}
diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c
index 4924ca77d6..166738e768 100644
--- a/src/core/client_config/resolvers/zookeeper_resolver.c
+++ b/src/core/client_config/resolvers/zookeeper_resolver.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -420,7 +420,7 @@ static void zookeeper_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
if (r->resolved_config != NULL) {
grpc_client_config_ref(r->resolved_config);
}
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
r->next_completion = NULL;
r->published_version = r->resolved_version;
}
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 2992da8b79..0a996a1e8b 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -145,7 +145,7 @@ struct grpc_subchannel_call {
static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
- int iomgr_success);
+ bool iomgr_success);
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
#define REF_REASON reason
@@ -175,7 +175,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
*/
static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg,
- int success) {
+ bool success) {
grpc_connected_subchannel *c = arg;
grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c));
gpr_free(c);
@@ -198,7 +198,7 @@ void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
*/
static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
- int success) {
+ bool success) {
grpc_subchannel *c = arg;
gpr_free((void *)c->filters);
grpc_channel_args_destroy(c->args);
@@ -268,7 +268,7 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF"));
if (old_refs == 1) {
grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c),
- 1);
+ true, NULL);
}
}
@@ -284,9 +284,13 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
c->connector = connector;
grpc_connector_ref(c->connector);
c->num_filters = args->filter_count;
- c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
- memcpy((void *)c->filters, args->filters,
- sizeof(grpc_channel_filter *) * c->num_filters);
+ if (c->num_filters > 0) {
+ c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
+ memcpy((void *)c->filters, args->filters,
+ sizeof(grpc_channel_filter *) * c->num_filters);
+ } else {
+ c->filters = NULL;
+ }
c->addr = gpr_malloc(args->addr_len);
memcpy(c->addr, args->addr, args->addr_len);
grpc_pollset_set_init(&c->pollset_set);
@@ -337,7 +341,7 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
}
static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
- int success) {
+ bool success) {
external_state_watcher *w = arg;
grpc_closure *follow_up = w->notify;
if (w->pollset_set != NULL) {
@@ -409,7 +413,7 @@ void grpc_connected_subchannel_process_transport_op(
}
static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
- int iomgr_success) {
+ bool iomgr_success) {
state_watcher *sw = p;
grpc_subchannel *c = sw->subchannel;
gpr_mu *mu = &c->mu;
@@ -483,7 +487,9 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
/* build final filter list */
num_filters = c->num_filters + c->connecting_result.num_filters + 1;
filters = gpr_malloc(sizeof(*filters) * num_filters);
- memcpy((void *)filters, c->filters, sizeof(*filters) * c->num_filters);
+ if (c->num_filters > 0) {
+ memcpy((void *)filters, c->filters, sizeof(*filters) * c->num_filters);
+ }
memcpy((void *)(filters + c->num_filters), c->connecting_result.filters,
sizeof(*filters) * c->connecting_result.num_filters);
filters[num_filters - 1] = &grpc_connected_channel_filter;
@@ -519,7 +525,12 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
}
/* publish */
- GPR_ASSERT(gpr_atm_no_barrier_cas(&c->connected_subchannel, 0, (gpr_atm)con));
+ /* TODO(ctiller): this full barrier seems to clear up a TSAN failure.
+ I'd have expected the rel_cas below to be enough, but
+ seemingly it's not.
+ Re-evaluate if we really need this. */
+ gpr_atm_full_barrier();
+ GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
c->connecting = 0;
/* setup subchannel watching connected subchannel for changes; subchannel ref
@@ -583,7 +594,7 @@ static void update_reconnect_parameters(grpc_subchannel *c) {
gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN));
}
-static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) {
+static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) {
grpc_subchannel *c = arg;
gpr_mu_lock(&c->mu);
c->have_alarm = 0;
@@ -600,7 +611,7 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) {
}
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
grpc_subchannel *c = arg;
if (c->connecting_result.transport != NULL) {
@@ -636,7 +647,7 @@ static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
*/
static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
- int success) {
+ bool success) {
grpc_subchannel_call *c = call;
GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index b5cd8d8d2a..71237bb614 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -114,13 +114,13 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req,
gpr_free(req);
}
-static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, int success);
+static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success);
static void do_read(grpc_exec_ctx *exec_ctx, internal_request *req) {
grpc_endpoint_read(exec_ctx, req->ep, &req->incoming, &req->on_read);
}
-static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, int success) {
+static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success) {
internal_request *req = user_data;
size_t i;
@@ -147,7 +147,7 @@ static void on_written(grpc_exec_ctx *exec_ctx, internal_request *req) {
do_read(exec_ctx, req);
}
-static void done_write(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void done_write(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
internal_request *req = arg;
if (success) {
on_written(exec_ctx, req);
@@ -175,7 +175,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
start_write(exec_ctx, req);
}
-static void on_connected(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void on_connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
internal_request *req = arg;
if (!req->ep) {
diff --git a/src/core/iomgr/closure.c b/src/core/iomgr/closure.c
index 7646a88ac5..3a96f7385f 100644
--- a/src/core/iomgr/closure.c
+++ b/src/core/iomgr/closure.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -43,7 +43,7 @@ void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
}
void grpc_closure_list_add(grpc_closure_list *closure_list,
- grpc_closure *closure, int success) {
+ grpc_closure *closure, bool success) {
if (closure == NULL) return;
closure->final_data = (success != 0);
if (closure_list->head == NULL) {
@@ -54,7 +54,7 @@ void grpc_closure_list_add(grpc_closure_list *closure_list,
closure_list->tail = closure;
}
-int grpc_closure_list_empty(grpc_closure_list closure_list) {
+bool grpc_closure_list_empty(grpc_closure_list closure_list) {
return closure_list.head == NULL;
}
@@ -77,7 +77,7 @@ typedef struct {
grpc_closure wrapper;
} wrapped_closure;
-static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
wrapped_closure *wc = arg;
grpc_iomgr_cb_func cb = wc->cb;
void *cb_arg = wc->cb_arg;
diff --git a/src/core/iomgr/closure.h b/src/core/iomgr/closure.h
index 98ef91e1db..ea96c19c71 100644
--- a/src/core/iomgr/closure.h
+++ b/src/core/iomgr/closure.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -35,6 +35,7 @@
#define GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H
#include <grpc/support/port_platform.h>
+#include <stdbool.h>
struct grpc_closure;
typedef struct grpc_closure grpc_closure;
@@ -54,7 +55,7 @@ typedef struct grpc_closure_list {
* \param success An indication on the state of the iomgr. On false, cleanup
* actions should be taken (eg, shutdown). */
typedef void (*grpc_iomgr_cb_func)(grpc_exec_ctx *exec_ctx, void *arg,
- int success);
+ bool success);
/** A closure over a grpc_iomgr_cb_func. */
struct grpc_closure {
@@ -83,13 +84,13 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg);
/** add \a closure to the end of \a list and set \a closure's success to \a
* success */
void grpc_closure_list_add(grpc_closure_list *list, grpc_closure *closure,
- int success);
+ bool success);
/** append all closures from \a src to \a dst and empty \a src. */
void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst);
/** return whether \a list is empty. */
-int grpc_closure_list_empty(grpc_closure_list list);
+bool grpc_closure_list_empty(grpc_closure_list list);
/** return the next pointer for a queued closure list */
grpc_closure *grpc_closure_next(grpc_closure *closure);
diff --git a/src/core/iomgr/exec_ctx.c b/src/core/iomgr/exec_ctx.c
index 6059a6031c..1fd79f6eba 100644
--- a/src/core/iomgr/exec_ctx.c
+++ b/src/core/iomgr/exec_ctx.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -37,16 +37,16 @@
#include "src/core/profiling/timers.h"
-int grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
- int did_something = 0;
+bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
+ bool did_something = 0;
GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0);
while (!grpc_closure_list_empty(exec_ctx->closure_list)) {
grpc_closure *c = exec_ctx->closure_list.head;
exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
while (c != NULL) {
- int success = (int)(c->final_data & 1);
+ bool success = (bool)(c->final_data & 1);
grpc_closure *next = (grpc_closure *)(c->final_data & ~(uintptr_t)1);
- did_something++;
+ did_something = true;
GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0);
c->cb(exec_ctx, c->cb_arg, success);
GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0);
@@ -62,11 +62,15 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {
}
void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- int success) {
+ bool success,
+ grpc_workqueue *offload_target_or_null) {
+ GPR_ASSERT(offload_target_or_null == NULL);
grpc_closure_list_add(&exec_ctx->closure_list, closure, success);
}
void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
- grpc_closure_list *list) {
+ grpc_closure_list *list,
+ grpc_workqueue *offload_target_or_null) {
+ GPR_ASSERT(offload_target_or_null == NULL);
grpc_closure_list_move(list, &exec_ctx->closure_list);
}
diff --git a/src/core/iomgr/exec_ctx.h b/src/core/iomgr/exec_ctx.h
index 43df488094..9a9b2e55fa 100644
--- a/src/core/iomgr/exec_ctx.h
+++ b/src/core/iomgr/exec_ctx.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -57,22 +57,29 @@ struct grpc_exec_ctx {
grpc_closure_list closure_list;
};
+/** A workqueue represents a list of work to be executed asynchronously.
+ Forward declared here to avoid a circular dependency with workqueue.h. */
+struct grpc_workqueue;
+typedef struct grpc_workqueue grpc_workqueue;
+
#define GRPC_EXEC_CTX_INIT \
{ GRPC_CLOSURE_LIST_INIT }
/** Flush any work that has been enqueued onto this grpc_exec_ctx.
* Caller must guarantee that no interfering locks are held.
- * Returns 1 if work was performed, 0 otherwise. */
-int grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx);
+ * Returns true if work was performed, false otherwise. */
+bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx);
/** Finish any pending work for a grpc_exec_ctx. Must be called before
* the instance is destroyed, or work may be lost. */
void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx);
/** Add a closure to be executed at the next flush/finish point */
void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- int success);
+ bool success,
+ grpc_workqueue *offload_target_or_null);
/** Add a list of closures to be executed at the next flush/finish point.
* Leaves \a list empty. */
void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
- grpc_closure_list *list);
+ grpc_closure_list *list,
+ grpc_workqueue *offload_target_or_null);
#endif
diff --git a/src/core/iomgr/executor.c b/src/core/iomgr/executor.c
index 00c68f7828..f22d8f30ac 100644
--- a/src/core/iomgr/executor.c
+++ b/src/core/iomgr/executor.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -77,7 +77,7 @@ static void closure_exec_thread_func(void *ignored) {
gpr_mu_unlock(&g_executor.mu);
break;
} else {
- grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures);
+ grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures, NULL);
}
gpr_mu_unlock(&g_executor.mu);
grpc_exec_ctx_flush(&exec_ctx);
@@ -112,7 +112,7 @@ static void maybe_spawn_locked() {
g_executor.pending_join = 1;
}
-void grpc_executor_enqueue(grpc_closure *closure, int success) {
+void grpc_executor_enqueue(grpc_closure *closure, bool success) {
gpr_mu_lock(&g_executor.mu);
if (g_executor.shutting_down == 0) {
grpc_closure_list_add(&g_executor.closures, closure, success);
@@ -133,7 +133,7 @@ void grpc_executor_shutdown() {
* list below because we aren't accepting new work */
/* Execute pending callbacks, some may be performing cleanups */
- grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures);
+ grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures, NULL);
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(grpc_closure_list_empty(g_executor.closures));
if (pending_join) {
diff --git a/src/core/iomgr/executor.h b/src/core/iomgr/executor.h
index 6da446ae9c..aac057ddf5 100644
--- a/src/core/iomgr/executor.h
+++ b/src/core/iomgr/executor.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -45,7 +45,7 @@ void grpc_executor_init();
/** Enqueue \a closure for its eventual execution of \a f(arg) on a separate
* thread */
-void grpc_executor_enqueue(grpc_closure *closure, int success);
+void grpc_executor_enqueue(grpc_closure *closure, bool success);
/** Shutdown the executor, running all pending work as part of the call */
void grpc_executor_shutdown();
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 89c938bc04..85eadd754b 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -218,7 +218,7 @@ static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
} else {
grpc_remove_fd_from_all_epoll_sets(fd->fd);
}
- grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
}
int grpc_fd_wrapped_fd(grpc_fd *fd) {
@@ -273,7 +273,7 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
} else if (*st == CLOSURE_READY) {
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
- grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
maybe_wake_one_watcher_locked(fd);
} else {
/* upcallptr was set to a different closure. This is an error! */
@@ -296,7 +296,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
return 0;
} else {
/* waiting ==> queue closure */
- grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown);
+ grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
*st = CLOSURE_NOT_READY;
return 1;
}
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c
index d3868ce62c..96b6f81024 100644
--- a/src/core/iomgr/iocp_windows.c
+++ b/src/core/iomgr/iocp_windows.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -67,7 +67,7 @@ static DWORD deadline_to_millis_timeout(gpr_timespec deadline,
return 0;
}
timeout = gpr_time_sub(deadline, now);
- return gpr_time_to_millis(gpr_time_add(
+ return (DWORD)gpr_time_to_millis(gpr_time_add(
timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
}
@@ -120,7 +120,7 @@ void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) {
info->has_pending_iocp = 1;
}
gpr_mu_unlock(&socket->state_mu);
- grpc_exec_ctx_enqueue(exec_ctx, closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL);
}
void grpc_iocp_init(void) {
@@ -179,13 +179,11 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) {
static void socket_notify_on_iocp(grpc_exec_ctx *exec_ctx,
grpc_winsocket *socket, grpc_closure *closure,
grpc_winsocket_callback_info *info) {
- int run_now = 0;
GPR_ASSERT(info->closure == NULL);
gpr_mu_lock(&socket->state_mu);
if (info->has_pending_iocp) {
- run_now = 1;
info->has_pending_iocp = 0;
- grpc_exec_ctx_enqueue(exec_ctx, closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL);
} else {
info->closure = closure;
}
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index d117485327..4acae2bb71 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -141,7 +141,7 @@ static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_status) {
+ bool iomgr_status) {
delayed_add *da = arg;
if (!grpc_fd_is_orphaned(da->fd)) {
@@ -154,7 +154,7 @@ static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg,
/* We don't care about this pollset anymore. */
if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) {
da->pollset->called_shutdown = 1;
- grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, true, NULL);
}
}
gpr_mu_unlock(&da->pollset->mu);
@@ -178,7 +178,7 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx,
GRPC_FD_REF(fd, "delayed_add");
grpc_closure_init(&da->closure, perform_delayed_add, da);
pollset->in_flight_cbs++;
- grpc_exec_ctx_enqueue(exec_ctx, &da->closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &da->closure, true, NULL);
}
}
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index c325b634ae..a8e2e22977 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -243,7 +243,7 @@ void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs));
pollset->vtable->finish_shutdown(pollset);
- grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
}
void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@@ -271,7 +271,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (!grpc_pollset_has_workers(pollset) &&
!grpc_closure_list_empty(pollset->idle_jobs)) {
GPR_TIMER_MARK("grpc_pollset_work.idle_jobs", 0);
- grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
goto done;
}
/* Check alarms - these are a global resource so we just ping
@@ -365,7 +365,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
* TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
gpr_mu_lock(&pollset->mu);
} else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
- grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
gpr_mu_unlock(&pollset->mu);
grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->mu);
@@ -381,7 +381,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->shutdown_done = closure;
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!grpc_pollset_has_workers(pollset)) {
- grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
}
if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
!grpc_pollset_has_workers(pollset)) {
@@ -419,7 +419,8 @@ typedef struct grpc_unary_promote_args {
grpc_closure promotion_closure;
} grpc_unary_promote_args;
-static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, int success) {
+static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args,
+ bool success) {
grpc_unary_promote_args *up_args = args;
const grpc_pollset_vtable *original_vtable = up_args->original_vtable;
grpc_pollset *pollset = up_args->pollset;
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index deb661548d..2e650ca939 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -81,15 +81,6 @@ static grpc_pollset_worker *pop_front_worker(
}
}
-static void push_back_worker(grpc_pollset_worker *root,
- grpc_pollset_worker_link_type type,
- grpc_pollset_worker *worker) {
- worker->links[type].next = root;
- worker->links[type].prev = worker->links[type].next->links[type].prev;
- worker->links[type].prev->links[type].next =
- worker->links[type].next->links[type].prev = worker;
-}
-
static void push_front_worker(grpc_pollset_worker *root,
grpc_pollset_worker_link_type type,
grpc_pollset_worker *worker) {
@@ -116,7 +107,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->shutting_down = 1;
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!pollset->is_iocp_worker) {
- grpc_exec_ctx_enqueue(exec_ctx, closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL);
} else {
pollset->on_shutdown = closure;
}
@@ -174,7 +165,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
if (pollset->shutting_down && pollset->on_shutdown != NULL) {
- grpc_exec_ctx_enqueue(exec_ctx, pollset->on_shutdown, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, pollset->on_shutdown, true, NULL);
pollset->on_shutdown = NULL;
}
goto done;
diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c
index 555c74ce7e..c51745b918 100644
--- a/src/core/iomgr/resolve_address_posix.c
+++ b/src/core/iomgr/resolve_address_posix.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -152,7 +152,7 @@ done:
/* Callback to be passed to grpc_executor to asynch-ify
* grpc_blocking_resolve_address */
-static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, int success) {
+static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, bool success) {
request *r = rp;
grpc_resolved_addresses *resolved =
grpc_blocking_resolve_address(r->name, r->default_port);
diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c
index 007c855d10..28c8661e73 100644
--- a/src/core/iomgr/resolve_address_windows.c
+++ b/src/core/iomgr/resolve_address_windows.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -135,7 +135,7 @@ done:
/* Callback to be passed to grpc_executor to asynch-ify
* grpc_blocking_resolve_address */
-static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, int success) {
+static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, bool success) {
request *r = rp;
grpc_resolved_addresses *resolved =
grpc_blocking_resolve_address(r->name, r->default_port);
diff --git a/src/core/iomgr/sockaddr_win32.h b/src/core/iomgr/sockaddr_win32.h
index fe2be99145..8e3946a7d8 100644
--- a/src/core/iomgr/sockaddr_win32.h
+++ b/src/core/iomgr/sockaddr_win32.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -38,9 +38,4 @@
#include <ws2tcpip.h>
#include <mswsock.h>
-#ifdef __MINGW32__
-/* mingw seems to be missing that definition. */
-const char *inet_ntop(int af, const void *src, char *dst, socklen_t size);
-#endif
-
#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_WIN32_H */
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index d9d24ee9a3..c76c2e3b0f 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -91,7 +91,7 @@ error:
return 0;
}
-static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, int success) {
+static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, bool success) {
int done;
async_connect *ac = acp;
if (grpc_tcp_trace) {
@@ -111,7 +111,7 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, int success) {
}
}
-static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, int success) {
+static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) {
async_connect *ac = acp;
int so_error = 0;
socklen_t so_error_size;
@@ -206,7 +206,7 @@ finish:
gpr_free(ac->addr_str);
gpr_free(ac);
}
- grpc_exec_ctx_enqueue(exec_ctx, closure, *ep != NULL);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, *ep != NULL, NULL);
}
void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
@@ -243,7 +243,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
addr_len = sizeof(addr4_copy);
}
if (!prepare_socket(addr, fd)) {
- grpc_exec_ctx_enqueue(exec_ctx, closure, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
return;
}
@@ -259,14 +259,14 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
if (err >= 0) {
*ep = grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str);
- grpc_exec_ctx_enqueue(exec_ctx, closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL);
goto done;
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
gpr_log(GPR_ERROR, "connect error to '%s': %s", addr_str, strerror(errno));
grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error");
- grpc_exec_ctx_enqueue(exec_ctx, closure, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
goto done;
}
diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c
index e5691b7e12..689c6f7b10 100644
--- a/src/core/iomgr/tcp_client_windows.c
+++ b/src/core/iomgr/tcp_client_windows.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -74,7 +74,7 @@ static void async_connect_unlock_and_cleanup(async_connect *ac) {
}
}
-static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, int occured) {
+static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, bool occured) {
async_connect *ac = acp;
gpr_mu_lock(&ac->mu);
/* If the alarm didn't occur, it got cancelled. */
@@ -84,7 +84,7 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, int occured) {
async_connect_unlock_and_cleanup(ac);
}
-static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, int from_iocp) {
+static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, bool from_iocp) {
async_connect *ac = acp;
SOCKET sock = ac->socket->socket;
grpc_endpoint **ep = ac->endpoint;
@@ -215,7 +215,7 @@ failure:
} else if (sock != INVALID_SOCKET) {
closesocket(sock);
}
- grpc_exec_ctx_enqueue(exec_ctx, on_done, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, on_done, false, NULL);
}
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 4fa8ca8c71..048e907441 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -100,9 +100,9 @@ typedef struct {
} grpc_tcp;
static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
- int success);
+ bool success);
static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
- int success);
+ bool success);
static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
@@ -247,7 +247,7 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
}
static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
- int success) {
+ bool success) {
grpc_tcp *tcp = (grpc_tcp *)arg;
GPR_ASSERT(!tcp->finished_edge);
@@ -273,7 +273,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
tcp->finished_edge = 0;
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
- grpc_exec_ctx_enqueue(exec_ctx, &tcp->read_closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &tcp->read_closure, true, NULL);
}
}
@@ -360,7 +360,7 @@ static flush_result tcp_flush(grpc_tcp *tcp) {
}
static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
- int success) {
+ bool success) {
grpc_tcp *tcp = (grpc_tcp *)arg;
flush_result status;
grpc_closure *cb;
@@ -407,7 +407,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (buf->length == 0) {
GPR_TIMER_END("tcp_write", 0);
- grpc_exec_ctx_enqueue(exec_ctx, cb, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, cb, true, NULL);
return;
}
tcp->outgoing_buffer = buf;
@@ -420,7 +420,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
tcp->write_cb = cb;
grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
} else {
- grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE);
+ grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE, NULL);
}
GPR_TIMER_END("tcp_write", 0);
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index adf14aeb59..5e07f8261c 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -160,7 +160,7 @@ grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, true, NULL);
}
gpr_mu_destroy(&s->mu);
@@ -174,7 +174,8 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_free(s);
}
-static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, int success) {
+static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server,
+ bool success) {
grpc_tcp_server *s = server;
gpr_mu_lock(&s->mu);
s->destroyed_ports++;
@@ -317,7 +318,7 @@ error:
}
/* event manager callback when reads are ready */
-static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
grpc_tcp_listener *sp = arg;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index,
sp->fd_index};
@@ -602,7 +603,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
/* Complete shutdown_starting work before destroying. */
grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_mu_lock(&s->mu);
- grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting);
+ grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting, NULL);
gpr_mu_unlock(&s->mu);
if (exec_ctx == NULL) {
grpc_exec_ctx_flush(&local_exec_ctx);
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index 8ee8149f25..ce930b8f41 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -119,7 +119,7 @@ grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, true, NULL);
}
/* Now that the accepts have been aborted, we can destroy the sockets.
@@ -173,7 +173,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
/* Complete shutdown_starting work before destroying. */
grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_mu_lock(&s->mu);
- grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting);
+ grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting, NULL);
gpr_mu_unlock(&s->mu);
if (exec_ctx == NULL) {
grpc_exec_ctx_flush(&local_exec_ctx);
@@ -311,7 +311,7 @@ failure:
}
/* Event manager callback when reads are ready. */
-static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) {
+static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, bool from_iocp) {
grpc_tcp_listener *sp = arg;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0};
SOCKET sock = sp->new_socket;
@@ -531,7 +531,7 @@ int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index,
for (sp = s->head; sp && port_index != 0; sp = sp->next, --port_index)
;
if (sp) {
- return _open_osfhandle(sp->socket->socket, 0);
+ return _open_osfhandle((intptr_t)sp->socket->socket, 0);
} else {
return -1;
}
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index cc7f7ff8d2..038e4158c8 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -138,15 +138,12 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
#endif
/* Asynchronous callback from the IOCP, or the background thread. */
-static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, int success) {
+static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, bool success) {
grpc_tcp *tcp = tcpp;
grpc_closure *cb = tcp->read_cb;
grpc_winsocket *socket = tcp->socket;
gpr_slice sub;
- gpr_slice *slice = NULL;
- size_t nslices = 0;
grpc_winsocket_callback_info *info = &socket->read_info;
- int do_abort = 0;
if (success) {
if (socket->read_info.wsa_error != 0 && !tcp->shutting_down) {
@@ -187,7 +184,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
WSABUF buffer;
if (tcp->shutting_down) {
- grpc_exec_ctx_enqueue(exec_ctx, cb, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL);
return;
}
@@ -211,7 +208,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
/* Did we get data immediately ? Yay. */
if (info->wsa_error != WSAEWOULDBLOCK) {
info->bytes_transfered = bytes_read;
- grpc_exec_ctx_enqueue(exec_ctx, &tcp->on_read, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &tcp->on_read, true, NULL);
return;
}
@@ -224,7 +221,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
info->wsa_error = wsa_error;
- grpc_exec_ctx_enqueue(exec_ctx, &tcp->on_read, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, &tcp->on_read, false, NULL);
return;
}
}
@@ -233,12 +230,11 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
}
/* Asynchronous callback from the IOCP, or the background thread. */
-static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, int success) {
+static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, bool success) {
grpc_tcp *tcp = (grpc_tcp *)tcpp;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->write_info;
grpc_closure *cb;
- int do_abort = 0;
gpr_mu_lock(&tcp->mu);
cb = tcp->write_cb;
@@ -277,7 +273,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
size_t len;
if (tcp->shutting_down) {
- grpc_exec_ctx_enqueue(exec_ctx, cb, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL);
return;
}
@@ -305,9 +301,9 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
connection that has its send queue filled up. But if we don't, then we can
avoid doing an async write operation at all. */
if (info->wsa_error != WSAEWOULDBLOCK) {
- int ok = 0;
+ bool ok = false;
if (status == 0) {
- ok = 1;
+ ok = true;
GPR_ASSERT(bytes_sent == tcp->write_slices->length);
} else {
if (socket->read_info.wsa_error != WSAECONNRESET) {
@@ -317,7 +313,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
}
}
if (allocated) gpr_free(allocated);
- grpc_exec_ctx_enqueue(exec_ctx, cb, ok);
+ grpc_exec_ctx_enqueue(exec_ctx, cb, ok, NULL);
return;
}
@@ -334,7 +330,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
TCP_UNREF(tcp, "write");
- grpc_exec_ctx_enqueue(exec_ctx, cb, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL);
return;
}
}
diff --git a/src/core/iomgr/timer.c b/src/core/iomgr/timer.c
index 24d6204e07..a33d8f63a0 100644
--- a/src/core/iomgr/timer.c
+++ b/src/core/iomgr/timer.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -224,7 +224,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
shard_type *shard = &g_shards[shard_idx(timer)];
gpr_mu_lock(&shard->mu);
if (!timer->triggered) {
- grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, false, NULL);
timer->triggered = 1;
if (timer->heap_index == INVALID_HEAP_INDEX) {
list_remove(timer);
@@ -290,7 +290,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
grpc_timer *timer;
gpr_mu_lock(&shard->mu);
while ((timer = pop_one(shard, now))) {
- grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, success);
+ grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, success, NULL);
n++;
}
*new_min_deadline = compute_min_deadline(shard);
diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c
index a1a6b04cad..fe006c603c 100644
--- a/src/core/iomgr/udp_server.c
+++ b/src/core/iomgr/udp_server.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -425,15 +425,5 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
gpr_mu_unlock(&s->mu);
}
-/* TODO(rjshade): Add a test for this method. */
-void grpc_udp_server_write(server_port *sp, const char *buffer, size_t buf_len,
- const struct sockaddr *peer_address) {
- ssize_t rc;
- rc = sendto(sp->fd, buffer, buf_len, 0, peer_address, sizeof(peer_address));
- if (rc < 0) {
- gpr_log(GPR_ERROR, "Unable to send data: %s", strerror(errno));
- }
-}
-
#endif
#endif
diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h
index de5736c426..73a21c80ab 100644
--- a/src/core/iomgr/udp_server.h
+++ b/src/core/iomgr/udp_server.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -72,12 +72,4 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *server,
grpc_closure *on_done);
-/* Write the contents of buffer to the underlying UDP socket. */
-/*
-void grpc_udp_server_write(grpc_udp_server *s,
- const char *buffer,
- int buf_len,
- const struct sockaddr* to);
- */
-
#endif /* GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H */
diff --git a/src/core/iomgr/workqueue.h b/src/core/iomgr/workqueue.h
index 714536233c..36dd133468 100644
--- a/src/core/iomgr/workqueue.h
+++ b/src/core/iomgr/workqueue.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -47,9 +47,7 @@
#include "src/core/iomgr/workqueue_windows.h"
#endif
-/** A workqueue represents a list of work to be executed asynchronously. */
-struct grpc_workqueue;
-typedef struct grpc_workqueue grpc_workqueue;
+/* grpc_workqueue is forward declared in exec_ctx.h */
/** Create a work queue */
grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx);
diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c
index d2a1c34612..da11df67ef 100644
--- a/src/core/iomgr/workqueue_posix.c
+++ b/src/core/iomgr/workqueue_posix.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -45,7 +45,7 @@
#include "src/core/iomgr/fd_posix.h"
-static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, int success);
+static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success);
grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx) {
char name[32];
@@ -110,7 +110,7 @@ void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
gpr_mu_unlock(&workqueue->mu);
}
-static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
grpc_workqueue *workqueue = arg;
if (!success) {
diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c
index 8b56c57645..afba0079f5 100644
--- a/src/core/security/credentials.c
+++ b/src/core/security/credentials.c
@@ -793,7 +793,7 @@ static void md_only_test_destruct(grpc_call_credentials *creds) {
}
static void on_simulated_token_fetch_done(grpc_exec_ctx *exec_ctx,
- void *user_data, int success) {
+ void *user_data, bool success) {
grpc_credentials_metadata_request *r =
(grpc_credentials_metadata_request *)user_data;
grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)r->creds;
@@ -812,7 +812,7 @@ static void md_only_test_get_request_metadata(
grpc_credentials_metadata_request *cb_arg =
grpc_credentials_metadata_request_create(creds, cb, user_data);
grpc_executor_enqueue(
- grpc_closure_create(on_simulated_token_fetch_done, cb_arg), 1);
+ grpc_closure_create(on_simulated_token_fetch_done, cb_arg), true);
} else {
cb(exec_ctx, user_data, c->md_store->entries, 1, GRPC_CREDENTIALS_OK);
}
diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c
index 5385e41130..f3ac14568a 100644
--- a/src/core/security/google_default_credentials.c
+++ b/src/core/security/google_default_credentials.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -86,7 +86,7 @@ static void on_compute_engine_detection_http_response(
gpr_mu_unlock(GRPC_POLLSET_MU(&detector->pollset));
}
-static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int s) {
+static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool s) {
grpc_pollset_destroy(p);
}
@@ -157,7 +157,7 @@ static grpc_call_credentials *create_default_creds_from_path(char *creds_path) {
if (grpc_auth_json_key_is_valid(&key)) {
result =
grpc_service_account_jwt_access_credentials_create_from_auth_json_key(
- key, grpc_max_auth_token_lifetime);
+ key, grpc_max_auth_token_lifetime());
goto end;
}
diff --git a/src/core/security/handshake.c b/src/core/security/handshake.c
index 364b765396..a8b2fef629 100644
--- a/src/core/security/handshake.c
+++ b/src/core/security/handshake.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -61,10 +61,10 @@ typedef struct {
} grpc_security_handshake;
static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
- void *setup, int success);
+ void *setup, bool success);
static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *setup,
- int success);
+ bool success);
static void security_connector_remove_handshake(grpc_security_handshake *h) {
grpc_security_connector_handshake_list *node;
@@ -198,7 +198,8 @@ static void send_handshake_bytes_to_peer(grpc_exec_ctx *exec_ctx,
}
static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
- void *handshake, int success) {
+ void *handshake,
+ bool success) {
grpc_security_handshake *h = handshake;
size_t consumed_slice_size = 0;
tsi_result result = TSI_OK;
@@ -265,7 +266,7 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
/* If handshake is NULL, the handshake is done. */
static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx,
- void *handshake, int success) {
+ void *handshake, bool success) {
grpc_security_handshake *h = handshake;
/* Make sure that write is OK. */
diff --git a/src/core/security/json_token.c b/src/core/security/json_token.c
index 4d4bc4baad..762f02989a 100644
--- a/src/core/security/json_token.c
+++ b/src/core/security/json_token.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -49,7 +49,13 @@
/* --- Constants. --- */
/* 1 hour max. */
-const gpr_timespec grpc_max_auth_token_lifetime = {3600, 0, GPR_TIMESPAN};
+gpr_timespec grpc_max_auth_token_lifetime() {
+ gpr_timespec out;
+ out.tv_sec = 3600;
+ out.tv_nsec = 0;
+ out.clock_type = GPR_TIMESPAN;
+ return out;
+}
#define GRPC_JWT_RSA_SHA256_ALGORITHM "RS256"
#define GRPC_JWT_TYPE "JWT"
@@ -211,9 +217,9 @@ static char *encoded_jwt_claim(const grpc_auth_json_key *json_key,
gpr_timespec expiration = gpr_time_add(now, token_lifetime);
char now_str[GPR_LTOA_MIN_BUFSIZE];
char expiration_str[GPR_LTOA_MIN_BUFSIZE];
- if (gpr_time_cmp(token_lifetime, grpc_max_auth_token_lifetime) > 0) {
+ if (gpr_time_cmp(token_lifetime, grpc_max_auth_token_lifetime()) > 0) {
gpr_log(GPR_INFO, "Cropping token lifetime to maximum allowed value.");
- expiration = gpr_time_add(now, grpc_max_auth_token_lifetime);
+ expiration = gpr_time_add(now, grpc_max_auth_token_lifetime());
}
int64_ttoa(now.tv_sec, now_str);
int64_ttoa(expiration.tv_sec, expiration_str);
diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c
index 9b87e268c6..d11c43be20 100644
--- a/src/core/security/secure_endpoint.c
+++ b/src/core/security/secure_endpoint.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -126,7 +126,7 @@ static void flush_read_staging_buffer(secure_endpoint *ep, uint8_t **cur,
}
static void call_read_cb(grpc_exec_ctx *exec_ctx, secure_endpoint *ep,
- int success) {
+ bool success) {
if (grpc_trace_secure_endpoint) {
size_t i;
for (i = 0; i < ep->read_buffer->count; i++) {
@@ -137,11 +137,11 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, secure_endpoint *ep,
}
}
ep->read_buffer = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, ep->read_cb, success);
+ grpc_exec_ctx_enqueue(exec_ctx, ep->read_cb, success, NULL);
SECURE_ENDPOINT_UNREF(exec_ctx, ep, "read");
}
-static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, int success) {
+static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success) {
unsigned i;
uint8_t keep_looping = 0;
tsi_result result = TSI_OK;
@@ -315,7 +315,7 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep,
if (result != TSI_OK) {
/* TODO(yangg) do different things according to the error type? */
gpr_slice_buffer_reset_and_unref(&ep->output_buffer);
- grpc_exec_ctx_enqueue(exec_ctx, cb, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL);
return;
}
diff --git a/src/core/security/security_connector.c b/src/core/security/security_connector.c
index 61336a1057..bdccbabfea 100644
--- a/src/core/security/security_connector.c
+++ b/src/core/security/security_connector.c
@@ -61,6 +61,14 @@ static const char *installed_roots_path =
INSTALL_PREFIX "/share/grpc/roots.pem";
#endif
+/* -- Overridden default roots. -- */
+
+static grpc_ssl_roots_override_callback ssl_roots_override_cb = NULL;
+
+void grpc_set_ssl_roots_override_callback(grpc_ssl_roots_override_callback cb) {
+ ssl_roots_override_cb = cb;
+}
+
/* -- Cipher suites. -- */
/* Defines the cipher suites that we accept by default. All these cipher suites
@@ -595,23 +603,44 @@ static grpc_security_connector_vtable ssl_channel_vtable = {
static grpc_security_connector_vtable ssl_server_vtable = {
ssl_server_destroy, ssl_server_do_handshake, ssl_server_check_peer};
-static gpr_slice default_pem_root_certs;
+static gpr_slice compute_default_pem_root_certs_once(void) {
+ gpr_slice result = gpr_empty_slice();
-static void init_default_pem_root_certs(void) {
/* First try to load the roots from the environment. */
char *default_root_certs_path =
gpr_getenv(GRPC_DEFAULT_SSL_ROOTS_FILE_PATH_ENV_VAR);
- if (default_root_certs_path == NULL) {
- default_pem_root_certs = gpr_empty_slice();
- } else {
- default_pem_root_certs = gpr_load_file(default_root_certs_path, 0, NULL);
+ if (default_root_certs_path != NULL) {
+ result = gpr_load_file(default_root_certs_path, 0, NULL);
gpr_free(default_root_certs_path);
}
+ /* Try overridden roots if needed. */
+ grpc_ssl_roots_override_result ovrd_res = GRPC_SSL_ROOTS_OVERRIDE_FAIL;
+ if (GPR_SLICE_IS_EMPTY(result) && ssl_roots_override_cb != NULL) {
+ char *pem_root_certs = NULL;
+ ovrd_res = ssl_roots_override_cb(&pem_root_certs);
+ if (ovrd_res == GRPC_SSL_ROOTS_OVERRIDE_OK) {
+ GPR_ASSERT(pem_root_certs != NULL);
+ result = gpr_slice_new(pem_root_certs, strlen(pem_root_certs), gpr_free);
+ }
+ }
+
/* Fall back to installed certs if needed. */
- if (GPR_SLICE_IS_EMPTY(default_pem_root_certs)) {
- default_pem_root_certs = gpr_load_file(installed_roots_path, 0, NULL);
+ if (GPR_SLICE_IS_EMPTY(result) &&
+ ovrd_res != GRPC_SSL_ROOTS_OVERRIDE_FAIL_PERMANENTLY) {
+ result = gpr_load_file(installed_roots_path, 0, NULL);
}
+ return result;
+}
+
+static gpr_slice default_pem_root_certs;
+
+static void init_default_pem_root_certs(void) {
+ default_pem_root_certs = compute_default_pem_root_certs_once();
+}
+
+gpr_slice grpc_get_default_ssl_roots_for_testing(void) {
+ return compute_default_pem_root_certs_once();
}
size_t grpc_get_default_ssl_roots(const unsigned char **pem_root_certs) {
diff --git a/src/core/security/security_connector.h b/src/core/security/security_connector.h
index 2b734109b3..39df7821f0 100644
--- a/src/core/security/security_connector.h
+++ b/src/core/security/security_connector.h
@@ -209,6 +209,9 @@ grpc_security_status grpc_ssl_channel_security_connector_create(
/* Gets the default ssl roots. */
size_t grpc_get_default_ssl_roots(const unsigned char **pem_root_certs);
+/* Exposed for TESTING ONLY!. */
+gpr_slice grpc_get_default_ssl_roots_for_testing(void);
+
/* Config for ssl servers. */
typedef struct {
unsigned char **pem_private_keys;
diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c
index d5c8c54369..4c78711387 100644
--- a/src/core/security/server_auth_filter.c
+++ b/src/core/security/server_auth_filter.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -153,7 +153,7 @@ static void on_md_processing_done(
}
static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
- int success) {
+ bool success) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index 08713fceaf..ee3443c828 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -142,7 +142,7 @@ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep,
on_accept, state);
}
-static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, int success) {
+static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, bool success) {
grpc_server_secure_state *state = statep;
if (state->destroy_callback != NULL) {
state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg,
diff --git a/src/core/statistics/census_init.c b/src/core/statistics/census_init.c
index e6306f5e6f..b6a962f228 100644
--- a/src/core/statistics/census_init.c
+++ b/src/core/statistics/census_init.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
diff --git a/src/core/support/env_win32.c b/src/core/support/env_win32.c
index 6b1ff102b0..10258283ba 100644
--- a/src/core/support/env_win32.c
+++ b/src/core/support/env_win32.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -38,7 +38,12 @@
#include "src/core/support/env.h"
#include "src/core/support/string.h"
+#ifdef __MINGW32__
+errno_t getenv_s(size_t *size_needed, char *buffer, size_t size,
+ const char *varname);
+#else
#include <stdlib.h>
+#endif
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -47,14 +52,17 @@
char *gpr_getenv(const char *name) {
size_t size;
char *result = NULL;
- char *duplicated;
errno_t err;
- err = _dupenv_s(&result, &size, name);
- if (err) return NULL;
- duplicated = gpr_strdup(result);
- free(result);
- return duplicated;
+ err = getenv_s(&size, NULL, 0, name);
+ if (err || (size == 0)) return NULL;
+ result = gpr_malloc(size);
+ err = getenv_s(&size, result, size, name);
+ if (err) {
+ gpr_free(result);
+ return NULL;
+ }
+ return result;
}
void gpr_setenv(const char *name, const char *value) {
diff --git a/src/core/support/log_win32.c b/src/core/support/log_win32.c
index 40adcd1b50..e18e667fe5 100644
--- a/src/core/support/log_win32.c
+++ b/src/core/support/log_win32.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -109,13 +109,13 @@ void gpr_default_log(gpr_log_func_args *args) {
fflush(stderr);
}
-char *gpr_format_message(DWORD messageid) {
+char *gpr_format_message(int messageid) {
LPTSTR tmessage;
char *message;
DWORD status = FormatMessage(
FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
- NULL, messageid, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ NULL, (DWORD)messageid, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
(LPTSTR)(&tmessage), 0, NULL);
if (status == 0) return gpr_strdup("Unable to retrieve error string");
message = gpr_tchar_to_char(tmessage);
diff --git a/src/core/support/string_win32.c b/src/core/support/string_win32.c
index 914ba8771c..3b1f702cf1 100644
--- a/src/core/support/string_win32.c
+++ b/src/core/support/string_win32.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -85,8 +85,8 @@ LPTSTR
gpr_char_to_tchar(LPCSTR input) {
LPTSTR ret;
int needed = MultiByteToWideChar(CP_UTF8, 0, input, -1, NULL, 0);
- if (needed == 0) return NULL;
- ret = gpr_malloc(needed * sizeof(TCHAR));
+ if (needed <= 0) return NULL;
+ ret = gpr_malloc((unsigned)needed * sizeof(TCHAR));
MultiByteToWideChar(CP_UTF8, 0, input, -1, ret, needed);
return ret;
}
@@ -95,8 +95,8 @@ LPSTR
gpr_tchar_to_char(LPCTSTR input) {
LPSTR ret;
int needed = WideCharToMultiByte(CP_UTF8, 0, input, -1, NULL, 0, NULL, NULL);
- if (needed == 0) return NULL;
- ret = gpr_malloc(needed);
+ if (needed <= 0) return NULL;
+ ret = gpr_malloc((unsigned)needed);
WideCharToMultiByte(CP_UTF8, 0, input, -1, ret, needed, NULL, NULL);
return ret;
}
diff --git a/src/core/support/subprocess_windows.c b/src/core/support/subprocess_windows.c
new file mode 100644
index 0000000000..d48c5437f0
--- /dev/null
+++ b/src/core/support/subprocess_windows.c
@@ -0,0 +1,141 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_WINDOWS_SUBPROCESS
+
+#include <windows.h>
+#include <string.h>
+#include <tchar.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/subprocess.h>
+#include "src/core/support/string.h"
+#include "src/core/support/string_win32.h"
+
+struct gpr_subprocess {
+ PROCESS_INFORMATION pi;
+ int joined;
+ int interrupted;
+};
+
+const char *gpr_subprocess_binary_extension() { return ".exe"; }
+
+gpr_subprocess *gpr_subprocess_create(int argc, const char **argv) {
+ gpr_subprocess *r;
+
+ STARTUPINFO si;
+ PROCESS_INFORMATION pi;
+
+ char *args = gpr_strjoin_sep(argv, argc, " ", NULL);
+ TCHAR *args_tchar;
+
+ args_tchar = gpr_char_to_tchar(args);
+ gpr_free(args);
+
+ memset(&si, 0, sizeof(si));
+ si.cb = sizeof(si);
+ memset(&pi, 0, sizeof(pi));
+
+ if (!CreateProcess(NULL, args_tchar, NULL, NULL, FALSE,
+ CREATE_NEW_PROCESS_GROUP, NULL, NULL, &si, &pi)) {
+ gpr_free(args_tchar);
+ return NULL;
+ }
+ gpr_free(args_tchar);
+
+ r = gpr_malloc(sizeof(gpr_subprocess));
+ memset(r, 0, sizeof(*r));
+ r->pi = pi;
+ return r;
+}
+
+void gpr_subprocess_destroy(gpr_subprocess *p) {
+ if (p) {
+ if (!p->joined) {
+ gpr_subprocess_interrupt(p);
+ gpr_subprocess_join(p);
+ }
+ if (p->pi.hProcess) {
+ CloseHandle(p->pi.hProcess);
+ }
+ if (p->pi.hThread) {
+ CloseHandle(p->pi.hThread);
+ }
+ gpr_free(p);
+ }
+}
+
+int gpr_subprocess_join(gpr_subprocess *p) {
+ DWORD dwExitCode;
+ if (GetExitCodeProcess(p->pi.hProcess, &dwExitCode)) {
+ if (dwExitCode == STILL_ACTIVE) {
+ if (WaitForSingleObject(p->pi.hProcess, INFINITE) == WAIT_OBJECT_0) {
+ p->joined = 1;
+ goto getExitCode;
+ }
+ return -1; // failed to join
+ } else {
+ goto getExitCode;
+ }
+ } else {
+ return -1; // failed to get exit code
+ }
+
+getExitCode:
+ if (p->interrupted) {
+ return 0;
+ }
+ if (GetExitCodeProcess(p->pi.hProcess, &dwExitCode)) {
+ return dwExitCode;
+ } else {
+ return -1; // failed to get exit code
+ }
+}
+
+void gpr_subprocess_interrupt(gpr_subprocess *p) {
+ DWORD dwExitCode;
+ if (GetExitCodeProcess(p->pi.hProcess, &dwExitCode)) {
+ if (dwExitCode == STILL_ACTIVE) {
+ gpr_log(GPR_INFO, "sending ctrl-break");
+ GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT, p->pi.dwProcessId);
+ p->joined = 1;
+ p->interrupted = 1;
+ }
+ }
+ return;
+}
+
+#endif /* GPR_WINDOWS_SUBPROCESS */
diff --git a/src/core/support/sync_win32.c b/src/core/support/sync_win32.c
index 51a082b29e..84d412a75f 100644
--- a/src/core/support/sync_win32.c
+++ b/src/core/support/sync_win32.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -94,7 +94,11 @@ int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline) {
if (now_ms >= deadline_ms) {
timeout = 1;
} else {
- timeout_max_ms = (DWORD)min(deadline_ms - now_ms, INFINITE - 1);
+ if ((deadline_ms - now_ms) >= INFINITE) {
+ timeout_max_ms = INFINITE - 1;
+ } else {
+ timeout_max_ms = (DWORD)(deadline_ms - now_ms);
+ }
timeout = (SleepConditionVariableCS(cv, &mu->cs, timeout_max_ms) == 0 &&
GetLastError() == ERROR_TIMEOUT);
}
diff --git a/src/core/support/time_win32.c b/src/core/support/time_win32.c
index 2bed0f6a9c..8af957e6f4 100644
--- a/src/core/support/time_win32.c
+++ b/src/core/support/time_win32.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -37,9 +37,12 @@
#ifdef GPR_WIN32
+#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <src/core/support/time_precise.h>
#include <sys/timeb.h>
+#include <process.h>
+#include <limits.h>
#include "src/core/support/block_annotate.h"
@@ -50,11 +53,12 @@ void gpr_time_init(void) {
LARGE_INTEGER frequency;
QueryPerformanceFrequency(&frequency);
QueryPerformanceCounter(&g_start_time);
- g_time_scale = 1.0 / frequency.QuadPart;
+ g_time_scale = 1.0 / (double)frequency.QuadPart;
}
gpr_timespec gpr_now(gpr_clock_type clock) {
gpr_timespec now_tv;
+ LONGLONG diff;
struct _timeb now_tb;
LARGE_INTEGER timestamp;
double now_dbl;
@@ -68,10 +72,14 @@ gpr_timespec gpr_now(gpr_clock_type clock) {
case GPR_CLOCK_MONOTONIC:
case GPR_CLOCK_PRECISE:
QueryPerformanceCounter(&timestamp);
- now_dbl = (timestamp.QuadPart - g_start_time.QuadPart) * g_time_scale;
+ diff = timestamp.QuadPart - g_start_time.QuadPart;
+ now_dbl = (double)diff * g_time_scale;
now_tv.tv_sec = (int64_t)now_dbl;
now_tv.tv_nsec = (int32_t)((now_dbl - (double)now_tv.tv_sec) * 1e9);
break;
+ case GPR_TIMESPAN:
+ abort();
+ break;
}
return now_tv;
}
@@ -79,7 +87,7 @@ gpr_timespec gpr_now(gpr_clock_type clock) {
void gpr_sleep_until(gpr_timespec until) {
gpr_timespec now;
gpr_timespec delta;
- DWORD sleep_millis;
+ int64_t sleep_millis;
for (;;) {
/* We could simplify by using clock_nanosleep instead, but it might be
@@ -91,9 +99,10 @@ void gpr_sleep_until(gpr_timespec until) {
delta = gpr_time_sub(until, now);
sleep_millis =
- (DWORD)delta.tv_sec * GPR_MS_PER_SEC + delta.tv_nsec / GPR_NS_PER_MS;
+ delta.tv_sec * GPR_MS_PER_SEC + delta.tv_nsec / GPR_NS_PER_MS;
+ GPR_ASSERT((sleep_millis >= 0) && (sleep_millis <= INT_MAX));
GRPC_SCHEDULING_START_BLOCKING_REGION;
- Sleep(sleep_millis);
+ Sleep((DWORD)sleep_millis);
GRPC_SCHEDULING_END_BLOCKING_REGION;
}
}
diff --git a/src/core/surface/alarm.c b/src/core/surface/alarm.c
index 7c47dd56f8..d753023ca9 100644
--- a/src/core/surface/alarm.c
+++ b/src/core/surface/alarm.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -48,7 +48,7 @@ struct grpc_alarm {
static void do_nothing_end_completion(grpc_exec_ctx *exec_ctx, void *arg,
grpc_cq_completion *c) {}
-static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
grpc_alarm *alarm = arg;
grpc_cq_end_op(exec_ctx, alarm->cq, alarm->tag, success,
do_nothing_end_completion, NULL, &alarm->completion);
@@ -65,7 +65,7 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
grpc_timer_init(&exec_ctx, &alarm->alarm, deadline, alarm_cb, alarm,
gpr_now(GPR_CLOCK_MONOTONIC));
- grpc_cq_begin_op(cq);
+ grpc_cq_begin_op(cq, tag);
grpc_exec_ctx_finish(&exec_ctx);
return alarm;
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 880666bb38..9495e748b5 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -229,9 +229,9 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status,
const char *description);
static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack,
- int success);
+ bool success);
static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
- int success);
+ bool success);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
uint32_t propagation_mask,
@@ -351,7 +351,7 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) {
GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON);
}
-static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, int success) {
+static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) {
size_t i;
int ii;
grpc_call *c = call;
@@ -688,13 +688,13 @@ typedef struct cancel_closure {
grpc_status_code status;
} cancel_closure;
-static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, int success) {
+static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) {
cancel_closure *cc = ccp;
GRPC_CALL_INTERNAL_UNREF(exec_ctx, cc->call, "cancel");
gpr_free(cc);
}
-static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, int success) {
+static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) {
grpc_transport_stream_op op;
cancel_closure *cc = ccp;
memset(&op, 0, sizeof(op));
@@ -721,7 +721,7 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
cc->call = c;
cc->status = status;
GRPC_CALL_INTERNAL_REF(c, "cancel");
- grpc_exec_ctx_enqueue(exec_ctx, &cc->closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &cc->closure, true, NULL);
return GRPC_CALL_OK;
}
@@ -757,7 +757,7 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
return CALL_FROM_TOP_ELEM(elem);
}
-static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
grpc_call *call = arg;
gpr_mu_lock(&call->mu);
call->have_alarm = 0;
@@ -934,7 +934,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
grpc_call *call = bctl->call;
if (bctl->is_notify_tag_closure) {
- grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success);
+ grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success, NULL);
gpr_mu_lock(&call->mu);
bctl->call->used_batches =
(uint8_t)(bctl->call->used_batches &
@@ -974,7 +974,7 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
}
static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
- int success) {
+ bool success) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
@@ -993,7 +993,7 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
}
}
-static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, int success) {
+static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
grpc_call *child_call;
@@ -1066,7 +1066,7 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, int success) {
}
static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
- int success) {
+ bool success) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 001692376f..12d8ebceb9 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -80,7 +80,7 @@ struct grpc_channel {
/* the protobuf library will (by default) start warning at 100megs */
#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
-static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, int success);
+static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, bool success);
grpc_channel *grpc_channel_create_from_filters(
grpc_exec_ctx *exec_ctx, const char *target,
@@ -268,7 +268,7 @@ void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx,
}
static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
grpc_channel *channel = arg;
grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CHANNEL(channel));
while (channel->registered_calls) {
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
index 10f5c4da4d..2dd4fce26b 100644
--- a/src/core/surface/channel_connectivity.c
+++ b/src/core/surface/channel_connectivity.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -165,11 +165,11 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
}
}
-static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, int success) {
+static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, bool success) {
partly_done(exec_ctx, pw, 1);
}
-static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, int success) {
+static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, bool success) {
partly_done(exec_ctx, pw, 0);
}
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 49083f0870..4d4337d288 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -80,11 +80,11 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
}
static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
- int success) {
+ bool success) {
connector_unref(exec_ctx, arg);
}
-static void connected(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
connector *c = arg;
grpc_closure *notify;
grpc_endpoint *tcp = c->tcp;
diff --git a/src/core/surface/channel_ping.c b/src/core/surface/channel_ping.c
index b4ce282787..983f1c8a66 100644
--- a/src/core/surface/channel_ping.c
+++ b/src/core/surface/channel_ping.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -53,7 +53,7 @@ static void ping_destroy(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(arg);
}
-static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
ping_result *pr = arg;
grpc_cq_end_op(exec_ctx, pr->cq, pr->tag, success, ping_destroy, pr,
&pr->completion_storage);
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index c0db9c508a..75298eb795 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -86,7 +86,7 @@ static gpr_mu g_freelist_mu;
grpc_completion_queue *g_freelist;
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc,
- int success);
+ bool success);
void grpc_cq_global_init(void) { gpr_mu_init(&g_freelist_mu); }
@@ -169,7 +169,7 @@ void grpc_cq_internal_ref(grpc_completion_queue *cc) {
}
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
- int success) {
+ bool success) {
grpc_completion_queue *cc = arg;
GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy");
}
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index 19cea4c4f6..e3ab70dba7 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -117,8 +117,10 @@ void grpc_init(void) {
grpc_iomgr_init();
grpc_executor_init();
grpc_tracer_init("GRPC_TRACE");
- /* Only initialize census if noone else has. */
- if (census_enabled() == CENSUS_FEATURE_NONE) {
+ /* Only initialize census if no one else has and some features are
+ * available. */
+ if (census_enabled() == CENSUS_FEATURE_NONE &&
+ census_supported() != CENSUS_FEATURE_NONE) {
if (census_initialize(census_supported())) { /* enable all features. */
gpr_log(GPR_ERROR, "Could not initialize census.");
}
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index a60e9d20da..705996cad3 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -78,8 +78,8 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
} else if (op->recv_trailing_metadata != NULL) {
fill_metadata(elem, op->recv_trailing_metadata);
}
- grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, 0);
- grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL);
+ grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL);
}
static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 552a570713..dd1441ad67 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -128,14 +128,14 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
}
static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
- int success) {
+ bool success) {
connector *c = arg;
grpc_security_connector_do_handshake(exec_ctx, &c->security_connector->base,
c->connecting_endpoint,
on_secure_handshake_done, c);
}
-static void connected(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
connector *c = arg;
grpc_closure *notify;
grpc_endpoint *tcp = c->newly_connecting_endpoint;
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 79db13810a..42cffccb4c 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -260,7 +260,7 @@ struct shutdown_cleanup_args {
};
static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_status_ignored) {
+ bool iomgr_status_ignored) {
struct shutdown_cleanup_args *a = arg;
gpr_slice_unref(a->slice);
gpr_free(a);
@@ -313,7 +313,7 @@ static void request_matcher_destroy(request_matcher *rm) {
gpr_stack_lockfree_destroy(rm->requests);
}
-static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, int success) {
+static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, bool success) {
grpc_call_destroy(grpc_call_from_top_element(elem));
}
@@ -328,7 +328,7 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx,
grpc_closure_init(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
- grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL);
}
}
@@ -392,7 +392,7 @@ static void orphan_channel(channel_data *chand) {
}
static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd,
- int success) {
+ bool success) {
channel_data *chand = cd;
grpc_server *server = chand->server;
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "server");
@@ -407,7 +407,8 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) {
maybe_finish_shutdown(exec_ctx, chand->server);
chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
chand->finish_destroy_channel_closure.cb_arg = chand;
- grpc_exec_ctx_enqueue(exec_ctx, &chand->finish_destroy_channel_closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &chand->finish_destroy_channel_closure, true,
+ NULL);
}
static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
@@ -420,7 +421,7 @@ static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL);
return;
}
@@ -569,7 +570,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
}
static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
- int success) {
+ bool success) {
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
gpr_timespec op_deadline;
@@ -609,7 +610,7 @@ static void server_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
}
static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
- int success) {
+ bool success) {
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
if (success) {
@@ -620,7 +621,7 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL);
} else if (calld->state == PENDING) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
@@ -653,7 +654,7 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd,
}
static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd,
- int iomgr_status_ignored) {
+ bool iomgr_status_ignored) {
channel_data *chand = cd;
grpc_server *server = chand->server;
if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
@@ -779,9 +780,7 @@ grpc_server *grpc_server_create_from_filters(
const grpc_channel_filter **filters, size_t filter_count,
const grpc_channel_args *args) {
size_t i;
- /* TODO(census): restore this once we finalize census filter etc.
- int census_enabled = grpc_channel_args_is_census_enabled(args); */
- int census_enabled = 0;
+ int census_enabled = grpc_channel_args_is_census_enabled(args);
grpc_server *server = gpr_malloc(sizeof(grpc_server));
@@ -987,7 +986,7 @@ void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg,
}
static void listener_destroy_done(grpc_exec_ctx *exec_ctx, void *s,
- int success) {
+ bool success) {
grpc_server *server = s;
gpr_mu_lock(&server->mu_global);
server->listeners_destroyed++;
@@ -1142,7 +1141,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
grpc_closure_init(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
- grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true,
+ NULL);
} else {
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
@@ -1231,7 +1231,7 @@ done:
}
static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx,
- void *user_data, int success);
+ void *user_data, bool success);
static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
gpr_slice slice = value->slice;
@@ -1317,7 +1317,7 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
}
static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, void *prc,
- int success) {
+ bool success) {
requested_call *rc = prc;
grpc_call *call = *rc->call;
grpc_call_element *elem =
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index 6e21d2dcd7..ce970dfe73 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -82,7 +82,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp,
grpc_closure *destroy_done) {
grpc_tcp_server *tcp = tcpp;
grpc_tcp_server_unref(exec_ctx, tcp);
- grpc_exec_ctx_enqueue(exec_ctx, destroy_done, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, destroy_done, true, NULL);
}
int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c
index f30093e06b..5e37e80948 100644
--- a/src/core/surface/server_create.c
+++ b/src/core/surface/server_create.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -43,9 +43,6 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
const grpc_channel_filter *filters[3];
size_t num_filters = 0;
filters[num_filters++] = &grpc_compress_filter;
- if (grpc_channel_args_is_census_enabled(args)) {
- filters[num_filters++] = &grpc_server_census_filter;
- }
GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
return grpc_server_create_from_filters(filters, num_filters, args);
}
diff --git a/src/core/surface/version.c b/src/core/surface/version.c
index aada18e07e..262a13f184 100644
--- a/src/core/surface/version.c
+++ b/src/core/surface/version.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
diff --git a/src/core/transport/chttp2/hpack_encoder.c b/src/core/transport/chttp2/hpack_encoder.c
index 89a80d896c..f30f574d06 100644
--- a/src/core/transport/chttp2/hpack_encoder.c
+++ b/src/core/transport/chttp2/hpack_encoder.c
@@ -283,7 +283,7 @@ static void emit_lithdr_incidx(grpc_chttp2_hpack_compressor *c,
len_val_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)len_val, 1);
GRPC_CHTTP2_WRITE_VARINT(key_index, 2, 0x40,
add_tiny_header_data(st, len_pfx), len_pfx);
- GRPC_CHTTP2_WRITE_VARINT((uint32_t)len_val, 1, 0x00,
+ GRPC_CHTTP2_WRITE_VARINT((uint32_t)len_val, 1, huffman_prefix,
add_tiny_header_data(st, len_val_len), len_val_len);
add_header_data(st, gpr_slice_ref(value_slice));
}
@@ -300,7 +300,7 @@ static void emit_lithdr_noidx(grpc_chttp2_hpack_compressor *c,
len_val_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)len_val, 1);
GRPC_CHTTP2_WRITE_VARINT(key_index, 4, 0x00,
add_tiny_header_data(st, len_pfx), len_pfx);
- GRPC_CHTTP2_WRITE_VARINT((uint32_t)len_val, 1, 0x00,
+ GRPC_CHTTP2_WRITE_VARINT((uint32_t)len_val, 1, huffman_prefix,
add_tiny_header_data(st, len_val_len), len_val_len);
add_header_data(st, gpr_slice_ref(value_slice));
}
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index a8262b7af2..c611496e7e 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -35,6 +35,7 @@
#define GRPC_INTERNAL_CORE_CHTTP2_INTERNAL_H
#include <assert.h>
+#include <stdbool.h>
#include "src/core/iomgr/endpoint.h"
#include "src/core/transport/chttp2/frame.h"
@@ -67,6 +68,9 @@ typedef enum {
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING,
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_WRITING,
GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
+ /* streams waiting for the outgoing window in the writing path, they will be
+ * merged to the stalled list or writable list under transport lock. */
+ GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT,
/** streams that are waiting to start because there are too many concurrent
streams on the connection */
GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
@@ -488,7 +492,7 @@ void grpc_chttp2_perform_writes(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
grpc_endpoint *endpoint);
void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
- void *transport_writing, int success);
+ void *transport_writing, bool success);
void grpc_chttp2_cleanup_writing(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *global,
grpc_chttp2_transport_writing *writing);
@@ -504,11 +508,11 @@ void grpc_chttp2_publish_reads(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *global,
grpc_chttp2_transport_parsing *parsing);
-/** Get a writable stream
- returns non-zero if there was a stream available */
void grpc_chttp2_list_add_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
+/** Get a writable stream
+ returns non-zero if there was a stream available */
int grpc_chttp2_list_pop_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing,
@@ -560,9 +564,12 @@ int grpc_chttp2_list_pop_check_read_ops(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
-void grpc_chttp2_list_add_stalled_by_transport(
+void grpc_chttp2_list_add_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing);
+void grpc_chttp2_list_flush_writing_stalled_by_transport(
+ grpc_chttp2_transport_writing *transport_writing, bool is_window_available);
+
int grpc_chttp2_list_pop_stalled_by_transport(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c
index 273a513e2f..2f31a47cb3 100644
--- a/src/core/transport/chttp2/stream_lists.c
+++ b/src/core/transport/chttp2/stream_lists.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -313,12 +313,27 @@ int grpc_chttp2_list_pop_check_read_ops(
return r;
}
-void grpc_chttp2_list_add_stalled_by_transport(
+void grpc_chttp2_list_add_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
STREAM_FROM_WRITING(stream_writing),
- GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
+ GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT);
+}
+
+void grpc_chttp2_list_flush_writing_stalled_by_transport(
+ grpc_chttp2_transport_writing *transport_writing,
+ bool is_window_available) {
+ grpc_chttp2_stream *stream;
+ grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing);
+ while (stream_list_pop(transport, &stream,
+ GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) {
+ if (is_window_available) {
+ grpc_chttp2_list_add_writable_stream(&transport->global, &stream->global);
+ } else {
+ stream_list_add(transport, stream, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
+ }
+ }
}
int grpc_chttp2_list_pop_stalled_by_transport(
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index fdad05b5fb..095883c66d 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -130,8 +130,8 @@ int grpc_chttp2_unlocking_check_writes(
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
}
} else {
- grpc_chttp2_list_add_stalled_by_transport(transport_writing,
- stream_writing);
+ grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
+ stream_writing);
}
}
if (stream_global->send_trailing_metadata) {
@@ -188,7 +188,7 @@ void grpc_chttp2_perform_writes(
grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf,
&transport_writing->done_cb);
} else {
- grpc_exec_ctx_enqueue(exec_ctx, &transport_writing->done_cb, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &transport_writing->done_cb, true, NULL);
}
}
@@ -273,8 +273,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
stream_writing->sent_message = 1;
}
} else if (transport_writing->outgoing_window == 0) {
- grpc_chttp2_list_add_stalled_by_transport(transport_writing,
- stream_writing);
+ grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
+ stream_writing);
grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
}
}
@@ -312,8 +312,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
/* do nothing - already reffed */
}
} else {
- grpc_chttp2_list_add_stalled_by_transport(transport_writing,
- stream_writing);
+ grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
+ stream_writing);
grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
}
} else {
@@ -329,6 +329,10 @@ void grpc_chttp2_cleanup_writing(
grpc_chttp2_transport_writing *transport_writing) {
grpc_chttp2_stream_writing *stream_writing;
grpc_chttp2_stream_global *stream_global;
+ bool is_window_available = transport_writing->outgoing_window > 0;
+
+ grpc_chttp2_list_flush_writing_stalled_by_transport(transport_writing,
+ is_window_available);
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 05b25fd8b0..9298573c7f 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -86,14 +86,14 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
/* forward declarations of various callbacks that we'll build closures around */
static void writing_action(grpc_exec_ctx *exec_ctx, void *t,
- int iomgr_success_ignored);
+ bool iomgr_success_ignored);
/** Set a transport level setting, and push it to our peer */
static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
uint32_t value);
/** Endpoint callback to process incoming data */
-static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success);
+static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success);
/** Start disconnection chain */
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
@@ -183,7 +183,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
and maybe they hold resources that need to be freed */
while (t->global.pings.next != &t->global.pings) {
grpc_chttp2_outstanding_ping *ping = t->global.pings.next;
- grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, false, NULL);
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
gpr_free(ping);
@@ -602,7 +602,7 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
t->parsing_active)) {
t->writing_active = 1;
REF_TRANSPORT(t, "writing");
- grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, true, NULL);
prevent_endpoint_shutdown(t);
}
check_read_ops(exec_ctx, &t->global);
@@ -631,7 +631,7 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
}
void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
- void *transport_writing_ptr, int success) {
+ void *transport_writing_ptr, bool success) {
grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr;
grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
grpc_chttp2_stream_global *stream_global;
@@ -669,7 +669,7 @@ void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
}
static void writing_action(grpc_exec_ctx *exec_ctx, void *gt,
- int iomgr_success_ignored) {
+ bool iomgr_success_ignored) {
grpc_chttp2_transport *t = gt;
GPR_TIMER_BEGIN("writing_action", 0);
grpc_chttp2_perform_writes(exec_ctx, &t->writing, t->ep);
@@ -759,7 +759,7 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
closure->final_data |= 1;
}
if (closure->final_data < 2) {
- grpc_exec_ctx_enqueue(exec_ctx, closure, closure->final_data == 0);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, closure->final_data == 0, NULL);
}
*pclosure = NULL;
}
@@ -777,7 +777,7 @@ static int contains_non_ok_status(
return 0;
}
-static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, int success) {}
+static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, bool success) {}
static void perform_stream_op_locked(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
@@ -934,7 +934,7 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
for (ping = transport_global->pings.next; ping != &transport_global->pings;
ping = ping->next) {
if (0 == memcmp(opaque_8bytes, ping->id, 8)) {
- grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, true, NULL);
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
gpr_free(ping);
@@ -951,7 +951,7 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
lock(t);
- grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL);
if (op->on_connectivity_state_change != NULL) {
grpc_connectivity_state_notify_on_state_change(
@@ -1022,11 +1022,13 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
*stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames);
GPR_ASSERT(*stream_global->recv_message != NULL);
- grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, true,
+ NULL);
stream_global->recv_message_ready = NULL;
} else if (stream_global->published_trailing_metadata) {
*stream_global->recv_message = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, true,
+ NULL);
stream_global->recv_message_ready = NULL;
}
}
@@ -1336,7 +1338,7 @@ static void read_error_locked(grpc_exec_ctx *exec_ctx,
}
/* tcp read callback */
-static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) {
+static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) {
size_t i;
int keep_reading = 0;
grpc_chttp2_transport *t = tp;
@@ -1523,7 +1525,7 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
unlock(exec_ctx, bs->transport);
return 1;
} else if (bs->failed) {
- grpc_exec_ctx_enqueue(exec_ctx, on_complete, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, on_complete, false, NULL);
unlock(exec_ctx, bs->transport);
return 0;
} else {
@@ -1552,7 +1554,7 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&bs->transport->mu);
if (bs->on_next != NULL) {
*bs->next = slice;
- grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, true, NULL);
bs->on_next = NULL;
} else {
gpr_slice_buffer_add(&bs->slices, slice);
@@ -1567,7 +1569,7 @@ void grpc_chttp2_incoming_byte_stream_finished(
if (from_parsing_thread) {
gpr_mu_lock(&bs->transport->mu);
}
- grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, false, NULL);
bs->on_next = NULL;
bs->failed = 1;
if (from_parsing_thread) {
diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c
index 3c3fd4671d..87765b9799 100644
--- a/src/core/transport/connectivity_state.c
+++ b/src/core/transport/connectivity_state.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -78,7 +78,7 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx,
} else {
success = 0;
}
- grpc_exec_ctx_enqueue(exec_ctx, w->notify, success);
+ grpc_exec_ctx_enqueue(exec_ctx, w->notify, success, NULL);
gpr_free(w);
}
gpr_free(tracker->name);
@@ -109,7 +109,7 @@ int grpc_connectivity_state_notify_on_state_change(
if (current == NULL) {
grpc_connectivity_state_watcher *w = tracker->watchers;
if (w != NULL && w->notify == notify) {
- grpc_exec_ctx_enqueue(exec_ctx, notify, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, notify, false, NULL);
tracker->watchers = w->next;
gpr_free(w);
return 0;
@@ -117,7 +117,7 @@ int grpc_connectivity_state_notify_on_state_change(
while (w != NULL) {
grpc_connectivity_state_watcher *rm_candidate = w->next;
if (rm_candidate != NULL && rm_candidate->notify == notify) {
- grpc_exec_ctx_enqueue(exec_ctx, notify, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, notify, false, NULL);
w->next = w->next->next;
gpr_free(rm_candidate);
return 0;
@@ -128,7 +128,7 @@ int grpc_connectivity_state_notify_on_state_change(
} else {
if (tracker->current_state != *current) {
*current = tracker->current_state;
- grpc_exec_ctx_enqueue(exec_ctx, notify, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, notify, true, NULL);
} else {
grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
w->current = current;
@@ -158,7 +158,7 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
while ((w = tracker->watchers) != NULL) {
*w->current = tracker->current_state;
tracker->watchers = w->next;
- grpc_exec_ctx_enqueue(exec_ctx, w->notify, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, w->notify, true, NULL);
gpr_free(w);
}
}
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index 2ab978be46..08d685668c 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -59,7 +59,7 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_stream_refcount *refcount) {
#endif
if (gpr_unref(&refcount->refs)) {
- grpc_exec_ctx_enqueue(exec_ctx, &refcount->destroy, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &refcount->destroy, true, NULL);
}
}
@@ -125,8 +125,8 @@ char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
void grpc_transport_stream_op_finish_with_failure(
grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op) {
- grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, 0);
- grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL);
+ grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL);
}
void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
@@ -150,7 +150,7 @@ typedef struct {
grpc_closure closure;
} close_message_data;
-static void free_message(grpc_exec_ctx *exec_ctx, void *p, int iomgr_success) {
+static void free_message(grpc_exec_ctx *exec_ctx, void *p, bool iomgr_success) {
close_message_data *cmd = p;
gpr_slice_unref(cmd->message);
if (cmd->then_call != NULL) {
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index f94f0ae76e..f5cac77adc 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -81,16 +81,29 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount);
/* Transport stream op: a set of operations to perform on a transport
against a single stream */
typedef struct grpc_transport_stream_op {
+ /** Send initial metadata to the peer, from the provided metadata batch. */
grpc_metadata_batch *send_initial_metadata;
+
+ /** Send trailing metadata to the peer, from the provided metadata batch. */
grpc_metadata_batch *send_trailing_metadata;
+ /** Send message data to the peer, from the provided byte stream. */
grpc_byte_stream *send_message;
+ /** Receive initial metadata from the stream, into provided metadata batch. */
grpc_metadata_batch *recv_initial_metadata;
+
+ /** Receive message data from the stream, into provided byte stream. */
grpc_byte_stream **recv_message;
+ /** Should be enqueued when one message is ready to be processed. */
grpc_closure *recv_message_ready;
+
+ /** Receive trailing metadata from the stream, into provided metadata batch.
+ */
grpc_metadata_batch *recv_trailing_metadata;
+ /** Should be enqueued when all requested operations (excluding recv_message
+ which has its own closure) in a given batch have been completed. */
grpc_closure *on_complete;
/** If != GRPC_STATUS_OK, cancel this stream */
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index c7974d655b..ae20392d11 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -35,24 +35,29 @@
#include <memory>
-#include <grpc/grpc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/slice.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
-#include <grpc++/security/credentials.h>
#include <grpc++/impl/call.h>
+#include <grpc++/impl/codegen/completion_queue_tag.h>
+#include <grpc++/impl/grpc_library.h>
#include <grpc++/impl/rpc_method.h>
+#include <grpc++/security/credentials.h>
#include <grpc++/support/channel_arguments.h>
#include <grpc++/support/config.h>
#include <grpc++/support/status.h>
#include <grpc++/support/time.h>
+#include <grpc/grpc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice.h>
#include "src/core/profiling/timers.h"
namespace grpc {
+static internal::GrpcLibraryInitializer g_gli_initializer;
Channel::Channel(const grpc::string& host, grpc_channel* channel)
- : host_(host), c_channel_(channel) {}
+ : host_(host), c_channel_(channel) {
+ g_gli_initializer.summon();
+}
Channel::~Channel() { grpc_channel_destroy(c_channel_); }
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc
index 2aa532808c..73147fd7bb 100644
--- a/src/cpp/client/client_context.cc
+++ b/src/cpp/client/client_context.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -33,6 +33,7 @@
#include <grpc++/client_context.h>
+#include <grpc/compression.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
@@ -48,6 +49,7 @@ namespace grpc {
class DefaultGlobalClientCallbacks GRPC_FINAL
: public ClientContext::GlobalCallbacks {
public:
+ ~DefaultGlobalClientCallbacks() GRPC_OVERRIDE {}
void DefaultConstructor(ClientContext* context) GRPC_OVERRIDE {}
void Destructor(ClientContext* context) GRPC_OVERRIDE {}
};
diff --git a/src/cpp/client/create_channel.cc b/src/cpp/client/create_channel.cc
index 3bbca807d3..fdaa28ffef 100644
--- a/src/cpp/client/create_channel.cc
+++ b/src/cpp/client/create_channel.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -36,6 +36,7 @@
#include <grpc++/channel.h>
#include <grpc++/create_channel.h>
+#include <grpc++/impl/grpc_library.h>
#include <grpc++/support/channel_arguments.h>
#include "src/cpp/client/create_channel_internal.h"
@@ -53,7 +54,8 @@ std::shared_ptr<Channel> CreateCustomChannel(
const grpc::string& target,
const std::shared_ptr<ChannelCredentials>& creds,
const ChannelArguments& args) {
- GrpcLibrary init_lib; // We need to call init in case of a bad creds.
+ internal::GrpcLibrary
+ init_lib; // We need to call init in case of a bad creds.
ChannelArguments cp_args = args;
std::ostringstream user_agent_prefix;
user_agent_prefix << "grpc-c++/" << grpc_version_string();
diff --git a/src/cpp/client/credentials.cc b/src/cpp/client/credentials.cc
index 0c08db11a9..6fb620b0ea 100644
--- a/src/cpp/client/credentials.cc
+++ b/src/cpp/client/credentials.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -31,12 +31,18 @@
*
*/
+#include <grpc++/impl/grpc_library.h>
#include <grpc++/security/credentials.h>
namespace grpc {
+static internal::GrpcLibraryInitializer g_gli_initializer;
+ChannelCredentials::ChannelCredentials() { g_gli_initializer.summon(); }
+
ChannelCredentials::~ChannelCredentials() {}
+CallCredentials::CallCredentials() { g_gli_initializer.summon(); }
+
CallCredentials::~CallCredentials() {}
} // namespace grpc
diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc
index 96ae25b761..074dae7ca7 100644
--- a/src/cpp/client/secure_credentials.cc
+++ b/src/cpp/client/secure_credentials.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -41,6 +41,13 @@
namespace grpc {
+static internal::GrpcLibraryInitializer g_gli_initializer;
+SecureChannelCredentials::SecureChannelCredentials(
+ grpc_channel_credentials* c_creds)
+ : c_creds_(c_creds) {
+ g_gli_initializer.summon();
+}
+
std::shared_ptr<grpc::Channel> SecureChannelCredentials::CreateChannel(
const string& target, const grpc::ChannelArguments& args) {
grpc_channel_args channel_args;
@@ -51,6 +58,12 @@ std::shared_ptr<grpc::Channel> SecureChannelCredentials::CreateChannel(
nullptr));
}
+SecureCallCredentials::SecureCallCredentials(grpc_call_credentials* c_creds)
+ : c_creds_(c_creds) {
+ internal::GrpcLibraryInitializer gli_initializer;
+ gli_initializer.summon();
+}
+
bool SecureCallCredentials::ApplyToCall(grpc_call* call) {
return grpc_call_set_credentials(call, c_creds_) == GRPC_CALL_OK;
}
diff --git a/src/cpp/client/secure_credentials.h b/src/cpp/client/secure_credentials.h
index cef59292dd..9e84102154 100644
--- a/src/cpp/client/secure_credentials.h
+++ b/src/cpp/client/secure_credentials.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -45,11 +45,8 @@ namespace grpc {
class SecureChannelCredentials GRPC_FINAL : public ChannelCredentials {
public:
- explicit SecureChannelCredentials(grpc_channel_credentials* c_creds)
- : c_creds_(c_creds) {}
- ~SecureChannelCredentials() GRPC_OVERRIDE {
- grpc_channel_credentials_release(c_creds_);
- }
+ explicit SecureChannelCredentials(grpc_channel_credentials* c_creds);
+ ~SecureChannelCredentials() { grpc_channel_credentials_release(c_creds_); }
grpc_channel_credentials* GetRawCreds() { return c_creds_; }
std::shared_ptr<grpc::Channel> CreateChannel(
@@ -62,11 +59,8 @@ class SecureChannelCredentials GRPC_FINAL : public ChannelCredentials {
class SecureCallCredentials GRPC_FINAL : public CallCredentials {
public:
- explicit SecureCallCredentials(grpc_call_credentials* c_creds)
- : c_creds_(c_creds) {}
- ~SecureCallCredentials() GRPC_OVERRIDE {
- grpc_call_credentials_release(c_creds_);
- }
+ explicit SecureCallCredentials(grpc_call_credentials* c_creds);
+ ~SecureCallCredentials() { grpc_call_credentials_release(c_creds_); }
grpc_call_credentials* GetRawCreds() { return c_creds_; }
bool ApplyToCall(grpc_call* call) GRPC_OVERRIDE;
diff --git a/src/core/census/context.h b/src/cpp/codegen/grpc_library.cc
index 700bcf86cf..48acec3f3d 100644
--- a/src/core/census/context.h
+++ b/src/cpp/codegen/grpc_library.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -31,17 +31,10 @@
*
*/
-#ifndef GRPC_INTERNAL_CORE_CENSUS_CONTEXT_H
-#define GRPC_INTERNAL_CORE_CENSUS_CONTEXT_H
+#include <grpc++/impl/codegen/grpc_library.h>
-#include <grpc/census.h>
+namespace grpc {
-#define GRPC_CENSUS_MAX_ON_THE_WIRE_TAG_BYTES 2048
+GrpcLibraryInterface *g_glip = nullptr;
-/* census_context is the in-memory representation of information needed to
- * maintain tracing, RPC statistics and resource usage information. */
-struct census_context {
- census_tag_set *tags; /* Opaque data structure for census tags. */
-};
-
-#endif /* GRPC_INTERNAL_CORE_CENSUS_CONTEXT_H */
+} // namespace grpc
diff --git a/src/cpp/common/alarm.cc b/src/cpp/common/alarm.cc
index 1f0f04175e..807a67df24 100644
--- a/src/cpp/common/alarm.cc
+++ b/src/cpp/common/alarm.cc
@@ -1,5 +1,5 @@
/*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -30,13 +30,18 @@
*
*/
-#include <grpc/grpc.h>
#include <grpc++/alarm.h>
+#include <grpc++/completion_queue.h>
+#include <grpc++/impl/grpc_library.h>
+#include <grpc/grpc.h>
namespace grpc {
+static internal::GrpcLibraryInitializer g_gli_initializer;
Alarm::Alarm(CompletionQueue* cq, gpr_timespec deadline, void* tag)
- : alarm_(grpc_alarm_create(cq->cq(), deadline, tag)) {}
+ : alarm_(grpc_alarm_create(cq->cq(), deadline, tag)) {
+ g_gli_initializer.summon();
+}
Alarm::~Alarm() { grpc_alarm_destroy(alarm_); }
diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc
index a175beb452..4f76dfff1d 100644
--- a/src/cpp/common/completion_queue.cc
+++ b/src/cpp/common/completion_queue.cc
@@ -1,5 +1,5 @@
/*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -34,13 +34,17 @@
#include <memory>
+#include <grpc++/impl/codegen/completion_queue_tag.h>
+#include <grpc++/impl/grpc_library.h>
+#include <grpc++/support/time.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
-#include <grpc++/support/time.h>
namespace grpc {
+static internal::GrpcLibraryInitializer g_gli_initializer;
CompletionQueue::CompletionQueue() {
+ g_gli_initializer.summon();
cq_ = grpc_completion_queue_create(nullptr);
}
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 878775bbee..0d31140924 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -35,16 +35,19 @@
#include <utility>
-#include <grpc/grpc.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
#include <grpc++/completion_queue.h>
#include <grpc++/generic/async_generic_service.h>
+#include <grpc++/impl/codegen/completion_queue_tag.h>
+#include <grpc++/impl/grpc_library.h>
+#include <grpc++/impl/method_handler_impl.h>
#include <grpc++/impl/rpc_service_method.h>
#include <grpc++/impl/service_type.h>
-#include <grpc++/server_context.h>
#include <grpc++/security/server_credentials.h>
+#include <grpc++/server_context.h>
#include <grpc++/support/time.h>
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
#include "src/core/profiling/timers.h"
#include "src/cpp/server/thread_pool_interface.h"
@@ -53,17 +56,17 @@ namespace grpc {
class DefaultGlobalCallbacks GRPC_FINAL : public Server::GlobalCallbacks {
public:
+ ~DefaultGlobalCallbacks() GRPC_OVERRIDE {}
void PreSynchronousRequest(ServerContext* context) GRPC_OVERRIDE {}
void PostSynchronousRequest(ServerContext* context) GRPC_OVERRIDE {}
};
-static Server::GlobalCallbacks* g_callbacks = nullptr;
+static std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr;
static gpr_once g_once_init_callbacks = GPR_ONCE_INIT;
static void InitGlobalCallbacks() {
if (g_callbacks == nullptr) {
- static DefaultGlobalCallbacks default_global_callbacks;
- g_callbacks = &default_global_callbacks;
+ g_callbacks.reset(new DefaultGlobalCallbacks());
}
}
@@ -234,12 +237,12 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
}
}
- void Run() {
+ void Run(std::shared_ptr<GlobalCallbacks> global_callbacks) {
ctx_.BeginCompletionOp(&call_);
- g_callbacks->PreSynchronousRequest(&ctx_);
+ global_callbacks->PreSynchronousRequest(&ctx_);
method_->handler()->RunHandler(MethodHandler::HandlerParameter(
&call_, &ctx_, request_payload_, call_.max_message_size()));
- g_callbacks->PostSynchronousRequest(&ctx_);
+ global_callbacks->PostSynchronousRequest(&ctx_);
request_payload_ = nullptr;
void* ignored_tag;
bool ignored_ok;
@@ -275,6 +278,7 @@ static grpc_server* CreateServer(const ChannelArguments& args) {
return grpc_server_create(&channel_args, nullptr);
}
+static internal::GrpcLibraryInitializer g_gli_initializer;
Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
int max_message_size, const ChannelArguments& args)
: max_message_size_(max_message_size),
@@ -286,7 +290,9 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
server_(CreateServer(args)),
thread_pool_(thread_pool),
thread_pool_owned_(thread_pool_owned) {
+ g_gli_initializer.summon();
gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
+ global_callbacks_ = g_callbacks;
grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
}
@@ -296,6 +302,8 @@ Server::~Server() {
if (started_ && !shutdown_) {
lock.unlock();
Shutdown();
+ } else if (!started_) {
+ cq_.Shutdown();
}
}
void* got_tag;
@@ -311,39 +319,34 @@ Server::~Server() {
void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
GPR_ASSERT(g_callbacks == nullptr);
GPR_ASSERT(callbacks != nullptr);
- g_callbacks = callbacks;
+ g_callbacks.reset(callbacks);
}
-bool Server::RegisterService(const grpc::string* host, RpcService* service) {
- for (int i = 0; i < service->GetMethodCount(); ++i) {
- RpcServiceMethod* method = service->GetMethod(i);
+bool Server::RegisterService(const grpc::string* host, Service* service) {
+ bool has_async_methods = service->has_async_methods();
+ if (has_async_methods) {
+ GPR_ASSERT(service->server_ == nullptr &&
+ "Can only register an asynchronous service against one server.");
+ service->server_ = this;
+ }
+ for (auto it = service->methods_.begin(); it != service->methods_.end();
+ ++it) {
+ if (it->get() == nullptr) { // Handled by generic service if any.
+ continue;
+ }
+ RpcServiceMethod* method = it->get();
void* tag = grpc_server_register_method(server_, method->name(),
host ? host->c_str() : nullptr);
- if (!tag) {
+ if (tag == nullptr) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name());
return false;
}
- sync_methods_->emplace_back(method, tag);
- }
- return true;
-}
-
-bool Server::RegisterAsyncService(const grpc::string* host,
- AsynchronousService* service) {
- GPR_ASSERT(service->server_ == nullptr &&
- "Can only register an asynchronous service against one server.");
- service->server_ = this;
- service->request_args_ = new void* [service->method_count_];
- for (size_t i = 0; i < service->method_count_; ++i) {
- void* tag = grpc_server_register_method(server_, service->method_names_[i],
- host ? host->c_str() : nullptr);
- if (!tag) {
- gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
- service->method_names_[i]);
- return false;
+ if (method->handler() == nullptr) {
+ method->set_server_tag(tag);
+ } else {
+ sync_methods_->emplace_back(method, tag);
}
- service->request_args_[i] = tag;
}
return true;
}
@@ -439,8 +442,8 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
GPR_ASSERT(GRPC_CALL_OK == result);
}
-Server::BaseAsyncRequest::BaseAsyncRequest(
- Server* server, ServerContext* context,
+ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
+ ServerInterface* server, ServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag,
bool delete_on_finalize)
: server_(server),
@@ -453,9 +456,8 @@ Server::BaseAsyncRequest::BaseAsyncRequest(
memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
}
-Server::BaseAsyncRequest::~BaseAsyncRequest() {}
-
-bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
+bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
+ bool* status) {
if (*status) {
for (size_t i = 0; i < initial_metadata_array_.count; i++) {
context_->client_metadata_.insert(
@@ -469,7 +471,7 @@ bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
grpc_metadata_array_destroy(&initial_metadata_array_);
context_->set_call(call_);
context_->cq_ = call_cq_;
- Call call(call_, server_, call_cq_, server_->max_message_size_);
+ Call call(call_, server_, call_cq_, server_->max_message_size());
if (*status && call_) {
context_->BeginCompletionOp(&call);
}
@@ -482,22 +484,22 @@ bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
return true;
}
-Server::RegisteredAsyncRequest::RegisteredAsyncRequest(
- Server* server, ServerContext* context,
+ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
+ ServerInterface* server, ServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
: BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
-void Server::RegisteredAsyncRequest::IssueRequest(
+void ServerInterface::RegisteredAsyncRequest::IssueRequest(
void* registered_method, grpc_byte_buffer** payload,
ServerCompletionQueue* notification_cq) {
grpc_server_request_registered_call(
- server_->server_, registered_method, &call_, &context_->deadline_,
+ server_->server(), registered_method, &call_, &context_->deadline_,
&initial_metadata_array_, payload, call_cq_->cq(), notification_cq->cq(),
this);
}
-Server::GenericAsyncRequest::GenericAsyncRequest(
- Server* server, GenericServerContext* context,
+ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
+ ServerInterface* server, GenericServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
: BaseAsyncRequest(server, context, stream, call_cq, tag,
@@ -505,12 +507,13 @@ Server::GenericAsyncRequest::GenericAsyncRequest(
grpc_call_details_init(&call_details_);
GPR_ASSERT(notification_cq);
GPR_ASSERT(call_cq);
- grpc_server_request_call(server->server_, &call_, &call_details_,
+ grpc_server_request_call(server->server(), &call_, &call_details_,
&initial_metadata_array_, call_cq->cq(),
notification_cq->cq(), this);
}
-bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) {
+bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag,
+ bool* status) {
// TODO(yangg) remove the copy here.
if (*status) {
static_cast<GenericServerContext*>(context_)->method_ =
@@ -569,7 +572,7 @@ void Server::RunRpc() {
}
}
GPR_TIMER_SCOPE("cd.Run()", 0);
- cd.Run();
+ cd.Run(global_callbacks_);
}
}
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 26c0724a30..a8c188e5a5 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -43,7 +43,7 @@
namespace grpc {
ServerBuilder::ServerBuilder()
- : max_message_size_(-1), generic_service_(nullptr), thread_pool_(nullptr) {
+ : max_message_size_(-1), generic_service_(nullptr) {
grpc_compression_options_init(&compression_options_);
}
@@ -53,24 +53,13 @@ std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() {
return std::unique_ptr<ServerCompletionQueue>(cq);
}
-void ServerBuilder::RegisterService(SynchronousService* service) {
- services_.emplace_back(new NamedService<RpcService>(service->service()));
-}
-
-void ServerBuilder::RegisterAsyncService(AsynchronousService* service) {
- async_services_.emplace_back(new NamedService<AsynchronousService>(service));
+void ServerBuilder::RegisterService(Service* service) {
+ services_.emplace_back(new NamedService(service));
}
void ServerBuilder::RegisterService(const grpc::string& addr,
- SynchronousService* service) {
- services_.emplace_back(
- new NamedService<RpcService>(addr, service->service()));
-}
-
-void ServerBuilder::RegisterAsyncService(const grpc::string& addr,
- AsynchronousService* service) {
- async_services_.emplace_back(
- new NamedService<AsynchronousService>(addr, service));
+ Service* service) {
+ services_.emplace_back(new NamedService(addr, service));
}
void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) {
@@ -96,14 +85,14 @@ void ServerBuilder::AddListeningPort(const grpc::string& addr,
}
std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
- bool thread_pool_owned = false;
- if (!async_services_.empty() && !services_.empty()) {
- gpr_log(GPR_ERROR, "Mixing async and sync services is unsupported for now");
- return nullptr;
- }
- if (!thread_pool_ && !services_.empty()) {
- thread_pool_ = CreateDefaultThreadPool();
- thread_pool_owned = true;
+ std::unique_ptr<ThreadPoolInterface> thread_pool;
+ for (auto it = services_.begin(); it != services_.end(); ++it) {
+ if ((*it)->service->has_synchronous_methods()) {
+ if (thread_pool == nullptr) {
+ thread_pool.reset(CreateDefaultThreadPool());
+ break;
+ }
+ }
}
ChannelArguments args;
for (auto option = options_.begin(); option != options_.end(); ++option) {
@@ -115,7 +104,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
args.SetInt(GRPC_COMPRESSION_ALGORITHM_STATE_ARG,
compression_options_.enabled_algorithms_bitset);
std::unique_ptr<Server> server(
- new Server(thread_pool_, thread_pool_owned, max_message_size_, args));
+ new Server(thread_pool.release(), true, max_message_size_, args));
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
nullptr);
@@ -126,15 +115,17 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
return nullptr;
}
}
- for (auto service = async_services_.begin(); service != async_services_.end();
- service++) {
- if (!server->RegisterAsyncService((*service)->host.get(),
- (*service)->service)) {
- return nullptr;
- }
- }
if (generic_service_) {
server->RegisterAsyncGenericService(generic_service_);
+ } else {
+ for (auto it = services_.begin(); it != services_.end(); ++it) {
+ if ((*it)->service->has_generic_methods()) {
+ gpr_log(GPR_ERROR,
+ "Some methods were marked generic but there is no "
+ "generic service registered.");
+ return nullptr;
+ }
+ }
}
for (auto port = ports_.begin(); port != ports_.end(); port++) {
int r = server->AddListeningPort(port->addr, port->creds.get());
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index b3a74c7fce..3732c1f090 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -33,9 +33,11 @@
#include <grpc++/server_context.h>
+#include <grpc/compression.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc++/completion_queue.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/sync.h>
#include <grpc++/support/time.h>
diff --git a/src/cpp/util/byte_buffer.cc b/src/cpp/util/byte_buffer.cc
index 2952f94b24..3a2318d1a6 100644
--- a/src/cpp/util/byte_buffer.cc
+++ b/src/cpp/util/byte_buffer.cc
@@ -31,8 +31,8 @@
*
*/
-#include <grpc/byte_buffer_reader.h>
#include <grpc++/support/byte_buffer.h>
+#include <grpc/byte_buffer_reader.h>
namespace grpc {
@@ -84,8 +84,10 @@ ByteBuffer::ByteBuffer(const ByteBuffer& buf)
: buffer_(grpc_byte_buffer_copy(buf.buffer_)) {}
ByteBuffer& ByteBuffer::operator=(const ByteBuffer& buf) {
- Clear(); // first remove existing data
- buffer_ = grpc_byte_buffer_copy(buf.buffer_); // then copy
+ Clear(); // first remove existing data
+ if (buf.buffer_) {
+ buffer_ = grpc_byte_buffer_copy(buf.buffer_); // then copy
+ }
return *this;
}
diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs
index 4c6d50356c..65813909de 100644
--- a/src/csharp/Grpc.Core/VersionInfo.cs
+++ b/src/csharp/Grpc.Core/VersionInfo.cs
@@ -1,6 +1,6 @@
#region Copyright notice and license
-// Copyright 2015, Google Inc.
+// Copyright 2015-2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
diff --git a/src/csharp/build_packages.bat b/src/csharp/build_packages.bat
index 7a34bf3fd6..e423545ef8 100644
--- a/src/csharp/build_packages.bat
+++ b/src/csharp/build_packages.bat
@@ -49,6 +49,10 @@ endlocal
@rem copy resulting nuget packages to artifacts directory
xcopy /Y /I *.nupkg ..\..\artifacts\
+@rem create a zipfile with the artifacts as well
+powershell -Command "Add-Type -Assembly 'System.IO.Compression.FileSystem'; [System.IO.Compression.ZipFile]::CreateFromDirectory('..\..\artifacts', 'csharp_nugets.zip');"
+xcopy /Y /I csharp_nugets.zip ..\..\artifacts\
+
goto :EOF
:error
diff --git a/src/node/ext/byte_buffer.cc b/src/node/ext/byte_buffer.cc
index c306292c04..ee703fdc91 100644
--- a/src/node/ext/byte_buffer.cc
+++ b/src/node/ext/byte_buffer.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -69,7 +69,7 @@ Local<Value> ByteBufferToBuffer(grpc_byte_buffer *buffer) {
return scope.Escape(Nan::Null());
}
size_t length = grpc_byte_buffer_length(buffer);
- char *result = reinterpret_cast<char *>(calloc(length, sizeof(char)));
+ char *result = new char[length];
size_t offset = 0;
grpc_byte_buffer_reader reader;
grpc_byte_buffer_reader_init(&reader, buffer);
diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc
index a2b8e0d22b..654c5aed09 100644
--- a/src/node/ext/node_grpc.cc
+++ b/src/node/ext/node_grpc.cc
@@ -112,8 +112,8 @@ void InitCallErrorConstants(Local<Object> exports) {
Nan::Set(exports, Nan::New("callError").ToLocalChecked(), call_error);
Local<Value> OK(Nan::New<Uint32, uint32_t>(GRPC_CALL_OK));
Nan::Set(call_error, Nan::New("OK").ToLocalChecked(), OK);
- Local<Value> ERROR(Nan::New<Uint32, uint32_t>(GRPC_CALL_ERROR));
- Nan::Set(call_error, Nan::New("ERROR").ToLocalChecked(), ERROR);
+ Local<Value> CALL_ERROR(Nan::New<Uint32, uint32_t>(GRPC_CALL_ERROR));
+ Nan::Set(call_error, Nan::New("ERROR").ToLocalChecked(), CALL_ERROR);
Local<Value> NOT_ON_SERVER(
Nan::New<Uint32, uint32_t>(GRPC_CALL_ERROR_NOT_ON_SERVER));
Nan::Set(call_error, Nan::New("NOT_ON_SERVER").ToLocalChecked(),
diff --git a/src/node/index.js b/src/node/index.js
index 0d1a7fd887..7eacdc67b1 100644
--- a/src/node/index.js
+++ b/src/node/index.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -51,7 +51,7 @@ var server = require('./src/server.js');
var Metadata = require('./src/metadata.js');
-var grpc = require('bindings')('grpc_node');
+var grpc = require('./src/grpc_extension');
/**
* Load a gRPC object from an existing ProtoBuf.Reflect object.
diff --git a/src/node/performance/benchmark_server.js b/src/node/performance/benchmark_server.js
index ba61e52ba1..e48acd48f5 100644
--- a/src/node/performance/benchmark_server.js
+++ b/src/node/performance/benchmark_server.js
@@ -76,7 +76,7 @@ function unaryCall(call, callback) {
*/
function streamingCall(call) {
call.on('data', function(value) {
- var payload = {body: zeroBuffer(value.repsonse_size)};
+ var payload = {body: zeroBuffer(value.response_size)};
call.write({payload: payload});
});
call.on('end', function() {
diff --git a/src/node/performance/worker_service_impl.js b/src/node/performance/worker_service_impl.js
index 8841ae13c3..99ae32127e 100644
--- a/src/node/performance/worker_service_impl.js
+++ b/src/node/performance/worker_service_impl.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -99,7 +99,7 @@ exports.runServer = function runServer(call) {
var stats;
switch (request.argtype) {
case 'setup':
- server = new BenchmarkServer(request.setup.host, request.setup.port,
+ server = new BenchmarkServer('[::]', request.setup.port,
request.setup.security_params);
server.start();
stats = server.mark();
diff --git a/src/node/src/client.js b/src/node/src/client.js
index d57826781d..b5247a69ee 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -51,7 +51,7 @@
var _ = require('lodash');
-var grpc = require('bindings')('grpc_node');
+var grpc = require('./grpc_extension');
var common = require('./common');
diff --git a/src/node/src/credentials.js b/src/node/src/credentials.js
index dcbfac18f4..710ab6d879 100644
--- a/src/node/src/credentials.js
+++ b/src/node/src/credentials.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -61,7 +61,7 @@
'use strict';
-var grpc = require('bindings')('grpc_node.node');
+var grpc = require('./grpc_extension');
var CallCredentials = grpc.CallCredentials;
diff --git a/src/node/src/grpc_extension.js b/src/node/src/grpc_extension.js
new file mode 100644
index 0000000000..6a8fe2c03c
--- /dev/null
+++ b/src/node/src/grpc_extension.js
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+var binary = require('node-pre-gyp');
+var path = require('path');
+var binding_path =
+ binary.find(path.resolve(path.join(__dirname, '../../../package.json')));
+var binding = require(binding_path);
+
+module.exports = binding;
diff --git a/src/node/src/metadata.js b/src/node/src/metadata.js
index fef79f959e..51a9f8a216 100644
--- a/src/node/src/metadata.js
+++ b/src/node/src/metadata.js
@@ -49,7 +49,7 @@
var _ = require('lodash');
-var grpc = require('bindings')('grpc_node');
+var grpc = require('./grpc_extension');
/**
* Class for storing metadata. Keys are normalized to lowercase ASCII.
diff --git a/src/node/src/server.js b/src/node/src/server.js
index ceaa9f5d1f..e5aadcd565 100644
--- a/src/node/src/server.js
+++ b/src/node/src/server.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -51,7 +51,7 @@
var _ = require('lodash');
-var grpc = require('bindings')('grpc_node');
+var grpc = require('./grpc_extension');
var common = require('./common');
diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js
index f1f86b35db..2300096d03 100644
--- a/src/node/test/call_test.js
+++ b/src/node/test/call_test.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -34,7 +34,7 @@
'use strict';
var assert = require('assert');
-var grpc = require('bindings')('grpc_node');
+var grpc = require('../src/grpc_extension');
/**
* Helper function to return an absolute deadline given a relative timeout in
diff --git a/src/node/test/channel_test.js b/src/node/test/channel_test.js
index 7163a5fb5e..c0ae2b769a 100644
--- a/src/node/test/channel_test.js
+++ b/src/node/test/channel_test.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -34,7 +34,7 @@
'use strict';
var assert = require('assert');
-var grpc = require('bindings')('grpc_node');
+var grpc = require('../src/grpc_extension');
/**
* This is used for testing functions with multiple asynchronous calls that
diff --git a/src/node/test/constant_test.js b/src/node/test/constant_test.js
index b17cd339cb..712c70706d 100644
--- a/src/node/test/constant_test.js
+++ b/src/node/test/constant_test.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -34,7 +34,7 @@
'use strict';
var assert = require('assert');
-var grpc = require('bindings')('grpc_node');
+var grpc = require('../src/grpc_extension');
/**
* List of all status names
diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js
index 0f6c5941c4..353c6c761d 100644
--- a/src/node/test/end_to_end_test.js
+++ b/src/node/test/end_to_end_test.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -34,7 +34,7 @@
'use strict';
var assert = require('assert');
-var grpc = require('bindings')('grpc_node');
+var grpc = require('../src/grpc_extension');
/**
* This is used for testing functions with multiple asynchronous calls that
diff --git a/src/node/test/server_test.js b/src/node/test/server_test.js
index 592f47e145..71a9647184 100644
--- a/src/node/test/server_test.js
+++ b/src/node/test/server_test.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -36,7 +36,7 @@
var assert = require('assert');
var fs = require('fs');
var path = require('path');
-var grpc = require('bindings')('grpc_node');
+var grpc = require('../src/grpc_extension');
describe('server', function() {
describe('constructor', function() {
diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js
index fc765ed731..530f1f7749 100644
--- a/src/node/test/surface_test.js
+++ b/src/node/test/surface_test.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -952,6 +952,7 @@ describe('Call propagation', function() {
describe('Cancellation', function() {
it('With a unary call', function(done) {
done = multiDone(done, 2);
+ var call;
proxy_impl.unary = function(parent, callback) {
client.unary(parent.request, function(err, value) {
try {
@@ -969,12 +970,11 @@ describe('Call propagation', function() {
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
grpc.credentials.createInsecure());
- var call = proxy_client.unary({}, function(err, value) {
- done();
- });
+ call = proxy_client.unary({}, function(err, value) { done(); });
});
it('With a client stream call', function(done) {
done = multiDone(done, 2);
+ var call;
proxy_impl.clientStream = function(parent, callback) {
client.clientStream(function(err, value) {
try {
@@ -992,12 +992,11 @@ describe('Call propagation', function() {
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
grpc.credentials.createInsecure());
- var call = proxy_client.clientStream(function(err, value) {
- done();
- });
+ call = proxy_client.clientStream(function(err, value) { done(); });
});
it('With a server stream call', function(done) {
done = multiDone(done, 2);
+ var call;
proxy_impl.serverStream = function(parent) {
var child = client.serverStream(parent.request, null,
{parent: parent});
@@ -1013,13 +1012,14 @@ describe('Call propagation', function() {
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
grpc.credentials.createInsecure());
- var call = proxy_client.serverStream({});
+ call = proxy_client.serverStream({});
call.on('error', function(err) {
done();
});
});
it('With a bidi stream call', function(done) {
done = multiDone(done, 2);
+ var call;
proxy_impl.bidiStream = function(parent) {
var child = client.bidiStream(null, {parent: parent});
child.on('error', function(err) {
@@ -1034,7 +1034,7 @@ describe('Call propagation', function() {
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
grpc.credentials.createInsecure());
- var call = proxy_client.bidiStream();
+ call = proxy_client.bidiStream();
call.on('error', function(err) {
done();
});
diff --git a/src/objective-c/BoringSSL.podspec b/src/objective-c/BoringSSL.podspec
index f5b738203d..4a6df910a7 100644
--- a/src/objective-c/BoringSSL.podspec
+++ b/src/objective-c/BoringSSL.podspec
@@ -31,7 +31,7 @@
Pod::Spec.new do |s|
s.name = 'BoringSSL'
- s.version = '1.0'
+ s.version = '2.0'
s.summary = 'BoringSSL is a fork of OpenSSL that is designed to meet Google’s needs.'
# Adapted from the homepage:
s.description = <<-DESC
@@ -67,7 +67,7 @@ Pod::Spec.new do |s|
s.authors = 'Adam Langley', 'David Benjamin', 'Matt Braithwaite'
s.source = { :git => 'https://boringssl.googlesource.com/boringssl',
- :tag => 'version_for_cocoapods_1.0' }
+ :tag => 'version_for_cocoapods_2.0' }
s.source_files = 'ssl/*.{h,c}',
'ssl/**/*.{h,c}',
diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto
index 0784ebf91c..2f352e652f 100644
--- a/src/proto/grpc/testing/control.proto
+++ b/src/proto/grpc/testing/control.proto
@@ -1,4 +1,4 @@
-// Copyright 2015, Google Inc.
+// Copyright 2015-2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
@@ -42,6 +42,7 @@ enum ClientType {
enum ServerType {
SYNC_SERVER = 0;
ASYNC_SERVER = 1;
+ ASYNC_GENERIC_SERVER = 2;
}
enum RpcType {
@@ -126,8 +127,6 @@ message ClientArgs {
message ServerConfig {
ServerType server_type = 1;
SecurityParams security_params = 2;
- // Host on which to listen.
- string host = 3;
// Port on which to listen. Zero means pick unused port.
int32 port = 4;
// Only for async server. Number of threads used to serve the requests.
diff --git a/src/proto/grpc/testing/echo_messages.proto b/src/proto/grpc/testing/echo_messages.proto
index f01d645af7..d05a35548d 100644
--- a/src/proto/grpc/testing/echo_messages.proto
+++ b/src/proto/grpc/testing/echo_messages.proto
@@ -1,5 +1,5 @@
-// Copyright 2015, Google Inc.
+// Copyright 2015-2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
@@ -41,6 +41,7 @@ message RequestParams {
int32 response_message_length = 6;
bool echo_peer = 7;
string expected_client_identity = 8; // will force check_auth_context.
+ bool skip_cancelled_check = 9;
}
message EchoRequest {
diff --git a/src/python/README.md b/src/python/README.md
deleted file mode 100644
index 8b167659ff..0000000000
--- a/src/python/README.md
+++ /dev/null
@@ -1,100 +0,0 @@
-gRPC Python
-=========
-The Python facility of gRPC.
-
-Status
--------
-Beta : Core behavior well-used and proven; bugs lurk off the beaten path.
-
-PREREQUISITES
--------------
-- python-virtualenv, python-pip, python3-pip, python-dev, python3-dev
-- [homebrew][] on Mac OS X. These simplify the installation of the gRPC C core.
-
-INSTALLATION
--------------
-
-**Linux (Debian):**
-
-Add [Debian jessie-backports][] to your `sources.list` file. Example:
-
-```sh
-echo "deb http://http.debian.net/debian jessie-backports main" | \
-sudo tee -a /etc/apt/sources.list
-```
-
-Install the gRPC Debian package
-
-```sh
-sudo apt-get update
-sudo apt-get install libgrpc-dev
-```
-
-Install the gRPC Python module
-
-```sh
-sudo pip install grpcio
-```
-
-**Mac OS X**
-
-Install [homebrew][]. Run the following command to install gRPC Python.
-```sh
-$ curl -fsSL https://goo.gl/getgrpc | bash -s python
-```
-This will download and run the [gRPC install script][], then install the latest version of the gRPC Python package. It also installs the Protocol Buffers compiler (_protoc_) and the gRPC _protoc_ plugin for python.
-
-EXAMPLES
---------
-Please read our online documentation for a [Quick Start][] and a [detailed example][]
-
-BUILDING FROM SOURCE
----------------------
-- Clone this repository
-
-- Install tox
-```
-$ sudo pip install tox
-```
-
-- Initialize the git submodules
-```
-$ git submodule update --init
-```
-
-- Make the libraries
-```
-$ make
-```
-
-- Use build_python.sh to build the Python code and install it into a virtual environment
-```
-$ CONFIG=opt tools/run_tests/build_python.sh
-```
-
-TESTING
--------
-
-- Use run_python.sh to run gRPC as it was installed into the virtual environment
-```
-$ CONFIG=opt PYVER=2.7 tools/run_tests/run_python.sh
-```
-
-PACKAGING
----------
-
-- Install packaging dependencies
-```
-$ pip install setuptools twine
-```
-
-- Push to PyPI
-```
-$ ../../tools/distrib/python/submit.py
-```
-
-[homebrew]:http://brew.sh
-[gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
-[Quick Start]:http://www.grpc.io/docs/tutorials/basic/python.html
-[detailed example]:http://www.grpc.io/docs/installation/python.html
-[Debian jessie-backports]:http://backports.debian.org/Instructions/
diff --git a/src/python/grpcio/commands.py b/src/python/grpcio/commands.py
index 29b506abe0..98ad2e571d 100644
--- a/src/python/grpcio/commands.py
+++ b/src/python/grpcio/commands.py
@@ -30,15 +30,22 @@
"""Provides distutils command classes for the GRPC Python setup process."""
import distutils
+import glob
import os
import os.path
+import platform
import re
+import shutil
import subprocess
import sys
+import traceback
import setuptools
+from setuptools.command import bdist_egg
from setuptools.command import build_ext
from setuptools.command import build_py
+from setuptools.command import easy_install
+from setuptools.command import install
from setuptools.command import test
import support
@@ -58,6 +65,129 @@ class CommandError(Exception):
"""Simple exception class for GRPC custom commands."""
+# TODO(atash): Remove this once PyPI has better Linux bdist support. See
+# https://bitbucket.org/pypa/pypi/issues/120/binary-wheels-for-linux-are-not-supported
+def _get_linux_bdist_egg(decorated_basename, target_egg_basename):
+ """Returns a string path to a .egg file for Linux to install.
+
+ If we can retrieve a pre-compiled egg from online, uses it. Else, emits a
+ warning and builds from source.
+ """
+ # Break import style to ensure that setup.py has had a chance to install the
+ # relevant package eggs.
+ from six.moves.urllib import request
+ decorated_path = decorated_basename + '.egg'
+ try:
+ url = (
+ 'https://storage.googleapis.com/grpc-precompiled-binaries/'
+ 'python/{target}'
+ .format(target=decorated_path))
+ egg_data = request.urlopen(url).read()
+ except IOError as error:
+ raise CommandError(
+ '{}\n\nCould not find the bdist egg {}: {}'
+ .format(traceback.format_exc(), decorated_path, error.message))
+ # Our chosen local egg path.
+ egg_path = target_egg_basename + '.egg'
+ try:
+ with open(egg_path, 'w') as egg_file:
+ egg_file.write(egg_data)
+ except IOError as error:
+ raise CommandError(
+ '{}\n\nCould not write grpcio egg: {}'
+ .format(traceback.format_exc(), error.message))
+ return egg_path
+
+
+class EggNameMixin(object):
+
+ def egg_name(self, with_custom):
+ """
+ Args:
+ with_custom: Boolean describing whether or not to decorate the egg name
+ with custom gRPC-specific target information.
+ """
+ egg_command = self.get_finalized_command('bdist_egg')
+ base = os.path.splitext(os.path.basename(egg_command.egg_output))[0]
+ if with_custom:
+ flavor = 'ucs2' if sys.maxunicode == 65535 else 'ucs4'
+ return '{base}-{flavor}'.format(base=base, flavor=flavor)
+ else:
+ return base
+
+
+class Install(install.install, EggNameMixin):
+ """Custom Install command for gRPC Python.
+
+ This is for bdist shims and whatever else we might need a custom install
+ command for.
+ """
+
+ user_options = install.install.user_options + [
+ # TODO(atash): remove this once manylinux gets on PyPI. See
+ # https://bitbucket.org/pypa/pypi/issues/120/binary-wheels-for-linux-are-not-supported
+ ('use-linux-bdist', None,
+ 'Whether to retrieve a binary for Linux instead of building from '
+ 'source.'),
+ ]
+
+ def initialize_options(self):
+ install.install.initialize_options(self)
+ self.use_linux_bdist = False
+
+ def finalize_options(self):
+ install.install.finalize_options(self)
+
+ def run(self):
+ if self.use_linux_bdist:
+ try:
+ egg_path = _get_linux_bdist_egg(self.egg_name(True),
+ self.egg_name(False))
+ except CommandError as error:
+ sys.stderr.write(
+ '\nWARNING: Failed to acquire grpcio prebuilt binary:\n'
+ '{}.\n\n'.format(error.message))
+ raise
+ try:
+ self._run_bdist_retrieval_install(egg_path)
+ except Exception as error:
+ # if anything else happens (and given how there's no way to really know
+ # what's happening in setuptools here, I mean *anything*), warn the user
+ # and fall back to building from source.
+ sys.stderr.write(
+ '{}\nWARNING: Failed to install grpcio prebuilt binary.\n\n'
+ .format(traceback.format_exc()))
+ install.install.run(self)
+ else:
+ install.install.run(self)
+
+ # TODO(atash): Remove this once PyPI has better Linux bdist support. See
+ # https://bitbucket.org/pypa/pypi/issues/120/binary-wheels-for-linux-are-not-supported
+ def _run_bdist_retrieval_install(self, bdist_egg):
+ easy_install = self.distribution.get_command_class('easy_install')
+ easy_install_command = easy_install(
+ self.distribution, args='x', root=self.root, record=self.record,
+ )
+ easy_install_command.ensure_finalized()
+ easy_install_command.always_copy_from = '.'
+ easy_install_command.package_index.scan(glob.glob('*.egg'))
+ arguments = [bdist_egg]
+ if setuptools.bootstrap_install_from:
+ args.insert(0, setuptools.bootstrap_install_from)
+ easy_install_command.args = arguments
+ easy_install_command.run()
+ setuptools.bootstrap_install_from = None
+
+
+class BdistEggCustomName(bdist_egg.bdist_egg, EggNameMixin):
+ """Thin wrapper around the bdist_egg command to build with our custom name."""
+
+ def run(self):
+ bdist_egg.bdist_egg.run(self)
+ target = os.path.join(self.dist_dir, '{}.egg'.format(self.egg_name(True)))
+ shutil.move(self.get_outputs()[0], target)
+
+
class SphinxDocumentation(setuptools.Command):
"""Command to generate documentation via sphinx."""
@@ -190,11 +320,11 @@ class BuildExt(build_ext.build_ext):
extension.extra_link_args += list(BuildExt.LINK_OPTIONS[compiler])
try:
build_ext.build_ext.build_extensions(self)
- except KeyboardInterrupt:
- raise
except Exception as error:
- support.diagnose_build_ext_error(self, error)
- raise CommandError("Failed `build_ext` step.")
+ formatted_exception = traceback.format_exc()
+ support.diagnose_build_ext_error(self, error, formatted_exception)
+ raise CommandError(
+ "Failed `build_ext` step:\n{}".format(formatted_exception))
class Gather(setuptools.Command):
diff --git a/src/python/grpcio/grpc/framework/foundation/logging_pool.py b/src/python/grpcio/grpc/framework/foundation/logging_pool.py
index 7c7a6eebfc..f82c7f7fba 100644
--- a/src/python/grpcio/grpc/framework/foundation/logging_pool.py
+++ b/src/python/grpcio/grpc/framework/foundation/logging_pool.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -29,7 +29,6 @@
"""A thread pool that logs exceptions raised by tasks executed within it."""
-import functools
import logging
from concurrent import futures
@@ -37,12 +36,12 @@ from concurrent import futures
def _wrap(behavior):
"""Wraps an arbitrary callable behavior in exception-logging."""
- @functools.wraps(behavior)
def _wrapping(*args, **kwargs):
try:
return behavior(*args, **kwargs)
except Exception as e:
- logging.exception('Unexpected exception from task run in logging pool!')
+ logging.exception(
+ 'Unexpected exception from %s executed in logging pool!', behavior)
raise
return _wrapping
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 98ab1ff7f0..9b77613776 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -60,6 +60,7 @@ CORE_SOURCE_FILES = [
'src/core/support/string_posix.c',
'src/core/support/string_win32.c',
'src/core/support/subprocess_posix.c',
+ 'src/core/support/subprocess_windows.c',
'src/core/support/sync.c',
'src/core/support/sync_posix.c',
'src/core/support/sync_win32.c',
@@ -170,6 +171,7 @@ CORE_SOURCE_FILES = [
'src/core/json/json_reader.c',
'src/core/json/json_string.c',
'src/core/json/json_writer.c',
+ 'src/core/surface/alarm.c',
'src/core/surface/api_trace.c',
'src/core/surface/byte_buffer.c',
'src/core/surface/byte_buffer_reader.c',
@@ -221,7 +223,7 @@ CORE_SOURCE_FILES = [
'src/core/census/context.c',
'src/core/census/initialize.c',
'src/core/census/operation.c',
- 'src/core/census/tag_set.c',
+ 'src/core/census/placeholders.c',
'src/core/census/tracing.c',
'src/boringssl/err_data.c',
'third_party/boringssl/crypto/aes/aes.c',
diff --git a/src/python/grpcio/support.py b/src/python/grpcio/support.py
index bbc509653d..33244eb388 100644
--- a/src/python/grpcio/support.py
+++ b/src/python/grpcio/support.py
@@ -76,16 +76,40 @@ def _expect_compile(compiler, source_string, error_message):
"Diagnostics found a compilation environment issue:\n{}"
.format(error_message))
-def diagnose_build_ext_error(build_ext, error):
- {
- errors.CompileError: diagnose_compile_error
- }[type(error)](build_ext, error)
-
def diagnose_compile_error(build_ext, error):
- """Attempt to run a few test files through the compiler to see if we can
- diagnose the reason for the compile failure."""
+ """Attempt to diagnose an error during compilation."""
for c_check, message in C_CHECKS.items():
_expect_compile(build_ext.compiler, c_check, message)
- raise commands.CommandError(
- "\n\nWe could not diagnose your build failure. Please file an issue at "
- "http://www.github.com/grpc/grpc with `[Python install]` in the title.")
+ python_sources = [
+ source for source in build_ext.get_source_files()
+ if source.startswith('./src/python') and source.endswith('c')
+ ]
+ for source in python_sources:
+ if not os.path.isfile(source):
+ raise commands.CommandError(
+ ("Diagnostics found a missing Python extension source file:\n{}\n\n"
+ "This is usually because the Cython sources haven't been transpiled "
+ "into C yet and you're building from source.\n"
+ "Try setting the environment variable "
+ "`GRPC_PYTHON_BUILD_WITH_CYTHON=1` when invoking `setup.py` or "
+ "when using `pip`, e.g.:\n\n"
+ "pip install -rrequirements.txt\n"
+ "GRPC_PYTHON_BUILD_WITH_CYTHON=1 pip install .")
+ .format(source)
+ )
+
+
+_ERROR_DIAGNOSES = {
+ errors.CompileError: diagnose_compile_error
+}
+
+def diagnose_build_ext_error(build_ext, error, formatted):
+ diagnostic = _ERROR_DIAGNOSES.get(type(error))
+ if diagnostic is None:
+ raise commands.CommandError(
+ "\n\nWe could not diagnose your build failure. Please file an issue at "
+ "http://www.github.com/grpc/grpc with `[Python install]` in the title."
+ "\n\n{}".format(formatted))
+ else:
+ diagnostic(build_ext, error)
+
diff --git a/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py b/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py
index 452802da6a..0521e1c102 100644
--- a/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py
+++ b/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -29,6 +29,7 @@
"""Tests for grpc.framework.foundation.logging_pool."""
+import threading
import unittest
from grpc.framework.foundation import logging_pool
@@ -36,6 +37,21 @@ from grpc.framework.foundation import logging_pool
_POOL_SIZE = 16
+class _CallableObject(object):
+
+ def __init__(self):
+ self._lock = threading.Lock()
+ self._passed_values = []
+
+ def __call__(self, value):
+ with self._lock:
+ self._passed_values.append(value)
+
+ def passed_values(self):
+ with self._lock:
+ return tuple(self._passed_values)
+
+
class LoggingPoolTest(unittest.TestCase):
def testUpAndDown(self):
@@ -59,6 +75,14 @@ class LoggingPoolTest(unittest.TestCase):
self.assertIsNotNone(raised_exception)
+ def testCallableObjectExecuted(self):
+ callable_object = _CallableObject()
+ passed_object = object()
+ with logging_pool.pool(_POOL_SIZE) as pool:
+ future = pool.submit(callable_object, passed_object)
+ self.assertIsNone(future.result())
+ self.assertSequenceEqual((passed_object,), callable_object.passed_values())
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
index 3bcefa601d..c8a3a1bc74 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -30,9 +30,12 @@
"""Test code for the Face layer of RPC Framework."""
import abc
+import itertools
import unittest
+from concurrent import futures
# test_interfaces is referenced from specification in this module.
+from grpc.framework.foundation import logging_pool
from grpc.framework.interfaces.face import face
from tests.unit.framework.common import test_constants
from tests.unit.framework.common import test_control
@@ -139,13 +142,50 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
test_messages.verify(second_request, second_response, self)
- @unittest.skip('Parallel invocations impossible with blocking control flow!')
def testParallelInvocations(self):
- raise NotImplementedError()
+ pool = logging_pool.pool(test_constants.PARALLELISM)
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = []
+ response_futures = []
+ for _ in range(test_constants.PARALLELISM):
+ request = test_messages.request()
+ response_future = pool.submit(
+ self._invoker.blocking(group, method), request,
+ test_constants.LONG_TIMEOUT)
+ requests.append(request)
+ response_futures.append(response_future)
+
+ responses = [
+ response_future.result() for response_future in response_futures]
+
+ for request, response in zip(requests, responses):
+ test_messages.verify(request, response, self)
+ pool.shutdown(wait=True)
- @unittest.skip('Parallel invocations impossible with blocking control flow!')
def testWaitingForSomeButNotAllParallelInvocations(self):
- raise NotImplementedError()
+ pool = logging_pool.pool(test_constants.PARALLELISM)
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = []
+ response_futures_to_indices = {}
+ for index in range(test_constants.PARALLELISM):
+ request = test_messages.request()
+ response_future = pool.submit(
+ self._invoker.blocking(group, method), request,
+ test_constants.LONG_TIMEOUT)
+ requests.append(request)
+ response_futures_to_indices[response_future] = index
+
+ some_completed_response_futures_iterator = itertools.islice(
+ futures.as_completed(response_futures_to_indices),
+ test_constants.PARALLELISM / 2)
+ for response_future in some_completed_response_futures_iterator:
+ index = response_futures_to_indices[response_future]
+ test_messages.verify(requests[index], response_future.result(), self)
+ pool.shutdown(wait=True)
@unittest.skip('Cancellation impossible with blocking control flow!')
def testCancelledUnaryRequestUnaryResponse(self):
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
index fc8daa992f..1d36a931e8 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
@@ -31,8 +31,10 @@
import abc
import contextlib
+import itertools
import threading
import unittest
+from concurrent import futures
# test_interfaces is referenced from specification in this module.
from grpc.framework.foundation import logging_pool
@@ -219,6 +221,23 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
test_messages.verify(second_request, second_response, self)
+ def testParallelInvocations(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ second_request = test_messages.request()
+
+ first_response_future = self._invoker.future(group, method)(
+ first_request, test_constants.LONG_TIMEOUT)
+ second_response_future = self._invoker.future(group, method)(
+ second_request, test_constants.LONG_TIMEOUT)
+ first_response = first_response_future.result()
+ second_response = second_response_future.result()
+
+ test_messages.verify(first_request, first_response, self)
+ test_messages.verify(second_request, second_response, self)
+
for (group, method), test_messages_sequence in (
self._digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
@@ -237,26 +256,28 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
for request, response in zip(requests, responses):
test_messages.verify(request, response, self)
- def testParallelInvocations(self):
+ def testWaitingForSomeButNotAllParallelInvocations(self):
+ pool = logging_pool.pool(test_constants.PARALLELISM)
for (group, method), test_messages_sequence in (
self._digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
- first_request = test_messages.request()
- second_request = test_messages.request()
-
- first_response_future = self._invoker.future(group, method)(
- first_request, test_constants.LONG_TIMEOUT)
- second_response_future = self._invoker.future(group, method)(
- second_request, test_constants.LONG_TIMEOUT)
- first_response = first_response_future.result()
- second_response = second_response_future.result()
-
- test_messages.verify(first_request, first_response, self)
- test_messages.verify(second_request, second_response, self)
-
- @unittest.skip('TODO(nathaniel): implement.')
- def testWaitingForSomeButNotAllParallelInvocations(self):
- raise NotImplementedError()
+ requests = []
+ response_futures_to_indices = {}
+ for index in range(test_constants.PARALLELISM):
+ request = test_messages.request()
+ inner_response_future = self._invoker.future(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ outer_response_future = pool.submit(inner_response_future.result)
+ requests.append(request)
+ response_futures_to_indices[outer_response_future] = index
+
+ some_completed_response_futures_iterator = itertools.islice(
+ futures.as_completed(response_futures_to_indices),
+ test_constants.PARALLELISM / 2)
+ for response_future in some_completed_response_futures_iterator:
+ index = response_futures_to_indices[response_future]
+ test_messages.verify(requests[index], response_future.result(), self)
+ pool.shutdown(wait=True)
def testCancelledUnaryRequestUnaryResponse(self):
for (group, method), test_messages_sequence in (
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py
index 2e444ff09d..42a7f4e3b8 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -76,7 +76,7 @@ class Receiver(face.ResponseReceiver):
def unary_response(self):
with self._condition:
if self._abortion is not None:
- raise AssertionError('Aborted with abortion "%s"!' % self._abortion)
+ raise AssertionError('Aborted: "{}"!'.format(self._abortion))
elif len(self._responses) != 1:
raise AssertionError(
'%d responses received, not exactly one!', len(self._responses))
@@ -88,7 +88,7 @@ class Receiver(face.ResponseReceiver):
if self._abortion is None:
return list(self._responses)
else:
- raise AssertionError('Aborted with abortion "%s"!' % self._abortion)
+ raise AssertionError('Aborted: "{}"!'.format(self._abortion))
def abortion(self):
with self._condition:
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 43adafb73f..07864ebf59 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -525,12 +525,13 @@ typedef struct run_batch_stack {
grpc_status_code recv_status;
char *recv_status_details;
size_t recv_status_details_capacity;
- uint write_flag;
+ unsigned write_flag;
} run_batch_stack;
/* grpc_run_batch_stack_init ensures the run_batch_stack is properly
* initialized */
-static void grpc_run_batch_stack_init(run_batch_stack *st, uint write_flag) {
+static void grpc_run_batch_stack_init(run_batch_stack *st,
+ unsigned write_flag) {
MEMZERO(st, run_batch_stack, 1);
grpc_metadata_array_init(&st->send_metadata);
grpc_metadata_array_init(&st->send_trailing_metadata);
@@ -696,7 +697,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
grpc_call_error err;
VALUE result = Qnil;
VALUE rb_write_flag = rb_ivar_get(self, id_write_flag);
- uint write_flag = 0;
+ unsigned write_flag = 0;
TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
/* Validate the ops args, adding them to a ruby array */
diff --git a/src/ruby/ext/grpc/rb_call_credentials.c b/src/ruby/ext/grpc/rb_call_credentials.c
index 4d719d7541..ebcc6592c2 100644
--- a/src/ruby/ext/grpc/rb_call_credentials.c
+++ b/src/ruby/ext/grpc/rb_call_credentials.c
@@ -79,6 +79,7 @@ static VALUE grpc_rb_call_credentials_callback(VALUE callback_args) {
static VALUE grpc_rb_call_credentials_callback_rescue(VALUE args,
VALUE exception_object) {
VALUE result = rb_hash_new();
+ (void)args;
rb_hash_aset(result, rb_str_new2("metadata"), Qnil);
/* Currently only gives the exception class name. It should be possible get
more details */
@@ -132,6 +133,7 @@ static void grpc_rb_call_credentials_plugin_get_metadata(
}
static void grpc_rb_call_credentials_plugin_destroy(void *state) {
+ (void)state;
// Not sure what needs to be done here
}
diff --git a/src/ruby/ext/grpc/rb_event_thread.c b/src/ruby/ext/grpc/rb_event_thread.c
index 95af091317..516f0bdad2 100644
--- a/src/ruby/ext/grpc/rb_event_thread.c
+++ b/src/ruby/ext/grpc/rb_event_thread.c
@@ -102,6 +102,7 @@ static void grpc_rb_event_queue_destroy() {
static void *grpc_rb_wait_for_event_no_gil(void *param) {
grpc_rb_event *event = NULL;
+ (void)param;
gpr_mu_lock(&event_queue.mu);
while ((event = grpc_rb_event_queue_dequeue()) == NULL) {
gpr_cv_wait(&event_queue.cv,
@@ -117,6 +118,7 @@ static void *grpc_rb_wait_for_event_no_gil(void *param) {
}
static void grpc_rb_event_unblocking_func(void *arg) {
+ (void)arg;
gpr_mu_lock(&event_queue.mu);
event_queue.abort = true;
gpr_cv_signal(&event_queue.cv);
@@ -127,6 +129,7 @@ static void grpc_rb_event_unblocking_func(void *arg) {
* events */
static VALUE grpc_rb_event_thread(VALUE arg) {
grpc_rb_event *event;
+ (void)arg;
while(true) {
event = (grpc_rb_event*)rb_thread_call_without_gvl(
grpc_rb_wait_for_event_no_gil, NULL,
diff --git a/src/zlib/gen_build_yaml.py b/src/zlib/gen_build_yaml.py
index 8d6064b10a..4bd557367a 100755
--- a/src/zlib/gen_build_yaml.py
+++ b/src/zlib/gen_build_yaml.py
@@ -54,6 +54,7 @@ try:
out['libs'] = [{
'name': 'z',
'zlib': True,
+ 'defaults': 'zlib',
'build': 'private',
'language': 'c',
'secure': 'no',