aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/compiler/cpp_generator.cc221
-rw-r--r--src/compiler/csharp_generator.cc56
-rw-r--r--src/compiler/objective_c_generator.cc108
-rw-r--r--src/compiler/ruby_generator.cc87
-rw-r--r--src/core/channel/client_channel.c1
-rw-r--r--src/core/iomgr/fd_posix.c96
-rw-r--r--src/core/iomgr/fd_posix.h28
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c2
-rw-r--r--src/core/iomgr/pollset_posix.c6
-rw-r--r--src/core/surface/lame_client.c1
-rwxr-xr-xsrc/php/bin/run_gen_code_test.sh4
11 files changed, 361 insertions, 249 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index acac1475f7..b0d2b5d229 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -86,23 +86,25 @@ grpc::string FilenameIdentifier(const grpc::string &filename) {
grpc::string GetHeaderPrologue(const grpc::protobuf::FileDescriptor *file,
const Parameters &params) {
grpc::string output;
- grpc::protobuf::io::StringOutputStream output_stream(&output);
- grpc::protobuf::io::Printer printer(&output_stream, '$');
- std::map<grpc::string, grpc::string> vars;
-
- vars["filename"] = file->name();
- vars["filename_identifier"] = FilenameIdentifier(file->name());
- vars["filename_base"] = grpc_generator::StripProto(file->name());
-
- printer.Print(vars, "// Generated by the gRPC protobuf plugin.\n");
- printer.Print(vars, "// If you make any local change, they will be lost.\n");
- printer.Print(vars, "// source: $filename$\n");
- printer.Print(vars, "#ifndef GRPC_$filename_identifier$__INCLUDED\n");
- printer.Print(vars, "#define GRPC_$filename_identifier$__INCLUDED\n");
- printer.Print(vars, "\n");
- printer.Print(vars, "#include \"$filename_base$.pb.h\"\n");
- printer.Print(vars, "\n");
-
+ {
+ // Scope the output stream so it closes and finalizes output to the string.
+ grpc::protobuf::io::StringOutputStream output_stream(&output);
+ grpc::protobuf::io::Printer printer(&output_stream, '$');
+ std::map<grpc::string, grpc::string> vars;
+
+ vars["filename"] = file->name();
+ vars["filename_identifier"] = FilenameIdentifier(file->name());
+ vars["filename_base"] = grpc_generator::StripProto(file->name());
+
+ printer.Print(vars, "// Generated by the gRPC protobuf plugin.\n");
+ printer.Print(vars, "// If you make any local change, they will be lost.\n");
+ printer.Print(vars, "// source: $filename$\n");
+ printer.Print(vars, "#ifndef GRPC_$filename_identifier$__INCLUDED\n");
+ printer.Print(vars, "#define GRPC_$filename_identifier$__INCLUDED\n");
+ printer.Print(vars, "\n");
+ printer.Print(vars, "#include \"$filename_base$.pb.h\"\n");
+ printer.Print(vars, "\n");
+ }
return output;
}
@@ -626,100 +628,108 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer,
grpc::string GetHeaderServices(const grpc::protobuf::FileDescriptor *file,
const Parameters &params) {
grpc::string output;
- grpc::protobuf::io::StringOutputStream output_stream(&output);
- grpc::protobuf::io::Printer printer(&output_stream, '$');
- std::map<grpc::string, grpc::string> vars;
-
- if (!params.services_namespace.empty()) {
- vars["services_namespace"] = params.services_namespace;
- printer.Print(vars, "\nnamespace $services_namespace$ {\n\n");
- }
+ {
+ // Scope the output stream so it closes and finalizes output to the string.
+ grpc::protobuf::io::StringOutputStream output_stream(&output);
+ grpc::protobuf::io::Printer printer(&output_stream, '$');
+ std::map<grpc::string, grpc::string> vars;
+
+ if (!params.services_namespace.empty()) {
+ vars["services_namespace"] = params.services_namespace;
+ printer.Print(vars, "\nnamespace $services_namespace$ {\n\n");
+ }
- for (int i = 0; i < file->service_count(); ++i) {
- PrintHeaderService(&printer, file->service(i), &vars);
- printer.Print("\n");
- }
+ for (int i = 0; i < file->service_count(); ++i) {
+ PrintHeaderService(&printer, file->service(i), &vars);
+ printer.Print("\n");
+ }
- if (!params.services_namespace.empty()) {
- printer.Print(vars, "} // namespace $services_namespace$\n\n");
+ if (!params.services_namespace.empty()) {
+ printer.Print(vars, "} // namespace $services_namespace$\n\n");
+ }
}
-
return output;
}
grpc::string GetHeaderEpilogue(const grpc::protobuf::FileDescriptor *file,
const Parameters &params) {
grpc::string output;
- grpc::protobuf::io::StringOutputStream output_stream(&output);
- grpc::protobuf::io::Printer printer(&output_stream, '$');
- std::map<grpc::string, grpc::string> vars;
-
- vars["filename"] = file->name();
- vars["filename_identifier"] = FilenameIdentifier(file->name());
-
- if (!file->package().empty()) {
- std::vector<grpc::string> parts =
- grpc_generator::tokenize(file->package(), ".");
-
- for (auto part = parts.rbegin(); part != parts.rend(); part++) {
- vars["part"] = *part;
- printer.Print(vars, "} // namespace $part$\n");
+ {
+ // Scope the output stream so it closes and finalizes output to the string.
+ grpc::protobuf::io::StringOutputStream output_stream(&output);
+ grpc::protobuf::io::Printer printer(&output_stream, '$');
+ std::map<grpc::string, grpc::string> vars;
+
+ vars["filename"] = file->name();
+ vars["filename_identifier"] = FilenameIdentifier(file->name());
+
+ if (!file->package().empty()) {
+ std::vector<grpc::string> parts =
+ grpc_generator::tokenize(file->package(), ".");
+
+ for (auto part = parts.rbegin(); part != parts.rend(); part++) {
+ vars["part"] = *part;
+ printer.Print(vars, "} // namespace $part$\n");
+ }
+ printer.Print(vars, "\n");
}
+
printer.Print(vars, "\n");
+ printer.Print(vars, "#endif // GRPC_$filename_identifier$__INCLUDED\n");
}
-
- printer.Print(vars, "\n");
- printer.Print(vars, "#endif // GRPC_$filename_identifier$__INCLUDED\n");
-
return output;
}
grpc::string GetSourcePrologue(const grpc::protobuf::FileDescriptor *file,
const Parameters &params) {
grpc::string output;
- grpc::protobuf::io::StringOutputStream output_stream(&output);
- grpc::protobuf::io::Printer printer(&output_stream, '$');
- std::map<grpc::string, grpc::string> vars;
-
- vars["filename"] = file->name();
- vars["filename_base"] = grpc_generator::StripProto(file->name());
-
- printer.Print(vars, "// Generated by the gRPC protobuf plugin.\n");
- printer.Print(vars, "// If you make any local change, they will be lost.\n");
- printer.Print(vars, "// source: $filename$\n\n");
- printer.Print(vars, "#include \"$filename_base$.pb.h\"\n");
- printer.Print(vars, "#include \"$filename_base$.grpc.pb.h\"\n");
- printer.Print(vars, "\n");
-
+ {
+ // Scope the output stream so it closes and finalizes output to the string.
+ grpc::protobuf::io::StringOutputStream output_stream(&output);
+ grpc::protobuf::io::Printer printer(&output_stream, '$');
+ std::map<grpc::string, grpc::string> vars;
+
+ vars["filename"] = file->name();
+ vars["filename_base"] = grpc_generator::StripProto(file->name());
+
+ printer.Print(vars, "// Generated by the gRPC protobuf plugin.\n");
+ printer.Print(vars, "// If you make any local change, they will be lost.\n");
+ printer.Print(vars, "// source: $filename$\n\n");
+ printer.Print(vars, "#include \"$filename_base$.pb.h\"\n");
+ printer.Print(vars, "#include \"$filename_base$.grpc.pb.h\"\n");
+ printer.Print(vars, "\n");
+ }
return output;
}
grpc::string GetSourceIncludes(const grpc::protobuf::FileDescriptor *file,
const Parameters &param) {
grpc::string output;
- grpc::protobuf::io::StringOutputStream output_stream(&output);
- grpc::protobuf::io::Printer printer(&output_stream, '$');
- std::map<grpc::string, grpc::string> vars;
-
- printer.Print(vars, "#include <grpc++/async_unary_call.h>\n");
- printer.Print(vars, "#include <grpc++/channel_interface.h>\n");
- printer.Print(vars, "#include <grpc++/impl/client_unary_call.h>\n");
- printer.Print(vars, "#include <grpc++/impl/rpc_service_method.h>\n");
- printer.Print(vars, "#include <grpc++/impl/service_type.h>\n");
- printer.Print(vars, "#include <grpc++/stream.h>\n");
-
- if (!file->package().empty()) {
- std::vector<grpc::string> parts =
- grpc_generator::tokenize(file->package(), ".");
-
- for (auto part = parts.begin(); part != parts.end(); part++) {
- vars["part"] = *part;
- printer.Print(vars, "namespace $part$ {\n");
+ {
+ // Scope the output stream so it closes and finalizes output to the string.
+ grpc::protobuf::io::StringOutputStream output_stream(&output);
+ grpc::protobuf::io::Printer printer(&output_stream, '$');
+ std::map<grpc::string, grpc::string> vars;
+
+ printer.Print(vars, "#include <grpc++/async_unary_call.h>\n");
+ printer.Print(vars, "#include <grpc++/channel_interface.h>\n");
+ printer.Print(vars, "#include <grpc++/impl/client_unary_call.h>\n");
+ printer.Print(vars, "#include <grpc++/impl/rpc_service_method.h>\n");
+ printer.Print(vars, "#include <grpc++/impl/service_type.h>\n");
+ printer.Print(vars, "#include <grpc++/stream.h>\n");
+
+ if (!file->package().empty()) {
+ std::vector<grpc::string> parts =
+ grpc_generator::tokenize(file->package(), ".");
+
+ for (auto part = parts.begin(); part != parts.end(); part++) {
+ vars["part"] = *part;
+ printer.Print(vars, "namespace $part$ {\n");
+ }
}
- }
-
- printer.Print(vars, "\n");
+ printer.Print(vars, "\n");
+ }
return output;
}
@@ -1077,26 +1087,29 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
grpc::string GetSourceServices(const grpc::protobuf::FileDescriptor *file,
const Parameters &params) {
grpc::string output;
- grpc::protobuf::io::StringOutputStream output_stream(&output);
- grpc::protobuf::io::Printer printer(&output_stream, '$');
- std::map<grpc::string, grpc::string> vars;
- // Package string is empty or ends with a dot. It is used to fully qualify
- // method names.
- vars["Package"] = file->package();
- if (!file->package().empty()) {
- vars["Package"].append(".");
- }
- if (!params.services_namespace.empty()) {
- vars["ns"] = params.services_namespace + "::";
- vars["prefix"] = params.services_namespace;
- } else {
- vars["ns"] = "";
- vars["prefix"] = "";
- }
+ {
+ // Scope the output stream so it closes and finalizes output to the string.
+ grpc::protobuf::io::StringOutputStream output_stream(&output);
+ grpc::protobuf::io::Printer printer(&output_stream, '$');
+ std::map<grpc::string, grpc::string> vars;
+ // Package string is empty or ends with a dot. It is used to fully qualify
+ // method names.
+ vars["Package"] = file->package();
+ if (!file->package().empty()) {
+ vars["Package"].append(".");
+ }
+ if (!params.services_namespace.empty()) {
+ vars["ns"] = params.services_namespace + "::";
+ vars["prefix"] = params.services_namespace;
+ } else {
+ vars["ns"] = "";
+ vars["prefix"] = "";
+ }
- for (int i = 0; i < file->service_count(); ++i) {
- PrintSourceService(&printer, file->service(i), &vars);
- printer.Print("\n");
+ for (int i = 0; i < file->service_count(); ++i) {
+ PrintSourceService(&printer, file->service(i), &vars);
+ printer.Print("\n");
+ }
}
return output;
}
diff --git a/src/compiler/csharp_generator.cc b/src/compiler/csharp_generator.cc
index 5dd078b303..ccb0b688b6 100644
--- a/src/compiler/csharp_generator.cc
+++ b/src/compiler/csharp_generator.cc
@@ -474,35 +474,39 @@ void GenerateService(Printer* out, const ServiceDescriptor *service) {
grpc::string GetServices(const FileDescriptor *file) {
grpc::string output;
- StringOutputStream output_stream(&output);
- Printer out(&output_stream, '$');
+ {
+ // Scope the output stream so it closes and finalizes output to the string.
- // Don't write out any output if there no services, to avoid empty service
- // files being generated for proto files that don't declare any.
- if (file->service_count() == 0) {
- return output;
- }
+ StringOutputStream output_stream(&output);
+ Printer out(&output_stream, '$');
+
+ // Don't write out any output if there no services, to avoid empty service
+ // files being generated for proto files that don't declare any.
+ if (file->service_count() == 0) {
+ return output;
+ }
- // Write out a file header.
- out.Print("// Generated by the protocol buffer compiler. DO NOT EDIT!\n");
- out.Print("// source: $filename$\n", "filename", file->name());
- out.Print("#region Designer generated code\n");
- out.Print("\n");
- out.Print("using System;\n");
- out.Print("using System.Threading;\n");
- out.Print("using System.Threading.Tasks;\n");
- out.Print("using Grpc.Core;\n");
- // TODO(jtattermusch): add using for protobuf message classes
- out.Print("\n");
-
- out.Print("namespace $namespace$ {\n", "namespace", GetFileNamespace(file));
- out.Indent();
- for (int i = 0; i < file->service_count(); i++) {
- GenerateService(&out, file->service(i));
+ // Write out a file header.
+ out.Print("// Generated by the protocol buffer compiler. DO NOT EDIT!\n");
+ out.Print("// source: $filename$\n", "filename", file->name());
+ out.Print("#region Designer generated code\n");
+ out.Print("\n");
+ out.Print("using System;\n");
+ out.Print("using System.Threading;\n");
+ out.Print("using System.Threading.Tasks;\n");
+ out.Print("using Grpc.Core;\n");
+ // TODO(jtattermusch): add using for protobuf message classes
+ out.Print("\n");
+
+ out.Print("namespace $namespace$ {\n", "namespace", GetFileNamespace(file));
+ out.Indent();
+ for (int i = 0; i < file->service_count(); i++) {
+ GenerateService(&out, file->service(i));
+ }
+ out.Outdent();
+ out.Print("}\n");
+ out.Print("#endregion\n");
}
- out.Outdent();
- out.Print("}\n");
- out.Print("#endregion\n");
return output;
}
diff --git a/src/compiler/objective_c_generator.cc b/src/compiler/objective_c_generator.cc
index 8f35302bee..1bf0254f5b 100644
--- a/src/compiler/objective_c_generator.cc
+++ b/src/compiler/objective_c_generator.cc
@@ -176,65 +176,71 @@ void PrintMethodImplementations(Printer *printer,
string GetHeader(const ServiceDescriptor *service, const string prefix) {
string output;
- grpc::protobuf::io::StringOutputStream output_stream(&output);
- Printer printer(&output_stream, '$');
+ {
+ // Scope the output stream so it closes and finalizes output to the string.
+ grpc::protobuf::io::StringOutputStream output_stream(&output);
+ Printer printer(&output_stream, '$');
- printer.Print("@protocol GRXWriteable;\n");
- printer.Print("@protocol GRXWriter;\n\n");
-
- map<string, string> vars = {{"service_name", service->name()},
- {"prefix", prefix}};
- printer.Print(vars, "@protocol $prefix$$service_name$ <NSObject>\n\n");
-
- for (int i = 0; i < service->method_count(); i++) {
- PrintMethodDeclarations(&printer, service->method(i), vars);
+ printer.Print("@protocol GRXWriteable;\n");
+ printer.Print("@protocol GRXWriter;\n\n");
+
+ map<string, string> vars = {{"service_name", service->name()},
+ {"prefix", prefix}};
+ printer.Print(vars, "@protocol $prefix$$service_name$ <NSObject>\n\n");
+
+ for (int i = 0; i < service->method_count(); i++) {
+ PrintMethodDeclarations(&printer, service->method(i), vars);
+ }
+ printer.Print("@end\n\n");
+
+ printer.Print("// Basic service implementation, over gRPC, that only does"
+ " marshalling and parsing.\n");
+ printer.Print(vars, "@interface $prefix$$service_name$ :"
+ " ProtoService<$prefix$$service_name$>\n");
+ printer.Print("- (instancetype)initWithHost:(NSString *)host"
+ " NS_DESIGNATED_INITIALIZER;\n");
+ printer.Print("@end\n");
}
- printer.Print("@end\n\n");
-
- printer.Print("// Basic service implementation, over gRPC, that only does"
- " marshalling and parsing.\n");
- printer.Print(vars, "@interface $prefix$$service_name$ :"
- " ProtoService<$prefix$$service_name$>\n");
- printer.Print("- (instancetype)initWithHost:(NSString *)host"
- " NS_DESIGNATED_INITIALIZER;\n");
- printer.Print("@end\n");
return output;
}
string GetSource(const ServiceDescriptor *service, const string prefix) {
string output;
- grpc::protobuf::io::StringOutputStream output_stream(&output);
- Printer printer(&output_stream, '$');
-
- map<string, string> vars = {{"service_name", service->name()},
- {"package", service->file()->package()},
- {"prefix", prefix}};
-
- printer.Print(vars,
- "static NSString *const kPackageName = @\"$package$\";\n");
- printer.Print(vars,
- "static NSString *const kServiceName = @\"$service_name$\";\n\n");
-
- printer.Print(vars, "@implementation $prefix$$service_name$\n\n");
-
- printer.Print("// Designated initializer\n");
- printer.Print("- (instancetype)initWithHost:(NSString *)host {\n");
- printer.Print(" return (self = [super initWithHost:host"
- " packageName:kPackageName serviceName:kServiceName]);\n");
- printer.Print("}\n\n");
- printer.Print("// Override superclass initializer to disallow different"
- " package and service names.\n");
- printer.Print("- (instancetype)initWithHost:(NSString *)host\n");
- printer.Print(" packageName:(NSString *)packageName\n");
- printer.Print(" serviceName:(NSString *)serviceName {\n");
- printer.Print(" return [self initWithHost:host];\n");
- printer.Print("}\n\n\n");
-
- for (int i = 0; i < service->method_count(); i++) {
- PrintMethodImplementations(&printer, service->method(i), vars);
- }
+ {
+ // Scope the output stream so it closes and finalizes output to the string.
+ grpc::protobuf::io::StringOutputStream output_stream(&output);
+ Printer printer(&output_stream, '$');
+
+ map<string, string> vars = {{"service_name", service->name()},
+ {"package", service->file()->package()},
+ {"prefix", prefix}};
- printer.Print("@end\n");
+ printer.Print(vars,
+ "static NSString *const kPackageName = @\"$package$\";\n");
+ printer.Print(vars,
+ "static NSString *const kServiceName = @\"$service_name$\";\n\n");
+
+ printer.Print(vars, "@implementation $prefix$$service_name$\n\n");
+
+ printer.Print("// Designated initializer\n");
+ printer.Print("- (instancetype)initWithHost:(NSString *)host {\n");
+ printer.Print(" return (self = [super initWithHost:host"
+ " packageName:kPackageName serviceName:kServiceName]);\n");
+ printer.Print("}\n\n");
+ printer.Print("// Override superclass initializer to disallow different"
+ " package and service names.\n");
+ printer.Print("- (instancetype)initWithHost:(NSString *)host\n");
+ printer.Print(" packageName:(NSString *)packageName\n");
+ printer.Print(" serviceName:(NSString *)serviceName {\n");
+ printer.Print(" return [self initWithHost:host];\n");
+ printer.Print("}\n\n\n");
+
+ for (int i = 0; i < service->method_count(); i++) {
+ PrintMethodImplementations(&printer, service->method(i), vars);
+ }
+
+ printer.Print("@end\n");
+ }
return output;
}
diff --git a/src/compiler/ruby_generator.cc b/src/compiler/ruby_generator.cc
index a0bb92848b..299137519f 100644
--- a/src/compiler/ruby_generator.cc
+++ b/src/compiler/ruby_generator.cc
@@ -119,49 +119,52 @@ void PrintService(const ServiceDescriptor *service, const grpc::string &package,
grpc::string GetServices(const FileDescriptor *file) {
grpc::string output;
- StringOutputStream output_stream(&output);
- Printer out(&output_stream, '$');
-
- // Don't write out any output if there no services, to avoid empty service
- // files being generated for proto files that don't declare any.
- if (file->service_count() == 0) {
- return output;
- }
-
- // Write out a file header.
- std::map<grpc::string, grpc::string> header_comment_vars = ListToDict(
- {"file.name", file->name(), "file.package", file->package(), });
- out.Print("# Generated by the protocol buffer compiler. DO NOT EDIT!\n");
- out.Print(header_comment_vars,
- "# Source: $file.name$ for package '$file.package$'\n");
-
- out.Print("\n");
- out.Print("require 'grpc'\n");
- // Write out require statemment to import the separately generated file
- // that defines the messages used by the service. This is generated by the
- // main ruby plugin.
- std::map<grpc::string, grpc::string> dep_vars =
- ListToDict({"dep.name", MessagesRequireName(file), });
- out.Print(dep_vars, "require '$dep.name$'\n");
-
- // Write out services within the modules
- out.Print("\n");
- std::vector<grpc::string> modules = Split(file->package(), '.');
- for (size_t i = 0; i < modules.size(); ++i) {
- std::map<grpc::string, grpc::string> module_vars =
- ListToDict({"module.name", CapitalizeFirst(modules[i]), });
- out.Print(module_vars, "module $module.name$\n");
- out.Indent();
+ {
+ // Scope the output stream so it closes and finalizes output to the string.
+
+ StringOutputStream output_stream(&output);
+ Printer out(&output_stream, '$');
+
+ // Don't write out any output if there no services, to avoid empty service
+ // files being generated for proto files that don't declare any.
+ if (file->service_count() == 0) {
+ return output;
+ }
+
+ // Write out a file header.
+ std::map<grpc::string, grpc::string> header_comment_vars = ListToDict(
+ {"file.name", file->name(), "file.package", file->package(), });
+ out.Print("# Generated by the protocol buffer compiler. DO NOT EDIT!\n");
+ out.Print(header_comment_vars,
+ "# Source: $file.name$ for package '$file.package$'\n");
+
+ out.Print("\n");
+ out.Print("require 'grpc'\n");
+ // Write out require statemment to import the separately generated file
+ // that defines the messages used by the service. This is generated by the
+ // main ruby plugin.
+ std::map<grpc::string, grpc::string> dep_vars =
+ ListToDict({"dep.name", MessagesRequireName(file), });
+ out.Print(dep_vars, "require '$dep.name$'\n");
+
+ // Write out services within the modules
+ out.Print("\n");
+ std::vector<grpc::string> modules = Split(file->package(), '.');
+ for (size_t i = 0; i < modules.size(); ++i) {
+ std::map<grpc::string, grpc::string> module_vars =
+ ListToDict({"module.name", CapitalizeFirst(modules[i]), });
+ out.Print(module_vars, "module $module.name$\n");
+ out.Indent();
+ }
+ for (int i = 0; i < file->service_count(); ++i) {
+ auto service = file->service(i);
+ PrintService(service, file->package(), &out);
+ }
+ for (size_t i = 0; i < modules.size(); ++i) {
+ out.Outdent();
+ out.Print("end\n");
+ }
}
- for (int i = 0; i < file->service_count(); ++i) {
- auto service = file->service(i);
- PrintService(service, file->package(), &out);
- }
- for (size_t i = 0; i < modules.size(); ++i) {
- out.Outdent();
- out.Print("end\n");
- }
-
return output;
}
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 78f8d06d89..42e242ae81 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -144,6 +144,7 @@ static void handle_op_after_cancellation(grpc_call_element *elem,
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
if (op->send_ops) {
+ grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
op->on_done_send(op->send_user_data, 0);
}
if (op->recv_ops) {
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 9c8133d2d4..63615ea25f 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -96,8 +96,10 @@ static grpc_fd *alloc_fd(int fd) {
gpr_atm_rel_store(&r->writest, NOT_READY);
gpr_atm_rel_store(&r->shutdown, 0);
r->fd = fd;
- r->watcher_root.next = r->watcher_root.prev = &r->watcher_root;
+ r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
+ &r->inactive_watcher_root;
r->freelist_next = NULL;
+ r->read_watcher = r->write_watcher = NULL;
return r;
}
@@ -147,14 +149,34 @@ int grpc_fd_is_orphaned(grpc_fd *fd) {
return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
}
-static void wake_watchers(grpc_fd *fd) {
- grpc_fd_watcher *watcher;
+static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
+ if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
+ grpc_pollset_force_kick(fd->inactive_watcher_root.next->pollset);
+ } else if (fd->read_watcher) {
+ grpc_pollset_force_kick(fd->read_watcher->pollset);
+ } else if (fd->write_watcher) {
+ grpc_pollset_force_kick(fd->write_watcher->pollset);
+ }
+}
+
+static void maybe_wake_one_watcher(grpc_fd *fd) {
gpr_mu_lock(&fd->watcher_mu);
- for (watcher = fd->watcher_root.next; watcher != &fd->watcher_root;
- watcher = watcher->next) {
+ maybe_wake_one_watcher_locked(fd);
+ gpr_mu_unlock(&fd->watcher_mu);
+}
+
+static void wake_all_watchers(grpc_fd *fd) {
+ grpc_fd_watcher *watcher;
+ for (watcher = fd->inactive_watcher_root.next;
+ watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
grpc_pollset_force_kick(watcher->pollset);
}
- gpr_mu_unlock(&fd->watcher_mu);
+ if (fd->read_watcher) {
+ grpc_pollset_force_kick(fd->read_watcher->pollset);
+ }
+ if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
+ grpc_pollset_force_kick(fd->write_watcher->pollset);
+ }
}
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
@@ -162,7 +184,7 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
fd->on_done_user_data = user_data;
shutdown(fd->fd, SHUT_RDWR);
ref_by(fd, 1); /* remove active status, but keep referenced */
- wake_watchers(fd);
+ wake_all_watchers(fd);
unref_by(fd, 2); /* drop the reference */
}
@@ -204,7 +226,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
set_ready call. NOTE: we don't have an ABA problem here,
since we should never have concurrent calls to the same
notify_on function. */
- wake_watchers(fd);
+ maybe_wake_one_watcher(fd);
return;
}
/* swap was unsuccessful due to an intervening set_ready call.
@@ -290,29 +312,65 @@ void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure) {
gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
gpr_uint32 read_mask, gpr_uint32 write_mask,
grpc_fd_watcher *watcher) {
+ gpr_uint32 mask = 0;
/* keep track of pollers that have requested our events, in case they change
*/
grpc_fd_ref(fd);
gpr_mu_lock(&fd->watcher_mu);
- watcher->next = &fd->watcher_root;
- watcher->prev = watcher->next->prev;
- watcher->next->prev = watcher->prev->next = watcher;
+ /* if there is nobody polling for read, but we need to, then start doing so */
+ if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) {
+ fd->read_watcher = watcher;
+ mask |= read_mask;
+ }
+ /* if there is nobody polling for write, but we need to, then start doing so
+ */
+ if (!fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) {
+ fd->write_watcher = watcher;
+ mask |= write_mask;
+ }
+ /* if not polling, remember this watcher in case we need someone to later */
+ if (mask == 0) {
+ watcher->next = &fd->inactive_watcher_root;
+ watcher->prev = watcher->next->prev;
+ watcher->next->prev = watcher->prev->next = watcher;
+ }
watcher->pollset = pollset;
watcher->fd = fd;
gpr_mu_unlock(&fd->watcher_mu);
- return (gpr_atm_acq_load(&fd->readst) != READY ? read_mask : 0) |
- (gpr_atm_acq_load(&fd->writest) != READY ? write_mask : 0);
+ return mask;
}
-void grpc_fd_end_poll(grpc_fd_watcher *watcher) {
- gpr_mu_lock(&watcher->fd->watcher_mu);
- watcher->next->prev = watcher->prev;
- watcher->prev->next = watcher->next;
- gpr_mu_unlock(&watcher->fd->watcher_mu);
+void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) {
+ int was_polling = 0;
+ int kick = 0;
+ grpc_fd *fd = watcher->fd;
+
+ gpr_mu_lock(&fd->watcher_mu);
+ if (watcher == fd->read_watcher) {
+ /* remove read watcher, kick if we still need a read */
+ was_polling = 1;
+ kick = kick || !got_read;
+ fd->read_watcher = NULL;
+ }
+ if (watcher == fd->write_watcher) {
+ /* remove write watcher, kick if we still need a write */
+ was_polling = 1;
+ kick = kick || !got_write;
+ fd->write_watcher = NULL;
+ }
+ if (!was_polling) {
+ /* remove from inactive list */
+ watcher->next->prev = watcher->prev;
+ watcher->prev->next = watcher->next;
+ }
+ if (kick) {
+ maybe_wake_one_watcher_locked(fd);
+ }
+ gpr_mu_unlock(&fd->watcher_mu);
- grpc_fd_unref(watcher->fd);
+ grpc_fd_unref(fd);
}
void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) {
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index be21f2b55f..cfc533b7f5 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -66,8 +66,32 @@ struct grpc_fd {
gpr_mu set_state_mu;
gpr_atm shutdown;
+ /* The watcher list.
+
+ The following watcher related fields are protected by watcher_mu.
+
+ An fd_watcher is an ephemeral object created when an fd wants to
+ begin polling, and destroyed after the poll.
+
+ It denotes the fd's interest in whether to read poll or write poll
+ or both or neither on this fd.
+
+ If a watcher is asked to poll for reads or writes, the read_watcher
+ or write_watcher fields are set respectively. A watcher may be asked
+ to poll for both, in which case both fields will be set.
+
+ read_watcher and write_watcher may be NULL if no watcher has been
+ asked to poll for reads or writes.
+
+ If an fd_watcher is not asked to poll for reads or writes, it's added
+ to a linked list of inactive watchers, rooted at inactive_watcher_root.
+ If at a later time there becomes need of a poller to poll, one of
+ the inactive pollers may be kicked out of their poll loops to take
+ that responsibility. */
gpr_mu watcher_mu;
- grpc_fd_watcher watcher_root;
+ grpc_fd_watcher inactive_watcher_root;
+ grpc_fd_watcher *read_watcher;
+ grpc_fd_watcher *write_watcher;
gpr_atm readst;
gpr_atm writest;
@@ -103,7 +127,7 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
gpr_uint32 read_mask, gpr_uint32 write_mask,
grpc_fd_watcher *rec);
/* Complete polling previously started with grpc_fd_begin_poll */
-void grpc_fd_end_poll(grpc_fd_watcher *rec);
+void grpc_fd_end_poll(grpc_fd_watcher *rec, int got_read, int got_write);
/* Return 1 if this fd is orphaned, 0 otherwise */
int grpc_fd_is_orphaned(grpc_fd *fd);
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index 25b7cfda1a..4d36107ab0 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -98,7 +98,7 @@ static void end_polling(grpc_pollset *pollset) {
pollset_hdr *h;
h = pollset->data.ptr;
for (i = 1; i < h->pfd_count; i++) {
- grpc_fd_end_poll(&h->watchers[i]);
+ grpc_fd_end_poll(&h->watchers[i], h->pfds[i].revents & POLLIN, h->pfds[i].revents & POLLOUT);
}
}
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index f496ac5bfa..826c792990 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -420,10 +420,12 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset,
pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher);
- r = poll(pfd, GPR_ARRAY_SIZE(pfd), timeout);
+ /* poll fd count (argument 2) is shortened by one if we have no events
+ to poll on - such that it only includes the kicker */
+ r = poll(pfd, GPR_ARRAY_SIZE(pfd) - (pfd[1].events == 0), timeout);
GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
- grpc_fd_end_poll(&fd_watcher);
+ grpc_fd_end_poll(&fd_watcher, pfd[1].revents & POLLIN, pfd[1].revents & POLLOUT);
if (r < 0) {
if (errno != EINTR) {
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 3186292a02..a3b0b2672b 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -55,6 +55,7 @@ static void lame_start_transport_op(grpc_call_element *elem,
channel_data *chand = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
if (op->send_ops) {
+ grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
op->on_done_send(op->send_user_data, 0);
}
if (op->recv_ops) {
diff --git a/src/php/bin/run_gen_code_test.sh b/src/php/bin/run_gen_code_test.sh
index 79abbe6cf8..4882a2b846 100755
--- a/src/php/bin/run_gen_code_test.sh
+++ b/src/php/bin/run_gen_code_test.sh
@@ -29,9 +29,9 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
cd $(dirname $0)
-GRPC_TEST_HOST=localhost:7070 php -d extension_dir=../ext/grpc/modules/ \
+GRPC_TEST_HOST=localhost:50051 php -d extension_dir=../ext/grpc/modules/ \
-d extension=grpc.so /usr/local/bin/phpunit -v --debug --strict \
../tests/generated_code/GeneratedCodeTest.php
-GRPC_TEST_HOST=localhost:7070 php -d extension_dir=../ext/grpc/modules/ \
+GRPC_TEST_HOST=localhost:50051 php -d extension_dir=../ext/grpc/modules/ \
-d extension=grpc.so /usr/local/bin/phpunit -v --debug --strict \
../tests/generated_code/GeneratedCodeWithCallbackTest.php