diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/compiler/cpp_generator.cc | 221 | ||||
-rw-r--r-- | src/compiler/csharp_generator.cc | 56 | ||||
-rw-r--r-- | src/compiler/objective_c_generator.cc | 108 | ||||
-rw-r--r-- | src/compiler/ruby_generator.cc | 87 | ||||
-rw-r--r-- | src/core/channel/client_channel.c | 1 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.c | 96 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.h | 28 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_poll_posix.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 6 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 1 | ||||
-rwxr-xr-x | src/php/bin/run_gen_code_test.sh | 4 |
11 files changed, 361 insertions, 249 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index acac1475f7..b0d2b5d229 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -86,23 +86,25 @@ grpc::string FilenameIdentifier(const grpc::string &filename) { grpc::string GetHeaderPrologue(const grpc::protobuf::FileDescriptor *file, const Parameters ¶ms) { grpc::string output; - grpc::protobuf::io::StringOutputStream output_stream(&output); - grpc::protobuf::io::Printer printer(&output_stream, '$'); - std::map<grpc::string, grpc::string> vars; - - vars["filename"] = file->name(); - vars["filename_identifier"] = FilenameIdentifier(file->name()); - vars["filename_base"] = grpc_generator::StripProto(file->name()); - - printer.Print(vars, "// Generated by the gRPC protobuf plugin.\n"); - printer.Print(vars, "// If you make any local change, they will be lost.\n"); - printer.Print(vars, "// source: $filename$\n"); - printer.Print(vars, "#ifndef GRPC_$filename_identifier$__INCLUDED\n"); - printer.Print(vars, "#define GRPC_$filename_identifier$__INCLUDED\n"); - printer.Print(vars, "\n"); - printer.Print(vars, "#include \"$filename_base$.pb.h\"\n"); - printer.Print(vars, "\n"); - + { + // Scope the output stream so it closes and finalizes output to the string. + grpc::protobuf::io::StringOutputStream output_stream(&output); + grpc::protobuf::io::Printer printer(&output_stream, '$'); + std::map<grpc::string, grpc::string> vars; + + vars["filename"] = file->name(); + vars["filename_identifier"] = FilenameIdentifier(file->name()); + vars["filename_base"] = grpc_generator::StripProto(file->name()); + + printer.Print(vars, "// Generated by the gRPC protobuf plugin.\n"); + printer.Print(vars, "// If you make any local change, they will be lost.\n"); + printer.Print(vars, "// source: $filename$\n"); + printer.Print(vars, "#ifndef GRPC_$filename_identifier$__INCLUDED\n"); + printer.Print(vars, "#define GRPC_$filename_identifier$__INCLUDED\n"); + printer.Print(vars, "\n"); + printer.Print(vars, "#include \"$filename_base$.pb.h\"\n"); + printer.Print(vars, "\n"); + } return output; } @@ -626,100 +628,108 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer, grpc::string GetHeaderServices(const grpc::protobuf::FileDescriptor *file, const Parameters ¶ms) { grpc::string output; - grpc::protobuf::io::StringOutputStream output_stream(&output); - grpc::protobuf::io::Printer printer(&output_stream, '$'); - std::map<grpc::string, grpc::string> vars; - - if (!params.services_namespace.empty()) { - vars["services_namespace"] = params.services_namespace; - printer.Print(vars, "\nnamespace $services_namespace$ {\n\n"); - } + { + // Scope the output stream so it closes and finalizes output to the string. + grpc::protobuf::io::StringOutputStream output_stream(&output); + grpc::protobuf::io::Printer printer(&output_stream, '$'); + std::map<grpc::string, grpc::string> vars; + + if (!params.services_namespace.empty()) { + vars["services_namespace"] = params.services_namespace; + printer.Print(vars, "\nnamespace $services_namespace$ {\n\n"); + } - for (int i = 0; i < file->service_count(); ++i) { - PrintHeaderService(&printer, file->service(i), &vars); - printer.Print("\n"); - } + for (int i = 0; i < file->service_count(); ++i) { + PrintHeaderService(&printer, file->service(i), &vars); + printer.Print("\n"); + } - if (!params.services_namespace.empty()) { - printer.Print(vars, "} // namespace $services_namespace$\n\n"); + if (!params.services_namespace.empty()) { + printer.Print(vars, "} // namespace $services_namespace$\n\n"); + } } - return output; } grpc::string GetHeaderEpilogue(const grpc::protobuf::FileDescriptor *file, const Parameters ¶ms) { grpc::string output; - grpc::protobuf::io::StringOutputStream output_stream(&output); - grpc::protobuf::io::Printer printer(&output_stream, '$'); - std::map<grpc::string, grpc::string> vars; - - vars["filename"] = file->name(); - vars["filename_identifier"] = FilenameIdentifier(file->name()); - - if (!file->package().empty()) { - std::vector<grpc::string> parts = - grpc_generator::tokenize(file->package(), "."); - - for (auto part = parts.rbegin(); part != parts.rend(); part++) { - vars["part"] = *part; - printer.Print(vars, "} // namespace $part$\n"); + { + // Scope the output stream so it closes and finalizes output to the string. + grpc::protobuf::io::StringOutputStream output_stream(&output); + grpc::protobuf::io::Printer printer(&output_stream, '$'); + std::map<grpc::string, grpc::string> vars; + + vars["filename"] = file->name(); + vars["filename_identifier"] = FilenameIdentifier(file->name()); + + if (!file->package().empty()) { + std::vector<grpc::string> parts = + grpc_generator::tokenize(file->package(), "."); + + for (auto part = parts.rbegin(); part != parts.rend(); part++) { + vars["part"] = *part; + printer.Print(vars, "} // namespace $part$\n"); + } + printer.Print(vars, "\n"); } + printer.Print(vars, "\n"); + printer.Print(vars, "#endif // GRPC_$filename_identifier$__INCLUDED\n"); } - - printer.Print(vars, "\n"); - printer.Print(vars, "#endif // GRPC_$filename_identifier$__INCLUDED\n"); - return output; } grpc::string GetSourcePrologue(const grpc::protobuf::FileDescriptor *file, const Parameters ¶ms) { grpc::string output; - grpc::protobuf::io::StringOutputStream output_stream(&output); - grpc::protobuf::io::Printer printer(&output_stream, '$'); - std::map<grpc::string, grpc::string> vars; - - vars["filename"] = file->name(); - vars["filename_base"] = grpc_generator::StripProto(file->name()); - - printer.Print(vars, "// Generated by the gRPC protobuf plugin.\n"); - printer.Print(vars, "// If you make any local change, they will be lost.\n"); - printer.Print(vars, "// source: $filename$\n\n"); - printer.Print(vars, "#include \"$filename_base$.pb.h\"\n"); - printer.Print(vars, "#include \"$filename_base$.grpc.pb.h\"\n"); - printer.Print(vars, "\n"); - + { + // Scope the output stream so it closes and finalizes output to the string. + grpc::protobuf::io::StringOutputStream output_stream(&output); + grpc::protobuf::io::Printer printer(&output_stream, '$'); + std::map<grpc::string, grpc::string> vars; + + vars["filename"] = file->name(); + vars["filename_base"] = grpc_generator::StripProto(file->name()); + + printer.Print(vars, "// Generated by the gRPC protobuf plugin.\n"); + printer.Print(vars, "// If you make any local change, they will be lost.\n"); + printer.Print(vars, "// source: $filename$\n\n"); + printer.Print(vars, "#include \"$filename_base$.pb.h\"\n"); + printer.Print(vars, "#include \"$filename_base$.grpc.pb.h\"\n"); + printer.Print(vars, "\n"); + } return output; } grpc::string GetSourceIncludes(const grpc::protobuf::FileDescriptor *file, const Parameters ¶m) { grpc::string output; - grpc::protobuf::io::StringOutputStream output_stream(&output); - grpc::protobuf::io::Printer printer(&output_stream, '$'); - std::map<grpc::string, grpc::string> vars; - - printer.Print(vars, "#include <grpc++/async_unary_call.h>\n"); - printer.Print(vars, "#include <grpc++/channel_interface.h>\n"); - printer.Print(vars, "#include <grpc++/impl/client_unary_call.h>\n"); - printer.Print(vars, "#include <grpc++/impl/rpc_service_method.h>\n"); - printer.Print(vars, "#include <grpc++/impl/service_type.h>\n"); - printer.Print(vars, "#include <grpc++/stream.h>\n"); - - if (!file->package().empty()) { - std::vector<grpc::string> parts = - grpc_generator::tokenize(file->package(), "."); - - for (auto part = parts.begin(); part != parts.end(); part++) { - vars["part"] = *part; - printer.Print(vars, "namespace $part$ {\n"); + { + // Scope the output stream so it closes and finalizes output to the string. + grpc::protobuf::io::StringOutputStream output_stream(&output); + grpc::protobuf::io::Printer printer(&output_stream, '$'); + std::map<grpc::string, grpc::string> vars; + + printer.Print(vars, "#include <grpc++/async_unary_call.h>\n"); + printer.Print(vars, "#include <grpc++/channel_interface.h>\n"); + printer.Print(vars, "#include <grpc++/impl/client_unary_call.h>\n"); + printer.Print(vars, "#include <grpc++/impl/rpc_service_method.h>\n"); + printer.Print(vars, "#include <grpc++/impl/service_type.h>\n"); + printer.Print(vars, "#include <grpc++/stream.h>\n"); + + if (!file->package().empty()) { + std::vector<grpc::string> parts = + grpc_generator::tokenize(file->package(), "."); + + for (auto part = parts.begin(); part != parts.end(); part++) { + vars["part"] = *part; + printer.Print(vars, "namespace $part$ {\n"); + } } - } - - printer.Print(vars, "\n"); + printer.Print(vars, "\n"); + } return output; } @@ -1077,26 +1087,29 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, grpc::string GetSourceServices(const grpc::protobuf::FileDescriptor *file, const Parameters ¶ms) { grpc::string output; - grpc::protobuf::io::StringOutputStream output_stream(&output); - grpc::protobuf::io::Printer printer(&output_stream, '$'); - std::map<grpc::string, grpc::string> vars; - // Package string is empty or ends with a dot. It is used to fully qualify - // method names. - vars["Package"] = file->package(); - if (!file->package().empty()) { - vars["Package"].append("."); - } - if (!params.services_namespace.empty()) { - vars["ns"] = params.services_namespace + "::"; - vars["prefix"] = params.services_namespace; - } else { - vars["ns"] = ""; - vars["prefix"] = ""; - } + { + // Scope the output stream so it closes and finalizes output to the string. + grpc::protobuf::io::StringOutputStream output_stream(&output); + grpc::protobuf::io::Printer printer(&output_stream, '$'); + std::map<grpc::string, grpc::string> vars; + // Package string is empty or ends with a dot. It is used to fully qualify + // method names. + vars["Package"] = file->package(); + if (!file->package().empty()) { + vars["Package"].append("."); + } + if (!params.services_namespace.empty()) { + vars["ns"] = params.services_namespace + "::"; + vars["prefix"] = params.services_namespace; + } else { + vars["ns"] = ""; + vars["prefix"] = ""; + } - for (int i = 0; i < file->service_count(); ++i) { - PrintSourceService(&printer, file->service(i), &vars); - printer.Print("\n"); + for (int i = 0; i < file->service_count(); ++i) { + PrintSourceService(&printer, file->service(i), &vars); + printer.Print("\n"); + } } return output; } diff --git a/src/compiler/csharp_generator.cc b/src/compiler/csharp_generator.cc index 5dd078b303..ccb0b688b6 100644 --- a/src/compiler/csharp_generator.cc +++ b/src/compiler/csharp_generator.cc @@ -474,35 +474,39 @@ void GenerateService(Printer* out, const ServiceDescriptor *service) { grpc::string GetServices(const FileDescriptor *file) { grpc::string output; - StringOutputStream output_stream(&output); - Printer out(&output_stream, '$'); + { + // Scope the output stream so it closes and finalizes output to the string. - // Don't write out any output if there no services, to avoid empty service - // files being generated for proto files that don't declare any. - if (file->service_count() == 0) { - return output; - } + StringOutputStream output_stream(&output); + Printer out(&output_stream, '$'); + + // Don't write out any output if there no services, to avoid empty service + // files being generated for proto files that don't declare any. + if (file->service_count() == 0) { + return output; + } - // Write out a file header. - out.Print("// Generated by the protocol buffer compiler. DO NOT EDIT!\n"); - out.Print("// source: $filename$\n", "filename", file->name()); - out.Print("#region Designer generated code\n"); - out.Print("\n"); - out.Print("using System;\n"); - out.Print("using System.Threading;\n"); - out.Print("using System.Threading.Tasks;\n"); - out.Print("using Grpc.Core;\n"); - // TODO(jtattermusch): add using for protobuf message classes - out.Print("\n"); - - out.Print("namespace $namespace$ {\n", "namespace", GetFileNamespace(file)); - out.Indent(); - for (int i = 0; i < file->service_count(); i++) { - GenerateService(&out, file->service(i)); + // Write out a file header. + out.Print("// Generated by the protocol buffer compiler. DO NOT EDIT!\n"); + out.Print("// source: $filename$\n", "filename", file->name()); + out.Print("#region Designer generated code\n"); + out.Print("\n"); + out.Print("using System;\n"); + out.Print("using System.Threading;\n"); + out.Print("using System.Threading.Tasks;\n"); + out.Print("using Grpc.Core;\n"); + // TODO(jtattermusch): add using for protobuf message classes + out.Print("\n"); + + out.Print("namespace $namespace$ {\n", "namespace", GetFileNamespace(file)); + out.Indent(); + for (int i = 0; i < file->service_count(); i++) { + GenerateService(&out, file->service(i)); + } + out.Outdent(); + out.Print("}\n"); + out.Print("#endregion\n"); } - out.Outdent(); - out.Print("}\n"); - out.Print("#endregion\n"); return output; } diff --git a/src/compiler/objective_c_generator.cc b/src/compiler/objective_c_generator.cc index 8f35302bee..1bf0254f5b 100644 --- a/src/compiler/objective_c_generator.cc +++ b/src/compiler/objective_c_generator.cc @@ -176,65 +176,71 @@ void PrintMethodImplementations(Printer *printer, string GetHeader(const ServiceDescriptor *service, const string prefix) { string output; - grpc::protobuf::io::StringOutputStream output_stream(&output); - Printer printer(&output_stream, '$'); + { + // Scope the output stream so it closes and finalizes output to the string. + grpc::protobuf::io::StringOutputStream output_stream(&output); + Printer printer(&output_stream, '$'); - printer.Print("@protocol GRXWriteable;\n"); - printer.Print("@protocol GRXWriter;\n\n"); - - map<string, string> vars = {{"service_name", service->name()}, - {"prefix", prefix}}; - printer.Print(vars, "@protocol $prefix$$service_name$ <NSObject>\n\n"); - - for (int i = 0; i < service->method_count(); i++) { - PrintMethodDeclarations(&printer, service->method(i), vars); + printer.Print("@protocol GRXWriteable;\n"); + printer.Print("@protocol GRXWriter;\n\n"); + + map<string, string> vars = {{"service_name", service->name()}, + {"prefix", prefix}}; + printer.Print(vars, "@protocol $prefix$$service_name$ <NSObject>\n\n"); + + for (int i = 0; i < service->method_count(); i++) { + PrintMethodDeclarations(&printer, service->method(i), vars); + } + printer.Print("@end\n\n"); + + printer.Print("// Basic service implementation, over gRPC, that only does" + " marshalling and parsing.\n"); + printer.Print(vars, "@interface $prefix$$service_name$ :" + " ProtoService<$prefix$$service_name$>\n"); + printer.Print("- (instancetype)initWithHost:(NSString *)host" + " NS_DESIGNATED_INITIALIZER;\n"); + printer.Print("@end\n"); } - printer.Print("@end\n\n"); - - printer.Print("// Basic service implementation, over gRPC, that only does" - " marshalling and parsing.\n"); - printer.Print(vars, "@interface $prefix$$service_name$ :" - " ProtoService<$prefix$$service_name$>\n"); - printer.Print("- (instancetype)initWithHost:(NSString *)host" - " NS_DESIGNATED_INITIALIZER;\n"); - printer.Print("@end\n"); return output; } string GetSource(const ServiceDescriptor *service, const string prefix) { string output; - grpc::protobuf::io::StringOutputStream output_stream(&output); - Printer printer(&output_stream, '$'); - - map<string, string> vars = {{"service_name", service->name()}, - {"package", service->file()->package()}, - {"prefix", prefix}}; - - printer.Print(vars, - "static NSString *const kPackageName = @\"$package$\";\n"); - printer.Print(vars, - "static NSString *const kServiceName = @\"$service_name$\";\n\n"); - - printer.Print(vars, "@implementation $prefix$$service_name$\n\n"); - - printer.Print("// Designated initializer\n"); - printer.Print("- (instancetype)initWithHost:(NSString *)host {\n"); - printer.Print(" return (self = [super initWithHost:host" - " packageName:kPackageName serviceName:kServiceName]);\n"); - printer.Print("}\n\n"); - printer.Print("// Override superclass initializer to disallow different" - " package and service names.\n"); - printer.Print("- (instancetype)initWithHost:(NSString *)host\n"); - printer.Print(" packageName:(NSString *)packageName\n"); - printer.Print(" serviceName:(NSString *)serviceName {\n"); - printer.Print(" return [self initWithHost:host];\n"); - printer.Print("}\n\n\n"); - - for (int i = 0; i < service->method_count(); i++) { - PrintMethodImplementations(&printer, service->method(i), vars); - } + { + // Scope the output stream so it closes and finalizes output to the string. + grpc::protobuf::io::StringOutputStream output_stream(&output); + Printer printer(&output_stream, '$'); + + map<string, string> vars = {{"service_name", service->name()}, + {"package", service->file()->package()}, + {"prefix", prefix}}; - printer.Print("@end\n"); + printer.Print(vars, + "static NSString *const kPackageName = @\"$package$\";\n"); + printer.Print(vars, + "static NSString *const kServiceName = @\"$service_name$\";\n\n"); + + printer.Print(vars, "@implementation $prefix$$service_name$\n\n"); + + printer.Print("// Designated initializer\n"); + printer.Print("- (instancetype)initWithHost:(NSString *)host {\n"); + printer.Print(" return (self = [super initWithHost:host" + " packageName:kPackageName serviceName:kServiceName]);\n"); + printer.Print("}\n\n"); + printer.Print("// Override superclass initializer to disallow different" + " package and service names.\n"); + printer.Print("- (instancetype)initWithHost:(NSString *)host\n"); + printer.Print(" packageName:(NSString *)packageName\n"); + printer.Print(" serviceName:(NSString *)serviceName {\n"); + printer.Print(" return [self initWithHost:host];\n"); + printer.Print("}\n\n\n"); + + for (int i = 0; i < service->method_count(); i++) { + PrintMethodImplementations(&printer, service->method(i), vars); + } + + printer.Print("@end\n"); + } return output; } diff --git a/src/compiler/ruby_generator.cc b/src/compiler/ruby_generator.cc index a0bb92848b..299137519f 100644 --- a/src/compiler/ruby_generator.cc +++ b/src/compiler/ruby_generator.cc @@ -119,49 +119,52 @@ void PrintService(const ServiceDescriptor *service, const grpc::string &package, grpc::string GetServices(const FileDescriptor *file) { grpc::string output; - StringOutputStream output_stream(&output); - Printer out(&output_stream, '$'); - - // Don't write out any output if there no services, to avoid empty service - // files being generated for proto files that don't declare any. - if (file->service_count() == 0) { - return output; - } - - // Write out a file header. - std::map<grpc::string, grpc::string> header_comment_vars = ListToDict( - {"file.name", file->name(), "file.package", file->package(), }); - out.Print("# Generated by the protocol buffer compiler. DO NOT EDIT!\n"); - out.Print(header_comment_vars, - "# Source: $file.name$ for package '$file.package$'\n"); - - out.Print("\n"); - out.Print("require 'grpc'\n"); - // Write out require statemment to import the separately generated file - // that defines the messages used by the service. This is generated by the - // main ruby plugin. - std::map<grpc::string, grpc::string> dep_vars = - ListToDict({"dep.name", MessagesRequireName(file), }); - out.Print(dep_vars, "require '$dep.name$'\n"); - - // Write out services within the modules - out.Print("\n"); - std::vector<grpc::string> modules = Split(file->package(), '.'); - for (size_t i = 0; i < modules.size(); ++i) { - std::map<grpc::string, grpc::string> module_vars = - ListToDict({"module.name", CapitalizeFirst(modules[i]), }); - out.Print(module_vars, "module $module.name$\n"); - out.Indent(); + { + // Scope the output stream so it closes and finalizes output to the string. + + StringOutputStream output_stream(&output); + Printer out(&output_stream, '$'); + + // Don't write out any output if there no services, to avoid empty service + // files being generated for proto files that don't declare any. + if (file->service_count() == 0) { + return output; + } + + // Write out a file header. + std::map<grpc::string, grpc::string> header_comment_vars = ListToDict( + {"file.name", file->name(), "file.package", file->package(), }); + out.Print("# Generated by the protocol buffer compiler. DO NOT EDIT!\n"); + out.Print(header_comment_vars, + "# Source: $file.name$ for package '$file.package$'\n"); + + out.Print("\n"); + out.Print("require 'grpc'\n"); + // Write out require statemment to import the separately generated file + // that defines the messages used by the service. This is generated by the + // main ruby plugin. + std::map<grpc::string, grpc::string> dep_vars = + ListToDict({"dep.name", MessagesRequireName(file), }); + out.Print(dep_vars, "require '$dep.name$'\n"); + + // Write out services within the modules + out.Print("\n"); + std::vector<grpc::string> modules = Split(file->package(), '.'); + for (size_t i = 0; i < modules.size(); ++i) { + std::map<grpc::string, grpc::string> module_vars = + ListToDict({"module.name", CapitalizeFirst(modules[i]), }); + out.Print(module_vars, "module $module.name$\n"); + out.Indent(); + } + for (int i = 0; i < file->service_count(); ++i) { + auto service = file->service(i); + PrintService(service, file->package(), &out); + } + for (size_t i = 0; i < modules.size(); ++i) { + out.Outdent(); + out.Print("end\n"); + } } - for (int i = 0; i < file->service_count(); ++i) { - auto service = file->service(i); - PrintService(service, file->package(), &out); - } - for (size_t i = 0; i < modules.size(); ++i) { - out.Outdent(); - out.Print("end\n"); - } - return output; } diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 78f8d06d89..42e242ae81 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -144,6 +144,7 @@ static void handle_op_after_cancellation(grpc_call_element *elem, call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; if (op->send_ops) { + grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops); op->on_done_send(op->send_user_data, 0); } if (op->recv_ops) { diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 9c8133d2d4..63615ea25f 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -96,8 +96,10 @@ static grpc_fd *alloc_fd(int fd) { gpr_atm_rel_store(&r->writest, NOT_READY); gpr_atm_rel_store(&r->shutdown, 0); r->fd = fd; - r->watcher_root.next = r->watcher_root.prev = &r->watcher_root; + r->inactive_watcher_root.next = r->inactive_watcher_root.prev = + &r->inactive_watcher_root; r->freelist_next = NULL; + r->read_watcher = r->write_watcher = NULL; return r; } @@ -147,14 +149,34 @@ int grpc_fd_is_orphaned(grpc_fd *fd) { return (gpr_atm_acq_load(&fd->refst) & 1) == 0; } -static void wake_watchers(grpc_fd *fd) { - grpc_fd_watcher *watcher; +static void maybe_wake_one_watcher_locked(grpc_fd *fd) { + if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) { + grpc_pollset_force_kick(fd->inactive_watcher_root.next->pollset); + } else if (fd->read_watcher) { + grpc_pollset_force_kick(fd->read_watcher->pollset); + } else if (fd->write_watcher) { + grpc_pollset_force_kick(fd->write_watcher->pollset); + } +} + +static void maybe_wake_one_watcher(grpc_fd *fd) { gpr_mu_lock(&fd->watcher_mu); - for (watcher = fd->watcher_root.next; watcher != &fd->watcher_root; - watcher = watcher->next) { + maybe_wake_one_watcher_locked(fd); + gpr_mu_unlock(&fd->watcher_mu); +} + +static void wake_all_watchers(grpc_fd *fd) { + grpc_fd_watcher *watcher; + for (watcher = fd->inactive_watcher_root.next; + watcher != &fd->inactive_watcher_root; watcher = watcher->next) { grpc_pollset_force_kick(watcher->pollset); } - gpr_mu_unlock(&fd->watcher_mu); + if (fd->read_watcher) { + grpc_pollset_force_kick(fd->read_watcher->pollset); + } + if (fd->write_watcher && fd->write_watcher != fd->read_watcher) { + grpc_pollset_force_kick(fd->write_watcher->pollset); + } } void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) { @@ -162,7 +184,7 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) { fd->on_done_user_data = user_data; shutdown(fd->fd, SHUT_RDWR); ref_by(fd, 1); /* remove active status, but keep referenced */ - wake_watchers(fd); + wake_all_watchers(fd); unref_by(fd, 2); /* drop the reference */ } @@ -204,7 +226,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure, set_ready call. NOTE: we don't have an ABA problem here, since we should never have concurrent calls to the same notify_on function. */ - wake_watchers(fd); + maybe_wake_one_watcher(fd); return; } /* swap was unsuccessful due to an intervening set_ready call. @@ -290,29 +312,65 @@ void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure) { gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, gpr_uint32 read_mask, gpr_uint32 write_mask, grpc_fd_watcher *watcher) { + gpr_uint32 mask = 0; /* keep track of pollers that have requested our events, in case they change */ grpc_fd_ref(fd); gpr_mu_lock(&fd->watcher_mu); - watcher->next = &fd->watcher_root; - watcher->prev = watcher->next->prev; - watcher->next->prev = watcher->prev->next = watcher; + /* if there is nobody polling for read, but we need to, then start doing so */ + if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) { + fd->read_watcher = watcher; + mask |= read_mask; + } + /* if there is nobody polling for write, but we need to, then start doing so + */ + if (!fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) { + fd->write_watcher = watcher; + mask |= write_mask; + } + /* if not polling, remember this watcher in case we need someone to later */ + if (mask == 0) { + watcher->next = &fd->inactive_watcher_root; + watcher->prev = watcher->next->prev; + watcher->next->prev = watcher->prev->next = watcher; + } watcher->pollset = pollset; watcher->fd = fd; gpr_mu_unlock(&fd->watcher_mu); - return (gpr_atm_acq_load(&fd->readst) != READY ? read_mask : 0) | - (gpr_atm_acq_load(&fd->writest) != READY ? write_mask : 0); + return mask; } -void grpc_fd_end_poll(grpc_fd_watcher *watcher) { - gpr_mu_lock(&watcher->fd->watcher_mu); - watcher->next->prev = watcher->prev; - watcher->prev->next = watcher->next; - gpr_mu_unlock(&watcher->fd->watcher_mu); +void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) { + int was_polling = 0; + int kick = 0; + grpc_fd *fd = watcher->fd; + + gpr_mu_lock(&fd->watcher_mu); + if (watcher == fd->read_watcher) { + /* remove read watcher, kick if we still need a read */ + was_polling = 1; + kick = kick || !got_read; + fd->read_watcher = NULL; + } + if (watcher == fd->write_watcher) { + /* remove write watcher, kick if we still need a write */ + was_polling = 1; + kick = kick || !got_write; + fd->write_watcher = NULL; + } + if (!was_polling) { + /* remove from inactive list */ + watcher->next->prev = watcher->prev; + watcher->prev->next = watcher->next; + } + if (kick) { + maybe_wake_one_watcher_locked(fd); + } + gpr_mu_unlock(&fd->watcher_mu); - grpc_fd_unref(watcher->fd); + grpc_fd_unref(fd); } void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) { diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index be21f2b55f..cfc533b7f5 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -66,8 +66,32 @@ struct grpc_fd { gpr_mu set_state_mu; gpr_atm shutdown; + /* The watcher list. + + The following watcher related fields are protected by watcher_mu. + + An fd_watcher is an ephemeral object created when an fd wants to + begin polling, and destroyed after the poll. + + It denotes the fd's interest in whether to read poll or write poll + or both or neither on this fd. + + If a watcher is asked to poll for reads or writes, the read_watcher + or write_watcher fields are set respectively. A watcher may be asked + to poll for both, in which case both fields will be set. + + read_watcher and write_watcher may be NULL if no watcher has been + asked to poll for reads or writes. + + If an fd_watcher is not asked to poll for reads or writes, it's added + to a linked list of inactive watchers, rooted at inactive_watcher_root. + If at a later time there becomes need of a poller to poll, one of + the inactive pollers may be kicked out of their poll loops to take + that responsibility. */ gpr_mu watcher_mu; - grpc_fd_watcher watcher_root; + grpc_fd_watcher inactive_watcher_root; + grpc_fd_watcher *read_watcher; + grpc_fd_watcher *write_watcher; gpr_atm readst; gpr_atm writest; @@ -103,7 +127,7 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, gpr_uint32 read_mask, gpr_uint32 write_mask, grpc_fd_watcher *rec); /* Complete polling previously started with grpc_fd_begin_poll */ -void grpc_fd_end_poll(grpc_fd_watcher *rec); +void grpc_fd_end_poll(grpc_fd_watcher *rec, int got_read, int got_write); /* Return 1 if this fd is orphaned, 0 otherwise */ int grpc_fd_is_orphaned(grpc_fd *fd); diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 25b7cfda1a..4d36107ab0 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -98,7 +98,7 @@ static void end_polling(grpc_pollset *pollset) { pollset_hdr *h; h = pollset->data.ptr; for (i = 1; i < h->pfd_count; i++) { - grpc_fd_end_poll(&h->watchers[i]); + grpc_fd_end_poll(&h->watchers[i], h->pfds[i].revents & POLLIN, h->pfds[i].revents & POLLOUT); } } diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index f496ac5bfa..826c792990 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -420,10 +420,12 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher); - r = poll(pfd, GPR_ARRAY_SIZE(pfd), timeout); + /* poll fd count (argument 2) is shortened by one if we have no events + to poll on - such that it only includes the kicker */ + r = poll(pfd, GPR_ARRAY_SIZE(pfd) - (pfd[1].events == 0), timeout); GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r); - grpc_fd_end_poll(&fd_watcher); + grpc_fd_end_poll(&fd_watcher, pfd[1].revents & POLLIN, pfd[1].revents & POLLOUT); if (r < 0) { if (errno != EINTR) { diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 3186292a02..a3b0b2672b 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -55,6 +55,7 @@ static void lame_start_transport_op(grpc_call_element *elem, channel_data *chand = elem->channel_data; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); if (op->send_ops) { + grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops); op->on_done_send(op->send_user_data, 0); } if (op->recv_ops) { diff --git a/src/php/bin/run_gen_code_test.sh b/src/php/bin/run_gen_code_test.sh index 79abbe6cf8..4882a2b846 100755 --- a/src/php/bin/run_gen_code_test.sh +++ b/src/php/bin/run_gen_code_test.sh @@ -29,9 +29,9 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. cd $(dirname $0) -GRPC_TEST_HOST=localhost:7070 php -d extension_dir=../ext/grpc/modules/ \ +GRPC_TEST_HOST=localhost:50051 php -d extension_dir=../ext/grpc/modules/ \ -d extension=grpc.so /usr/local/bin/phpunit -v --debug --strict \ ../tests/generated_code/GeneratedCodeTest.php -GRPC_TEST_HOST=localhost:7070 php -d extension_dir=../ext/grpc/modules/ \ +GRPC_TEST_HOST=localhost:50051 php -d extension_dir=../ext/grpc/modules/ \ -d extension=grpc.so /usr/local/bin/phpunit -v --debug --strict \ ../tests/generated_code/GeneratedCodeWithCallbackTest.php |