aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Yang Gao <yangg@google.com>2015-03-26 23:12:14 -0700
committerGravatar Yang Gao <yangg@google.com>2015-03-26 23:12:14 -0700
commit48bbd000eb9f7e4247446ef1d4f7f9dc26319550 (patch)
tree7d78f480b64cd6a73890ff0852b9bd725012154e /src
parent166f9d00cec5f74ef996dd3fec7025f2b031275c (diff)
parentf9b6335b2c0c064903d26a631a6ee3ac19f37aa2 (diff)
merge upstream and resolve conflict
Diffstat (limited to 'src')
-rw-r--r--src/compiler/cpp_generator.cc122
-rw-r--r--src/compiler/cpp_generator.h17
-rw-r--r--src/compiler/cpp_plugin.cc32
-rw-r--r--src/compiler/generator_helpers.h20
-rw-r--r--src/compiler/python_generator.cc172
-rw-r--r--src/compiler/python_generator.h30
-rw-r--r--src/compiler/python_plugin.cc58
-rw-r--r--src/compiler/ruby_generator.cc48
-rw-r--r--src/compiler/ruby_generator.h10
-rw-r--r--src/compiler/ruby_generator_helpers-inl.h12
-rw-r--r--src/compiler/ruby_generator_map-inl.h13
-rw-r--r--src/compiler/ruby_generator_string-inl.h43
-rw-r--r--src/compiler/ruby_plugin.cc30
-rw-r--r--src/core/channel/http_server_filter.c3
-rw-r--r--src/core/iomgr/iocp_windows.c34
-rw-r--r--src/core/iomgr/iocp_windows.h1
-rw-r--r--src/core/iomgr/iomgr.c11
-rw-r--r--src/core/iomgr/sockaddr_win32.h5
-rw-r--r--src/core/iomgr/socket_windows.c7
-rw-r--r--src/core/iomgr/socket_windows.h6
-rw-r--r--src/core/iomgr/tcp_server_windows.c3
-rw-r--r--src/core/iomgr/tcp_windows.c4
-rw-r--r--[-rwxr-xr-x]src/core/support/cpu_iphone.c (renamed from src/php/ext/grpc/event.h)30
-rw-r--r--src/core/support/cpu_posix.c2
-rw-r--r--src/core/support/env_win32.c15
-rw-r--r--src/core/support/file_win32.c2
-rw-r--r--src/core/support/log_win32.c8
-rw-r--r--src/core/support/string_win32.c6
-rw-r--r--src/core/support/sync_win32.c1
-rw-r--r--src/core/support/time_win32.c4
-rw-r--r--src/core/surface/call.c2
-rw-r--r--src/core/surface/call.h9
-rw-r--r--src/core/surface/call_log_batch.c121
-rw-r--r--src/core/surface/init.c3
-rw-r--r--src/core/tsi/ssl_transport_security.c4
-rw-r--r--src/cpp/client/insecure_credentials.cc2
-rw-r--r--src/cpp/client/secure_credentials.cc2
-rw-r--r--src/cpp/common/call.cc11
-rw-r--r--src/cpp/common/completion_queue.cc17
-rw-r--r--src/cpp/server/secure_server_credentials.cc9
-rw-r--r--src/cpp/server/server.cc7
-rw-r--r--src/cpp/server/server_builder.cc24
-rw-r--r--src/cpp/server/thread_pool.cc45
-rw-r--r--src/cpp/server/thread_pool.h2
-rw-r--r--src/cpp/util/time.cc6
-rw-r--r--src/csharp/Grpc.Core.Tests/ClientServerTest.cs6
-rw-r--r--src/csharp/Grpc.Core.Tests/ServerTest.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs4
-rw-r--r--src/csharp/Grpc.Core/Server.cs8
-rw-r--r--src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs2
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs2
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropServer.cs4
-rw-r--r--src/node/examples/qps_test.js136
-rw-r--r--src/node/index.js8
-rw-r--r--src/node/interop/interop_client.js12
-rw-r--r--src/node/src/client.js53
-rw-r--r--src/node/src/common.js23
-rw-r--r--src/node/src/server.js99
-rw-r--r--src/node/test/surface_test.js53
-rw-r--r--src/php/ext/grpc/byte_buffer.c5
-rw-r--r--src/php/ext/grpc/call.c534
-rw-r--r--src/php/ext/grpc/call.h17
-rw-r--r--src/php/ext/grpc/channel.c23
-rw-r--r--src/php/ext/grpc/completion_queue.c170
-rwxr-xr-xsrc/php/ext/grpc/completion_queue.h62
-rwxr-xr-xsrc/php/ext/grpc/config.m42
-rw-r--r--src/php/ext/grpc/event.c150
-rw-r--r--src/php/ext/grpc/php_grpc.c36
-rw-r--r--src/php/ext/grpc/server.c76
-rwxr-xr-xsrc/php/ext/grpc/server.h1
-rw-r--r--src/php/lib/Grpc/AbstractCall.php79
-rwxr-xr-xsrc/php/lib/Grpc/AbstractSurfaceActiveCall.php98
-rwxr-xr-xsrc/php/lib/Grpc/ActiveCall.php123
-rwxr-xr-xsrc/php/lib/Grpc/BaseStub.php31
-rw-r--r--[-rwxr-xr-x]src/php/lib/Grpc/BidiStreamingCall.php (renamed from src/php/lib/Grpc/BidiStreamingSurfaceActiveCall.php)38
-rw-r--r--[-rwxr-xr-x]src/php/lib/Grpc/ClientStreamingCall.php (renamed from src/php/lib/Grpc/ClientStreamingSurfaceActiveCall.php)31
-rw-r--r--[-rwxr-xr-x]src/php/lib/Grpc/ServerStreamingCall.php (renamed from src/php/lib/Grpc/ServerStreamingSurfaceActiveCall.php)41
-rw-r--r--[-rwxr-xr-x]src/php/lib/Grpc/UnaryCall.php (renamed from src/php/lib/Grpc/SimpleSurfaceActiveCall.php)32
-rwxr-xr-xsrc/php/tests/interop/interop_client.php7
-rwxr-xr-xsrc/php/tests/unit_tests/CallTest.php62
-rwxr-xr-xsrc/php/tests/unit_tests/CompletionQueueTest.php46
-rwxr-xr-xsrc/php/tests/unit_tests/EndToEndTest.php186
-rwxr-xr-xsrc/php/tests/unit_tests/SecureEndToEndTest.php197
-rwxr-xr-xsrc/ruby/Rakefile53
-rw-r--r--src/ruby/lib/grpc/generic/service.rb17
-rw-r--r--src/ruby/spec/generic/active_call_spec.rb4
-rw-r--r--src/ruby/spec/generic/client_stub_spec.rb8
-rw-r--r--src/ruby/spec/generic/rpc_server_spec.rb42
88 files changed, 1785 insertions, 1811 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index d5004624d4..0a84c73520 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -111,7 +111,8 @@ bool HasBidiStreaming(const grpc::protobuf::FileDescriptor *file) {
}
} // namespace
-grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file) {
+grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file,
+ const Parameters &params) {
grpc::string temp =
"#include <grpc++/impl/internal_stub.h>\n"
"#include <grpc++/impl/service_type.h>\n"
@@ -158,7 +159,7 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file) {
return temp;
}
-grpc::string GetSourceIncludes() {
+grpc::string GetSourceIncludes(const Parameters &param) {
return "#include <grpc++/async_unary_call.h>\n"
"#include <grpc++/channel_interface.h>\n"
"#include <grpc++/impl/client_unary_call.h>\n"
@@ -353,16 +354,27 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer,
printer->Print("};\n");
}
-grpc::string GetHeaderServices(const grpc::protobuf::FileDescriptor *file) {
+grpc::string GetHeaderServices(const grpc::protobuf::FileDescriptor *file,
+ const Parameters &params) {
grpc::string output;
grpc::protobuf::io::StringOutputStream output_stream(&output);
grpc::protobuf::io::Printer printer(&output_stream, '$');
std::map<grpc::string, grpc::string> vars;
+ if (!params.services_namespace.empty()) {
+ vars["services_namespace"] = params.services_namespace;
+ printer.Print(vars, "\nnamespace $services_namespace$ {\n\n");
+ }
+
for (int i = 0; i < file->service_count(); ++i) {
PrintHeaderService(&printer, file->service(i), &vars);
printer.Print("\n");
}
+
+ if (!params.services_namespace.empty()) {
+ printer.Print(vars, "} // namespace $services_namespace$\n\n");
+ }
+
return output;
}
@@ -376,18 +388,18 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer,
grpc_cpp_generator::ClassName(method->output_type(), true);
if (NoStreaming(method)) {
printer->Print(*vars,
- "::grpc::Status $Service$::Stub::$Method$("
+ "::grpc::Status $ns$$Service$::Stub::$Method$("
"::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response) {\n");
printer->Print(*vars,
" return ::grpc::BlockingUnaryCall(channel(),"
- "::grpc::RpcMethod($Service$_method_names[$Idx$]), "
+ "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$]), "
"context, request, response);\n"
"}\n\n");
printer->Print(
*vars,
"std::unique_ptr< ::grpc::ClientAsyncResponseReader< $Response$>> "
- "$Service$::Stub::Async$Method$(::grpc::ClientContext* context, "
+ "$ns$$Service$::Stub::Async$Method$(::grpc::ClientContext* context, "
"const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
@@ -395,32 +407,32 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer,
"::grpc::ClientAsyncResponseReader< $Response$>>(new "
"::grpc::ClientAsyncResponseReader< $Response$>("
"channel(), cq, "
- "::grpc::RpcMethod($Service$_method_names[$Idx$]), "
+ "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$]), "
"context, request, tag));\n"
"}\n\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(*vars,
"std::unique_ptr< ::grpc::ClientWriter< $Request$>> "
- "$Service$::Stub::$Method$("
+ "$ns$$Service$::Stub::$Method$("
"::grpc::ClientContext* context, $Response$* response) {\n");
printer->Print(*vars,
" return std::unique_ptr< ::grpc::ClientWriter< "
"$Request$>>(new ::grpc::ClientWriter< $Request$>("
"channel(),"
- "::grpc::RpcMethod($Service$_method_names[$Idx$], "
+ "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], "
"::grpc::RpcMethod::RpcType::CLIENT_STREAMING), "
"context, response));\n"
"}\n\n");
printer->Print(*vars,
"std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>> "
- "$Service$::Stub::Async$Method$("
+ "$ns$$Service$::Stub::Async$Method$("
"::grpc::ClientContext* context, $Response$* response, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
" return std::unique_ptr< ::grpc::ClientAsyncWriter< "
"$Request$>>(new ::grpc::ClientAsyncWriter< $Request$>("
"channel(), cq, "
- "::grpc::RpcMethod($Service$_method_names[$Idx$], "
+ "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], "
"::grpc::RpcMethod::RpcType::CLIENT_STREAMING), "
"context, response, tag));\n"
"}\n\n");
@@ -428,26 +440,26 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer,
printer->Print(
*vars,
"std::unique_ptr< ::grpc::ClientReader< $Response$>> "
- "$Service$::Stub::$Method$("
+ "$ns$$Service$::Stub::$Method$("
"::grpc::ClientContext* context, const $Request$& request) {\n");
printer->Print(*vars,
" return std::unique_ptr< ::grpc::ClientReader< "
"$Response$>>(new ::grpc::ClientReader< $Response$>("
"channel(),"
- "::grpc::RpcMethod($Service$_method_names[$Idx$], "
+ "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], "
"::grpc::RpcMethod::RpcType::SERVER_STREAMING), "
"context, request));\n"
"}\n\n");
printer->Print(*vars,
"std::unique_ptr< ::grpc::ClientAsyncReader< $Response$>> "
- "$Service$::Stub::Async$Method$("
+ "$ns$$Service$::Stub::Async$Method$("
"::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
" return std::unique_ptr< ::grpc::ClientAsyncReader< "
"$Response$>>(new ::grpc::ClientAsyncReader< $Response$>("
"channel(), cq, "
- "::grpc::RpcMethod($Service$_method_names[$Idx$], "
+ "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], "
"::grpc::RpcMethod::RpcType::SERVER_STREAMING), "
"context, request, tag));\n"
"}\n\n");
@@ -455,27 +467,27 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer,
printer->Print(
*vars,
"std::unique_ptr< ::grpc::ClientReaderWriter< $Request$, $Response$>> "
- "$Service$::Stub::$Method$(::grpc::ClientContext* context) {\n");
+ "$ns$$Service$::Stub::$Method$(::grpc::ClientContext* context) {\n");
printer->Print(*vars,
" return std::unique_ptr< ::grpc::ClientReaderWriter< "
"$Request$, $Response$>>(new ::grpc::ClientReaderWriter< "
"$Request$, $Response$>("
"channel(),"
- "::grpc::RpcMethod($Service$_method_names[$Idx$], "
+ "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], "
"::grpc::RpcMethod::RpcType::BIDI_STREAMING), "
"context));\n"
"}\n\n");
printer->Print(*vars,
"std::unique_ptr< ::grpc::ClientAsyncReaderWriter< "
"$Request$, $Response$>> "
- "$Service$::Stub::Async$Method$(::grpc::ClientContext* context, "
+ "$ns$$Service$::Stub::Async$Method$(::grpc::ClientContext* context, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Print(*vars,
" return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< "
"$Request$, $Response$>>(new "
"::grpc::ClientAsyncReaderWriter< $Request$, $Response$>("
"channel(), cq, "
- "::grpc::RpcMethod($Service$_method_names[$Idx$], "
+ "::grpc::RpcMethod($prefix$$Service$_method_names[$Idx$], "
"::grpc::RpcMethod::RpcType::BIDI_STREAMING), "
"context, tag));\n"
"}\n\n");
@@ -492,7 +504,7 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer,
grpc_cpp_generator::ClassName(method->output_type(), true);
if (NoStreaming(method)) {
printer->Print(*vars,
- "::grpc::Status $Service$::Service::$Method$("
+ "::grpc::Status $ns$$Service$::Service::$Method$("
"::grpc::ServerContext* context, "
"const $Request$* request, $Response$* response) {\n");
printer->Print(
@@ -501,7 +513,7 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer,
printer->Print("}\n\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(*vars,
- "::grpc::Status $Service$::Service::$Method$("
+ "::grpc::Status $ns$$Service$::Service::$Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerReader< $Request$>* reader, "
"$Response$* response) {\n");
@@ -511,7 +523,7 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer,
printer->Print("}\n\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(*vars,
- "::grpc::Status $Service$::Service::$Method$("
+ "::grpc::Status $ns$$Service$::Service::$Method$("
"::grpc::ServerContext* context, "
"const $Request$* request, "
"::grpc::ServerWriter< $Response$>* writer) {\n");
@@ -521,7 +533,7 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer,
printer->Print("}\n\n");
} else if (BidiStreaming(method)) {
printer->Print(*vars,
- "::grpc::Status $Service$::Service::$Method$("
+ "::grpc::Status $ns$$Service$::Service::$Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerReaderWriter< $Response$, $Request$>* "
"stream) {\n");
@@ -543,7 +555,7 @@ void PrintSourceServerAsyncMethod(
grpc_cpp_generator::ClassName(method->output_type(), true);
if (NoStreaming(method)) {
printer->Print(*vars,
- "void $Service$::AsyncService::Request$Method$("
+ "void $ns$$Service$::AsyncService::Request$Method$("
"::grpc::ServerContext* context, "
"$Request$* request, "
"::grpc::ServerAsyncResponseWriter< $Response$>* response, "
@@ -554,7 +566,7 @@ void PrintSourceServerAsyncMethod(
printer->Print("}\n\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(*vars,
- "void $Service$::AsyncService::Request$Method$("
+ "void $ns$$Service$::AsyncService::Request$Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerAsyncReader< $Response$, $Request$>* reader, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
@@ -564,7 +576,7 @@ void PrintSourceServerAsyncMethod(
printer->Print("}\n\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(*vars,
- "void $Service$::AsyncService::Request$Method$("
+ "void $ns$$Service$::AsyncService::Request$Method$("
"::grpc::ServerContext* context, "
"$Request$* request, "
"::grpc::ServerAsyncWriter< $Response$>* writer, "
@@ -576,7 +588,7 @@ void PrintSourceServerAsyncMethod(
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
- "void $Service$::AsyncService::Request$Method$("
+ "void $ns$$Service$::AsyncService::Request$Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, "
"::grpc::CompletionQueue* cq, void *tag) {\n");
@@ -592,7 +604,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
std::map<grpc::string, grpc::string> *vars) {
(*vars)["Service"] = service->name();
- printer->Print(*vars, "static const char* $Service$_method_names[] = {\n");
+ printer->Print(*vars, "static const char* $prefix$$Service$_method_names[] = {\n");
for (int i = 0; i < service->method_count(); ++i) {
(*vars)["Method"] = service->method(i)->name();
printer->Print(*vars, " \"/$Package$$Service$/$Method$\",\n");
@@ -601,9 +613,9 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
printer->Print(
*vars,
- "std::unique_ptr< $Service$::Stub> $Service$::NewStub("
+ "std::unique_ptr< $ns$$Service$::Stub> $ns$$Service$::NewStub("
"const std::shared_ptr< ::grpc::ChannelInterface>& channel) {\n"
- " std::unique_ptr< $Service$::Stub> stub(new $Service$::Stub());\n"
+ " std::unique_ptr< $ns$$Service$::Stub> stub(new $ns$$Service$::Stub());\n"
" stub->set_channel(channel);\n"
" return stub;\n"
"}\n\n");
@@ -615,12 +627,12 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
(*vars)["MethodCount"] = as_string(service->method_count());
printer->Print(
*vars,
- "$Service$::AsyncService::AsyncService(::grpc::CompletionQueue* cq) : "
- "::grpc::AsynchronousService(cq, $Service$_method_names, $MethodCount$) "
+ "$ns$$Service$::AsyncService::AsyncService(::grpc::CompletionQueue* cq) : "
+ "::grpc::AsynchronousService(cq, $prefix$$Service$_method_names, $MethodCount$) "
"{}\n\n");
printer->Print(*vars,
- "$Service$::Service::~Service() {\n"
+ "$ns$$Service$::Service::~Service() {\n"
" delete service_;\n"
"}\n\n");
for (int i = 0; i < service->method_count(); ++i) {
@@ -629,7 +641,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
PrintSourceServerAsyncMethod(printer, service->method(i), vars);
}
printer->Print(*vars,
- "::grpc::RpcService* $Service$::Service::service() {\n");
+ "::grpc::RpcService* $ns$$Service$::Service::service() {\n");
printer->Indent();
printer->Print(
"if (service_ != nullptr) {\n"
@@ -648,52 +660,52 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
printer->Print(
*vars,
"service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
- " $Service$_method_names[$Idx$],\n"
+ " $prefix$$Service$_method_names[$Idx$],\n"
" ::grpc::RpcMethod::NORMAL_RPC,\n"
- " new ::grpc::RpcMethodHandler< $Service$::Service, $Request$, "
+ " new ::grpc::RpcMethodHandler< $ns$$Service$::Service, $Request$, "
"$Response$>(\n"
- " std::function< ::grpc::Status($Service$::Service*, "
+ " std::function< ::grpc::Status($ns$$Service$::Service*, "
"::grpc::ServerContext*, const $Request$*, $Response$*)>("
- "&$Service$::Service::$Method$), this),\n"
+ "&$ns$$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
"service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
- " $Service$_method_names[$Idx$],\n"
+ " $prefix$$Service$_method_names[$Idx$],\n"
" ::grpc::RpcMethod::CLIENT_STREAMING,\n"
" new ::grpc::ClientStreamingHandler< "
- "$Service$::Service, $Request$, $Response$>(\n"
- " std::function< ::grpc::Status($Service$::Service*, "
+ "$ns$$Service$::Service, $Request$, $Response$>(\n"
+ " std::function< ::grpc::Status($ns$$Service$::Service*, "
"::grpc::ServerContext*, "
"::grpc::ServerReader< $Request$>*, $Response$*)>("
- "&$Service$::Service::$Method$), this),\n"
+ "&$ns$$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
"service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
- " $Service$_method_names[$Idx$],\n"
+ " $prefix$$Service$_method_names[$Idx$],\n"
" ::grpc::RpcMethod::SERVER_STREAMING,\n"
" new ::grpc::ServerStreamingHandler< "
- "$Service$::Service, $Request$, $Response$>(\n"
- " std::function< ::grpc::Status($Service$::Service*, "
+ "$ns$$Service$::Service, $Request$, $Response$>(\n"
+ " std::function< ::grpc::Status($ns$$Service$::Service*, "
"::grpc::ServerContext*, "
"const $Request$*, ::grpc::ServerWriter< $Response$>*)>("
- "&$Service$::Service::$Method$), this),\n"
+ "&$ns$$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
"service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
- " $Service$_method_names[$Idx$],\n"
+ " $prefix$$Service$_method_names[$Idx$],\n"
" ::grpc::RpcMethod::BIDI_STREAMING,\n"
" new ::grpc::BidiStreamingHandler< "
- "$Service$::Service, $Request$, $Response$>(\n"
- " std::function< ::grpc::Status($Service$::Service*, "
+ "$ns$$Service$::Service, $Request$, $Response$>(\n"
+ " std::function< ::grpc::Status($ns$$Service$::Service*, "
"::grpc::ServerContext*, "
"::grpc::ServerReaderWriter< $Response$, $Request$>*)>("
- "&$Service$::Service::$Method$), this),\n"
+ "&$ns$$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
}
}
@@ -702,7 +714,8 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
printer->Print("}\n\n");
}
-grpc::string GetSourceServices(const grpc::protobuf::FileDescriptor *file) {
+grpc::string GetSourceServices(const grpc::protobuf::FileDescriptor *file,
+ const Parameters &params) {
grpc::string output;
grpc::protobuf::io::StringOutputStream output_stream(&output);
grpc::protobuf::io::Printer printer(&output_stream, '$');
@@ -713,6 +726,13 @@ grpc::string GetSourceServices(const grpc::protobuf::FileDescriptor *file) {
if (!file->package().empty()) {
vars["Package"].append(".");
}
+ if (!params.services_namespace.empty()) {
+ vars["ns"] = params.services_namespace + "::";
+ vars["prefix"] = params.services_namespace;
+ } else {
+ vars["ns"] = "";
+ vars["prefix"] = "";
+ }
for (int i = 0; i < file->service_count(); ++i) {
PrintSourceService(&printer, file->service(i), &vars);
diff --git a/src/compiler/cpp_generator.h b/src/compiler/cpp_generator.h
index 2ecdb5c47e..04ad71c067 100644
--- a/src/compiler/cpp_generator.h
+++ b/src/compiler/cpp_generator.h
@@ -38,17 +38,26 @@
namespace grpc_cpp_generator {
+// Contains all the parameters that are parsed from the command line.
+struct Parameters {
+ // Puts the service into a namespace
+ grpc::string services_namespace;
+};
+
// Return the includes needed for generated header file.
-grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file);
+grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file,
+ const Parameters &params);
// Return the includes needed for generated source file.
-grpc::string GetSourceIncludes();
+grpc::string GetSourceIncludes(const Parameters &params);
// Return the services for generated header file.
-grpc::string GetHeaderServices(const grpc::protobuf::FileDescriptor *file);
+grpc::string GetHeaderServices(const grpc::protobuf::FileDescriptor *file,
+ const Parameters &params);
// Return the services for generated source file.
-grpc::string GetSourceServices(const grpc::protobuf::FileDescriptor *file);
+grpc::string GetSourceServices(const grpc::protobuf::FileDescriptor *file,
+ const Parameters &params);
} // namespace grpc_cpp_generator
diff --git a/src/compiler/cpp_plugin.cc b/src/compiler/cpp_plugin.cc
index 5b83aa85cf..acbe128213 100644
--- a/src/compiler/cpp_plugin.cc
+++ b/src/compiler/cpp_plugin.cc
@@ -58,18 +58,42 @@ class CppGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator {
return false;
}
+ if (file->service_count() == 0) {
+ // No services. Do nothing.
+ return true;
+ }
+
+ grpc_cpp_generator::Parameters generator_parameters;
+
+ if (!parameter.empty()) {
+ std::vector<grpc::string> parameters_list =
+ grpc_generator::tokenize(parameter, ",");
+ for (auto parameter_string = parameters_list.begin();
+ parameter_string != parameters_list.end();
+ parameter_string++) {
+ std::vector<grpc::string> param =
+ grpc_generator::tokenize(*parameter_string, "=");
+ if (param[0] == "services_namespace") {
+ generator_parameters.services_namespace = param[1];
+ } else {
+ *error = grpc::string("Unknown parameter: ") + *parameter_string;
+ return false;
+ }
+ }
+ }
+
grpc::string file_name = grpc_generator::StripProto(file->name());
// Generate .pb.h
Insert(context, file_name + ".pb.h", "includes",
- grpc_cpp_generator::GetHeaderIncludes(file));
+ grpc_cpp_generator::GetHeaderIncludes(file, generator_parameters));
Insert(context, file_name + ".pb.h", "namespace_scope",
- grpc_cpp_generator::GetHeaderServices(file));
+ grpc_cpp_generator::GetHeaderServices(file, generator_parameters));
// Generate .pb.cc
Insert(context, file_name + ".pb.cc", "includes",
- grpc_cpp_generator::GetSourceIncludes());
+ grpc_cpp_generator::GetSourceIncludes(generator_parameters));
Insert(context, file_name + ".pb.cc", "namespace_scope",
- grpc_cpp_generator::GetSourceServices(file));
+ grpc_cpp_generator::GetSourceServices(file, generator_parameters));
return true;
}
diff --git a/src/compiler/generator_helpers.h b/src/compiler/generator_helpers.h
index 1e6727dd4c..30857891c7 100644
--- a/src/compiler/generator_helpers.h
+++ b/src/compiler/generator_helpers.h
@@ -75,6 +75,26 @@ inline grpc::string StringReplace(grpc::string str, const grpc::string &from,
return str;
}
+inline std::vector<grpc::string> tokenize(const grpc::string &input,
+ const grpc::string &delimiters) {
+ std::vector<grpc::string> tokens;
+ size_t pos, last_pos = 0;
+
+ for (;;) {
+ bool done = false;
+ pos = input.find_first_of(delimiters, last_pos);
+ if (pos == grpc::string::npos) {
+ done = true;
+ pos = input.length();
+ }
+
+ tokens.push_back(input.substr(last_pos, pos - last_pos));
+ if (done) return tokens;
+
+ last_pos = pos + 1;
+ }
+}
+
} // namespace grpc_generator
#endif // GRPC_INTERNAL_COMPILER_GENERATOR_HELPERS_H
diff --git a/src/compiler/python_generator.cc b/src/compiler/python_generator.cc
index e4f85450f5..748417e477 100644
--- a/src/compiler/python_generator.cc
+++ b/src/compiler/python_generator.cc
@@ -36,25 +36,28 @@
#include <cctype>
#include <cstring>
#include <map>
+#include <memory>
#include <ostream>
#include <sstream>
+#include <tuple>
#include <vector>
+#include "grpc++/config.h"
+#include "src/compiler/config.h"
#include "src/compiler/generator_helpers.h"
#include "src/compiler/python_generator.h"
-#include <google/protobuf/io/printer.h>
-#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
-#include <google/protobuf/descriptor.pb.h>
-#include <google/protobuf/descriptor.h>
using grpc_generator::StringReplace;
using grpc_generator::StripProto;
-using google::protobuf::Descriptor;
-using google::protobuf::FileDescriptor;
-using google::protobuf::MethodDescriptor;
-using google::protobuf::ServiceDescriptor;
-using google::protobuf::io::Printer;
-using google::protobuf::io::StringOutputStream;
+using grpc::protobuf::Descriptor;
+using grpc::protobuf::FileDescriptor;
+using grpc::protobuf::MethodDescriptor;
+using grpc::protobuf::ServiceDescriptor;
+using grpc::protobuf::compiler::GeneratorContext;
+using grpc::protobuf::io::CodedOutputStream;
+using grpc::protobuf::io::Printer;
+using grpc::protobuf::io::StringOutputStream;
+using grpc::protobuf::io::ZeroCopyOutputStream;
using std::initializer_list;
using std::make_pair;
using std::map;
@@ -63,6 +66,41 @@ using std::replace;
using std::vector;
namespace grpc_python_generator {
+
+PythonGrpcGenerator::PythonGrpcGenerator(const GeneratorConfiguration& config)
+ : config_(config) {}
+
+PythonGrpcGenerator::~PythonGrpcGenerator() {}
+
+bool PythonGrpcGenerator::Generate(
+ const FileDescriptor* file, const grpc::string& parameter,
+ GeneratorContext* context, grpc::string* error) const {
+ // Get output file name.
+ grpc::string file_name;
+ static const int proto_suffix_length = strlen(".proto");
+ if (file->name().size() > static_cast<size_t>(proto_suffix_length) &&
+ file->name().find_last_of(".proto") == file->name().size() - 1) {
+ file_name = file->name().substr(
+ 0, file->name().size() - proto_suffix_length) + "_pb2.py";
+ } else {
+ *error = "Invalid proto file name. Proto file must end with .proto";
+ return false;
+ }
+
+ std::unique_ptr<ZeroCopyOutputStream> output(
+ context->OpenForInsert(file_name, "module_scope"));
+ CodedOutputStream coded_out(output.get());
+ bool success = false;
+ grpc::string code = "";
+ tie(success, code) = grpc_python_generator::GetServices(file, config_);
+ if (success) {
+ coded_out.WriteRaw(code.data(), code.size());
+ return true;
+ } else {
+ return false;
+ }
+}
+
namespace {
//////////////////////////////////
// BEGIN FORMATTING BOILERPLATE //
@@ -70,14 +108,15 @@ namespace {
// Converts an initializer list of the form { key0, value0, key1, value1, ... }
// into a map of key* to value*. Is merely a readability helper for later code.
-map<std::string, std::string> ListToDict(const initializer_list<std::string>& values) {
+map<grpc::string, grpc::string> ListToDict(
+ const initializer_list<grpc::string>& values) {
assert(values.size() % 2 == 0);
- map<std::string, std::string> value_map;
+ map<grpc::string, grpc::string> value_map;
auto value_iter = values.begin();
for (unsigned i = 0; i < values.size()/2; ++i) {
- std::string key = *value_iter;
+ grpc::string key = *value_iter;
++value_iter;
- std::string value = *value_iter;
+ grpc::string value = *value_iter;
value_map[key] = value;
++value_iter;
}
@@ -111,8 +150,8 @@ class IndentScope {
bool PrintServicer(const ServiceDescriptor* service,
Printer* out) {
- std::string doc = "<fill me in later!>";
- map<std::string, std::string> dict = ListToDict({
+ grpc::string doc = "<fill me in later!>";
+ map<grpc::string, grpc::string> dict = ListToDict({
"Service", service->name(),
"Documentation", doc,
});
@@ -123,7 +162,7 @@ bool PrintServicer(const ServiceDescriptor* service,
out->Print("__metaclass__ = abc.ABCMeta\n");
for (int i = 0; i < service->method_count(); ++i) {
auto meth = service->method(i);
- std::string arg_name = meth->client_streaming() ?
+ grpc::string arg_name = meth->client_streaming() ?
"request_iterator" : "request";
out->Print("@abc.abstractmethod\n");
out->Print("def $Method$(self, $ArgName$, context):\n",
@@ -138,8 +177,8 @@ bool PrintServicer(const ServiceDescriptor* service,
}
bool PrintServer(const ServiceDescriptor* service, Printer* out) {
- std::string doc = "<fill me in later!>";
- map<std::string, std::string> dict = ListToDict({
+ grpc::string doc = "<fill me in later!>";
+ map<grpc::string, grpc::string> dict = ListToDict({
"Service", service->name(),
"Documentation", doc,
});
@@ -167,8 +206,8 @@ bool PrintServer(const ServiceDescriptor* service, Printer* out) {
bool PrintStub(const ServiceDescriptor* service,
Printer* out) {
- std::string doc = "<fill me in later!>";
- map<std::string, std::string> dict = ListToDict({
+ grpc::string doc = "<fill me in later!>";
+ map<grpc::string, grpc::string> dict = ListToDict({
"Service", service->name(),
"Documentation", doc,
});
@@ -179,7 +218,7 @@ bool PrintStub(const ServiceDescriptor* service,
out->Print("__metaclass__ = abc.ABCMeta\n");
for (int i = 0; i < service->method_count(); ++i) {
const MethodDescriptor* meth = service->method(i);
- std::string arg_name = meth->client_streaming() ?
+ grpc::string arg_name = meth->client_streaming() ?
"request_iterator" : "request";
auto methdict = ListToDict({"Method", meth->name(), "ArgName", arg_name});
out->Print("@abc.abstractmethod\n");
@@ -196,29 +235,29 @@ bool PrintStub(const ServiceDescriptor* service,
// TODO(protobuf team): Export `ModuleName` from protobuf's
// `src/google/protobuf/compiler/python/python_generator.cc` file.
-std::string ModuleName(const std::string& filename) {
- std::string basename = StripProto(filename);
+grpc::string ModuleName(const grpc::string& filename) {
+ grpc::string basename = StripProto(filename);
basename = StringReplace(basename, "-", "_");
basename = StringReplace(basename, "/", ".");
return basename + "_pb2";
}
bool GetModuleAndMessagePath(const Descriptor* type,
- pair<std::string, std::string>* out) {
+ pair<grpc::string, grpc::string>* out) {
const Descriptor* path_elem_type = type;
vector<const Descriptor*> message_path;
do {
message_path.push_back(path_elem_type);
path_elem_type = path_elem_type->containing_type();
} while (path_elem_type != nullptr);
- std::string file_name = type->file()->name();
+ grpc::string file_name = type->file()->name();
static const int proto_suffix_length = strlen(".proto");
if (!(file_name.size() > static_cast<size_t>(proto_suffix_length) &&
file_name.find_last_of(".proto") == file_name.size() - 1)) {
return false;
}
- std::string module = ModuleName(file_name);
- std::string message_type;
+ grpc::string module = ModuleName(file_name);
+ grpc::string message_type;
for (auto path_iter = message_path.rbegin();
path_iter != message_path.rend(); ++path_iter) {
message_type += (*path_iter)->name() + ".";
@@ -229,28 +268,30 @@ bool GetModuleAndMessagePath(const Descriptor* type,
return true;
}
-bool PrintServerFactory(const std::string& package_qualified_service_name,
+bool PrintServerFactory(const grpc::string& package_qualified_service_name,
const ServiceDescriptor* service, Printer* out) {
out->Print("def early_adopter_create_$Service$_server(servicer, port, "
"root_certificates, key_chain_pairs):\n",
"Service", service->name());
{
IndentScope raii_create_server_indent(out);
- map<std::string, std::string> method_description_constructors;
- map<std::string, pair<std::string, std::string>> input_message_modules_and_classes;
- map<std::string, pair<std::string, std::string>> output_message_modules_and_classes;
+ map<grpc::string, grpc::string> method_description_constructors;
+ map<grpc::string, pair<grpc::string, grpc::string>>
+ input_message_modules_and_classes;
+ map<grpc::string, pair<grpc::string, grpc::string>>
+ output_message_modules_and_classes;
for (int i = 0; i < service->method_count(); ++i) {
const MethodDescriptor* method = service->method(i);
- const std::string method_description_constructor =
- std::string(method->client_streaming() ? "stream_" : "unary_") +
- std::string(method->server_streaming() ? "stream_" : "unary_") +
+ const grpc::string method_description_constructor =
+ grpc::string(method->client_streaming() ? "stream_" : "unary_") +
+ grpc::string(method->server_streaming() ? "stream_" : "unary_") +
"service_description";
- pair<std::string, std::string> input_message_module_and_class;
+ pair<grpc::string, grpc::string> input_message_module_and_class;
if (!GetModuleAndMessagePath(method->input_type(),
&input_message_module_and_class)) {
return false;
}
- pair<std::string, std::string> output_message_module_and_class;
+ pair<grpc::string, grpc::string> output_message_module_and_class;
if (!GetModuleAndMessagePath(method->output_type(),
&output_message_module_and_class)) {
return false;
@@ -268,17 +309,20 @@ bool PrintServerFactory(const std::string& package_qualified_service_name,
make_pair(method->name(), output_message_module_and_class));
}
out->Print("method_service_descriptions = {\n");
- for (auto& name_and_description_constructor :
- method_description_constructors) {
+ for (auto name_and_description_constructor =
+ method_description_constructors.begin();
+ name_and_description_constructor !=
+ method_description_constructors.end();
+ name_and_description_constructor++) {
IndentScope raii_descriptions_indent(out);
- const std::string method_name = name_and_description_constructor.first;
+ const grpc::string method_name = name_and_description_constructor->first;
auto input_message_module_and_class =
input_message_modules_and_classes.find(method_name);
auto output_message_module_and_class =
output_message_modules_and_classes.find(method_name);
out->Print("\"$Method$\": utilities.$Constructor$(\n", "Method",
method_name, "Constructor",
- name_and_description_constructor.second);
+ name_and_description_constructor->second);
{
IndentScope raii_description_arguments_indent(out);
out->Print("servicer.$Method$,\n", "Method", method_name);
@@ -304,29 +348,31 @@ bool PrintServerFactory(const std::string& package_qualified_service_name,
return true;
}
-bool PrintStubFactory(const std::string& package_qualified_service_name,
+bool PrintStubFactory(const grpc::string& package_qualified_service_name,
const ServiceDescriptor* service, Printer* out) {
- map<std::string, std::string> dict = ListToDict({
+ map<grpc::string, grpc::string> dict = ListToDict({
"Service", service->name(),
});
out->Print(dict, "def early_adopter_create_$Service$_stub(host, port):\n");
{
IndentScope raii_create_server_indent(out);
- map<std::string, std::string> method_description_constructors;
- map<std::string, pair<std::string, std::string>> input_message_modules_and_classes;
- map<std::string, pair<std::string, std::string>> output_message_modules_and_classes;
+ map<grpc::string, grpc::string> method_description_constructors;
+ map<grpc::string, pair<grpc::string, grpc::string>>
+ input_message_modules_and_classes;
+ map<grpc::string, pair<grpc::string, grpc::string>>
+ output_message_modules_and_classes;
for (int i = 0; i < service->method_count(); ++i) {
const MethodDescriptor* method = service->method(i);
- const std::string method_description_constructor =
- std::string(method->client_streaming() ? "stream_" : "unary_") +
- std::string(method->server_streaming() ? "stream_" : "unary_") +
+ const grpc::string method_description_constructor =
+ grpc::string(method->client_streaming() ? "stream_" : "unary_") +
+ grpc::string(method->server_streaming() ? "stream_" : "unary_") +
"invocation_description";
- pair<std::string, std::string> input_message_module_and_class;
+ pair<grpc::string, grpc::string> input_message_module_and_class;
if (!GetModuleAndMessagePath(method->input_type(),
&input_message_module_and_class)) {
return false;
}
- pair<std::string, std::string> output_message_module_and_class;
+ pair<grpc::string, grpc::string> output_message_module_and_class;
if (!GetModuleAndMessagePath(method->output_type(),
&output_message_module_and_class)) {
return false;
@@ -344,17 +390,20 @@ bool PrintStubFactory(const std::string& package_qualified_service_name,
make_pair(method->name(), output_message_module_and_class));
}
out->Print("method_invocation_descriptions = {\n");
- for (auto& name_and_description_constructor :
- method_description_constructors) {
+ for (auto name_and_description_constructor =
+ method_description_constructors.begin();
+ name_and_description_constructor !=
+ method_description_constructors.end();
+ name_and_description_constructor++) {
IndentScope raii_descriptions_indent(out);
- const std::string method_name = name_and_description_constructor.first;
+ const grpc::string method_name = name_and_description_constructor->first;
auto input_message_module_and_class =
input_message_modules_and_classes.find(method_name);
auto output_message_module_and_class =
output_message_modules_and_classes.find(method_name);
out->Print("\"$Method$\": utilities.$Constructor$(\n", "Method",
method_name, "Constructor",
- name_and_description_constructor.second);
+ name_and_description_constructor->second);
{
IndentScope raii_description_arguments_indent(out);
out->Print(
@@ -378,22 +427,25 @@ bool PrintStubFactory(const std::string& package_qualified_service_name,
return true;
}
-bool PrintPreamble(const FileDescriptor* file, Printer* out) {
+bool PrintPreamble(const FileDescriptor* file,
+ const GeneratorConfiguration& config, Printer* out) {
out->Print("import abc\n");
- out->Print("from grpc.early_adopter import implementations\n");
+ out->Print("from $Package$ import implementations\n",
+ "Package", config.implementations_package_root);
out->Print("from grpc.framework.alpha import utilities\n");
return true;
}
} // namespace
-pair<bool, std::string> GetServices(const FileDescriptor* file) {
- std::string output;
+pair<bool, grpc::string> GetServices(const FileDescriptor* file,
+ const GeneratorConfiguration& config) {
+ grpc::string output;
{
// Scope the output stream so it closes and finalizes output to the string.
StringOutputStream output_stream(&output);
Printer out(&output_stream, '$');
- if (!PrintPreamble(file, &out)) {
+ if (!PrintPreamble(file, config, &out)) {
return make_pair(false, "");
}
auto package = file->package();
diff --git a/src/compiler/python_generator.h b/src/compiler/python_generator.h
index df29ca17e3..b47f3c1243 100644
--- a/src/compiler/python_generator.h
+++ b/src/compiler/python_generator.h
@@ -34,18 +34,34 @@
#ifndef GRPC_INTERNAL_COMPILER_PYTHON_GENERATOR_H
#define GRPC_INTERNAL_COMPILER_PYTHON_GENERATOR_H
-#include <string>
#include <utility>
-namespace google {
-namespace protobuf {
-class FileDescriptor;
-} // namespace protobuf
-} // namespace google
+#include "src/compiler/config.h"
namespace grpc_python_generator {
-std::pair<bool, std::string> GetServices(const google::protobuf::FileDescriptor* file);
+// Data pertaining to configuration of the generator with respect to anything
+// that may be used internally at Google.
+struct GeneratorConfiguration {
+ grpc::string implementations_package_root;
+};
+
+class PythonGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator {
+ public:
+ PythonGrpcGenerator(const GeneratorConfiguration& config);
+ ~PythonGrpcGenerator();
+
+ bool Generate(const grpc::protobuf::FileDescriptor* file,
+ const grpc::string& parameter,
+ grpc::protobuf::compiler::GeneratorContext* context,
+ grpc::string* error) const;
+ private:
+ GeneratorConfiguration config_;
+};
+
+std::pair<bool, grpc::string> GetServices(
+ const grpc::protobuf::FileDescriptor* file,
+ const GeneratorConfiguration& config);
} // namespace grpc_python_generator
diff --git a/src/compiler/python_plugin.cc b/src/compiler/python_plugin.cc
index e7d172f7bf..d1f49442da 100644
--- a/src/compiler/python_plugin.cc
+++ b/src/compiler/python_plugin.cc
@@ -33,60 +33,12 @@
// Generates a Python gRPC service interface out of Protobuf IDL.
-#include <cstring>
-#include <memory>
-#include <string>
-#include <tuple>
-
+#include "src/compiler/config.h"
#include "src/compiler/python_generator.h"
-#include <google/protobuf/compiler/code_generator.h>
-#include <google/protobuf/compiler/plugin.h>
-#include <google/protobuf/io/coded_stream.h>
-#include <google/protobuf/io/zero_copy_stream.h>
-#include <google/protobuf/descriptor.h>
-
-using google::protobuf::FileDescriptor;
-using google::protobuf::compiler::CodeGenerator;
-using google::protobuf::compiler::GeneratorContext;
-using google::protobuf::compiler::PluginMain;
-using google::protobuf::io::CodedOutputStream;
-using google::protobuf::io::ZeroCopyOutputStream;
-
-class PythonGrpcGenerator : public CodeGenerator {
- public:
- PythonGrpcGenerator() {}
- ~PythonGrpcGenerator() {}
-
- bool Generate(const FileDescriptor* file, const std::string& parameter,
- GeneratorContext* context, std::string* error) const {
- // Get output file name.
- std::string file_name;
- static const int proto_suffix_length = strlen(".proto");
- if (file->name().size() > static_cast<size_t>(proto_suffix_length) &&
- file->name().find_last_of(".proto") == file->name().size() - 1) {
- file_name = file->name().substr(
- 0, file->name().size() - proto_suffix_length) + "_pb2.py";
- } else {
- *error = "Invalid proto file name. Proto file must end with .proto";
- return false;
- }
-
- std::unique_ptr<ZeroCopyOutputStream> output(
- context->OpenForInsert(file_name, "module_scope"));
- CodedOutputStream coded_out(output.get());
- bool success = false;
- std::string code = "";
- tie(success, code) = grpc_python_generator::GetServices(file);
- if (success) {
- coded_out.WriteRaw(code.data(), code.size());
- return true;
- } else {
- return false;
- }
- }
-};
int main(int argc, char* argv[]) {
- PythonGrpcGenerator generator;
- return PluginMain(argc, argv, &generator);
+ grpc_python_generator::GeneratorConfiguration config;
+ config.implementations_package_root = "grpc.early_adopter";
+ grpc_python_generator::PythonGrpcGenerator generator(config);
+ return grpc::protobuf::compiler::PluginMain(argc, argv, &generator);
}
diff --git a/src/compiler/ruby_generator.cc b/src/compiler/ruby_generator.cc
index 32b6a8d8e4..a0bb92848b 100644
--- a/src/compiler/ruby_generator.cc
+++ b/src/compiler/ruby_generator.cc
@@ -32,24 +32,20 @@
*/
#include <cctype>
-#include <string>
#include <map>
#include <vector>
+#include "src/compiler/config.h"
#include "src/compiler/ruby_generator.h"
#include "src/compiler/ruby_generator_helpers-inl.h"
#include "src/compiler/ruby_generator_map-inl.h"
#include "src/compiler/ruby_generator_string-inl.h"
-#include <google/protobuf/io/printer.h>
-#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
-#include <google/protobuf/descriptor.pb.h>
-#include <google/protobuf/descriptor.h>
-
-using google::protobuf::FileDescriptor;
-using google::protobuf::ServiceDescriptor;
-using google::protobuf::MethodDescriptor;
-using google::protobuf::io::Printer;
-using google::protobuf::io::StringOutputStream;
+
+using grpc::protobuf::FileDescriptor;
+using grpc::protobuf::ServiceDescriptor;
+using grpc::protobuf::MethodDescriptor;
+using grpc::protobuf::io::Printer;
+using grpc::protobuf::io::StringOutputStream;
using std::map;
using std::vector;
@@ -57,38 +53,38 @@ namespace grpc_ruby_generator {
namespace {
// Prints out the method using the ruby gRPC DSL.
-void PrintMethod(const MethodDescriptor *method, const std::string &package,
+void PrintMethod(const MethodDescriptor *method, const grpc::string &package,
Printer *out) {
- std::string input_type = RubyTypeOf(method->input_type()->name(), package);
+ grpc::string input_type = RubyTypeOf(method->input_type()->name(), package);
if (method->client_streaming()) {
input_type = "stream(" + input_type + ")";
}
- std::string output_type = RubyTypeOf(method->output_type()->name(), package);
+ grpc::string output_type = RubyTypeOf(method->output_type()->name(), package);
if (method->server_streaming()) {
output_type = "stream(" + output_type + ")";
}
- std::map<std::string, std::string> method_vars =
+ std::map<grpc::string, grpc::string> method_vars =
ListToDict({"mth.name", method->name(), "input.type", input_type,
"output.type", output_type, });
out->Print(method_vars, "rpc :$mth.name$, $input.type$, $output.type$\n");
}
// Prints out the service using the ruby gRPC DSL.
-void PrintService(const ServiceDescriptor *service, const std::string &package,
+void PrintService(const ServiceDescriptor *service, const grpc::string &package,
Printer *out) {
if (service->method_count() == 0) {
return;
}
// Begin the service module
- std::map<std::string, std::string> module_vars =
+ std::map<grpc::string, grpc::string> module_vars =
ListToDict({"module.name", CapitalizeFirst(service->name()), });
out->Print(module_vars, "module $module.name$\n");
out->Indent();
// TODO(temiola): add documentation
- std::string doc = "TODO: add proto service documentation here";
- std::map<std::string, std::string> template_vars =
+ grpc::string doc = "TODO: add proto service documentation here";
+ std::map<grpc::string, grpc::string> template_vars =
ListToDict({"Documentation", doc, });
out->Print("\n");
out->Print(template_vars, "# $Documentation$\n");
@@ -101,7 +97,7 @@ void PrintService(const ServiceDescriptor *service, const std::string &package,
out->Print("\n");
out->Print("self.marshal_class_method = :encode\n");
out->Print("self.unmarshal_class_method = :decode\n");
- std::map<std::string, std::string> pkg_vars =
+ std::map<grpc::string, grpc::string> pkg_vars =
ListToDict({"service.name", service->name(), "pkg.name", package, });
out->Print(pkg_vars, "self.service_name = '$pkg.name$.$service.name$'\n");
out->Print("\n");
@@ -121,8 +117,8 @@ void PrintService(const ServiceDescriptor *service, const std::string &package,
} // namespace
-std::string GetServices(const FileDescriptor *file) {
- std::string output;
+grpc::string GetServices(const FileDescriptor *file) {
+ grpc::string output;
StringOutputStream output_stream(&output);
Printer out(&output_stream, '$');
@@ -133,7 +129,7 @@ std::string GetServices(const FileDescriptor *file) {
}
// Write out a file header.
- std::map<std::string, std::string> header_comment_vars = ListToDict(
+ std::map<grpc::string, grpc::string> header_comment_vars = ListToDict(
{"file.name", file->name(), "file.package", file->package(), });
out.Print("# Generated by the protocol buffer compiler. DO NOT EDIT!\n");
out.Print(header_comment_vars,
@@ -144,15 +140,15 @@ std::string GetServices(const FileDescriptor *file) {
// Write out require statemment to import the separately generated file
// that defines the messages used by the service. This is generated by the
// main ruby plugin.
- std::map<std::string, std::string> dep_vars =
+ std::map<grpc::string, grpc::string> dep_vars =
ListToDict({"dep.name", MessagesRequireName(file), });
out.Print(dep_vars, "require '$dep.name$'\n");
// Write out services within the modules
out.Print("\n");
- std::vector<std::string> modules = Split(file->package(), '.');
+ std::vector<grpc::string> modules = Split(file->package(), '.');
for (size_t i = 0; i < modules.size(); ++i) {
- std::map<std::string, std::string> module_vars =
+ std::map<grpc::string, grpc::string> module_vars =
ListToDict({"module.name", CapitalizeFirst(modules[i]), });
out.Print(module_vars, "module $module.name$\n");
out.Indent();
diff --git a/src/compiler/ruby_generator.h b/src/compiler/ruby_generator.h
index 4dd38e0c27..a2ab36d4d9 100644
--- a/src/compiler/ruby_generator.h
+++ b/src/compiler/ruby_generator.h
@@ -34,17 +34,11 @@
#ifndef GRPC_INTERNAL_COMPILER_RUBY_GENERATOR_H
#define GRPC_INTERNAL_COMPILER_RUBY_GENERATOR_H
-#include <string>
-
-namespace google {
-namespace protobuf {
-class FileDescriptor;
-} // namespace protobuf
-} // namespace google
+#include "src/compiler/config.h"
namespace grpc_ruby_generator {
-std::string GetServices(const google::protobuf::FileDescriptor *file);
+grpc::string GetServices(const grpc::protobuf::FileDescriptor *file);
} // namespace grpc_ruby_generator
diff --git a/src/compiler/ruby_generator_helpers-inl.h b/src/compiler/ruby_generator_helpers-inl.h
index f3a087b3f8..9da7cab3c7 100644
--- a/src/compiler/ruby_generator_helpers-inl.h
+++ b/src/compiler/ruby_generator_helpers-inl.h
@@ -34,15 +34,13 @@
#ifndef GRPC_INTERNAL_COMPILER_RUBY_GENERATOR_HELPERS_INL_H
#define GRPC_INTERNAL_COMPILER_RUBY_GENERATOR_HELPERS_INL_H
-#include <string>
-
-#include <google/protobuf/descriptor.h>
+#include "src/compiler/config.h"
#include "src/compiler/ruby_generator_string-inl.h"
namespace grpc_ruby_generator {
-inline bool ServicesFilename(const google::protobuf::FileDescriptor *file,
- std::string *file_name_or_error) {
+inline bool ServicesFilename(const grpc::protobuf::FileDescriptor *file,
+ grpc::string *file_name_or_error) {
// Get output file name.
static const unsigned proto_suffix_length = 6; // length of ".proto"
if (file->name().size() > proto_suffix_length &&
@@ -57,8 +55,8 @@ inline bool ServicesFilename(const google::protobuf::FileDescriptor *file,
}
}
-inline std::string MessagesRequireName(
- const google::protobuf::FileDescriptor *file) {
+inline grpc::string MessagesRequireName(
+ const grpc::protobuf::FileDescriptor *file) {
return Replace(file->name(), ".proto", "");
}
diff --git a/src/compiler/ruby_generator_map-inl.h b/src/compiler/ruby_generator_map-inl.h
index f902b6d98f..6b87774f21 100644
--- a/src/compiler/ruby_generator_map-inl.h
+++ b/src/compiler/ruby_generator_map-inl.h
@@ -34,11 +34,12 @@
#ifndef GRPC_INTERNAL_COMPILER_RUBY_GENERATOR_MAP_INL_H
#define GRPC_INTERNAL_COMPILER_RUBY_GENERATOR_MAP_INL_H
+#include "src/compiler/config.h"
+
#include <iostream>
#include <initializer_list>
#include <map>
#include <ostream> // NOLINT
-#include <string>
#include <vector>
using std::initializer_list;
@@ -49,18 +50,18 @@ namespace grpc_ruby_generator {
// Converts an initializer list of the form { key0, value0, key1, value1, ... }
// into a map of key* to value*. Is merely a readability helper for later code.
-inline std::map<std::string, std::string> ListToDict(
- const initializer_list<std::string> &values) {
+inline std::map<grpc::string, grpc::string> ListToDict(
+ const initializer_list<grpc::string> &values) {
if (values.size() % 2 != 0) {
std::cerr << "Not every 'key' has a value in `values`."
<< std::endl;
}
- std::map<std::string, std::string> value_map;
+ std::map<grpc::string, grpc::string> value_map;
auto value_iter = values.begin();
for (unsigned i = 0; i < values.size() / 2; ++i) {
- std::string key = *value_iter;
+ grpc::string key = *value_iter;
++value_iter;
- std::string value = *value_iter;
+ grpc::string value = *value_iter;
value_map[key] = value;
++value_iter;
}
diff --git a/src/compiler/ruby_generator_string-inl.h b/src/compiler/ruby_generator_string-inl.h
index bdd314c16e..8da3a88da2 100644
--- a/src/compiler/ruby_generator_string-inl.h
+++ b/src/compiler/ruby_generator_string-inl.h
@@ -34,8 +34,9 @@
#ifndef GRPC_INTERNAL_COMPILER_RUBY_GENERATOR_STRING_INL_H
#define GRPC_INTERNAL_COMPILER_RUBY_GENERATOR_STRING_INL_H
+#include "src/compiler/config.h"
+
#include <algorithm>
-#include <string>
#include <sstream>
#include <vector>
@@ -45,10 +46,10 @@ using std::transform;
namespace grpc_ruby_generator {
// Split splits a string using char into elems.
-inline std::vector<std::string> &Split(const std::string &s, char delim,
- std::vector<std::string> *elems) {
+inline std::vector<grpc::string> &Split(const grpc::string &s, char delim,
+ std::vector<grpc::string> *elems) {
std::stringstream ss(s);
- std::string item;
+ grpc::string item;
while (getline(ss, item, delim)) {
elems->push_back(item);
}
@@ -56,17 +57,17 @@ inline std::vector<std::string> &Split(const std::string &s, char delim,
}
// Split splits a string using char, returning the result in a vector.
-inline std::vector<std::string> Split(const std::string &s, char delim) {
- std::vector<std::string> elems;
+inline std::vector<grpc::string> Split(const grpc::string &s, char delim) {
+ std::vector<grpc::string> elems;
Split(s, delim, &elems);
return elems;
}
// Replace replaces from with to in s.
-inline std::string Replace(std::string s, const std::string &from,
- const std::string &to) {
+inline grpc::string Replace(grpc::string s, const grpc::string &from,
+ const grpc::string &to) {
size_t start_pos = s.find(from);
- if (start_pos == std::string::npos) {
+ if (start_pos == grpc::string::npos) {
return s;
}
s.replace(start_pos, from.length(), to);
@@ -74,10 +75,10 @@ inline std::string Replace(std::string s, const std::string &from,
}
// ReplaceAll replaces all instances of search with replace in s.
-inline std::string ReplaceAll(std::string s, const std::string &search,
- const std::string &replace) {
+inline grpc::string ReplaceAll(grpc::string s, const grpc::string &search,
+ const grpc::string &replace) {
size_t pos = 0;
- while ((pos = s.find(search, pos)) != std::string::npos) {
+ while ((pos = s.find(search, pos)) != grpc::string::npos) {
s.replace(pos, search.length(), replace);
pos += replace.length();
}
@@ -85,10 +86,10 @@ inline std::string ReplaceAll(std::string s, const std::string &search,
}
// ReplacePrefix replaces from with to in s if search is a prefix of s.
-inline bool ReplacePrefix(std::string *s, const std::string &from,
- const std::string &to) {
+inline bool ReplacePrefix(grpc::string *s, const grpc::string &from,
+ const grpc::string &to) {
size_t start_pos = s->find(from);
- if (start_pos == std::string::npos || start_pos != 0) {
+ if (start_pos == grpc::string::npos || start_pos != 0) {
return false;
}
s->replace(start_pos, from.length(), to);
@@ -96,7 +97,7 @@ inline bool ReplacePrefix(std::string *s, const std::string &from,
}
// CapitalizeFirst capitalizes the first char in a string.
-inline std::string CapitalizeFirst(std::string s) {
+inline grpc::string CapitalizeFirst(grpc::string s) {
if (s.empty()) {
return s;
}
@@ -105,15 +106,15 @@ inline std::string CapitalizeFirst(std::string s) {
}
// RubyTypeOf updates a proto type to the required ruby equivalent.
-inline std::string RubyTypeOf(const std::string &a_type,
- const std::string &package) {
- std::string res(a_type);
+inline grpc::string RubyTypeOf(const grpc::string &a_type,
+ const grpc::string &package) {
+ grpc::string res(a_type);
ReplacePrefix(&res, package, ""); // remove the leading package if present
ReplacePrefix(&res, ".", ""); // remove the leading . (no package)
- if (res.find('.') == std::string::npos) {
+ if (res.find('.') == grpc::string::npos) {
return res;
} else {
- std::vector<std::string> prefixes_and_type = Split(res, '.');
+ std::vector<grpc::string> prefixes_and_type = Split(res, '.');
for (unsigned int i = 0; i < prefixes_and_type.size(); ++i) {
if (i != 0) {
res += "::"; // switch '.' to the ruby module delim
diff --git a/src/compiler/ruby_plugin.cc b/src/compiler/ruby_plugin.cc
index 4a6e9f7a5d..bd10d46e9c 100644
--- a/src/compiler/ruby_plugin.cc
+++ b/src/compiler/ruby_plugin.cc
@@ -32,43 +32,35 @@
*/
// Generates Ruby gRPC service interface out of Protobuf IDL.
-//
-// This is a Proto2 compiler plugin. See net/proto2/compiler/proto/plugin.proto
-// and net/proto2/compiler/public/plugin.h for more information on plugins.
#include <memory>
-#include <string>
+#include "src/compiler/config.h"
#include "src/compiler/ruby_generator.h"
#include "src/compiler/ruby_generator_helpers-inl.h"
-#include <google/protobuf/compiler/code_generator.h>
-#include <google/protobuf/compiler/plugin.h>
-#include <google/protobuf/io/coded_stream.h>
-#include <google/protobuf/io/zero_copy_stream.h>
-#include <google/protobuf/descriptor.h>
-class RubyGrpcGenerator : public google::protobuf::compiler::CodeGenerator {
+class RubyGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator {
public:
RubyGrpcGenerator() {}
~RubyGrpcGenerator() {}
- bool Generate(const google::protobuf::FileDescriptor *file,
- const std::string &parameter,
- google::protobuf::compiler::GeneratorContext *context,
- std::string *error) const {
- std::string code = grpc_ruby_generator::GetServices(file);
+ bool Generate(const grpc::protobuf::FileDescriptor *file,
+ const grpc::string &parameter,
+ grpc::protobuf::compiler::GeneratorContext *context,
+ grpc::string *error) const {
+ grpc::string code = grpc_ruby_generator::GetServices(file);
if (code.size() == 0) {
return true; // don't generate a file if there are no services
}
// Get output file name.
- std::string file_name;
+ grpc::string file_name;
if (!grpc_ruby_generator::ServicesFilename(file, &file_name)) {
return false;
}
- std::unique_ptr<google::protobuf::io::ZeroCopyOutputStream> output(
+ std::unique_ptr<grpc::protobuf::io::ZeroCopyOutputStream> output(
context->Open(file_name));
- google::protobuf::io::CodedOutputStream coded_out(output.get());
+ grpc::protobuf::io::CodedOutputStream coded_out(output.get());
coded_out.WriteRaw(code.data(), code.size());
return true;
}
@@ -76,5 +68,5 @@ class RubyGrpcGenerator : public google::protobuf::compiler::CodeGenerator {
int main(int argc, char *argv[]) {
RubyGrpcGenerator generator;
- return google::protobuf::compiler::PluginMain(argc, argv, &generator);
+ return grpc::protobuf::compiler::PluginMain(argc, argv, &generator);
}
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index f565cbf3ae..9da8b333ca 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -189,7 +189,8 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
/* translate host to :authority since :authority may be
omitted */
grpc_mdelem *authority = grpc_mdelem_from_metadata_strings(
- channeld->mdctx, channeld->authority_key, op->data.metadata->value);
+ channeld->mdctx, grpc_mdstr_ref(channeld->authority_key),
+ grpc_mdstr_ref(op->data.metadata->value));
grpc_mdelem_unref(op->data.metadata);
op->data.metadata = authority;
/* pass the event up */
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c
index 8b019e8049..aec626509a 100644
--- a/src/core/iomgr/iocp_windows.c
+++ b/src/core/iomgr/iocp_windows.c
@@ -52,10 +52,11 @@ static OVERLAPPED g_iocp_custom_overlap;
static gpr_event g_shutdown_iocp;
static gpr_event g_iocp_done;
+static gpr_atm g_orphans = 0;
static HANDLE g_iocp;
-static int do_iocp_work() {
+static void do_iocp_work() {
BOOL success;
DWORD bytes = 0;
DWORD flags = 0;
@@ -71,14 +72,14 @@ static int do_iocp_work() {
gpr_time_to_millis(wait_time));
if (!success && !overlapped) {
/* The deadline got attained. */
- return 0;
+ return;
}
GPR_ASSERT(completion_key && overlapped);
if (overlapped == &g_iocp_custom_overlap) {
if (completion_key == (ULONG_PTR) &g_iocp_kick_token) {
/* We were awoken from a kick. */
gpr_log(GPR_DEBUG, "do_iocp_work - got a kick");
- return 1;
+ return;
}
gpr_log(GPR_ERROR, "Unknown custom completion key.");
abort();
@@ -97,8 +98,13 @@ static int do_iocp_work() {
}
success = WSAGetOverlappedResult(socket->socket, &info->overlapped, &bytes,
FALSE, &flags);
- gpr_log(GPR_DEBUG, "bytes: %u, flags: %u - op %s", bytes, flags,
- success ? "succeeded" : "failed");
+ gpr_log(GPR_DEBUG, "bytes: %u, flags: %u - op %s %s", bytes, flags,
+ success ? "succeeded" : "failed", socket->orphan ? "orphan" : "");
+ if (socket->orphan) {
+ grpc_winsocket_destroy(socket);
+ gpr_atm_full_fetch_add(&g_orphans, -1);
+ return;
+ }
info->bytes_transfered = bytes;
info->wsa_error = success ? 0 : WSAGetLastError();
GPR_ASSERT(overlapped == &info->overlapped);
@@ -113,12 +119,10 @@ static int do_iocp_work() {
}
gpr_mu_unlock(&socket->state_mu);
if (f) f(opaque, 1);
-
- return 1;
}
static void iocp_loop(void *p) {
- while (!gpr_event_get(&g_shutdown_iocp)) {
+ while (gpr_atm_acq_load(&g_orphans) || !gpr_event_get(&g_shutdown_iocp)) {
grpc_maybe_call_delayed_callbacks(NULL, 1);
do_iocp_work();
}
@@ -138,13 +142,19 @@ void grpc_iocp_init(void) {
gpr_thd_new(&id, iocp_loop, NULL, NULL);
}
-void grpc_iocp_shutdown(void) {
+void grpc_iocp_kick(void) {
BOOL success;
- gpr_event_set(&g_shutdown_iocp, (void *)1);
+
success = PostQueuedCompletionStatus(g_iocp, 0,
(ULONG_PTR) &g_iocp_kick_token,
&g_iocp_custom_overlap);
GPR_ASSERT(success);
+}
+
+void grpc_iocp_shutdown(void) {
+ BOOL success;
+ gpr_event_set(&g_shutdown_iocp, (void *)1);
+ grpc_iocp_kick();
gpr_event_wait(&g_iocp_done, gpr_inf_future);
success = CloseHandle(g_iocp);
GPR_ASSERT(success);
@@ -166,6 +176,10 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) {
GPR_ASSERT(ret == g_iocp);
}
+void grpc_iocp_socket_orphan(grpc_winsocket *socket) {
+ gpr_atm_full_fetch_add(&g_orphans, 1);
+}
+
static void socket_notify_on_iocp(grpc_winsocket *socket,
void(*cb)(void *, int), void *opaque,
grpc_winsocket_callback_info *info) {
diff --git a/src/core/iomgr/iocp_windows.h b/src/core/iomgr/iocp_windows.h
index 33133193a1..fa3f5eee10 100644
--- a/src/core/iomgr/iocp_windows.h
+++ b/src/core/iomgr/iocp_windows.h
@@ -42,6 +42,7 @@
void grpc_iocp_init(void);
void grpc_iocp_shutdown(void);
void grpc_iocp_add_socket(grpc_winsocket *);
+void grpc_iocp_socket_orphan(grpc_winsocket *);
void grpc_socket_notify_on_write(grpc_winsocket *, void(*cb)(void *, int success),
void *opaque);
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index 058685b295..d0e6706fbd 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -117,7 +117,16 @@ void grpc_iomgr_shutdown(void) {
gpr_mu_lock(&g_mu);
}
if (g_refs) {
- if (gpr_cv_wait(&g_rcv, &g_mu, shutdown_deadline) && g_cbs_head == NULL) {
+ int timeout = 0;
+ gpr_timespec short_deadline = gpr_time_add(gpr_now(),
+ gpr_time_from_millis(100));
+ while (gpr_cv_wait(&g_rcv, &g_mu, short_deadline) && g_cbs_head == NULL) {
+ if (gpr_time_cmp(gpr_now(), shutdown_deadline) > 0) {
+ timeout = 1;
+ break;
+ }
+ }
+ if (timeout) {
gpr_log(GPR_DEBUG,
"Failed to free %d iomgr objects before shutdown deadline: "
"memory leaks are likely",
diff --git a/src/core/iomgr/sockaddr_win32.h b/src/core/iomgr/sockaddr_win32.h
index 3a5f27bb34..c0385ea614 100644
--- a/src/core/iomgr/sockaddr_win32.h
+++ b/src/core/iomgr/sockaddr_win32.h
@@ -38,4 +38,9 @@
#include <winsock2.h>
#include <mswsock.h>
+#ifdef __MINGW32__
+/* mingw seems to be missing that definition. */
+const char *inet_ntop(int af, const void *src, char *dst, socklen_t size);
+#endif
+
#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_WIN32_H */
diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c
index 99f38b0e03..22dad41783 100644
--- a/src/core/iomgr/socket_windows.c
+++ b/src/core/iomgr/socket_windows.c
@@ -55,7 +55,7 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket) {
return r;
}
-void shutdown_op(grpc_winsocket_callback_info *info) {
+static void shutdown_op(grpc_winsocket_callback_info *info) {
if (!info->cb) return;
grpc_iomgr_add_delayed_callback(info->cb, info->opaque, 0);
}
@@ -68,8 +68,13 @@ void grpc_winsocket_shutdown(grpc_winsocket *socket) {
void grpc_winsocket_orphan(grpc_winsocket *socket) {
gpr_log(GPR_DEBUG, "grpc_winsocket_orphan");
+ grpc_iocp_socket_orphan(socket);
+ socket->orphan = 1;
grpc_iomgr_unref();
closesocket(socket->socket);
+}
+
+void grpc_winsocket_destroy(grpc_winsocket *socket) {
gpr_mu_destroy(&socket->state_mu);
gpr_free(socket);
}
diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h
index d4776ab10f..cbae91692c 100644
--- a/src/core/iomgr/socket_windows.h
+++ b/src/core/iomgr/socket_windows.h
@@ -57,12 +57,13 @@ typedef struct grpc_winsocket_callback_info {
typedef struct grpc_winsocket {
SOCKET socket;
- int added_to_iocp;
-
grpc_winsocket_callback_info write_info;
grpc_winsocket_callback_info read_info;
gpr_mu state_mu;
+
+ int added_to_iocp;
+ int orphan;
} grpc_winsocket;
/* Create a wrapped windows handle.
@@ -71,5 +72,6 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket);
void grpc_winsocket_shutdown(grpc_winsocket *socket);
void grpc_winsocket_orphan(grpc_winsocket *socket);
+void grpc_winsocket_destroy(grpc_winsocket *socket);
#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKET_WINDOWS_H */
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index 59319da26d..0c3ab1dc91 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -53,9 +53,6 @@
#define INIT_PORT_CAP 2
#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
-static gpr_once s_init_max_accept_queue_size;
-static int s_max_accept_queue_size;
-
/* one listening port */
typedef struct server_port {
gpr_uint8 addresses[sizeof(struct sockaddr_in6) * 2 + 32];
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index 3efd69a71b..ec5496e7ee 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -172,7 +172,7 @@ static void win_notify_on_read(grpc_endpoint *ep,
tcp->read_slice = gpr_slice_malloc(8192);
buffer.len = GPR_SLICE_LENGTH(tcp->read_slice);
- buffer.buf = GPR_SLICE_START_PTR(tcp->read_slice);
+ buffer.buf = (char *)GPR_SLICE_START_PTR(tcp->read_slice);
gpr_log(GPR_DEBUG, "win_notify_on_read: calling WSARecv without overlap");
status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags,
@@ -284,7 +284,7 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep,
for (i = 0; i < tcp->write_slices.count; i++) {
buffers[i].len = GPR_SLICE_LENGTH(tcp->write_slices.slices[i]);
- buffers[i].buf = GPR_SLICE_START_PTR(tcp->write_slices.slices[i]);
+ buffers[i].buf = (char *)GPR_SLICE_START_PTR(tcp->write_slices.slices[i]);
}
gpr_log(GPR_DEBUG, "win_write: calling WSASend without overlap");
diff --git a/src/php/ext/grpc/event.h b/src/core/support/cpu_iphone.c
index ef5846aee1..d412a6d7ee 100755..100644
--- a/src/php/ext/grpc/event.h
+++ b/src/core/support/cpu_iphone.c
@@ -31,21 +31,23 @@
*
*/
-#ifndef NET_GRPC_PHP_GRPC_EVENT_H_
-#define NET_GRPC_PHP_GRPC_EVENT_H_
+#include <grpc/support/port_platform.h>
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
+#ifdef GPR_CPU_IPHONE
-#include "php.h"
-#include "php_ini.h"
-#include "ext/standard/info.h"
-#include "php_grpc.h"
+/* Probably 2 instead of 1, but see comment on gpr_cpu_current_cpu. */
+unsigned gpr_cpu_num_cores(void) {
+ return 1;
+}
-#include "grpc/grpc.h"
+/* Most code that's using this is using it to shard across work queues. So
+ unless profiling shows it's a problem or there appears a way to detect the
+ currently running CPU core, let's have it shard the default way.
+ Note that the interface in cpu.h lets gpr_cpu_num_cores return 0, but doing
+ it makes it impossible for gpr_cpu_current_cpu to satisfy its stated range,
+ and some code might be relying on it. */
+unsigned gpr_cpu_current_cpu(void) {
+ return 0;
+}
-/* Create a new Event object that wraps an existing grpc_event struct */
-zval *grpc_php_convert_event(grpc_event *event);
-
-#endif /* NET_GRPC_PHP_GRPC_COMPLETION_CHANNEL_H */
+#endif /* GPR_CPU_IPHONE */
diff --git a/src/core/support/cpu_posix.c b/src/core/support/cpu_posix.c
index 5f45fb0bc3..99484e37fb 100644
--- a/src/core/support/cpu_posix.c
+++ b/src/core/support/cpu_posix.c
@@ -74,4 +74,4 @@ unsigned gpr_cpu_current_cpu(void) {
return shard_ptr(&magic_thread_local);
}
-#endif /* GPR_CPU_LINUX */
+#endif /* GPR_CPU_POSIX */
diff --git a/src/core/support/env_win32.c b/src/core/support/env_win32.c
index 177cc36a30..9b4cd698ad 100644
--- a/src/core/support/env_win32.c
+++ b/src/core/support/env_win32.c
@@ -36,6 +36,7 @@
#ifdef GPR_WIN32
#include "src/core/support/env.h"
+#include "src/core/support/string.h"
#include <stdlib.h>
@@ -43,14 +44,16 @@
#include <grpc/support/log.h>
char *gpr_getenv(const char *name) {
- size_t required_size;
+ size_t size;
char *result = NULL;
+ char *duplicated;
+ errno_t err;
- getenv_s(&required_size, NULL, 0, name);
- if (required_size == 0) return NULL;
- result = gpr_malloc(required_size);
- getenv_s(&required_size, result, required_size, name);
- return result;
+ err = _dupenv_s(&result, &size, name);
+ if (err) return NULL;
+ duplicated = gpr_strdup(result);
+ free(result);
+ return duplicated;
}
void gpr_setenv(const char *name, const char *value) {
diff --git a/src/core/support/file_win32.c b/src/core/support/file_win32.c
index fe209af9b2..f59d3af397 100644
--- a/src/core/support/file_win32.c
+++ b/src/core/support/file_win32.c
@@ -72,7 +72,7 @@ FILE *gpr_tmpfile(const char *prefix, char **tmp_filename_out) {
if (_tfopen_s(&result, tmp_filename, TEXT("wb+")) != 0) goto end;
end:
- if (result && tmp_filename) {
+ if (result && tmp_filename_out) {
*tmp_filename_out = gpr_tchar_to_char(tmp_filename);
}
diff --git a/src/core/support/log_win32.c b/src/core/support/log_win32.c
index 720dc141f5..159c7e052c 100644
--- a/src/core/support/log_win32.c
+++ b/src/core/support/log_win32.c
@@ -43,6 +43,7 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
+#include "src/core/support/string.h"
#include "src/core/support/string_win32.h"
void gpr_log(const char *file, int line, gpr_log_severity severity,
@@ -55,7 +56,7 @@ void gpr_log(const char *file, int line, gpr_log_severity severity,
va_start(args, format);
ret = _vscprintf(format, args);
va_end(args);
- if (!(0 <= ret && ret < ~(size_t)0)) {
+ if (ret < 0) {
message = NULL;
} else {
/* Allocate a new buffer, with space for the NUL terminator. */
@@ -66,7 +67,7 @@ void gpr_log(const char *file, int line, gpr_log_severity severity,
va_start(args, format);
ret = vsnprintf_s(message, strp_buflen, _TRUNCATE, format, args);
va_end(args);
- if (ret != strp_buflen - 1) {
+ if ((size_t)ret != strp_buflen - 1) {
/* This should never happen. */
gpr_free(message);
message = NULL;
@@ -90,7 +91,7 @@ void gpr_default_log(gpr_log_func_args *args) {
strcpy(time_buffer, "error:strftime");
}
- fprintf(stderr, "%s%s.%09u %5u %s:%d] %s\n",
+ fprintf(stderr, "%s%s.%09u %5lu %s:%d] %s\n",
gpr_log_severity_string(args->severity), time_buffer,
(int)(now.tv_nsec), GetCurrentThreadId(),
args->file, args->line, args->message);
@@ -105,6 +106,7 @@ char *gpr_format_message(DWORD messageid) {
NULL, messageid,
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
(LPTSTR)(&tmessage), 0, NULL);
+ if (status == 0) return gpr_strdup("Unable to retreive error string");
message = gpr_tchar_to_char(tmessage);
LocalFree(tmessage);
return message;
diff --git a/src/core/support/string_win32.c b/src/core/support/string_win32.c
index 583abd27d8..6d1d6337a9 100644
--- a/src/core/support/string_win32.c
+++ b/src/core/support/string_win32.c
@@ -44,6 +44,8 @@
#include <grpc/support/alloc.h>
+#include "src/core/support/string.h"
+
int gpr_asprintf(char **strp, const char *format, ...) {
va_list args;
int ret;
@@ -53,7 +55,7 @@ int gpr_asprintf(char **strp, const char *format, ...) {
va_start(args, format);
ret = _vscprintf(format, args);
va_end(args);
- if (!(0 <= ret && ret < ~(size_t)0)) {
+ if (ret < 0) {
*strp = NULL;
return -1;
}
@@ -69,7 +71,7 @@ int gpr_asprintf(char **strp, const char *format, ...) {
va_start(args, format);
ret = vsnprintf_s(*strp, strp_buflen, _TRUNCATE, format, args);
va_end(args);
- if (ret == strp_buflen - 1) {
+ if ((size_t)ret == strp_buflen - 1) {
return ret;
}
diff --git a/src/core/support/sync_win32.c b/src/core/support/sync_win32.c
index c9a977cc80..cc31d9b052 100644
--- a/src/core/support/sync_win32.c
+++ b/src/core/support/sync_win32.c
@@ -37,6 +37,7 @@
#ifdef GPR_WIN32
+#undef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#include <windows.h>
#include <grpc/support/log.h>
diff --git a/src/core/support/time_win32.c b/src/core/support/time_win32.c
index 8256849655..f221cb5790 100644
--- a/src/core/support/time_win32.c
+++ b/src/core/support/time_win32.c
@@ -42,8 +42,8 @@
gpr_timespec gpr_now(void) {
gpr_timespec now_tv;
- struct __timeb32 now_tb;
- _ftime32_s(&now_tb);
+ struct _timeb now_tb;
+ _ftime_s(&now_tb);
now_tv.tv_sec = now_tb.time;
now_tv.tv_nsec = now_tb.millitm * 1000000;
return now_tv;
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index cfce943794..dba63058b8 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -1006,6 +1006,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
const grpc_op *op;
grpc_ioreq *req;
+ GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
+
if (nops == 0) {
grpc_cq_begin_op(call->cq, call, GRPC_OP_COMPLETE);
grpc_cq_end_op_complete(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK);
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index cb81cb52c2..06434f87ac 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -119,4 +119,13 @@ grpc_call_stack *grpc_call_get_call_stack(grpc_call *call);
/* Given the top call_element, get the call object. */
grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element);
+extern int grpc_trace_batch;
+
+void grpc_call_log_batch(char *file, int line, gpr_log_severity severity,
+ grpc_call *call, const grpc_op *ops, size_t nops,
+ void *tag);
+
+#define GRPC_CALL_LOG_BATCH(sev, call, ops, nops, tag) \
+ if (grpc_trace_batch) grpc_call_log_batch(sev, call, ops, nops, tag)
+
#endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */
diff --git a/src/core/surface/call_log_batch.c b/src/core/surface/call_log_batch.c
new file mode 100644
index 0000000000..a33583a12d
--- /dev/null
+++ b/src/core/surface/call_log_batch.c
@@ -0,0 +1,121 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/surface/call.h"
+
+#include "src/core/support/string.h"
+#include <grpc/support/alloc.h>
+
+int grpc_trace_batch = 0;
+
+static void add_metadata(gpr_strvec *b, const grpc_metadata *md, size_t count) {
+ size_t i;
+ for(i = 0; i < count; i++) {
+ gpr_strvec_add(b, gpr_strdup("\nkey="));
+ gpr_strvec_add(b, gpr_strdup(md[i].key));
+
+ gpr_strvec_add(b, gpr_strdup(" value="));
+ gpr_strvec_add(b, gpr_hexdump(md[i].value, md[i].value_length,
+ GPR_HEXDUMP_PLAINTEXT));
+ }
+}
+
+char *grpc_op_string(const grpc_op *op) {
+ char *tmp;
+ char *out;
+
+ gpr_strvec b;
+ gpr_strvec_init(&b);
+
+ switch (op->op) {
+ case GRPC_OP_SEND_INITIAL_METADATA:
+ gpr_strvec_add(&b, gpr_strdup("SEND_INITIAL_METADATA"));
+ add_metadata(&b, op->data.send_initial_metadata.metadata,
+ op->data.send_initial_metadata.count);
+ break;
+ case GRPC_OP_SEND_MESSAGE:
+ gpr_asprintf(&tmp, "SEND_MESSAGE ptr=%p", op->data.send_message);
+ gpr_strvec_add(&b, tmp);
+ break;
+ case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+ gpr_strvec_add(&b, gpr_strdup("SEND_CLOSE_FROM_CLIENT"));
+ break;
+ case GRPC_OP_SEND_STATUS_FROM_SERVER:
+ gpr_asprintf(&tmp, "SEND_STATUS_FROM_SERVER status=%d details=%s",
+ op->data.send_status_from_server.status,
+ op->data.send_status_from_server.status_details);
+ gpr_strvec_add(&b, tmp);
+ add_metadata(&b, op->data.send_status_from_server.trailing_metadata,
+ op->data.send_status_from_server.trailing_metadata_count);
+ break;
+ case GRPC_OP_RECV_INITIAL_METADATA:
+ gpr_asprintf(&tmp, "RECV_INITIAL_METADATA ptr=%p",
+ op->data.recv_initial_metadata);
+ gpr_strvec_add(&b, tmp);
+ break;
+ case GRPC_OP_RECV_MESSAGE:
+ gpr_asprintf(&tmp, "RECV_MESSAGE ptr=%p", op->data.recv_message);
+ gpr_strvec_add(&b, tmp);
+ break;
+ case GRPC_OP_RECV_STATUS_ON_CLIENT:
+ gpr_asprintf(&tmp,
+ "RECV_STATUS_ON_CLIENT metadata=%p status=%p details=%p",
+ op->data.recv_status_on_client.trailing_metadata,
+ op->data.recv_status_on_client.status,
+ op->data.recv_status_on_client.status_details);
+ gpr_strvec_add(&b, tmp);
+ break;
+ case GRPC_OP_RECV_CLOSE_ON_SERVER:
+ gpr_asprintf(&tmp, "RECV_CLOSE_ON_SERVER cancelled=%p",
+ op->data.recv_close_on_server.cancelled);
+ gpr_strvec_add(&b, tmp);
+ }
+ out = gpr_strvec_flatten(&b, NULL);
+ gpr_strvec_destroy(&b);
+
+ return out;
+}
+
+void grpc_call_log_batch(char *file, int line, gpr_log_severity severity,
+ grpc_call *call, const grpc_op *ops, size_t nops,
+ void *tag) {
+ char *tmp;
+ size_t i;
+ gpr_log(file, line, severity,
+ "grpc_call_start_batch(%p, %p, %d, 0x%x)", call, ops, nops, tag);
+ for(i = 0; i < nops; i++) {
+ tmp = grpc_op_string(&ops[i]);
+ gpr_log(file, line, severity, "ops[%d]: %s", i, tmp);
+ gpr_free(tmp);
+ }
+}
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index e48c4202e5..d4f0eb40e8 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -36,6 +36,7 @@
#include "src/core/debug/trace.h"
#include "src/core/statistics/census_interface.h"
#include "src/core/channel/channel_stack.h"
+#include "src/core/surface/call.h"
#include "src/core/surface/init.h"
#include "src/core/surface/surface_trace.h"
#include "src/core/transport/chttp2_transport.h"
@@ -57,6 +58,7 @@ void grpc_init(void) {
grpc_register_tracer("channel", &grpc_trace_channel);
grpc_register_tracer("surface", &grpc_surface_trace);
grpc_register_tracer("http", &grpc_http_trace);
+ grpc_register_tracer("batch", &grpc_trace_batch);
grpc_security_pre_init();
grpc_tracer_init("GRPC_TRACE");
grpc_iomgr_init();
@@ -82,4 +84,3 @@ int grpc_is_initialized(void) {
gpr_mu_unlock(&g_init_mu);
return r;
}
-
diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c
index 33645ca8b8..018ddc4456 100644
--- a/src/core/tsi/ssl_transport_security.c
+++ b/src/core/tsi/ssl_transport_security.c
@@ -567,7 +567,8 @@ static tsi_result populate_ssl_context(
EC_KEY* ecdh = EC_KEY_new_by_curve_name(NID_X9_62_prime256v1);
if (!SSL_CTX_set_tmp_ecdh(context, ecdh)) {
gpr_log(GPR_ERROR, "Could not set ephemeral ECDH key.");
- result = TSI_INTERNAL_ERROR;
+ EC_KEY_free(ecdh);
+ return TSI_INTERNAL_ERROR;
}
SSL_CTX_set_options(context, SSL_OP_SINGLE_ECDH_USE);
EC_KEY_free(ecdh);
@@ -604,6 +605,7 @@ static tsi_result build_alpn_protocol_name_list(
unsigned char* current;
*protocol_name_list = NULL;
*protocol_name_list_length = 0;
+ if (num_alpn_protocols == 0) return TSI_INVALID_ARGUMENT;
for (i = 0; i < num_alpn_protocols; i++) {
if (alpn_protocols_lengths[i] == 0) {
gpr_log(GPR_ERROR, "Invalid 0-length protocol name.");
diff --git a/src/cpp/client/insecure_credentials.cc b/src/cpp/client/insecure_credentials.cc
index f3ca430bd4..8945b038de 100644
--- a/src/cpp/client/insecure_credentials.cc
+++ b/src/cpp/client/insecure_credentials.cc
@@ -31,8 +31,6 @@
*
*/
-#include <string>
-
#include <grpc/grpc.h>
#include <grpc/support/log.h>
diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc
index e3c6637623..d6f9acc675 100644
--- a/src/cpp/client/secure_credentials.cc
+++ b/src/cpp/client/secure_credentials.cc
@@ -31,8 +31,6 @@
*
*/
-#include <string>
-
#include <grpc/grpc_security.h>
#include <grpc/support/log.h>
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
index 7119011ec4..1599e5117f 100644
--- a/src/cpp/common/call.cc
+++ b/src/cpp/common/call.cc
@@ -48,7 +48,6 @@ CallOpBuffer::CallOpBuffer()
initial_metadata_count_(0),
initial_metadata_(nullptr),
recv_initial_metadata_(nullptr),
- recv_initial_metadata_arr_{0, 0, nullptr},
send_message_(nullptr),
send_message_buffer_(nullptr),
send_buf_(nullptr),
@@ -58,7 +57,6 @@ CallOpBuffer::CallOpBuffer()
client_send_close_(false),
recv_trailing_metadata_(nullptr),
recv_status_(nullptr),
- recv_trailing_metadata_arr_{0, 0, nullptr},
status_code_(GRPC_STATUS_OK),
status_details_(nullptr),
status_details_capacity_(0),
@@ -66,7 +64,12 @@ CallOpBuffer::CallOpBuffer()
trailing_metadata_count_(0),
trailing_metadata_(nullptr),
cancelled_buf_(0),
- recv_closed_(nullptr) {}
+ recv_closed_(nullptr) {
+ memset(&recv_trailing_metadata_arr_, 0, sizeof(recv_trailing_metadata_arr_));
+ memset(&recv_initial_metadata_arr_, 0, sizeof(recv_initial_metadata_arr_));
+ recv_trailing_metadata_arr_.metadata = nullptr;
+ recv_initial_metadata_arr_.metadata = nullptr;
+}
void CallOpBuffer::Reset(void* next_return_tag) {
return_tag_ = next_return_tag;
@@ -145,7 +148,7 @@ void FillMetadataMap(grpc_metadata_array* arr,
// TODO(yangg) handle duplicates?
metadata->insert(std::pair<grpc::string, grpc::string>(
arr->metadata[i].key,
- {arr->metadata[i].value, arr->metadata[i].value_length}));
+ grpc::string(arr->metadata[i].value, arr->metadata[i].value_length)));
}
grpc_metadata_array_destroy(arr);
grpc_metadata_array_init(arr);
diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc
index fede2da016..cea2d24831 100644
--- a/src/cpp/common/completion_queue.cc
+++ b/src/cpp/common/completion_queue.cc
@@ -36,7 +36,6 @@
#include <grpc/grpc.h>
#include <grpc/support/log.h>
-#include <grpc/support/time.h>
#include "src/cpp/util/time.h"
namespace grpc {
@@ -57,15 +56,12 @@ class EventDeleter {
}
};
-CompletionQueue::NextStatus
-CompletionQueue::AsyncNext(void** tag, bool* ok,
- std::chrono::system_clock::time_point deadline) {
+CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
+ void** tag, bool* ok, gpr_timespec deadline) {
std::unique_ptr<grpc_event, EventDeleter> ev;
- gpr_timespec gpr_deadline;
- Timepoint2Timespec(deadline, &gpr_deadline);
for (;;) {
- ev.reset(grpc_completion_queue_next(cq_, gpr_deadline));
+ ev.reset(grpc_completion_queue_next(cq_, deadline));
if (!ev) { /* got a NULL back because deadline passed */
return TIMEOUT;
}
@@ -81,6 +77,13 @@ CompletionQueue::AsyncNext(void** tag, bool* ok,
}
}
+CompletionQueue::NextStatus CompletionQueue::AsyncNext(
+ void** tag, bool* ok, std::chrono::system_clock::time_point deadline) {
+ gpr_timespec gpr_deadline;
+ Timepoint2Timespec(deadline, &gpr_deadline);
+ return AsyncNextInternal(tag, ok, gpr_deadline);
+}
+
bool CompletionQueue::Pluck(CompletionQueueTag* tag) {
std::unique_ptr<grpc_event, EventDeleter> ev;
diff --git a/src/cpp/server/secure_server_credentials.cc b/src/cpp/server/secure_server_credentials.cc
index 88f7a9b1a9..49d69a3fb9 100644
--- a/src/cpp/server/secure_server_credentials.cc
+++ b/src/cpp/server/secure_server_credentials.cc
@@ -59,9 +59,12 @@ class SecureServerCredentials GRPC_FINAL : public ServerCredentials {
std::shared_ptr<ServerCredentials> SslServerCredentials(
const SslServerCredentialsOptions& options) {
std::vector<grpc_ssl_pem_key_cert_pair> pem_key_cert_pairs;
- for (const auto& key_cert_pair : options.pem_key_cert_pairs) {
- pem_key_cert_pairs.push_back(
- {key_cert_pair.private_key.c_str(), key_cert_pair.cert_chain.c_str()});
+ for (auto key_cert_pair = options.pem_key_cert_pairs.begin();
+ key_cert_pair != options.pem_key_cert_pairs.end();
+ key_cert_pair++) {
+ grpc_ssl_pem_key_cert_pair p = {key_cert_pair->private_key.c_str(),
+ key_cert_pair->cert_chain.c_str()};
+ pem_key_cert_pairs.push_back(p);
}
grpc_server_credentials* c_creds = grpc_ssl_server_credentials_create(
options.pem_root_certs.empty() ? nullptr : options.pem_root_certs.c_str(),
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 4ed4c45507..8e6a6cf40a 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -234,7 +234,8 @@ void Server::RegisterAsyncGenericService(AsyncGenericService* service) {
service->server_ = this;
}
-int Server::AddPort(const grpc::string& addr, ServerCredentials* creds) {
+int Server::AddListeningPort(const grpc::string& addr,
+ ServerCredentials* creds) {
GPR_ASSERT(!started_);
return creds->AddPortToServer(addr, server_);
}
@@ -246,8 +247,8 @@ bool Server::Start() {
// Start processing rpcs.
if (!sync_methods_.empty()) {
- for (auto& m : sync_methods_) {
- m.Request(server_);
+ for (auto m = sync_methods_.begin(); m != sync_methods_.end(); m++) {
+ m->Request(server_);
}
ScheduleCallback();
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 2f5a0dc6c8..c5e115f396 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -63,9 +63,9 @@ void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) {
generic_service_ = service;
}
-void ServerBuilder::AddPort(const grpc::string& addr,
- std::shared_ptr<ServerCredentials> creds,
- int* selected_port) {
+void ServerBuilder::AddListeningPort(const grpc::string& addr,
+ std::shared_ptr<ServerCredentials> creds,
+ int* selected_port) {
ports_.push_back(Port{addr, creds, selected_port});
}
@@ -86,24 +86,26 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
thread_pool_owned = true;
}
std::unique_ptr<Server> server(new Server(thread_pool_, thread_pool_owned));
- for (auto* service : services_) {
- if (!server->RegisterService(service)) {
+ for (auto service = services_.begin(); service != services_.end();
+ service++) {
+ if (!server->RegisterService(*service)) {
return nullptr;
}
}
- for (auto* service : async_services_) {
- if (!server->RegisterAsyncService(service)) {
+ for (auto service = async_services_.begin();
+ service != async_services_.end(); service++) {
+ if (!server->RegisterAsyncService(*service)) {
return nullptr;
}
}
if (generic_service_) {
server->RegisterAsyncGenericService(generic_service_);
}
- for (auto& port : ports_) {
- int r = server->AddPort(port.addr, port.creds.get());
+ for (auto port = ports_.begin(); port != ports_.end(); port++) {
+ int r = server->AddListeningPort(port->addr, port->creds.get());
if (!r) return nullptr;
- if (port.selected_port != nullptr) {
- *port.selected_port = r;
+ if (port->selected_port != nullptr) {
+ *port->selected_port = r;
}
}
if (!server->Start()) {
diff --git a/src/cpp/server/thread_pool.cc b/src/cpp/server/thread_pool.cc
index d3013b806c..80c96111b1 100644
--- a/src/cpp/server/thread_pool.cc
+++ b/src/cpp/server/thread_pool.cc
@@ -35,28 +35,29 @@
namespace grpc {
+void ThreadPool::ThreadFunc() {
+ for (;;) {
+ // Wait until work is available or we are shutting down.
+ std::unique_lock<std::mutex> lock(mu_);
+ if (!shutdown_ && callbacks_.empty()) {
+ cv_.wait(lock);
+ }
+ // Drain callbacks before considering shutdown to ensure all work
+ // gets completed.
+ if (!callbacks_.empty()) {
+ auto cb = callbacks_.front();
+ callbacks_.pop();
+ lock.unlock();
+ cb();
+ } else if (shutdown_) {
+ return;
+ }
+ }
+}
+
ThreadPool::ThreadPool(int num_threads) : shutdown_(false) {
for (int i = 0; i < num_threads; i++) {
- threads_.push_back(std::thread([this]() {
- for (;;) {
- // Wait until work is available or we are shutting down.
- auto have_work = [this]() { return shutdown_ || !callbacks_.empty(); };
- std::unique_lock<std::mutex> lock(mu_);
- if (!have_work()) {
- cv_.wait(lock, have_work);
- }
- // Drain callbacks before considering shutdown to ensure all work
- // gets completed.
- if (!callbacks_.empty()) {
- auto cb = callbacks_.front();
- callbacks_.pop();
- lock.unlock();
- cb();
- } else if (shutdown_) {
- return;
- }
- }
- }));
+ threads_.push_back(std::thread(&ThreadPool::ThreadFunc, this));
}
}
@@ -66,8 +67,8 @@ ThreadPool::~ThreadPool() {
shutdown_ = true;
cv_.notify_all();
}
- for (auto& t : threads_) {
- t.join();
+ for (auto t = threads_.begin(); t != threads_.end(); t++) {
+ t->join();
}
}
diff --git a/src/cpp/server/thread_pool.h b/src/cpp/server/thread_pool.h
index 6225d82a0b..41e2009ff1 100644
--- a/src/cpp/server/thread_pool.h
+++ b/src/cpp/server/thread_pool.h
@@ -58,6 +58,8 @@ class ThreadPool GRPC_FINAL : public ThreadPoolInterface {
bool shutdown_;
std::queue<std::function<void()>> callbacks_;
std::vector<std::thread> threads_;
+
+ void ThreadFunc();
};
} // namespace grpc
diff --git a/src/cpp/util/time.cc b/src/cpp/util/time.cc
index 44d2283e76..059ea72abf 100644
--- a/src/cpp/util/time.cc
+++ b/src/cpp/util/time.cc
@@ -42,11 +42,15 @@ using std::chrono::system_clock;
namespace grpc {
-// TODO(yangg) prevent potential overflow.
void Timepoint2Timespec(const system_clock::time_point& from,
gpr_timespec* to) {
system_clock::duration deadline = from.time_since_epoch();
seconds secs = duration_cast<seconds>(deadline);
+ if (from == system_clock::time_point::max() ||
+ secs.count() >= gpr_inf_future.tv_sec || secs.count() < 0) {
+ *to = gpr_inf_future;
+ return;
+ }
nanoseconds nsecs = duration_cast<nanoseconds>(deadline - secs);
to->tv_sec = secs.count();
to->tv_nsec = nsecs.count();
diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
index 807f5a6ded..3da9e33e53 100644
--- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
@@ -74,7 +74,7 @@ namespace Grpc.Core.Tests
ServerServiceDefinition.CreateBuilder(serviceName)
.AddMethod(unaryEchoStringMethod, HandleUnaryEchoString).Build());
- int port = server.AddPort(host + ":0");
+ int port = server.AddListeningPort(host + ":0");
server.Start();
using (Channel channel = new Channel(host + ":" + port))
@@ -97,7 +97,7 @@ namespace Grpc.Core.Tests
ServerServiceDefinition.CreateBuilder(serviceName)
.AddMethod(unaryEchoStringMethod, HandleUnaryEchoString).Build());
- int port = server.AddPort(host + ":0");
+ int port = server.AddListeningPort(host + ":0");
server.Start();
using (Channel channel = new Channel(host + ":" + port))
@@ -117,7 +117,7 @@ namespace Grpc.Core.Tests
server.AddServiceDefinition(
ServerServiceDefinition.CreateBuilder(serviceName).Build());
- int port = server.AddPort(host + ":0");
+ int port = server.AddListeningPort(host + ":0");
server.Start();
using (Channel channel = new Channel(host + ":" + port))
diff --git a/src/csharp/Grpc.Core.Tests/ServerTest.cs b/src/csharp/Grpc.Core.Tests/ServerTest.cs
index 12f914bfad..2a1855da67 100644
--- a/src/csharp/Grpc.Core.Tests/ServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ServerTest.cs
@@ -47,7 +47,7 @@ namespace Grpc.Core.Tests
GrpcEnvironment.Initialize();
Server server = new Server();
- server.AddPort("localhost:0");
+ server.AddListeningPort("localhost:0");
server.Start();
server.ShutdownAsync().Wait();
diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
index dc4781e796..a59da09822 100644
--- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
@@ -80,12 +80,12 @@ namespace Grpc.Core.Internal
return grpcsharp_server_create(cq, args);
}
- public int AddPort(string addr)
+ public int AddListeningPort(string addr)
{
return grpcsharp_server_add_http2_port(this, addr);
}
- public int AddPort(string addr, ServerCredentialsSafeHandle credentials)
+ public int AddListeningPort(string addr, ServerCredentialsSafeHandle credentials)
{
return grpcsharp_server_add_secure_http2_port(this, addr, credentials);
}
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index 2439cdb6dc..f086fa8beb 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -76,17 +76,17 @@ namespace Grpc.Core
}
// only call before Start()
- public int AddPort(string addr)
+ public int AddListeningPort(string addr)
{
- return handle.AddPort(addr);
+ return handle.AddListeningPort(addr);
}
// only call before Start()
- public int AddPort(string addr, ServerCredentials credentials)
+ public int AddListeningPort(string addr, ServerCredentials credentials)
{
using (var nativeCredentials = credentials.ToNativeCredentials())
{
- return handle.AddPort(addr, nativeCredentials);
+ return handle.AddListeningPort(addr, nativeCredentials);
}
}
diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
index 32a523f213..85f213cb39 100644
--- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
+++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
@@ -58,7 +58,7 @@ namespace math.Tests
server = new Server();
server.AddServiceDefinition(MathGrpc.BindService(new MathServiceImpl()));
- int port = server.AddPort(host + ":0");
+ int port = server.AddListeningPort(host + ":0");
server.Start();
channel = new Channel(host + ":" + port);
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
index 814f6311f2..1e76d3df21 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
@@ -59,7 +59,7 @@ namespace Grpc.IntegrationTesting
server = new Server();
server.AddServiceDefinition(TestServiceGrpc.BindService(new TestServiceImpl()));
- int port = server.AddPort(host + ":0", TestCredentials.CreateTestServerCredentials());
+ int port = server.AddListeningPort(host + ":0", TestCredentials.CreateTestServerCredentials());
server.Start();
var channelArgs = ChannelArgs.CreateBuilder()
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropServer.cs b/src/csharp/Grpc.IntegrationTesting/InteropServer.cs
index 5e580280b6..ad5200774f 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropServer.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropServer.cs
@@ -96,11 +96,11 @@ namespace Grpc.IntegrationTesting
string addr = "0.0.0.0:" + options.port;
if (options.useTls)
{
- server.AddPort(addr, TestCredentials.CreateTestServerCredentials());
+ server.AddListeningPort(addr, TestCredentials.CreateTestServerCredentials());
}
else
{
- server.AddPort(addr);
+ server.AddListeningPort(addr);
}
Console.WriteLine("Running server on " + addr);
server.Start();
diff --git a/src/node/examples/qps_test.js b/src/node/examples/qps_test.js
new file mode 100644
index 0000000000..00293b464a
--- /dev/null
+++ b/src/node/examples/qps_test.js
@@ -0,0 +1,136 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/**
+ * This script runs a QPS test. It sends requests for a specified length of time
+ * with a specified number pending at any one time. It then outputs the measured
+ * QPS. Usage:
+ * node qps_test.js [--concurrent=count] [--time=seconds]
+ * concurrent defaults to 100 and time defaults to 10
+ */
+
+'use strict';
+
+var async = require('async');
+var parseArgs = require('minimist');
+
+var grpc = require('..');
+var testProto = grpc.load(__dirname + '/../interop/test.proto').grpc.testing;
+var interop_server = require('../interop/interop_server.js');
+
+/**
+ * Runs the QPS test. Sends requests constantly for the given number of seconds,
+ * and keeps concurrent_calls requests pending at all times. When the test ends,
+ * the callback is called with the number of calls that completed within the
+ * time limit.
+ * @param {number} concurrent_calls The number of calls to have pending
+ * simultaneously
+ * @param {number} seconds The number of seconds to run the test for
+ * @param {function(Error, number)} callback Callback for test completion
+ */
+function runTest(concurrent_calls, seconds, callback) {
+ var testServer = interop_server.getServer(0, false);
+ testServer.server.listen();
+ var client = new testProto.TestService('localhost:' + testServer.port);
+
+ var warmup_num = 100;
+
+ /**
+ * Warms up the client to avoid counting startup time in the test result
+ * @param {function(Error)} callback Called when warmup is complete
+ */
+ function warmUp(callback) {
+ var pending = warmup_num;
+ function startCall() {
+ client.emptyCall({}, function(err, resp) {
+ if (err) {
+ callback(err);
+ return;
+ }
+ pending--;
+ if (pending === 0) {
+ callback(null);
+ }
+ });
+ }
+ for (var i = 0; i < warmup_num; i++) {
+ startCall();
+ }
+ }
+ /**
+ * Run the QPS test. Starts concurrent_calls requests, then starts a new
+ * request whenever one completes until time runs out.
+ * @param {function(Error, number)} callback Called when the test is complete.
+ * The second argument is the number of calls that finished within the
+ * time limit
+ */
+ function run(callback) {
+ var running = 0;
+ var count = 0;
+ var start = process.hrtime();
+ function responseCallback(err, resp) {
+ if (process.hrtime(start)[0] < seconds) {
+ count += 1;
+ client.emptyCall({}, responseCallback);
+ } else {
+ running -= 1;
+ if (running <= 0) {
+ callback(null, count);
+ }
+ }
+ }
+ for (var i = 0; i < concurrent_calls; i++) {
+ running += 1;
+ client.emptyCall({}, responseCallback);
+ }
+ }
+ async.waterfall([warmUp, run], function(err, count) {
+ testServer.server.shutdown();
+ callback(err, count);
+ });
+}
+
+if (require.main === module) {
+ var argv = parseArgs(process.argv.slice(2), {
+ default: {'concurrent': 100,
+ 'time': 10}
+ });
+ runTest(argv.concurrent, argv.time, function(err, count) {
+ if (err) {
+ throw err;
+ }
+ console.log('Concurrent calls:', argv.concurrent);
+ console.log('Time:', argv.time, 'seconds');
+ console.log('QPS:', (count/argv.time));
+ });
+}
diff --git a/src/node/index.js b/src/node/index.js
index ad3dd96af7..0b768edc6b 100644
--- a/src/node/index.js
+++ b/src/node/index.js
@@ -56,7 +56,7 @@ function loadObject(value) {
});
return result;
} else if (value.className === 'Service') {
- return client.makeClientConstructor(value);
+ return client.makeProtobufClientConstructor(value);
} else if (value.className === 'Message' || value.className === 'Enum') {
return value.build();
} else {
@@ -119,7 +119,7 @@ exports.load = load;
/**
* See docs for server.makeServerConstructor
*/
-exports.buildServer = server.makeServerConstructor;
+exports.buildServer = server.makeProtobufServerConstructor;
/**
* Status name to code number mapping
@@ -141,3 +141,7 @@ exports.Credentials = grpc.Credentials;
exports.ServerCredentials = grpc.ServerCredentials;
exports.getGoogleAuthDelegate = getGoogleAuthDelegate;
+
+exports.makeGenericClientConstructor = client.makeClientConstructor;
+
+exports.makeGenericServerConstructor = server.makeServerConstructor;
diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js
index 8060baf827..77804cf595 100644
--- a/src/node/interop/interop_client.js
+++ b/src/node/interop/interop_client.js
@@ -35,6 +35,7 @@
var fs = require('fs');
var path = require('path');
+var _ = require('underscore');
var grpc = require('..');
var testProto = grpc.load(__dirname + '/test.proto').grpc.testing;
var GoogleAuth = require('google-auth-library');
@@ -45,6 +46,8 @@ var AUTH_SCOPE = 'https://www.googleapis.com/auth/xapi.zoo';
var AUTH_SCOPE_RESPONSE = 'xapi.zoo';
var AUTH_USER = ('155450119199-3psnrh1sdr3d8cpj1v46naggf81mhdnk' +
'@developer.gserviceaccount.com');
+var COMPUTE_ENGINE_USER = ('155450119199-r5aaqa2vqoa9g5mv2m6s3m1l293rlmel' +
+ '@developer.gserviceaccount.com');
/**
* Create a buffer filled with size zeroes
@@ -265,11 +268,12 @@ function cancelAfterFirstResponse(client, done) {
/**
* Run one of the authentication tests.
+ * @param {string} expected_user The expected username in the response
* @param {Client} client The client to test against
* @param {function} done Callback to call when the test is completed. Included
* primarily for use with mocha
*/
-function authTest(client, done) {
+function authTest(expected_user, client, done) {
(new GoogleAuth()).getApplicationDefault(function(err, credential) {
assert.ifError(err);
if (credential.createScopedRequired()) {
@@ -290,7 +294,7 @@ function authTest(client, done) {
assert.strictEqual(resp.payload.type, testProto.PayloadType.COMPRESSABLE);
assert.strictEqual(resp.payload.body.limit - resp.payload.body.offset,
314159);
- assert.strictEqual(resp.username, AUTH_USER);
+ assert.strictEqual(resp.username, expected_user);
assert.strictEqual(resp.oauth_scope, AUTH_SCOPE_RESPONSE);
});
call.on('status', function(status) {
@@ -314,8 +318,8 @@ var test_cases = {
empty_stream: emptyStream,
cancel_after_begin: cancelAfterBegin,
cancel_after_first_response: cancelAfterFirstResponse,
- compute_engine_creds: authTest,
- service_account_creds: authTest
+ compute_engine_creds: _.partial(authTest, AUTH_USER),
+ service_account_creds: _.partial(authTest, COMPUTE_ENGINE_USER)
};
/**
diff --git a/src/node/src/client.js b/src/node/src/client.js
index 54b8dbdc9c..c46f7d0526 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -35,9 +35,6 @@
var _ = require('underscore');
-var capitalize = require('underscore.string/capitalize');
-var decapitalize = require('underscore.string/decapitalize');
-
var grpc = require('bindings')('grpc.node');
var common = require('./common.js');
@@ -463,13 +460,18 @@ var requester_makers = {
};
/**
- * Creates a constructor for clients for the given service
- * @param {ProtoBuf.Reflect.Service} service The service to generate a client
- * for
+ * Creates a constructor for a client with the given methods. The methods object
+ * maps method name to an object with the following keys:
+ * path: The path on the server for accessing the method. For example, for
+ * protocol buffers, we use "/service_name/method_name"
+ * requestStream: bool indicating whether the client sends a stream
+ * resonseStream: bool indicating whether the server sends a stream
+ * requestSerialize: function to serialize request objects
+ * responseDeserialize: function to deserialize response objects
+ * @param {Object} methods An object mapping method names to method attributes
* @return {function(string, Object)} New client constructor
*/
-function makeClientConstructor(service) {
- var prefix = '/' + common.fullyQualifiedName(service) + '/';
+function makeClientConstructor(methods) {
/**
* Create a client with the given methods
* @constructor
@@ -489,30 +491,41 @@ function makeClientConstructor(service) {
this.channel = new grpc.Channel(address, options);
}
- _.each(service.children, function(method) {
+ _.each(methods, function(attrs, name) {
var method_type;
- if (method.requestStream) {
- if (method.responseStream) {
+ if (attrs.requestStream) {
+ if (attrs.responseStream) {
method_type = 'bidi';
} else {
method_type = 'client_stream';
}
} else {
- if (method.responseStream) {
+ if (attrs.responseStream) {
method_type = 'server_stream';
} else {
method_type = 'unary';
}
}
- var serialize = common.serializeCls(method.resolvedRequestType.build());
- var deserialize = common.deserializeCls(
- method.resolvedResponseType.build());
- Client.prototype[decapitalize(method.name)] = requester_makers[method_type](
- prefix + capitalize(method.name), serialize, deserialize);
- Client.prototype[decapitalize(method.name)].serialize = serialize;
- Client.prototype[decapitalize(method.name)].deserialize = deserialize;
+ var serialize = attrs.requestSerialize;
+ var deserialize = attrs.responseDeserialize;
+ Client.prototype[name] = requester_makers[method_type](
+ attrs.path, serialize, deserialize);
+ Client.prototype[name].serialize = serialize;
+ Client.prototype[name].deserialize = deserialize;
});
+ return Client;
+}
+
+/**
+ * Creates a constructor for clients for the given service
+ * @param {ProtoBuf.Reflect.Service} service The service to generate a client
+ * for
+ * @return {function(string, Object)} New client constructor
+ */
+function makeProtobufClientConstructor(service) {
+ var method_attrs = common.getProtobufServiceAttrs(service);
+ var Client = makeClientConstructor(method_attrs);
Client.service = service;
return Client;
@@ -520,6 +533,8 @@ function makeClientConstructor(service) {
exports.makeClientConstructor = makeClientConstructor;
+exports.makeProtobufClientConstructor = makeProtobufClientConstructor;
+
/**
* See docs for client.status
*/
diff --git a/src/node/src/common.js b/src/node/src/common.js
index eec8f0f987..55a6b13782 100644
--- a/src/node/src/common.js
+++ b/src/node/src/common.js
@@ -36,6 +36,7 @@
var _ = require('underscore');
var capitalize = require('underscore.string/capitalize');
+var decapitalize = require('underscore.string/decapitalize');
/**
* Get a function that deserializes a specific type of protobuf.
@@ -110,6 +111,26 @@ function wrapIgnoreNull(func) {
}
/**
+ * Return a map from method names to method attributes for the service.
+ * @param {ProtoBuf.Reflect.Service} service The service to get attributes for
+ * @return {Object} The attributes map
+ */
+function getProtobufServiceAttrs(service) {
+ var prefix = '/' + fullyQualifiedName(service) + '/';
+ return _.object(_.map(service.children, function(method) {
+ return [decapitalize(method.name), {
+ path: prefix + capitalize(method.name),
+ requestStream: method.requestStream,
+ responseStream: method.responseStream,
+ requestSerialize: serializeCls(method.resolvedRequestType.build()),
+ requestDeserialize: deserializeCls(method.resolvedRequestType.build()),
+ responseSerialize: serializeCls(method.resolvedResponseType.build()),
+ responseDeserialize: deserializeCls(method.resolvedResponseType.build())
+ }];
+ }));
+}
+
+/**
* See docs for deserializeCls
*/
exports.deserializeCls = deserializeCls;
@@ -128,3 +149,5 @@ exports.fullyQualifiedName = fullyQualifiedName;
* See docs for wrapIgnoreNull
*/
exports.wrapIgnoreNull = wrapIgnoreNull;
+
+exports.getProtobufServiceAttrs = getProtobufServiceAttrs;
diff --git a/src/node/src/server.js b/src/node/src/server.js
index b72d110666..8a26a43606 100644
--- a/src/node/src/server.js
+++ b/src/node/src/server.js
@@ -35,9 +35,6 @@
var _ = require('underscore');
-var capitalize = require('underscore.string/capitalize');
-var decapitalize = require('underscore.string/decapitalize');
-
var grpc = require('bindings')('grpc.node');
var common = require('./common');
@@ -532,26 +529,20 @@ Server.prototype.bind = function(port, creds) {
};
/**
- * Creates a constructor for servers with a service defined by the methods
- * object. The methods object has string keys and values of this form:
- * {serialize: function, deserialize: function, client_stream: bool,
- * server_stream: bool}
- * @param {Object} methods Method descriptor for each method the server should
- * expose
- * @param {string} prefix The prefex to prepend to each method name
- * @return {function(Object, Object)} New server constructor
+ * Create a constructor for servers with services defined by service_attr_map.
+ * That is an object that maps (namespaced) service names to objects that in
+ * turn map method names to objects with the following keys:
+ * path: The path on the server for accessing the method. For example, for
+ * protocol buffers, we use "/service_name/method_name"
+ * requestStream: bool indicating whether the client sends a stream
+ * resonseStream: bool indicating whether the server sends a stream
+ * requestDeserialize: function to deserialize request objects
+ * responseSerialize: function to serialize response objects
+ * @param {Object} service_attr_map An object mapping service names to method
+ * attribute map objects
+ * @return {function(Object, function, Object=)} New server constructor
*/
-function makeServerConstructor(services) {
- var qual_names = [];
- _.each(services, function(service) {
- _.each(service.children, function(method) {
- var name = common.fullyQualifiedName(method);
- if (_.indexOf(qual_names, name) !== -1) {
- throw new Error('Method ' + name + ' exposed by more than one service');
- }
- qual_names.push(name);
- });
- });
+function makeServerConstructor(service_attr_map) {
/**
* Create a server with the given handlers for all of the methods.
* @constructor
@@ -565,41 +556,34 @@ function makeServerConstructor(services) {
function SurfaceServer(service_handlers, getMetadata, options) {
var server = new Server(getMetadata, options);
this.inner_server = server;
- _.each(services, function(service) {
- var service_name = common.fullyQualifiedName(service);
+ _.each(service_attr_map, function(service_attrs, service_name) {
if (service_handlers[service_name] === undefined) {
throw new Error('Handlers for service ' +
service_name + ' not provided.');
}
- var prefix = '/' + common.fullyQualifiedName(service) + '/';
- _.each(service.children, function(method) {
+ _.each(service_attrs, function(attrs, name) {
var method_type;
- if (method.requestStream) {
- if (method.responseStream) {
+ if (attrs.requestStream) {
+ if (attrs.responseStream) {
method_type = 'bidi';
} else {
method_type = 'client_stream';
}
} else {
- if (method.responseStream) {
+ if (attrs.responseStream) {
method_type = 'server_stream';
} else {
method_type = 'unary';
}
}
- if (service_handlers[service_name][decapitalize(method.name)] ===
- undefined) {
- throw new Error('Method handler for ' +
- common.fullyQualifiedName(method) + ' not provided.');
+ if (service_handlers[service_name][name] === undefined) {
+ throw new Error('Method handler for ' + attrs.path +
+ ' not provided.');
}
- var serialize = common.serializeCls(
- method.resolvedResponseType.build());
- var deserialize = common.deserializeCls(
- method.resolvedRequestType.build());
- server.register(
- prefix + capitalize(method.name),
- service_handlers[service_name][decapitalize(method.name)],
- serialize, deserialize, method_type);
+ var serialize = attrs.responseSerialize;
+ var deserialize = attrs.requestDeserialize;
+ server.register(attrs.path, service_handlers[service_name][name],
+ serialize, deserialize, method_type);
});
}, this);
}
@@ -636,6 +620,39 @@ function makeServerConstructor(services) {
}
/**
+ * Create a constructor for servers that serve the given services.
+ * @param {Array<ProtoBuf.Reflect.Service>} services The services that the
+ * servers will serve
+ * @return {function(Object, function, Object=)} New server constructor
+ */
+function makeProtobufServerConstructor(services) {
+ var qual_names = [];
+ var service_attr_map = {};
+ _.each(services, function(service) {
+ var service_name = common.fullyQualifiedName(service);
+ _.each(service.children, function(method) {
+ var name = common.fullyQualifiedName(method);
+ if (_.indexOf(qual_names, name) !== -1) {
+ throw new Error('Method ' + name + ' exposed by more than one service');
+ }
+ qual_names.push(name);
+ });
+ var method_attrs = common.getProtobufServiceAttrs(service);
+ if (!service_attr_map.hasOwnProperty(service_name)) {
+ service_attr_map[service_name] = {};
+ }
+ service_attr_map[service_name] = _.extend(service_attr_map[service_name],
+ method_attrs);
+ });
+ return makeServerConstructor(service_attr_map);
+}
+
+/**
* See documentation for makeServerConstructor
*/
exports.makeServerConstructor = makeServerConstructor;
+
+/**
+ * See documentation for makeProtobufServerConstructor
+ */
+exports.makeProtobufServerConstructor = makeProtobufServerConstructor;
diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js
index 91d8197bee..96b47815e1 100644
--- a/src/node/test/surface_test.js
+++ b/src/node/test/surface_test.js
@@ -45,6 +45,8 @@ var math_proto = ProtoBuf.loadProtoFile(__dirname + '/../examples/math.proto');
var mathService = math_proto.lookup('math.Math');
+var capitalize = require('underscore.string/capitalize');
+
describe('Surface server constructor', function() {
it('Should fail with conflicting method names', function() {
assert.throws(function() {
@@ -75,6 +77,55 @@ describe('Surface server constructor', function() {
}, /math.Math/);
});
});
+describe('Generic client and server', function() {
+ function toString(val) {
+ return val.toString();
+ }
+ function toBuffer(str) {
+ return new Buffer(str);
+ }
+ var string_service_attrs = {
+ 'capitalize' : {
+ path: '/string/capitalize',
+ requestStream: false,
+ responseStream: false,
+ requestSerialize: toBuffer,
+ requestDeserialize: toString,
+ responseSerialize: toBuffer,
+ responseDeserialize: toString
+ }
+ };
+ describe('String client and server', function() {
+ var client;
+ var server;
+ before(function() {
+ var Server = grpc.makeGenericServerConstructor({
+ string: string_service_attrs
+ });
+ server = new Server({
+ string: {
+ capitalize: function(call, callback) {
+ callback(null, capitalize(call.request));
+ }
+ }
+ });
+ var port = server.bind('localhost:0');
+ server.listen();
+ var Client = grpc.makeGenericClientConstructor(string_service_attrs);
+ client = new Client('localhost:' + port);
+ });
+ after(function() {
+ server.shutdown();
+ });
+ it('Should respond with a capitalized string', function(done) {
+ client.capitalize('abc', function(err, response) {
+ assert.ifError(err);
+ assert.strictEqual(response, 'Abc');
+ done();
+ });
+ });
+ });
+});
describe('Cancelling surface client', function() {
var client;
var server;
@@ -89,7 +140,7 @@ describe('Cancelling surface client', function() {
}
});
var port = server.bind('localhost:0');
- var Client = surface_client.makeClientConstructor(mathService);
+ var Client = surface_client.makeProtobufClientConstructor(mathService);
client = new Client('localhost:' + port);
});
after(function() {
diff --git a/src/php/ext/grpc/byte_buffer.c b/src/php/ext/grpc/byte_buffer.c
index 1ced1bf3f0..9f122d6da6 100644
--- a/src/php/ext/grpc/byte_buffer.c
+++ b/src/php/ext/grpc/byte_buffer.c
@@ -57,6 +57,11 @@ grpc_byte_buffer *string_to_byte_buffer(char *string, size_t length) {
void byte_buffer_to_string(grpc_byte_buffer *buffer, char **out_string,
size_t *out_length) {
+ if (buffer == NULL) {
+ *out_string = NULL;
+ *out_length = 0;
+ return;
+ }
size_t length = grpc_byte_buffer_length(buffer);
char *string = ecalloc(length + 1, sizeof(char));
size_t offset = 0;
diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c
index 798747109a..ba1b2a407d 100644
--- a/src/php/ext/grpc/call.c
+++ b/src/php/ext/grpc/call.c
@@ -49,11 +49,11 @@
#include <stdbool.h>
#include "grpc/support/log.h"
+#include "grpc/support/alloc.h"
#include "grpc/grpc.h"
#include "timeval.h"
#include "channel.h"
-#include "completion_queue.h"
#include "byte_buffer.h"
zend_class_entry *grpc_ce_call;
@@ -61,7 +61,19 @@ zend_class_entry *grpc_ce_call;
/* Frees and destroys an instance of wrapped_grpc_call */
void free_wrapped_grpc_call(void *object TSRMLS_DC) {
wrapped_grpc_call *call = (wrapped_grpc_call *)object;
+ grpc_event *event;
if (call->owned && call->wrapped != NULL) {
+ if (call->queue != NULL) {
+ grpc_completion_queue_shutdown(call->queue);
+ event = grpc_completion_queue_next(call->queue, gpr_inf_future);
+ while (event != NULL) {
+ if (event->type == GRPC_QUEUE_SHUTDOWN) {
+ break;
+ }
+ event = grpc_completion_queue_next(call->queue, gpr_inf_future);
+ }
+ grpc_completion_queue_destroy(call->queue);
+ }
grpc_call_destroy(call->wrapped);
}
efree(call);
@@ -88,17 +100,23 @@ zend_object_value create_wrapped_grpc_call(zend_class_entry *class_type
/* Wraps a grpc_call struct in a PHP object. Owned indicates whether the struct
should be destroyed at the end of the object's lifecycle */
-zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned) {
+zval *grpc_php_wrap_call(grpc_call *wrapped, grpc_completion_queue *queue,
+ bool owned) {
zval *call_object;
MAKE_STD_ZVAL(call_object);
object_init_ex(call_object, grpc_ce_call);
wrapped_grpc_call *call =
(wrapped_grpc_call *)zend_object_store_get_object(call_object TSRMLS_CC);
call->wrapped = wrapped;
+ call->queue = queue;
return call_object;
}
-zval *grpc_call_create_metadata_array(int count, grpc_metadata *elements) {
+/* Creates and returns a PHP array object with the data in a
+ * grpc_metadata_array. Returns NULL on failure */
+zval *grpc_parse_metadata_array(grpc_metadata_array *metadata_array) {
+ int count = metadata_array->count;
+ grpc_metadata *elements = metadata_array->metadata;
int i;
zval *array;
zval **data = NULL;
@@ -139,6 +157,64 @@ zval *grpc_call_create_metadata_array(int count, grpc_metadata *elements) {
return array;
}
+/* Populates a grpc_metadata_array with the data in a PHP array object.
+ Returns true on success and false on failure */
+bool create_metadata_array(zval *array, grpc_metadata_array *metadata) {
+ zval **inner_array;
+ zval **value;
+ HashTable *array_hash;
+ HashPosition array_pointer;
+ HashTable *inner_array_hash;
+ HashPosition inner_array_pointer;
+ char *key;
+ uint key_len;
+ ulong index;
+ if (Z_TYPE_P(array) != IS_ARRAY) {
+ return false;
+ }
+ grpc_metadata_array_init(metadata);
+ array_hash = Z_ARRVAL_P(array);
+ for (zend_hash_internal_pointer_reset_ex(array_hash, &array_pointer);
+ zend_hash_get_current_data_ex(array_hash, (void**)&inner_array,
+ &array_pointer) == SUCCESS;
+ zend_hash_move_forward_ex(array_hash, &array_pointer)) {
+ if (zend_hash_get_current_key_ex(array_hash, &key, &key_len, &index, 0,
+ &array_pointer) != HASH_KEY_IS_STRING) {
+ return false;
+ }
+ if (Z_TYPE_P(*inner_array) != IS_ARRAY) {
+ return false;
+ }
+ inner_array_hash = Z_ARRVAL_P(*inner_array);
+ metadata->capacity += zend_hash_num_elements(inner_array_hash);
+ }
+ metadata->metadata = gpr_malloc(metadata->capacity * sizeof(grpc_metadata));
+ for (zend_hash_internal_pointer_reset_ex(array_hash, &array_pointer);
+ zend_hash_get_current_data_ex(array_hash, (void**)&inner_array,
+ &array_pointer) == SUCCESS;
+ zend_hash_move_forward_ex(array_hash, &array_pointer)) {
+ if (zend_hash_get_current_key_ex(array_hash, &key, &key_len, &index, 0,
+ &array_pointer) != HASH_KEY_IS_STRING) {
+ return false;
+ }
+ inner_array_hash = Z_ARRVAL_P(*inner_array);
+ for (zend_hash_internal_pointer_reset_ex(inner_array_hash,
+ &inner_array_pointer);
+ zend_hash_get_current_data_ex(inner_array_hash, (void**)&value,
+ &inner_array_pointer) == SUCCESS;
+ zend_hash_move_forward_ex(inner_array_hash, &inner_array_pointer)) {
+ if (Z_TYPE_P(*value) != IS_STRING) {
+ return false;
+ }
+ metadata->metadata[metadata->count].key = key;
+ metadata->metadata[metadata->count].value = Z_STRVAL_P(*value);
+ metadata->metadata[metadata->count].value_length = Z_STRLEN_P(*value);
+ metadata->count += 1;
+ }
+ }
+ return true;
+}
+
/**
* Constructs a new instance of the Call class.
* @param Channel $channel The channel to associate the call with. Must not be
@@ -157,9 +233,10 @@ PHP_METHOD(Call, __construct) {
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "OsO", &channel_obj,
grpc_ce_channel, &method, &method_len,
&deadline_obj, grpc_ce_timeval) == FAILURE) {
- zend_throw_exception(spl_ce_InvalidArgumentException,
- "Call expects a Channel, a String, and a Timeval",
- 1 TSRMLS_CC);
+ zend_throw_exception(
+ spl_ce_InvalidArgumentException,
+ "Call expects a Channel, a String, and a Timeval",
+ 1 TSRMLS_CC);
return;
}
wrapped_grpc_channel *channel =
@@ -175,289 +252,250 @@ PHP_METHOD(Call, __construct) {
wrapped_grpc_timeval *deadline =
(wrapped_grpc_timeval *)zend_object_store_get_object(
deadline_obj TSRMLS_CC);
- call->wrapped = grpc_channel_create_call_old(
- channel->wrapped, method, channel->target, deadline->wrapped);
+ call->queue = grpc_completion_queue_create();
+ call->wrapped = grpc_channel_create_call(
+ channel->wrapped, call->queue, method, channel->target,
+ deadline->wrapped);
}
/**
- * Add metadata to the call. All array keys must be strings. If the value is a
- * string, it is added as a key/value pair. If it is an array, each value is
- * added paired with the same string
- * @param array $metadata The metadata to add
- * @param long $flags A bitwise combination of the Grpc\WRITE_* constants
- * (optional)
- * @return Void
+ * Start a batch of RPC actions.
+ * @param array batch Array of actions to take
+ * @return object Object with results of all actions
*/
-PHP_METHOD(Call, add_metadata) {
+PHP_METHOD(Call, start_batch) {
wrapped_grpc_call *call =
(wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC);
- grpc_metadata metadata;
- grpc_call_error error_code;
+ grpc_op ops[8];
+ size_t op_num = 0;
zval *array;
- zval **inner_array;
zval **value;
+ zval **inner_value;
HashTable *array_hash;
HashPosition array_pointer;
- HashTable *inner_array_hash;
- HashPosition inner_array_pointer;
+ HashTable *status_hash;
char *key;
uint key_len;
ulong index;
- long flags = 0;
- /* "a|l" == 1 array, 1 optional long */
- if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "a|l", &array, &flags) ==
+ grpc_metadata_array metadata;
+ grpc_metadata_array trailing_metadata;
+ grpc_metadata_array recv_metadata;
+ grpc_metadata_array recv_trailing_metadata;
+ grpc_status_code status;
+ char *status_details = NULL;
+ size_t status_details_capacity = 0;
+ grpc_byte_buffer *message;
+ int cancelled;
+ grpc_call_error error;
+ grpc_event *event;
+ zval *result;
+ char *message_str;
+ size_t message_len;
+ zval *recv_status;
+ grpc_metadata_array_init(&metadata);
+ grpc_metadata_array_init(&trailing_metadata);
+ grpc_metadata_array_init(&recv_metadata);
+ grpc_metadata_array_init(&recv_trailing_metadata);
+ MAKE_STD_ZVAL(result);
+ object_init(result);
+ /* "a" == 1 array */
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "a", &array) ==
FAILURE) {
zend_throw_exception(spl_ce_InvalidArgumentException,
- "add_metadata expects an array and an optional long",
- 1 TSRMLS_CC);
- return;
+ "start_batch expects an array", 1 TSRMLS_CC);
+ goto cleanup;
}
array_hash = Z_ARRVAL_P(array);
for (zend_hash_internal_pointer_reset_ex(array_hash, &array_pointer);
- zend_hash_get_current_data_ex(array_hash, (void**)&inner_array,
+ zend_hash_get_current_data_ex(array_hash, (void**)&value,
&array_pointer) == SUCCESS;
zend_hash_move_forward_ex(array_hash, &array_pointer)) {
if (zend_hash_get_current_key_ex(array_hash, &key, &key_len, &index, 0,
- &array_pointer) != HASH_KEY_IS_STRING) {
+ &array_pointer) != HASH_KEY_IS_LONG) {
zend_throw_exception(spl_ce_InvalidArgumentException,
- "metadata keys must be strings", 1 TSRMLS_CC);
- return;
+ "batch keys must be integers", 1 TSRMLS_CC);
+ goto cleanup;
}
- if (Z_TYPE_P(*inner_array) != IS_ARRAY) {
- zend_throw_exception(spl_ce_InvalidArgumentException,
- "metadata values must be arrays",
- 1 TSRMLS_CC);
- return;
- }
- inner_array_hash = Z_ARRVAL_P(*inner_array);
- for (zend_hash_internal_pointer_reset_ex(inner_array_hash,
- &inner_array_pointer);
- zend_hash_get_current_data_ex(inner_array_hash, (void**)&value,
- &inner_array_pointer) == SUCCESS;
- zend_hash_move_forward_ex(inner_array_hash, &inner_array_pointer)) {
- if (Z_TYPE_P(*value) != IS_STRING) {
+ switch(index) {
+ case GRPC_OP_SEND_INITIAL_METADATA:
+ if (!create_metadata_array(*value, &metadata)) {
+ zend_throw_exception(spl_ce_InvalidArgumentException,
+ "Bad metadata value given", 1 TSRMLS_CC);
+ goto cleanup;
+ }
+ ops[op_num].data.send_initial_metadata.count =
+ metadata.count;
+ ops[op_num].data.send_initial_metadata.metadata =
+ metadata.metadata;
+ break;
+ case GRPC_OP_SEND_MESSAGE:
+ if (Z_TYPE_PP(value) != IS_STRING) {
+ zend_throw_exception(spl_ce_InvalidArgumentException,
+ "Expected a string for send message",
+ 1 TSRMLS_CC);
+ }
+ ops[op_num].data.send_message =
+ string_to_byte_buffer(Z_STRVAL_PP(value), Z_STRLEN_PP(value));
+ break;
+ case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+ break;
+ case GRPC_OP_SEND_STATUS_FROM_SERVER:
+ status_hash = Z_ARRVAL_PP(value);
+ if (zend_hash_find(status_hash, "metadata", sizeof("metadata"),
+ (void **)&inner_value) == SUCCESS) {
+ if (!create_metadata_array(*inner_value, &trailing_metadata)) {
+ zend_throw_exception(spl_ce_InvalidArgumentException,
+ "Bad trailing metadata value given",
+ 1 TSRMLS_CC);
+ goto cleanup;
+ }
+ ops[op_num].data.send_status_from_server.trailing_metadata =
+ trailing_metadata.metadata;
+ ops[op_num].data.send_status_from_server.trailing_metadata_count =
+ trailing_metadata.count;
+ }
+ if (zend_hash_find(status_hash, "code", sizeof("code"),
+ (void**)&inner_value) == SUCCESS) {
+ if (Z_TYPE_PP(inner_value) != IS_LONG) {
+ zend_throw_exception(spl_ce_InvalidArgumentException,
+ "Status code must be an integer",
+ 1 TSRMLS_CC);
+ goto cleanup;
+ }
+ ops[op_num].data.send_status_from_server.status =
+ Z_LVAL_PP(inner_value);
+ } else {
+ zend_throw_exception(spl_ce_InvalidArgumentException,
+ "Integer status code is required",
+ 1 TSRMLS_CC);
+ goto cleanup;
+ }
+ if (zend_hash_find(status_hash, "details", sizeof("details"),
+ (void**)&inner_value) == SUCCESS) {
+ if (Z_TYPE_PP(inner_value) != IS_STRING) {
+ zend_throw_exception(spl_ce_InvalidArgumentException,
+ "Status details must be a string",
+ 1 TSRMLS_CC);
+ goto cleanup;
+ }
+ ops[op_num].data.send_status_from_server.status_details =
+ Z_STRVAL_PP(inner_value);
+ } else {
+ zend_throw_exception(spl_ce_InvalidArgumentException,
+ "String status details is required",
+ 1 TSRMLS_CC);
+ goto cleanup;
+ }
+ break;
+ case GRPC_OP_RECV_INITIAL_METADATA:
+ ops[op_num].data.recv_initial_metadata = &recv_metadata;
+ break;
+ case GRPC_OP_RECV_MESSAGE:
+ ops[op_num].data.recv_message = &message;
+ break;
+ case GRPC_OP_RECV_STATUS_ON_CLIENT:
+ ops[op_num].data.recv_status_on_client.trailing_metadata =
+ &recv_trailing_metadata;
+ ops[op_num].data.recv_status_on_client.status = &status;
+ ops[op_num].data.recv_status_on_client.status_details =
+ &status_details;
+ ops[op_num].data.recv_status_on_client.status_details_capacity =
+ &status_details_capacity;
+ break;
+ case GRPC_OP_RECV_CLOSE_ON_SERVER:
+ ops[op_num].data.recv_close_on_server.cancelled = &cancelled;
+ break;
+ default:
zend_throw_exception(spl_ce_InvalidArgumentException,
- "metadata values must be arrays of strings",
- 1 TSRMLS_CC);
- return;
- }
- metadata.key = key;
- metadata.value = Z_STRVAL_P(*value);
- metadata.value_length = Z_STRLEN_P(*value);
- error_code = grpc_call_add_metadata_old(call->wrapped, &metadata, 0u);
- MAYBE_THROW_CALL_ERROR(add_metadata, error_code);
+ "Unrecognized key in batch", 1 TSRMLS_CC);
+ goto cleanup;
}
+ ops[op_num].op = (grpc_op_type)index;
+ op_num++;
}
-}
-
-/**
- * Invoke the RPC. Starts sending metadata and request headers over the wire
- * @param CompletionQueue $queue The completion queue to use with this call
- * @param long $metadata_tag The tag to associate with returned metadata
- * @param long $finished_tag The tag to associate with the finished event
- * @param long $flags A bitwise combination of the Grpc\WRITE_* constants
- * (optional)
- * @return Void
- */
-PHP_METHOD(Call, invoke) {
- grpc_call_error error_code;
- long tag1;
- long tag2;
- zval *queue_obj;
- long flags = 0;
- /* "Oll|l" == 1 Object, 3 mandatory longs, 1 optional long */
- if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "Oll|l", &queue_obj,
- grpc_ce_completion_queue, &tag1, &tag2,
- &flags) == FAILURE) {
- zend_throw_exception(
- spl_ce_InvalidArgumentException,
- "invoke needs a CompletionQueue, 2 longs, and an optional long",
- 1 TSRMLS_CC);
- return;
+ error = grpc_call_start_batch(call->wrapped, ops, op_num, call->wrapped);
+ if (error != GRPC_CALL_OK) {
+ zend_throw_exception(spl_ce_LogicException,
+ "start_batch was called incorrectly",
+ (long)error TSRMLS_CC);
+ goto cleanup;
}
- add_property_zval(getThis(), "completion_queue", queue_obj);
- wrapped_grpc_call *call =
- (wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC);
- wrapped_grpc_completion_queue *queue =
- (wrapped_grpc_completion_queue *)zend_object_store_get_object(
- queue_obj TSRMLS_CC);
- error_code = grpc_call_invoke_old(call->wrapped, queue->wrapped, (void *)tag1,
- (void *)tag2, (gpr_uint32)flags);
- MAYBE_THROW_CALL_ERROR(invoke, error_code);
-}
-
-/**
- * Accept an incoming RPC, binding a completion queue to it. To be called after
- * adding metadata to the call, but before sending messages. Can only be called
- * on the server
- * @param CompletionQueue $queue The completion queue to use with this call
- * @param long $finished_tag The tag to associate with the finished event
- * @param long $flags A bitwise combination of the Grpc\WRITE_* constants
- * (optional)
- * @return Void
- */
-PHP_METHOD(Call, server_accept) {
- long tag;
- zval *queue_obj;
- grpc_call_error error_code;
- /* "Ol|l" == 1 Object, 1 long */
- if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "Ol", &queue_obj,
- grpc_ce_completion_queue, &tag) == FAILURE) {
- zend_throw_exception(
- spl_ce_InvalidArgumentException,
- "server_accept expects a CompletionQueue, a long, and an optional long",
- 1 TSRMLS_CC);
- return;
- }
- add_property_zval(getThis(), "completion_queue", queue_obj);
- wrapped_grpc_call *call =
- (wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC);
- wrapped_grpc_completion_queue *queue =
- (wrapped_grpc_completion_queue *)zend_object_store_get_object(
- queue_obj TSRMLS_CC);
- error_code =
- grpc_call_server_accept_old(call->wrapped, queue->wrapped, (void *)tag);
- MAYBE_THROW_CALL_ERROR(server_accept, error_code);
-}
-
-PHP_METHOD(Call, server_end_initial_metadata) {
- grpc_call_error error_code;
- long flags = 0;
- /* "|l" == 1 optional long */
- if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|l", &flags) ==
- FAILURE) {
- zend_throw_exception(spl_ce_InvalidArgumentException,
- "server_end_initial_metadata expects an optional long",
- 1 TSRMLS_CC);
- }
- wrapped_grpc_call *call =
- (wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC);
- error_code = grpc_call_server_end_initial_metadata_old(call->wrapped, flags);
- MAYBE_THROW_CALL_ERROR(server_end_initial_metadata, error_code);
-}
-
-/**
- * Called by clients to cancel an RPC on the server.
- * @return Void
- */
-PHP_METHOD(Call, cancel) {
- wrapped_grpc_call *call =
- (wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC);
- grpc_call_error error_code = grpc_call_cancel(call->wrapped);
- MAYBE_THROW_CALL_ERROR(cancel, error_code);
-}
-
-/**
- * Queue a byte buffer for writing
- * @param string $buffer The buffer to queue for writing
- * @param long $tag The tag to associate with this write
- * @param long $flags A bitwise combination of the Grpc\WRITE_* constants
- * (optional)
- * @return Void
- */
-PHP_METHOD(Call, start_write) {
- grpc_call_error error_code;
- wrapped_grpc_call *call =
- (wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC);
- char *buffer;
- int buffer_len;
- long tag;
- long flags = 0;
- /* "Ol|l" == 1 Object, 1 mandatory long, 1 optional long */
- if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sl|l", &buffer,
- &buffer_len, &tag, &flags) == FAILURE) {
- zend_throw_exception(spl_ce_InvalidArgumentException,
- "start_write expects a string and an optional long",
+ event = grpc_completion_queue_pluck(call->queue, call->wrapped,
+ gpr_inf_future);
+ if (event->data.op_complete != GRPC_OP_OK) {
+ zend_throw_exception(spl_ce_LogicException,
+ "The batch failed for some reason",
1 TSRMLS_CC);
- return;
+ goto cleanup;
}
- error_code = grpc_call_start_write_old(
- call->wrapped, string_to_byte_buffer(buffer, buffer_len), (void *)tag,
- (gpr_uint32)flags);
- MAYBE_THROW_CALL_ERROR(start_write, error_code);
-}
-
-/**
- * Queue a status for writing
- * @param long $status_code The status code to send
- * @param string $status_details The status details to send
- * @param long $tag The tag to associate with this status
- * @return Void
- */
-PHP_METHOD(Call, start_write_status) {
- grpc_call_error error_code;
- wrapped_grpc_call *call =
- (wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC);
- long status_code;
- int status_details_length;
- long tag;
- char *status_details;
- /* "lsl" == 1 long, 1 string, 1 long */
- if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "lsl", &status_code,
- &status_details, &status_details_length,
- &tag) == FAILURE) {
- zend_throw_exception(
- spl_ce_InvalidArgumentException,
- "start_write_status expects a long, a string, and a long", 1 TSRMLS_CC);
- return;
+ for (int i = 0; i < op_num; i++) {
+ switch(ops[i].op) {
+ case GRPC_OP_SEND_INITIAL_METADATA:
+ add_property_bool(result, "send_metadata", true);
+ break;
+ case GRPC_OP_SEND_MESSAGE:
+ add_property_bool(result, "send_message", true);
+ break;
+ case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+ add_property_bool(result, "send_close", true);
+ break;
+ case GRPC_OP_SEND_STATUS_FROM_SERVER:
+ add_property_bool(result, "send_status", true);
+ break;
+ case GRPC_OP_RECV_INITIAL_METADATA:
+ add_property_zval(result, "metadata",
+ grpc_parse_metadata_array(&recv_metadata));
+ break;
+ case GRPC_OP_RECV_MESSAGE:
+ byte_buffer_to_string(message, &message_str, &message_len);
+ if (message_str == NULL) {
+ add_property_null(result, "message");
+ } else {
+ add_property_stringl(result, "message", message_str, message_len,
+ false);
+ }
+ break;
+ case GRPC_OP_RECV_STATUS_ON_CLIENT:
+ MAKE_STD_ZVAL(recv_status);
+ object_init(recv_status);
+ add_property_zval(recv_status, "metadata",
+ grpc_parse_metadata_array(&recv_trailing_metadata));
+ add_property_long(recv_status, "code", status);
+ add_property_string(recv_status, "details", status_details, true);
+ add_property_zval(result, "status", recv_status);
+ break;
+ case GRPC_OP_RECV_CLOSE_ON_SERVER:
+ add_property_bool(result, "cancelled", cancelled);
+ break;
+ default:
+ break;
+ }
}
- error_code = grpc_call_start_write_status_old(call->wrapped,
- (grpc_status_code)status_code,
- status_details, (void *)tag);
- MAYBE_THROW_CALL_ERROR(start_write_status, error_code);
-}
-
-/**
- * Indicate that there are no more messages to send
- * @return Void
- */
-PHP_METHOD(Call, writes_done) {
- grpc_call_error error_code;
- wrapped_grpc_call *call =
- (wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC);
- long tag;
- /* "l" == 1 long */
- if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &tag) == FAILURE) {
- zend_throw_exception(spl_ce_InvalidArgumentException,
- "writes_done expects a long", 1 TSRMLS_CC);
- return;
+cleanup:
+ grpc_metadata_array_destroy(&metadata);
+ grpc_metadata_array_destroy(&trailing_metadata);
+ grpc_metadata_array_destroy(&recv_metadata);
+ grpc_metadata_array_destroy(&recv_trailing_metadata);
+ if (status_details != NULL) {
+ gpr_free(status_details);
}
- error_code = grpc_call_writes_done_old(call->wrapped, (void *)tag);
- MAYBE_THROW_CALL_ERROR(writes_done, error_code);
+ RETURN_DESTROY_ZVAL(result);
}
/**
- * Initiate a read on a call. Output event contains a byte buffer with the
- * result of the read
- * @param long $tag The tag to associate with this read
- * @return Void
+ * Cancel the call. This will cause the call to end with STATUS_CANCELLED if it
+ * has not already ended with another status.
*/
-PHP_METHOD(Call, start_read) {
- grpc_call_error error_code;
+PHP_METHOD(Call, cancel) {
wrapped_grpc_call *call =
(wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC);
- long tag;
- /* "l" == 1 long */
- if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &tag) == FAILURE) {
- zend_throw_exception(spl_ce_InvalidArgumentException,
- "start_read expects a long", 1 TSRMLS_CC);
- return;
- }
- error_code = grpc_call_start_read_old(call->wrapped, (void *)tag);
- MAYBE_THROW_CALL_ERROR(start_read, error_code);
+ grpc_call_cancel(call->wrapped);
}
static zend_function_entry call_methods[] = {
PHP_ME(Call, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR)
- PHP_ME(Call, server_accept, NULL, ZEND_ACC_PUBLIC)
- PHP_ME(Call, server_end_initial_metadata, NULL, ZEND_ACC_PUBLIC)
- PHP_ME(Call, add_metadata, NULL, ZEND_ACC_PUBLIC)
- PHP_ME(Call, cancel, NULL, ZEND_ACC_PUBLIC)
- PHP_ME(Call, invoke, NULL, ZEND_ACC_PUBLIC)
- PHP_ME(Call, start_read, NULL, ZEND_ACC_PUBLIC)
- PHP_ME(Call, start_write, NULL, ZEND_ACC_PUBLIC)
- PHP_ME(Call, start_write_status, NULL, ZEND_ACC_PUBLIC)
- PHP_ME(Call, writes_done, NULL, ZEND_ACC_PUBLIC) PHP_FE_END};
+ PHP_ME(Call, start_batch, NULL, ZEND_ACC_PUBLIC)
+ PHP_ME(Call, cancel, NULL, ZEND_ACC_PUBLIC) PHP_FE_END};
void grpc_init_call(TSRMLS_D) {
zend_class_entry ce;
diff --git a/src/php/ext/grpc/call.h b/src/php/ext/grpc/call.h
index bce5d82974..743effe5a1 100644
--- a/src/php/ext/grpc/call.h
+++ b/src/php/ext/grpc/call.h
@@ -45,17 +45,6 @@
#include "grpc/grpc.h"
-// Throw an exception if error_code is not OK
-#define MAYBE_THROW_CALL_ERROR(func_name, error_code) \
- do { \
- if (error_code != GRPC_CALL_OK) { \
- zend_throw_exception(spl_ce_LogicException, \
- #func_name " was called incorrectly", \
- (long)error_code TSRMLS_CC); \
- return; \
- } \
- } while (0)
-
/* Class entry for the Call PHP class */
extern zend_class_entry *grpc_ce_call;
@@ -65,16 +54,18 @@ typedef struct wrapped_grpc_call {
bool owned;
grpc_call *wrapped;
+ grpc_completion_queue *queue;
} wrapped_grpc_call;
/* Initializes the Call PHP class */
void grpc_init_call(TSRMLS_D);
/* Creates a Call object that wraps the given grpc_call struct */
-zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned);
+zval *grpc_php_wrap_call(grpc_call *wrapped, grpc_completion_queue *queue,
+ bool owned);
/* Creates and returns a PHP associative array of metadata from a C array of
* call metadata */
-zval *grpc_call_create_metadata_array(int count, grpc_metadata *elements);
+zval *grpc_parse_metadata_array(grpc_metadata_array *metadata_array);
#endif /* NET_GRPC_PHP_GRPC_CHANNEL_H_ */
diff --git a/src/php/ext/grpc/channel.c b/src/php/ext/grpc/channel.c
index 5e99332fab..c96fb128a6 100644
--- a/src/php/ext/grpc/channel.c
+++ b/src/php/ext/grpc/channel.c
@@ -51,7 +51,6 @@
#include "grpc/support/log.h"
#include "grpc/grpc_security.h"
-#include "completion_queue.h"
#include "server.h"
#include "credentials.h"
@@ -139,6 +138,9 @@ PHP_METHOD(Channel, __construct) {
HashTable *array_hash;
zval **creds_obj = NULL;
wrapped_grpc_credentials *creds = NULL;
+ zval **override_obj;
+ char *override;
+ int override_len;
/* "s|a" == 1 string, 1 optional array */
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|a", &target,
&target_length, &args_array) == FAILURE) {
@@ -146,6 +148,8 @@ PHP_METHOD(Channel, __construct) {
"Channel expects a string and an array", 1 TSRMLS_CC);
return;
}
+ override = target;
+ override_len = target_length;
if (args_array == NULL) {
channel->wrapped = grpc_channel_create(target, NULL);
} else {
@@ -162,6 +166,19 @@ PHP_METHOD(Channel, __construct) {
*creds_obj TSRMLS_CC);
zend_hash_del(array_hash, "credentials", 12);
}
+ if (zend_hash_find(array_hash, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
+ sizeof(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG),
+ (void **)&override_obj) == SUCCESS) {
+ if (Z_TYPE_PP(override_obj) != IS_STRING) {
+ zend_throw_exception(spl_ce_InvalidArgumentException,
+ GRPC_SSL_TARGET_NAME_OVERRIDE_ARG
+ " must be a string",
+ 1 TSRMLS_CC);
+ return;
+ }
+ override = Z_STRVAL_PP(override_obj);
+ override_len = Z_STRLEN_PP(override_obj);
+ }
php_grpc_read_args_array(args_array, &args);
if (creds == NULL) {
channel->wrapped = grpc_channel_create(target, &args);
@@ -172,8 +189,8 @@ PHP_METHOD(Channel, __construct) {
}
efree(args.args);
}
- channel->target = ecalloc(target_length + 1, sizeof(char));
- memcpy(channel->target, target, target_length);
+ channel->target = ecalloc(override_len + 1, sizeof(char));
+ memcpy(channel->target, override, override_len);
}
/**
diff --git a/src/php/ext/grpc/completion_queue.c b/src/php/ext/grpc/completion_queue.c
deleted file mode 100644
index 93abf5df36..0000000000
--- a/src/php/ext/grpc/completion_queue.c
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "completion_queue.h"
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include "php.h"
-#include "php_ini.h"
-#include "ext/standard/info.h"
-#include "ext/spl/spl_exceptions.h"
-#include "php_grpc.h"
-
-#include "zend_exceptions.h"
-
-#include <stdbool.h>
-
-#include "grpc/grpc.h"
-
-#include "event.h"
-#include "timeval.h"
-
-zend_class_entry *grpc_ce_completion_queue;
-
-/* Frees and destroys a wrapped instance of grpc_completion_queue */
-void free_wrapped_grpc_completion_queue(void *object TSRMLS_DC) {
- wrapped_grpc_completion_queue *queue = NULL;
- grpc_event *event;
- queue = (wrapped_grpc_completion_queue *)object;
- if (queue->wrapped != NULL) {
- grpc_completion_queue_shutdown(queue->wrapped);
- event = grpc_completion_queue_next(queue->wrapped, gpr_inf_future);
- while (event != NULL) {
- if (event->type == GRPC_QUEUE_SHUTDOWN) {
- break;
- }
- event = grpc_completion_queue_next(queue->wrapped, gpr_inf_future);
- }
- grpc_completion_queue_destroy(queue->wrapped);
- }
- efree(queue);
-}
-
-/* Initializes an instance of wrapped_grpc_channel to be associated with an
- * object of a class specified by class_type */
-zend_object_value create_wrapped_grpc_completion_queue(
- zend_class_entry *class_type TSRMLS_DC) {
- zend_object_value retval;
- wrapped_grpc_completion_queue *intern;
-
- intern = (wrapped_grpc_completion_queue *)emalloc(
- sizeof(wrapped_grpc_completion_queue));
- memset(intern, 0, sizeof(wrapped_grpc_completion_queue));
-
- zend_object_std_init(&intern->std, class_type TSRMLS_CC);
- object_properties_init(&intern->std, class_type);
- retval.handle = zend_objects_store_put(
- intern, (zend_objects_store_dtor_t)zend_objects_destroy_object,
- free_wrapped_grpc_completion_queue, NULL TSRMLS_CC);
- retval.handlers = zend_get_std_object_handlers();
- return retval;
-}
-
-/**
- * Construct an instance of CompletionQueue
- */
-PHP_METHOD(CompletionQueue, __construct) {
- wrapped_grpc_completion_queue *queue =
- (wrapped_grpc_completion_queue *)zend_object_store_get_object(getThis()
- TSRMLS_CC);
- queue->wrapped = grpc_completion_queue_create();
-}
-
-/**
- * Blocks until an event is available, the completion queue is being shutdown,
- * or timeout is reached. Returns NULL on timeout, otherwise the event that
- * occurred. Callers should call event.finish once they have processed the
- * event.
- * @param Timeval $timeout The timeout for the event
- * @return Event The event that occurred
- */
-PHP_METHOD(CompletionQueue, next) {
- zval *timeout;
- /* "O" == 1 Object */
- if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "O", &timeout,
- grpc_ce_timeval) == FAILURE) {
- zend_throw_exception(spl_ce_InvalidArgumentException,
- "next needs a Timeval", 1 TSRMLS_CC);
- return;
- }
- wrapped_grpc_completion_queue *completion_queue =
- (wrapped_grpc_completion_queue *)zend_object_store_get_object(getThis()
- TSRMLS_CC);
- wrapped_grpc_timeval *wrapped_timeout =
- (wrapped_grpc_timeval *)zend_object_store_get_object(timeout TSRMLS_CC);
- grpc_event *event = grpc_completion_queue_next(completion_queue->wrapped,
- wrapped_timeout->wrapped);
- if (event == NULL) {
- RETURN_NULL();
- }
- zval *wrapped_event = grpc_php_convert_event(event);
- RETURN_DESTROY_ZVAL(wrapped_event);
-}
-
-PHP_METHOD(CompletionQueue, pluck) {
- long tag;
- zval *timeout;
- /* "lO" == 1 long, 1 Object */
- if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "lO", &tag, &timeout,
- grpc_ce_timeval) == FAILURE) {
- zend_throw_exception(spl_ce_InvalidArgumentException,
- "pluck needs a long and a Timeval", 1 TSRMLS_CC);
- }
- wrapped_grpc_completion_queue *completion_queue =
- (wrapped_grpc_completion_queue *)zend_object_store_get_object(getThis()
- TSRMLS_CC);
- wrapped_grpc_timeval *wrapped_timeout =
- (wrapped_grpc_timeval *)zend_object_store_get_object(timeout TSRMLS_CC);
- grpc_event *event = grpc_completion_queue_pluck(
- completion_queue->wrapped, (void *)tag, wrapped_timeout->wrapped);
- if (event == NULL) {
- RETURN_NULL();
- }
- zval *wrapped_event = grpc_php_convert_event(event);
- RETURN_DESTROY_ZVAL(wrapped_event);
-}
-
-static zend_function_entry completion_queue_methods[] = {
- PHP_ME(CompletionQueue, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR)
- PHP_ME(CompletionQueue, next, NULL, ZEND_ACC_PUBLIC)
- PHP_ME(CompletionQueue, pluck, NULL, ZEND_ACC_PUBLIC) PHP_FE_END};
-
-void grpc_init_completion_queue(TSRMLS_D) {
- zend_class_entry ce;
- INIT_CLASS_ENTRY(ce, "Grpc\\CompletionQueue", completion_queue_methods);
- ce.create_object = create_wrapped_grpc_completion_queue;
- grpc_ce_completion_queue = zend_register_internal_class(&ce TSRMLS_CC);
-}
diff --git a/src/php/ext/grpc/completion_queue.h b/src/php/ext/grpc/completion_queue.h
deleted file mode 100755
index 1d386cc58f..0000000000
--- a/src/php/ext/grpc/completion_queue.h
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef NET_GRPC_PHP_GRPC_COMPLETION_QUEUE_H_
-#define NET_GRPC_PHP_GRPC_COMPLETION_QUEUE_H_
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include "php.h"
-#include "php_ini.h"
-#include "ext/standard/info.h"
-#include "php_grpc.h"
-
-#include "grpc/grpc.h"
-
-/* Class entry for the PHP CompletionQueue class */
-extern zend_class_entry *grpc_ce_completion_queue;
-
-/* Wrapper class for grpc_completion_queue that can be associated with a
- PHP object */
-typedef struct wrapped_grpc_completion_queue {
- zend_object std;
-
- grpc_completion_queue *wrapped;
-} wrapped_grpc_completion_queue;
-
-/* Initialize the CompletionQueue class */
-void grpc_init_completion_queue(TSRMLS_D);
-
-#endif /* NET_GRPC_PHP_GRPC_COMPLETION_QUEUE_H_ */
diff --git a/src/php/ext/grpc/config.m4 b/src/php/ext/grpc/config.m4
index 27c67781e7..d1a8decb73 100755
--- a/src/php/ext/grpc/config.m4
+++ b/src/php/ext/grpc/config.m4
@@ -66,5 +66,5 @@ if test "$PHP_GRPC" != "no"; then
PHP_SUBST(GRPC_SHARED_LIBADD)
- PHP_NEW_EXTENSION(grpc, byte_buffer.c call.c channel.c completion_queue.c credentials.c event.c timeval.c server.c server_credentials.c php_grpc.c, $ext_shared, , -Wall -Werror -pedantic -std=c99)
+ PHP_NEW_EXTENSION(grpc, byte_buffer.c call.c channel.c credentials.c timeval.c server.c server_credentials.c php_grpc.c, $ext_shared, , -Wall -Werror -pedantic -std=c99)
fi
diff --git a/src/php/ext/grpc/event.c b/src/php/ext/grpc/event.c
deleted file mode 100644
index 452c4b8bcb..0000000000
--- a/src/php/ext/grpc/event.c
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "event.h"
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include "php.h"
-#include "php_ini.h"
-#include "ext/standard/info.h"
-#include "php_grpc.h"
-
-#include <stdbool.h>
-
-#include "grpc/grpc.h"
-
-#include "byte_buffer.h"
-#include "call.h"
-#include "timeval.h"
-
-/* Create a new PHP object containing the event data in the event struct.
- event must not be used after this function is called */
-zval *grpc_php_convert_event(grpc_event *event) {
- zval *data_object;
- char *detail_string;
- size_t detail_len;
- char *method_string;
- size_t method_len;
- char *host_string;
- size_t host_len;
- char *read_string;
- size_t read_len;
-
- zval *event_object;
-
- if (event == NULL) {
- return NULL;
- }
-
- MAKE_STD_ZVAL(event_object);
- object_init(event_object);
-
- add_property_zval(
- event_object, "call",
- grpc_php_wrap_call(event->call, event->type == GRPC_SERVER_RPC_NEW));
- add_property_long(event_object, "type", event->type);
- add_property_long(event_object, "tag", (long)event->tag);
-
- switch (event->type) {
- case GRPC_QUEUE_SHUTDOWN:
- add_property_null(event_object, "data");
- break;
- case GRPC_READ:
- if (event->data.read == NULL) {
- add_property_null(event_object, "data");
- } else {
- byte_buffer_to_string(event->data.read, &read_string, &read_len);
- add_property_stringl(event_object, "data", read_string, read_len, true);
- }
- break;
- case GRPC_WRITE_ACCEPTED:
- add_property_long(event_object, "data", (long)event->data.write_accepted);
- break;
- case GRPC_FINISH_ACCEPTED:
- add_property_long(event_object, "data",
- (long)event->data.finish_accepted);
- break;
- case GRPC_CLIENT_METADATA_READ:
- data_object = grpc_call_create_metadata_array(
- event->data.client_metadata_read.count,
- event->data.client_metadata_read.elements);
- add_property_zval(event_object, "data", data_object);
- break;
- case GRPC_FINISHED:
- MAKE_STD_ZVAL(data_object);
- object_init(data_object);
- add_property_long(data_object, "code", event->data.finished.status);
- if (event->data.finished.details == NULL) {
- add_property_null(data_object, "details");
- } else {
- detail_len = strlen(event->data.finished.details);
- detail_string = ecalloc(detail_len + 1, sizeof(char));
- memcpy(detail_string, event->data.finished.details, detail_len);
- add_property_string(data_object, "details", detail_string, true);
- }
- add_property_zval(data_object, "metadata",
- grpc_call_create_metadata_array(
- event->data.finished.metadata_count,
- event->data.finished.metadata_elements));
- add_property_zval(event_object, "data", data_object);
- break;
- case GRPC_SERVER_RPC_NEW:
- MAKE_STD_ZVAL(data_object);
- object_init(data_object);
- method_len = strlen(event->data.server_rpc_new.method);
- method_string = ecalloc(method_len + 1, sizeof(char));
- memcpy(method_string, event->data.server_rpc_new.method, method_len);
- add_property_string(data_object, "method", method_string, false);
- host_len = strlen(event->data.server_rpc_new.host);
- host_string = ecalloc(host_len + 1, sizeof(char));
- memcpy(host_string, event->data.server_rpc_new.host, host_len);
- add_property_string(data_object, "host", host_string, false);
- add_property_zval(
- data_object, "absolute_timeout",
- grpc_php_wrap_timeval(event->data.server_rpc_new.deadline));
- add_property_zval(data_object, "metadata",
- grpc_call_create_metadata_array(
- event->data.server_rpc_new.metadata_count,
- event->data.server_rpc_new.metadata_elements));
- add_property_zval(event_object, "data", data_object);
- break;
- default:
- add_property_null(event_object, "data");
- break;
- }
- grpc_event_finish(event);
- return event_object;
-}
diff --git a/src/php/ext/grpc/php_grpc.c b/src/php/ext/grpc/php_grpc.c
index 67e366c385..1f9edfe881 100644
--- a/src/php/ext/grpc/php_grpc.c
+++ b/src/php/ext/grpc/php_grpc.c
@@ -34,8 +34,6 @@
#include "call.h"
#include "channel.h"
#include "server.h"
-#include "completion_queue.h"
-#include "event.h"
#include "timeval.h"
#include "credentials.h"
#include "server_credentials.h"
@@ -127,27 +125,12 @@ PHP_MINIT_FUNCTION(grpc) {
REGISTER_LONG_CONSTANT("Grpc\\CALL_ERROR_INVALID_FLAGS",
GRPC_CALL_ERROR_INVALID_FLAGS, CONST_CS);
- /* Register op error constants */
- REGISTER_LONG_CONSTANT("Grpc\\OP_OK", GRPC_OP_OK, CONST_CS);
- REGISTER_LONG_CONSTANT("Grpc\\OP_ERROR", GRPC_OP_ERROR, CONST_CS);
-
/* Register flag constants */
REGISTER_LONG_CONSTANT("Grpc\\WRITE_BUFFER_HINT", GRPC_WRITE_BUFFER_HINT,
CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\WRITE_NO_COMPRESS", GRPC_WRITE_NO_COMPRESS,
CONST_CS);
- /* Register completion type constants */
- REGISTER_LONG_CONSTANT("Grpc\\QUEUE_SHUTDOWN", GRPC_QUEUE_SHUTDOWN, CONST_CS);
- REGISTER_LONG_CONSTANT("Grpc\\READ", GRPC_READ, CONST_CS);
- REGISTER_LONG_CONSTANT("Grpc\\FINISH_ACCEPTED", GRPC_FINISH_ACCEPTED,
- CONST_CS);
- REGISTER_LONG_CONSTANT("Grpc\\WRITE_ACCEPTED", GRPC_WRITE_ACCEPTED, CONST_CS);
- REGISTER_LONG_CONSTANT("Grpc\\CLIENT_METADATA_READ",
- GRPC_CLIENT_METADATA_READ, CONST_CS);
- REGISTER_LONG_CONSTANT("Grpc\\FINISHED", GRPC_FINISHED, CONST_CS);
- REGISTER_LONG_CONSTANT("Grpc\\SERVER_RPC_NEW", GRPC_SERVER_RPC_NEW, CONST_CS);
-
/* Register status constants */
REGISTER_LONG_CONSTANT("Grpc\\STATUS_OK", GRPC_STATUS_OK, CONST_CS);
REGISTER_LONG_CONSTANT("Grpc\\STATUS_CANCELLED", GRPC_STATUS_CANCELLED,
@@ -181,10 +164,27 @@ PHP_MINIT_FUNCTION(grpc) {
REGISTER_LONG_CONSTANT("Grpc\\STATUS_DATA_LOSS", GRPC_STATUS_DATA_LOSS,
CONST_CS);
+ /* Register op type constants */
+ REGISTER_LONG_CONSTANT("Grpc\\OP_SEND_INITIAL_METADATA",
+ GRPC_OP_SEND_INITIAL_METADATA, CONST_CS);
+ REGISTER_LONG_CONSTANT("Grpc\\OP_SEND_MESSAGE",
+ GRPC_OP_SEND_MESSAGE, CONST_CS);
+ REGISTER_LONG_CONSTANT("Grpc\\OP_SEND_CLOSE_FROM_CLIENT",
+ GRPC_OP_SEND_CLOSE_FROM_CLIENT, CONST_CS);
+ REGISTER_LONG_CONSTANT("Grpc\\OP_SEND_STATUS_FROM_SERVER",
+ GRPC_OP_SEND_STATUS_FROM_SERVER, CONST_CS);
+ REGISTER_LONG_CONSTANT("Grpc\\OP_RECV_INITIAL_METADATA",
+ GRPC_OP_RECV_INITIAL_METADATA, CONST_CS);
+ REGISTER_LONG_CONSTANT("Grpc\\OP_RECV_MESSAGE",
+ GRPC_OP_RECV_MESSAGE, CONST_CS);
+ REGISTER_LONG_CONSTANT("Grpc\\OP_RECV_STATUS_ON_CLIENT",
+ GRPC_OP_RECV_STATUS_ON_CLIENT, CONST_CS);
+ REGISTER_LONG_CONSTANT("Grpc\\OP_RECV_CLOSE_ON_SERVER",
+ GRPC_OP_RECV_CLOSE_ON_SERVER, CONST_CS);
+
grpc_init_call(TSRMLS_C);
grpc_init_channel(TSRMLS_C);
grpc_init_server(TSRMLS_C);
- grpc_init_completion_queue(TSRMLS_C);
grpc_init_timeval(TSRMLS_C);
grpc_init_credentials(TSRMLS_C);
grpc_init_server_credentials(TSRMLS_C);
diff --git a/src/php/ext/grpc/server.c b/src/php/ext/grpc/server.c
index a5cfd95287..86b29958fb 100644
--- a/src/php/ext/grpc/server.c
+++ b/src/php/ext/grpc/server.c
@@ -52,15 +52,27 @@
#include "grpc/grpc_security.h"
#include "server.h"
-#include "completion_queue.h"
#include "channel.h"
#include "server_credentials.h"
+#include "timeval.h"
zend_class_entry *grpc_ce_server;
/* Frees and destroys an instance of wrapped_grpc_server */
void free_wrapped_grpc_server(void *object TSRMLS_DC) {
wrapped_grpc_server *server = (wrapped_grpc_server *)object;
+ grpc_event *event;
+ if (server->queue != NULL) {
+ grpc_completion_queue_shutdown(server->queue);
+ event = grpc_completion_queue_next(server->queue, gpr_inf_future);
+ while (event != NULL) {
+ if (event->type == GRPC_QUEUE_SHUTDOWN) {
+ break;
+ }
+ event = grpc_completion_queue_next(server->queue, gpr_inf_future);
+ }
+ grpc_completion_queue_destroy(server->queue);
+ }
if (server->wrapped != NULL) {
grpc_server_shutdown(server->wrapped);
grpc_server_destroy(server->wrapped);
@@ -95,26 +107,22 @@ zend_object_value create_wrapped_grpc_server(zend_class_entry *class_type
PHP_METHOD(Server, __construct) {
wrapped_grpc_server *server =
(wrapped_grpc_server *)zend_object_store_get_object(getThis() TSRMLS_CC);
- zval *queue_obj;
zval *args_array = NULL;
grpc_channel_args args;
- /* "O|a" == 1 Object, 1 optional array */
- if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "O|a", &queue_obj,
- grpc_ce_completion_queue, &args_array) == FAILURE) {
+ /* "|a" == 1 optional array */
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|a", &args_array) ==
+ FAILURE) {
zend_throw_exception(spl_ce_InvalidArgumentException,
- "Server expects a CompletionQueue and an array",
+ "Server expects an array",
1 TSRMLS_CC);
return;
}
- add_property_zval(getThis(), "completion_queue", queue_obj);
- wrapped_grpc_completion_queue *queue =
- (wrapped_grpc_completion_queue *)zend_object_store_get_object(
- queue_obj TSRMLS_CC);
+ server->queue = grpc_completion_queue_create();
if (args_array == NULL) {
- server->wrapped = grpc_server_create(queue->wrapped, NULL);
+ server->wrapped = grpc_server_create(server->queue, NULL);
} else {
php_grpc_read_args_array(args_array, &args);
- server->wrapped = grpc_server_create(queue->wrapped, &args);
+ server->wrapped = grpc_server_create(server->queue, &args);
efree(args.args);
}
}
@@ -129,16 +137,40 @@ PHP_METHOD(Server, request_call) {
grpc_call_error error_code;
wrapped_grpc_server *server =
(wrapped_grpc_server *)zend_object_store_get_object(getThis() TSRMLS_CC);
- long tag_new;
- /* "l" == 1 long */
- if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &tag_new) ==
- FAILURE) {
- zend_throw_exception(spl_ce_InvalidArgumentException,
- "request_call expects a long", 1 TSRMLS_CC);
- return;
+ grpc_call *call;
+ grpc_call_details details;
+ grpc_metadata_array metadata;
+ zval *result;
+ grpc_event *event;
+ MAKE_STD_ZVAL(result);
+ object_init(result);
+ grpc_call_details_init(&details);
+ grpc_metadata_array_init(&metadata);
+ error_code = grpc_server_request_call(server->wrapped, &call, &details,
+ &metadata, server->queue, NULL);
+ if (error_code != GRPC_CALL_OK) {
+ zend_throw_exception(spl_ce_LogicException, "request_call failed",
+ (long)error_code TSRMLS_CC);
+ goto cleanup;
+ }
+ event = grpc_completion_queue_pluck(server->queue, NULL, gpr_inf_future);
+ if (event->data.op_complete != GRPC_OP_OK) {
+ zend_throw_exception(spl_ce_LogicException,
+ "Failed to request a call for some reason",
+ 1 TSRMLS_CC);
+ goto cleanup;
}
- error_code = grpc_server_request_call_old(server->wrapped, (void *)tag_new);
- MAYBE_THROW_CALL_ERROR(request_call, error_code);
+ add_property_zval(result, "call", grpc_php_wrap_call(call, server->queue,
+ true));
+ add_property_string(result, "method", details.method, true);
+ add_property_string(result, "host", details.host, true);
+ add_property_zval(result, "absolute_deadline",
+ grpc_php_wrap_timeval(details.deadline));
+ add_property_zval(result, "metadata", grpc_parse_metadata_array(&metadata));
+cleanup:
+ grpc_call_details_destroy(&details);
+ grpc_metadata_array_destroy(&metadata);
+ RETURN_DESTROY_ZVAL(result);
}
/**
@@ -168,7 +200,7 @@ PHP_METHOD(Server, add_secure_http2_port) {
int addr_len;
zval *creds_obj;
/* "sO" == 1 string, 1 object */
- if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &addr, &addr_len,
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sO", &addr, &addr_len,
&creds_obj, grpc_ce_server_credentials) ==
FAILURE) {
zend_throw_exception(
diff --git a/src/php/ext/grpc/server.h b/src/php/ext/grpc/server.h
index b55689c581..ebb8d25ae1 100755
--- a/src/php/ext/grpc/server.h
+++ b/src/php/ext/grpc/server.h
@@ -53,6 +53,7 @@ typedef struct wrapped_grpc_server {
zend_object std;
grpc_server *wrapped;
+ grpc_completion_queue *queue;
} wrapped_grpc_server;
/* Initializes the Server class */
diff --git a/src/php/lib/Grpc/AbstractCall.php b/src/php/lib/Grpc/AbstractCall.php
new file mode 100644
index 0000000000..b813d16470
--- /dev/null
+++ b/src/php/lib/Grpc/AbstractCall.php
@@ -0,0 +1,79 @@
+<?php
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+namespace Grpc;
+
+require_once realpath(dirname(__FILE__) . '/../autoload.php');
+
+abstract class AbstractCall {
+
+ protected $call;
+ protected $deserialize;
+ protected $metadata;
+
+ /**
+ * Create a new Call wrapper object.
+ * @param Channel $channel The channel to communicate on
+ * @param string $method The method to call on the remote server
+ */
+ public function __construct(Channel $channel, $method, $deserialize) {
+ $this->call = new Call($channel, $method, Timeval::inf_future());
+ $this->deserialize = $deserialize;
+ }
+
+ /**
+ * @return The metadata sent by the server.
+ */
+ public function getMetadata() {
+ return $this->metadata;
+ }
+
+ /**
+ * Cancels the call
+ */
+ public function cancel() {
+ $this->call->cancel();
+ }
+
+ /**
+ * Deserialize a response value to an object.
+ * @param string $value The binary value to deserialize
+ * @return The deserialized value
+ */
+ protected function deserializeResponse($value) {
+ if ($value === null) {
+ return null;
+ }
+ return call_user_func($this->deserialize, $value);
+ }
+} \ No newline at end of file
diff --git a/src/php/lib/Grpc/AbstractSurfaceActiveCall.php b/src/php/lib/Grpc/AbstractSurfaceActiveCall.php
deleted file mode 100755
index 9d0af090ce..0000000000
--- a/src/php/lib/Grpc/AbstractSurfaceActiveCall.php
+++ /dev/null
@@ -1,98 +0,0 @@
-<?php
-
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-namespace Grpc;
-
-require_once realpath(dirname(__FILE__) . '/../autoload.php');
-
-/**
- * Represents an active call that allows sending and recieving messages.
- * Subclasses restrict how data can be sent and recieved.
- */
-abstract class AbstractSurfaceActiveCall {
- private $active_call;
- private $deserialize;
-
- /**
- * Create a new surface active call.
- * @param Channel $channel The channel to communicate on
- * @param string $method The method to call on the remote server
- * @param callable $deserialize The function to deserialize a value
- * @param array $metadata Metadata to send with the call, if applicable
- * @param long $flags Write flags to use with this call
- */
- public function __construct(Channel $channel,
- $method,
- callable $deserialize,
- $metadata = array(),
- $flags = 0) {
- $this->active_call = new ActiveCall($channel, $method, $metadata, $flags);
- $this->deserialize = $deserialize;
- }
-
- /**
- * @return The metadata sent by the server
- */
- public function getMetadata() {
- return $this->metadata();
- }
-
- /**
- * Cancels the call
- */
- public function cancel() {
- $this->active_call->cancel();
- }
-
- protected function _read() {
- $response = $this->active_call->read();
- if ($response === null) {
- return null;
- }
- return call_user_func($this->deserialize, $response);
- }
-
- protected function _write($value) {
- return $this->active_call->write($value->serialize());
- }
-
- protected function _writesDone() {
- $this->active_call->writesDone();
- }
-
- protected function _getStatus() {
- return $this->active_call->getStatus();
- }
-}
diff --git a/src/php/lib/Grpc/ActiveCall.php b/src/php/lib/Grpc/ActiveCall.php
deleted file mode 100755
index f0d0d55582..0000000000
--- a/src/php/lib/Grpc/ActiveCall.php
+++ /dev/null
@@ -1,123 +0,0 @@
-<?php
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-namespace Grpc;
-require_once realpath(dirname(__FILE__) . '/../autoload.php');
-
-/**
- * Represents an active call that allows sending and recieving binary data
- */
-class ActiveCall {
- private $completion_queue;
- private $call;
- private $flags;
- private $metadata;
-
- /**
- * Create a new active call.
- * @param Channel $channel The channel to communicate on
- * @param string $method The method to call on the remote server
- * @param array $metadata Metadata to send with the call, if applicable
- * @param long $flags Write flags to use with this call
- */
- public function __construct(Channel $channel,
- $method,
- $metadata = array(),
- $flags = 0) {
- $this->completion_queue = new CompletionQueue();
- $this->call = new Call($channel, $method, Timeval::inf_future());
- $this->call->add_metadata($metadata, 0);
- $this->flags = $flags;
-
- // Invoke the call.
- $this->call->invoke($this->completion_queue,
- CLIENT_METADATA_READ,
- FINISHED, 0);
- $metadata_event = $this->completion_queue->pluck(CLIENT_METADATA_READ,
- Timeval::inf_future());
- $this->metadata = $metadata_event->data;
- }
-
- /**
- * @return The metadata sent by the server.
- */
- public function getMetadata() {
- return $this->metadata;
- }
-
- /**
- * Cancels the call
- */
- public function cancel() {
- $this->call->cancel();
- }
-
- /**
- * Read a single message from the server.
- * @return The next message from the server, or null if there is none.
- */
- public function read() {
- $this->call->start_read(READ);
- $read_event = $this->completion_queue->pluck(READ, Timeval::inf_future());
- return $read_event->data;
- }
-
- /**
- * Write a single message to the server. This cannot be called after
- * writesDone is called.
- * @param ByteBuffer $data The data to write
- */
- public function write($data) {
- $this->call->start_write($data, WRITE_ACCEPTED, $this->flags);
- $this->completion_queue->pluck(WRITE_ACCEPTED, Timeval::inf_future());
- }
-
- /**
- * Indicate that no more writes will be sent.
- */
- public function writesDone() {
- $this->call->writes_done(FINISH_ACCEPTED);
- $this->completion_queue->pluck(FINISH_ACCEPTED, Timeval::inf_future());
- }
-
- /**
- * Wait for the server to send the status, and return it.
- * @return object The status object, with integer $code, string $details,
- * and array $metadata members
- */
- public function getStatus() {
- $status_event = $this->completion_queue->pluck(FINISHED,
- Timeval::inf_future());
- return $status_event->data;
- }
-}
diff --git a/src/php/lib/Grpc/BaseStub.php b/src/php/lib/Grpc/BaseStub.php
index fde055a3b3..9bc1711110 100755
--- a/src/php/lib/Grpc/BaseStub.php
+++ b/src/php/lib/Grpc/BaseStub.php
@@ -69,11 +69,9 @@ class BaseStub {
$argument,
callable $deserialize,
$metadata = array()) {
- return new SimpleSurfaceActiveCall($this->channel,
- $method,
- $deserialize,
- $argument,
- $metadata);
+ $call = new UnaryCall($this->channel, $method, $deserialize);
+ $call->start($argument, $metadata);
+ return $call;
}
/**
@@ -91,11 +89,9 @@ class BaseStub {
$arguments,
callable $deserialize,
$metadata = array()) {
- return new ClientStreamingSurfaceActiveCall($this->channel,
- $method,
- $deserialize,
- $arguments,
- $metadata);
+ $call = new ClientStreamingCall($this->channel, $method, $deserialize);
+ $call->start($arguments, $metadata);
+ return $call;
}
/**
@@ -112,11 +108,9 @@ class BaseStub {
$argument,
callable $deserialize,
$metadata = array()) {
- return new ServerStreamingSurfaceActiveCall($this->channel,
- $method,
- $deserialize,
- $argument,
- $metadata);
+ $call = new ServerStreamingCall($this->channel, $method, $deserialize);
+ $call->start($argument, $metadata);
+ return $call;
}
/**
@@ -130,9 +124,8 @@ class BaseStub {
public function _bidiRequest($method,
callable $deserialize,
$metadata = array()) {
- return new BidiStreamingSurfaceActiveCall($this->channel,
- $method,
- $deserialize,
- $metadata);
+ $call = new BidiStreamingCall($this->channel, $method, $deserialize);
+ $call->start($metadata);
+ return $call;
}
}
diff --git a/src/php/lib/Grpc/BidiStreamingSurfaceActiveCall.php b/src/php/lib/Grpc/BidiStreamingCall.php
index 0459f21e27..0d3dd629f2 100755..100644
--- a/src/php/lib/Grpc/BidiStreamingSurfaceActiveCall.php
+++ b/src/php/lib/Grpc/BidiStreamingCall.php
@@ -38,38 +38,52 @@ require_once realpath(dirname(__FILE__) . '/../autoload.php');
* Represents an active call that allows for sending and recieving messages in
* streams in any order.
*/
-class BidiStreamingSurfaceActiveCall extends AbstractSurfaceActiveCall {
+class BidiStreamingCall extends AbstractCall {
+ /**
+ * Start the call
+ * @param array $metadata Metadata to send with the call, if applicable
+ */
+ public function start($metadata) {
+ $event = $this->call->start_batch([
+ OP_SEND_INITIAL_METADATA => $metadata,
+ OP_RECV_INITIAL_METADATA => true]);
+ $this->metadata = $event->metadata;
+ }
/**
* Reads the next value from the server.
* @return The next value from the server, or null if there is none
*/
public function read() {
- return $this->_read();
+ $read_event = $this->call->start_batch([OP_RECV_MESSAGE => true]);
+ return $this->deserializeResponse($read_event->message);
}
/**
- * Writes a single message to the server. This cannot be called after
+ * Write a single message to the server. This cannot be called after
* writesDone is called.
- * @param $value The message to send
+ * @param ByteBuffer $data The data to write
*/
- public function write($value) {
- $this->_write($value);
+ public function write($data) {
+ $this->call->start_batch([OP_SEND_MESSAGE => $data->serialize()]);
}
/**
- * Indicate that no more writes will be sent
+ * Indicate that no more writes will be sent.
*/
public function writesDone() {
- $this->_writesDone();
+ $this->call->start_batch([OP_SEND_CLOSE_FROM_CLIENT => true]);
}
/**
* Wait for the server to send the status, and return it.
- * @return object The status object, with integer $code and string $details
- * members
+ * @return object The status object, with integer $code, string $details,
+ * and array $metadata members
*/
public function getStatus() {
- return $this->_getStatus();
+ $status_event = $this->call->start_batch([
+ OP_RECV_STATUS_ON_CLIENT => true
+ ]);
+ return $status_event->status;
}
-}
+} \ No newline at end of file
diff --git a/src/php/lib/Grpc/ClientStreamingSurfaceActiveCall.php b/src/php/lib/Grpc/ClientStreamingCall.php
index d33f09fbe4..4b3abcbdec 100755..100644
--- a/src/php/lib/Grpc/ClientStreamingSurfaceActiveCall.php
+++ b/src/php/lib/Grpc/ClientStreamingCall.php
@@ -38,25 +38,21 @@ require_once realpath(dirname(__FILE__) . '/../autoload.php');
* Represents an active call that sends a stream of messages and then gets a
* single response.
*/
-class ClientStreamingSurfaceActiveCall extends AbstractSurfaceActiveCall {
+class ClientStreamingCall extends AbstractCall {
/**
- * Create a new simple (single request/single response) active call.
- * @param Channel $channel The channel to communicate on
- * @param string $method The method to call on the remote server
- * @param callable $deserialize The function to deserialize a value
+ * Start the call.
* @param Traversable $arg_iter The iterator of arguments to send
* @param array $metadata Metadata to send with the call, if applicable
*/
- public function __construct(Channel $channel,
- $method,
- callable $deserialize,
- $arg_iter,
- $metadata = array()) {
- parent::__construct($channel, $method, $deserialize, $metadata, 0);
+ public function start($arg_iter, $metadata = array()) {
+ $event = $this->call->start_batch([
+ OP_SEND_INITIAL_METADATA => $metadata,
+ OP_RECV_INITIAL_METADATA => true]);
+ $this->metadata = $event->metadata;
foreach($arg_iter as $arg) {
- $this->_write($arg);
+ $this->call->start_batch([OP_SEND_MESSAGE => $arg->serialize()]);
}
- $this->_writesDone();
+ $this->call->start_batch([OP_SEND_CLOSE_FROM_CLIENT => true]);
}
/**
@@ -64,8 +60,9 @@ class ClientStreamingSurfaceActiveCall extends AbstractSurfaceActiveCall {
* @return [response data, status]
*/
public function wait() {
- $response = $this->_read();
- $status = $this->_getStatus();
- return array($response, $status);
+ $event = $this->call->start_batch([
+ OP_RECV_MESSAGE => true,
+ OP_RECV_STATUS_ON_CLIENT => true]);
+ return array($this->deserializeResponse($event->message), $event->status);
}
-}
+} \ No newline at end of file
diff --git a/src/php/lib/Grpc/ServerStreamingSurfaceActiveCall.php b/src/php/lib/Grpc/ServerStreamingCall.php
index fd08e86e51..7458f28bcb 100755..100644
--- a/src/php/lib/Grpc/ServerStreamingSurfaceActiveCall.php
+++ b/src/php/lib/Grpc/ServerStreamingCall.php
@@ -39,36 +39,41 @@ require_once realpath(dirname(__FILE__) . '/../autoload.php');
* Represents an active call that sends a single message and then gets a stream
* of reponses
*/
-class ServerStreamingSurfaceActiveCall extends AbstractSurfaceActiveCall {
+class ServerStreamingCall extends AbstractCall {
/**
- * Create a new simple (single request/single response) active call.
- * @param Channel $channel The channel to communicate on
- * @param string $method The method to call on the remote server
- * @param callable $deserialize The function to deserialize a value
+ * Start the call
* @param $arg The argument to send
* @param array $metadata Metadata to send with the call, if applicable
*/
- public function __construct(Channel $channel,
- $method,
- callable $deserialize,
- $arg,
- $metadata = array()) {
- parent::__construct($channel, $method, $deserialize, $metadata,
- \Grpc\WRITE_BUFFER_HINT);
- $this->_write($arg);
- $this->_writesDone();
+ public function start($arg, $metadata = array()) {
+ $event = $this->call->start_batch([
+ OP_SEND_INITIAL_METADATA => $metadata,
+ OP_RECV_INITIAL_METADATA => true,
+ OP_SEND_MESSAGE => $arg->serialize(),
+ OP_SEND_CLOSE_FROM_CLIENT => true]);
+ $this->metadata = $event->metadata;
}
/**
* @return An iterator of response values
*/
public function responses() {
- while(($response = $this->_read()) !== null) {
- yield $response;
+ $response = $this->call->start_batch([OP_RECV_MESSAGE => true])->message;
+ while($response !== null) {
+ yield $this->deserializeResponse($response);
+ $response = $this->call->start_batch([OP_RECV_MESSAGE => true])->message;
}
}
+ /**
+ * Wait for the server to send the status, and return it.
+ * @return object The status object, with integer $code, string $details,
+ * and array $metadata members
+ */
public function getStatus() {
- return $this->_getStatus();
+ $status_event = $this->call->start_batch([
+ OP_RECV_STATUS_ON_CLIENT => true
+ ]);
+ return $status_event->status;
}
-}
+} \ No newline at end of file
diff --git a/src/php/lib/Grpc/SimpleSurfaceActiveCall.php b/src/php/lib/Grpc/UnaryCall.php
index ba82f5704f..bbf9cfb588 100755..100644
--- a/src/php/lib/Grpc/SimpleSurfaceActiveCall.php
+++ b/src/php/lib/Grpc/UnaryCall.php
@@ -39,24 +39,19 @@ require_once realpath(dirname(__FILE__) . '/../autoload.php');
* Represents an active call that sends a single message and then gets a single
* response.
*/
-class SimpleSurfaceActiveCall extends AbstractSurfaceActiveCall {
+class UnaryCall extends AbstractCall {
/**
- * Create a new simple (single request/single response) active call.
- * @param Channel $channel The channel to communicate on
- * @param string $method The method to call on the remote server
- * @param callable $deserialize The function to deserialize a value
+ * Start the call
* @param $arg The argument to send
* @param array $metadata Metadata to send with the call, if applicable
*/
- public function __construct(Channel $channel,
- $method,
- callable $deserialize,
- $arg,
- $metadata = array()) {
- parent::__construct($channel, $method, $deserialize, $metadata,
- \Grpc\WRITE_BUFFER_HINT);
- $this->_write($arg);
- $this->_writesDone();
+ public function start($arg, $metadata = array()) {
+ $event = $this->call->start_batch([
+ OP_SEND_INITIAL_METADATA => $metadata,
+ OP_RECV_INITIAL_METADATA => true,
+ OP_SEND_MESSAGE => $arg->serialize(),
+ OP_SEND_CLOSE_FROM_CLIENT => true]);
+ $this->metadata = $event->metadata;
}
/**
@@ -64,8 +59,9 @@ class SimpleSurfaceActiveCall extends AbstractSurfaceActiveCall {
* @return [response data, status]
*/
public function wait() {
- $response = $this->_read();
- $status = $this->_getStatus();
- return array($response, $status);
+ $event = $this->call->start_batch([
+ OP_RECV_MESSAGE => true,
+ OP_RECV_STATUS_ON_CLIENT => true]);
+ return array($this->deserializeResponse($event->message), $event->status);
}
-}
+} \ No newline at end of file
diff --git a/src/php/tests/interop/interop_client.php b/src/php/tests/interop/interop_client.php
index 82ca438169..7ee089e241 100755
--- a/src/php/tests/interop/interop_client.php
+++ b/src/php/tests/interop/interop_client.php
@@ -132,8 +132,6 @@ function serverStreaming($stub) {
}
$call = $stub->StreamingOutputCall($request);
- hardAssert($call->getStatus()->code === Grpc\STATUS_OK,
- 'Call did not complete successfully');
$i = 0;
foreach($call->responses() as $value) {
hardAssert($i < 4, 'Too many responses');
@@ -142,7 +140,10 @@ function serverStreaming($stub) {
'Payload ' . $i . ' had the wrong type');
hardAssert(strlen($payload->getBody()) === $sizes[$i],
'Response ' . $i . ' had the wrong length');
+ $i += 1;
}
+ hardAssert($call->getStatus()->code === Grpc\STATUS_OK,
+ 'Call did not complete successfully');
}
/**
@@ -240,4 +241,6 @@ switch($args['test_case']) {
break;
case 'cancel_after_first_response':
cancelAfterFirstResponse($stub);
+ default:
+ exit(1);
}
diff --git a/src/php/tests/unit_tests/CallTest.php b/src/php/tests/unit_tests/CallTest.php
index 8bb0927f21..d361ce0030 100755
--- a/src/php/tests/unit_tests/CallTest.php
+++ b/src/php/tests/unit_tests/CallTest.php
@@ -36,65 +36,47 @@ class CallTest extends PHPUnit_Framework_TestCase{
static $port;
public static function setUpBeforeClass() {
- $cq = new Grpc\CompletionQueue();
- self::$server = new Grpc\Server($cq, []);
+ self::$server = new Grpc\Server([]);
self::$port = self::$server->add_http2_port('0.0.0.0:0');
}
public function setUp() {
- $this->cq = new Grpc\CompletionQueue();
$this->channel = new Grpc\Channel('localhost:' . self::$port, []);
$this->call = new Grpc\Call($this->channel,
'/foo',
Grpc\Timeval::inf_future());
}
- /**
- * @expectedException LogicException
- * @expectedExceptionCode Grpc\CALL_ERROR_INVALID_FLAGS
- * @expectedExceptionMessage invoke
- */
- public function testInvokeRejectsBadFlags() {
- $this->call->invoke($this->cq, 0, 0, 0xDEADBEEF);
- }
-
- /**
- * @expectedException LogicException
- * @expectedExceptionCode Grpc\CALL_ERROR_NOT_ON_CLIENT
- * @expectedExceptionMessage server_accept
- */
- public function testServerAcceptFailsCorrectly() {
- $this->call->server_accept($this->cq, 0);
- }
-
- /* These test methods with assertTrue(true) at the end just check that the
- method calls completed without errors. PHPUnit warns for tests with no
- asserts, and this avoids that warning without changing the meaning of the
- tests */
-
public function testAddEmptyMetadata() {
- $this->call->add_metadata([], 0);
- /* Dummy assert: Checks that the previous call completed without error */
- $this->assertTrue(true);
+ $batch = [
+ Grpc\OP_SEND_INITIAL_METADATA => []
+ ];
+ $result = $this->call->start_batch($batch);
+ $this->assertTrue($result->send_metadata);
}
public function testAddSingleMetadata() {
- $this->call->add_metadata(['key' => ['value']], 0);
- /* Dummy assert: Checks that the previous call completed without error */
- $this->assertTrue(true);
+ $batch = [
+ Grpc\OP_SEND_INITIAL_METADATA => ['key' => ['value']]
+ ];
+ $result = $this->call->start_batch($batch);
+ $this->assertTrue($result->send_metadata);
}
public function testAddMultiValueMetadata() {
- $this->call->add_metadata(['key' => ['value1', 'value2']], 0);
- /* Dummy assert: Checks that the previous call completed without error */
- $this->assertTrue(true);
+ $batch = [
+ Grpc\OP_SEND_INITIAL_METADATA => ['key' => ['value1', 'value2']]
+ ];
+ $result = $this->call->start_batch($batch);
+ $this->assertTrue($result->send_metadata);
}
public function testAddSingleAndMultiValueMetadata() {
- $this->call->add_metadata(
- ['key1' => ['value1'],
- 'key2' => ['value2', 'value3']], 0);
- /* Dummy assert: Checks that the previous call completed without error */
- $this->assertTrue(true);
+ $batch = [
+ Grpc\OP_SEND_INITIAL_METADATA => ['key1' => ['value1'],
+ 'key2' => ['value2', 'value3']]
+ ];
+ $result = $this->call->start_batch($batch);
+ $this->assertTrue($result->send_metadata);
}
}
diff --git a/src/php/tests/unit_tests/CompletionQueueTest.php b/src/php/tests/unit_tests/CompletionQueueTest.php
deleted file mode 100755
index 76ee61dfe8..0000000000
--- a/src/php/tests/unit_tests/CompletionQueueTest.php
+++ /dev/null
@@ -1,46 +0,0 @@
-<?php
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-class CompletionQueueTest extends PHPUnit_Framework_TestCase{
- public function testNextReturnsNullWithNoCall() {
- $cq = new Grpc\CompletionQueue();
- $event = $cq->next(Grpc\Timeval::zero());
- $this->assertNull($event);
- }
-
- public function testPluckReturnsNullWithNoCall() {
- $cq = new Grpc\CompletionQueue();
- $event = $cq->pluck(0, Grpc\Timeval::zero());
- $this->assertNull($event);
- }
-}
diff --git a/src/php/tests/unit_tests/EndToEndTest.php b/src/php/tests/unit_tests/EndToEndTest.php
index 0cbc506c8e..3e165b7213 100755
--- a/src/php/tests/unit_tests/EndToEndTest.php
+++ b/src/php/tests/unit_tests/EndToEndTest.php
@@ -33,18 +33,15 @@
*/
class EndToEndTest extends PHPUnit_Framework_TestCase{
public function setUp() {
- $this->client_queue = new Grpc\CompletionQueue();
- $this->server_queue = new Grpc\CompletionQueue();
- $this->server = new Grpc\Server($this->server_queue, []);
+ $this->server = new Grpc\Server([]);
$port = $this->server->add_http2_port('0.0.0.0:0');
$this->channel = new Grpc\Channel('localhost:' . $port, []);
+ $this->server->start();
}
public function tearDown() {
unset($this->channel);
unset($this->server);
- unset($this->client_queue);
- unset($this->server_queue);
}
public function testSimpleRequestBody() {
@@ -53,55 +50,45 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{
$call = new Grpc\Call($this->channel,
'dummy_method',
$deadline);
- $tag = 1;
- $call->invoke($this->client_queue, $tag, $tag);
- $server_tag = 2;
-
- $call->writes_done($tag);
- $event = $this->client_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\FINISH_ACCEPTED, $event->type);
- $this->assertSame(Grpc\OP_OK, $event->data);
-
- // check that a server rpc new was received
- $this->server->start();
- $this->server->request_call($server_tag);
- $event = $this->server_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\SERVER_RPC_NEW, $event->type);
- $server_call = $event->call;
- $this->assertNotNull($server_call);
- $server_call->server_accept($this->server_queue, $server_tag);
-
- $server_call->server_end_initial_metadata();
+ $event = $call->start_batch([
+ Grpc\OP_SEND_INITIAL_METADATA => [],
+ Grpc\OP_SEND_CLOSE_FROM_CLIENT => true
+ ]);
- // the server sends the status
- $server_call->start_write_status(Grpc\STATUS_OK, $status_text, $server_tag);
- $event = $this->server_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\FINISH_ACCEPTED, $event->type);
- $this->assertSame(Grpc\OP_OK, $event->data);
+ $this->assertTrue($event->send_metadata);
+ $this->assertTrue($event->send_close);
- // the client gets CLIENT_METADATA_READ
- $event = $this->client_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\CLIENT_METADATA_READ, $event->type);
+ $event = $this->server->request_call();
+ $this->assertSame('dummy_method', $event->method);
+ $this->assertSame([], $event->metadata);
+ $server_call = $event->call;
- // the client gets FINISHED
- $event = $this->client_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\FINISHED, $event->type);
- $status = $event->data;
+ $event = $server_call->start_batch([
+ Grpc\OP_SEND_INITIAL_METADATA => [],
+ Grpc\OP_SEND_STATUS_FROM_SERVER => [
+ 'metadata' => [],
+ 'code' => Grpc\STATUS_OK,
+ 'details' => $status_text
+ ],
+ Grpc\OP_RECV_CLOSE_ON_SERVER => true
+ ]);
+
+ $this->assertTrue($event->send_metadata);
+ $this->assertTrue($event->send_status);
+ $this->assertFalse($event->cancelled);
+
+ $event = $call->start_batch([
+ Grpc\OP_RECV_INITIAL_METADATA => true,
+ Grpc\OP_RECV_STATUS_ON_CLIENT => true
+ ]);
+
+ $this->assertSame([], $event->metadata);
+ $status = $event->status;
+ $this->assertSame([], $status->metadata);
$this->assertSame(Grpc\STATUS_OK, $status->code);
$this->assertSame($status_text, $status->details);
- // and the server gets FINISHED
- $event = $this->server_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\FINISHED, $event->type);
- $status = $event->data;
-
unset($call);
unset($server_call);
}
@@ -115,79 +102,52 @@ class EndToEndTest extends PHPUnit_Framework_TestCase{
$call = new Grpc\Call($this->channel,
'dummy_method',
$deadline);
- $tag = 1;
- $call->invoke($this->client_queue, $tag, $tag);
- $server_tag = 2;
+ $event = $call->start_batch([
+ Grpc\OP_SEND_INITIAL_METADATA => [],
+ Grpc\OP_SEND_CLOSE_FROM_CLIENT => true,
+ Grpc\OP_SEND_MESSAGE => $req_text
+ ]);
- // the client writes
- $call->start_write($req_text, $tag);
- $event = $this->client_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\WRITE_ACCEPTED, $event->type);
+ $this->assertTrue($event->send_metadata);
+ $this->assertTrue($event->send_close);
+ $this->assertTrue($event->send_message);
- // check that a server rpc new was received
- $this->server->start();
- $this->server->request_call($server_tag);
- $event = $this->server_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\SERVER_RPC_NEW, $event->type);
+ $event = $this->server->request_call();
+ $this->assertSame('dummy_method', $event->method);
$server_call = $event->call;
- $this->assertNotNull($server_call);
- $server_call->server_accept($this->server_queue, $server_tag);
-
- $server_call->server_end_initial_metadata();
-
- // start the server read
- $server_call->start_read($server_tag);
- $event = $this->server_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\READ, $event->type);
- $this->assertSame($req_text, $event->data);
-
- // the server replies
- $server_call->start_write($reply_text, $server_tag);
- $event = $this->server_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\WRITE_ACCEPTED, $event->type);
-
- // the client reads the metadata
- $event = $this->client_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\CLIENT_METADATA_READ, $event->type);
-
- // the client reads the reply
- $call->start_read($tag);
- $event = $this->client_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\READ, $event->type);
- $this->assertSame($reply_text, $event->data);
-
- // the client sends writes done
- $call->writes_done($tag);
- $event = $this->client_queue->next($deadline);
- $this->assertSame(Grpc\FINISH_ACCEPTED, $event->type);
- $this->assertSame(Grpc\OP_OK, $event->data);
-
- // the server sends the status
- $server_call->start_write_status(GRPC\STATUS_OK, $status_text, $server_tag);
- $event = $this->server_queue->next($deadline);
- $this->assertSame(Grpc\FINISH_ACCEPTED, $event->type);
- $this->assertSame(Grpc\OP_OK, $event->data);
-
- // the client gets FINISHED
- $event = $this->client_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\FINISHED, $event->type);
- $status = $event->data;
+
+ $event = $server_call->start_batch([
+ Grpc\OP_SEND_INITIAL_METADATA => [],
+ Grpc\OP_SEND_MESSAGE => $reply_text,
+ Grpc\OP_SEND_STATUS_FROM_SERVER => [
+ 'metadata' => [],
+ 'code' => Grpc\STATUS_OK,
+ 'details' => $status_text
+ ],
+ Grpc\OP_RECV_MESSAGE => true,
+ Grpc\OP_RECV_CLOSE_ON_SERVER => true,
+ ]);
+
+ $this->assertTrue($event->send_metadata);
+ $this->assertTrue($event->send_status);
+ $this->assertTrue($event->send_message);
+ $this->assertFalse($event->cancelled);
+ $this->assertSame($req_text, $event->message);
+
+ $event = $call->start_batch([
+ Grpc\OP_RECV_INITIAL_METADATA => true,
+ Grpc\OP_RECV_MESSAGE => true,
+ Grpc\OP_RECV_STATUS_ON_CLIENT => true,
+ ]);
+
+ $this->assertSame([], $event->metadata);
+ $this->assertSame($reply_text, $event->message);
+ $status = $event->status;
+ $this->assertSame([], $status->metadata);
$this->assertSame(Grpc\STATUS_OK, $status->code);
$this->assertSame($status_text, $status->details);
- // and the server gets FINISHED
- $event = $this->server_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\FINISHED, $event->type);
-
unset($call);
unset($server_call);
}
diff --git a/src/php/tests/unit_tests/SecureEndToEndTest.php b/src/php/tests/unit_tests/SecureEndToEndTest.php
index 896afeac49..2d62fe9d5e 100755
--- a/src/php/tests/unit_tests/SecureEndToEndTest.php
+++ b/src/php/tests/unit_tests/SecureEndToEndTest.php
@@ -33,17 +33,16 @@
*/
class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
public function setUp() {
- $this->client_queue = new Grpc\CompletionQueue();
- $this->server_queue = new Grpc\CompletionQueue();
$credentials = Grpc\Credentials::createSsl(
file_get_contents(dirname(__FILE__) . '/../data/ca.pem'));
$server_credentials = Grpc\ServerCredentials::createSsl(
null,
file_get_contents(dirname(__FILE__) . '/../data/server1.key'),
file_get_contents(dirname(__FILE__) . '/../data/server1.pem'));
- $this->server = new Grpc\Server($this->server_queue);
+ $this->server = new Grpc\Server();
$port = $this->server->add_secure_http2_port('0.0.0.0:0',
$server_credentials);
+ $this->server->start();
$this->channel = new Grpc\Channel(
'localhost:' . $port,
[
@@ -55,70 +54,58 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
public function tearDown() {
unset($this->channel);
unset($this->server);
- unset($this->client_queue);
- unset($this->server_queue);
}
public function testSimpleRequestBody() {
- $this->server->start();
$deadline = Grpc\Timeval::inf_future();
$status_text = 'xyz';
$call = new Grpc\Call($this->channel,
'dummy_method',
$deadline);
- $tag = 1;
- $call->invoke($this->client_queue, $tag, $tag);
- $server_tag = 2;
-
- $call->writes_done($tag);
- $event = $this->client_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\FINISH_ACCEPTED, $event->type);
- $this->assertSame(Grpc\OP_OK, $event->data);
-
- // check that a server rpc new was received
- $this->server->request_call($server_tag);
- $event = $this->server_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\SERVER_RPC_NEW, $event->type);
+
+ $event = $call->start_batch([
+ Grpc\OP_SEND_INITIAL_METADATA => [],
+ Grpc\OP_SEND_CLOSE_FROM_CLIENT => true
+ ]);
+
+ $this->assertTrue($event->send_metadata);
+ $this->assertTrue($event->send_close);
+
+ $event = $this->server->request_call();
+ $this->assertSame('dummy_method', $event->method);
+ $this->assertSame([], $event->metadata);
$server_call = $event->call;
- $this->assertNotNull($server_call);
- $server_call->server_accept($this->server_queue, $server_tag);
-
- $server_call->server_end_initial_metadata();
-
- // the server sends the status
- $server_call->start_write_status(Grpc\STATUS_OK, $status_text, $server_tag);
- $event = $this->server_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\FINISH_ACCEPTED, $event->type);
- $this->assertSame(Grpc\OP_OK, $event->data);
-
- // the client gets CLIENT_METADATA_READ
- $event = $this->client_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\CLIENT_METADATA_READ, $event->type);
-
- // the client gets FINISHED
- $event = $this->client_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\FINISHED, $event->type);
- $status = $event->data;
+
+ $event = $server_call->start_batch([
+ Grpc\OP_SEND_INITIAL_METADATA => [],
+ Grpc\OP_SEND_STATUS_FROM_SERVER => [
+ 'metadata' => [],
+ 'code' => Grpc\STATUS_OK,
+ 'details' => $status_text
+ ],
+ Grpc\OP_RECV_CLOSE_ON_SERVER => true
+ ]);
+
+ $this->assertTrue($event->send_metadata);
+ $this->assertTrue($event->send_status);
+ $this->assertFalse($event->cancelled);
+
+ $event = $call->start_batch([
+ Grpc\OP_RECV_INITIAL_METADATA => true,
+ Grpc\OP_RECV_STATUS_ON_CLIENT => true
+ ]);
+
+ $this->assertSame([], $event->metadata);
+ $status = $event->status;
+ $this->assertSame([], $status->metadata);
$this->assertSame(Grpc\STATUS_OK, $status->code);
$this->assertSame($status_text, $status->details);
- // and the server gets FINISHED
- $event = $this->server_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\FINISHED, $event->type);
- $status = $event->data;
-
unset($call);
unset($server_call);
}
public function testClientServerFullRequestResponse() {
- $this->server->start();
$deadline = Grpc\Timeval::inf_future();
$req_text = 'client_server_full_request_response';
$reply_text = 'reply:client_server_full_request_response';
@@ -127,78 +114,52 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
$call = new Grpc\Call($this->channel,
'dummy_method',
$deadline);
- $tag = 1;
- $call->invoke($this->client_queue, $tag, $tag);
-
- $server_tag = 2;
-
- // the client writes
- $call->start_write($req_text, $tag);
- $event = $this->client_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\WRITE_ACCEPTED, $event->type);
-
- // check that a server rpc new was received
- $this->server->request_call($server_tag);
- $event = $this->server_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\SERVER_RPC_NEW, $event->type);
+
+ $event = $call->start_batch([
+ Grpc\OP_SEND_INITIAL_METADATA => [],
+ Grpc\OP_SEND_CLOSE_FROM_CLIENT => true,
+ Grpc\OP_SEND_MESSAGE => $req_text
+ ]);
+
+ $this->assertTrue($event->send_metadata);
+ $this->assertTrue($event->send_close);
+ $this->assertTrue($event->send_message);
+
+ $event = $this->server->request_call();
+ $this->assertSame('dummy_method', $event->method);
$server_call = $event->call;
- $this->assertNotNull($server_call);
- $server_call->server_accept($this->server_queue, $server_tag);
-
- $server_call->server_end_initial_metadata();
-
- // start the server read
- $server_call->start_read($server_tag);
- $event = $this->server_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\READ, $event->type);
- $this->assertSame($req_text, $event->data);
-
- // the server replies
- $server_call->start_write($reply_text, $server_tag);
- $event = $this->server_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\WRITE_ACCEPTED, $event->type);
-
- // the client reads the metadata
- $event = $this->client_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\CLIENT_METADATA_READ, $event->type);
-
- // the client reads the reply
- $call->start_read($tag);
- $event = $this->client_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\READ, $event->type);
- $this->assertSame($reply_text, $event->data);
-
- // the client sends writes done
- $call->writes_done($tag);
- $event = $this->client_queue->next($deadline);
- $this->assertSame(Grpc\FINISH_ACCEPTED, $event->type);
- $this->assertSame(Grpc\OP_OK, $event->data);
-
- // the server sends the status
- $server_call->start_write_status(GRPC\STATUS_OK, $status_text, $server_tag);
- $event = $this->server_queue->next($deadline);
- $this->assertSame(Grpc\FINISH_ACCEPTED, $event->type);
- $this->assertSame(Grpc\OP_OK, $event->data);
-
- // the client gets FINISHED
- $event = $this->client_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\FINISHED, $event->type);
- $status = $event->data;
+
+ $event = $server_call->start_batch([
+ Grpc\OP_SEND_INITIAL_METADATA => [],
+ Grpc\OP_SEND_MESSAGE => $reply_text,
+ Grpc\OP_SEND_STATUS_FROM_SERVER => [
+ 'metadata' => [],
+ 'code' => Grpc\STATUS_OK,
+ 'details' => $status_text
+ ],
+ Grpc\OP_RECV_MESSAGE => true,
+ Grpc\OP_RECV_CLOSE_ON_SERVER => true,
+ ]);
+
+ $this->assertTrue($event->send_metadata);
+ $this->assertTrue($event->send_status);
+ $this->assertTrue($event->send_message);
+ $this->assertFalse($event->cancelled);
+ $this->assertSame($req_text, $event->message);
+
+ $event = $call->start_batch([
+ Grpc\OP_RECV_INITIAL_METADATA => true,
+ Grpc\OP_RECV_MESSAGE => true,
+ Grpc\OP_RECV_STATUS_ON_CLIENT => true,
+ ]);
+
+ $this->assertSame([], $event->metadata);
+ $this->assertSame($reply_text, $event->message);
+ $status = $event->status;
+ $this->assertSame([], $status->metadata);
$this->assertSame(Grpc\STATUS_OK, $status->code);
$this->assertSame($status_text, $status->details);
- // and the server gets FINISHED
- $event = $this->server_queue->next($deadline);
- $this->assertNotNull($event);
- $this->assertSame(Grpc\FINISHED, $event->type);
-
unset($call);
unset($server_call);
}
diff --git a/src/ruby/Rakefile b/src/ruby/Rakefile
index b27305d16c..afb354e922 100755
--- a/src/ruby/Rakefile
+++ b/src/ruby/Rakefile
@@ -2,14 +2,17 @@
require 'rake/extensiontask'
require 'rspec/core/rake_task'
require 'rubocop/rake_task'
+require 'bundler/gem_tasks'
-desc 'Run Rubocop to check for style violations'
+# Add rubocop style checking tasks
RuboCop::RakeTask.new
+# Add the extension compiler task
Rake::ExtensionTask.new 'grpc' do |ext|
ext.lib_dir = File.join('lib', 'grpc')
end
+# Define the test suites
SPEC_SUITES = [
{ id: :wrapper, title: 'wrapper layer', files: %w(spec/*.rb) },
{ id: :idiomatic, title: 'idiomatic layer', dir: %w(spec/generic),
@@ -19,36 +22,34 @@ SPEC_SUITES = [
{ id: :server, title: 'rpc server thread tests', dir: %w(spec/generic),
tag: 'server' }
]
+namespace :suite do
+ SPEC_SUITES.each do |suite|
+ desc "Run all specs in the #{suite[:title]} spec suite"
+ RSpec::Core::RakeTask.new(suite[:id]) do |t|
+ spec_files = []
+ suite[:files].each { |f| spec_files += Dir[f] } if suite[:files]
+
+ if suite[:dir]
+ suite[:dir].each { |f| spec_files += Dir["#{f}/**/*_spec.rb"] }
+ end
+ helper = 'spec/spec_helper.rb'
+ spec_files << helper unless spec_files.include?(helper)
-desc 'Run all RSpec tests'
-namespace :spec do
- namespace :suite do
- SPEC_SUITES.each do |suite|
- desc "Run all specs in #{suite[:title]} spec suite"
- RSpec::Core::RakeTask.new(suite[:id]) do |t|
- spec_files = []
- suite[:files].each { |f| spec_files += Dir[f] } if suite[:files]
-
- if suite[:dirs]
- suite[:dirs].each { |f| spec_files += Dir["#{f}/**/*_spec.rb"] }
- end
-
- t.pattern = spec_files
- t.rspec_opts = "--tag #{suite[:tag]}" if suite[:tag]
- if suite[:tags]
- t.rspec_opts = suite[:tags].map { |x| "--tag #{x}" }.join(' ')
- end
+ t.pattern = spec_files
+ t.rspec_opts = "--tag #{suite[:tag]}" if suite[:tag]
+ if suite[:tags]
+ t.rspec_opts = suite[:tags].map { |x| "--tag #{x}" }.join(' ')
end
end
end
end
-desc 'Compiles the extension then runs all the tests'
-task :all
+# Define dependencies between the suites.
+task 'suite:wrapper' => [:compile, :rubocop]
+task 'suite:idiomatic' => 'suite:wrapper'
+task 'suite:bidi' => 'suite:wrapper'
+task 'suite:server' => 'suite:wrapper'
+desc 'Compiles the gRPC extension then runs all the tests'
+task all: ['suite:idiomatic', 'suite:bidi', 'suite:server']
task default: :all
-task 'spec:suite:wrapper' => [:compile, :rubocop]
-task 'spec:suite:idiomatic' => 'spec:suite:wrapper'
-task 'spec:suite:bidi' => 'spec:suite:wrapper'
-task 'spec:suite:server' => 'spec:suite:wrapper'
-task all: ['spec:suite:idiomatic', 'spec:suite:bidi', 'spec:suite:server']
diff --git a/src/ruby/lib/grpc/generic/service.rb b/src/ruby/lib/grpc/generic/service.rb
index 6ea0831a2e..69076b4c6e 100644
--- a/src/ruby/lib/grpc/generic/service.rb
+++ b/src/ruby/lib/grpc/generic/service.rb
@@ -176,25 +176,26 @@ module GRPC
unmarshal = desc.unmarshal_proc(:output)
route = "/#{route_prefix}/#{name}"
if desc.request_response?
- define_method(mth_name) do |req, deadline = nil|
+ define_method(mth_name) do |req, deadline = nil, **kw|
logger.debug("calling #{@host}:#{route}")
- request_response(route, req, marshal, unmarshal, deadline)
+ request_response(route, req, marshal, unmarshal, deadline, **kw)
end
elsif desc.client_streamer?
- define_method(mth_name) do |reqs, deadline = nil|
+ define_method(mth_name) do |reqs, deadline = nil, **kw|
logger.debug("calling #{@host}:#{route}")
- client_streamer(route, reqs, marshal, unmarshal, deadline)
+ client_streamer(route, reqs, marshal, unmarshal, deadline, **kw)
end
elsif desc.server_streamer?
- define_method(mth_name) do |req, deadline = nil, &blk|
+ define_method(mth_name) do |req, deadline = nil, **kw, &blk|
logger.debug("calling #{@host}:#{route}")
- server_streamer(route, req, marshal, unmarshal, deadline,
+ server_streamer(route, req, marshal, unmarshal, deadline, **kw,
&blk)
end
else # is a bidi_stream
- define_method(mth_name) do |reqs, deadline = nil, &blk|
+ define_method(mth_name) do |reqs, deadline = nil, **kw, &blk|
logger.debug("calling #{@host}:#{route}")
- bidi_streamer(route, reqs, marshal, unmarshal, deadline, &blk)
+ bidi_streamer(route, reqs, marshal, unmarshal, deadline, **kw,
+ &blk)
end
end
end
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb
index 8914225b55..96e07cacb4 100644
--- a/src/ruby/spec/generic/active_call_spec.rb
+++ b/src/ruby/spec/generic/active_call_spec.rb
@@ -67,7 +67,7 @@ describe GRPC::ActiveCall do
end
describe '#multi_req_view' do
- xit 'exposes a fixed subset of the ActiveCall methods' do
+ it 'exposes a fixed subset of the ActiveCall methods' do
want = %w(cancelled, deadline, each_remote_read, metadata, shutdown)
v = @client_call.multi_req_view
want.each do |w|
@@ -77,7 +77,7 @@ describe GRPC::ActiveCall do
end
describe '#single_req_view' do
- xit 'exposes a fixed subset of the ActiveCall methods' do
+ it 'exposes a fixed subset of the ActiveCall methods' do
want = %w(cancelled, deadline, metadata, shutdown)
v = @client_call.single_req_view
want.each do |w|
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 73f2d37e30..0c98fc40d9 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -384,13 +384,7 @@ describe 'ClientStub' do
th.join
end
- # disabled because an unresolved wire-protocol implementation feature
- #
- # - servers should be able initiate messaging, however, as it stand
- # servers don't know if all the client metadata has been sent until
- # they receive a message from the client. Without receiving all the
- # metadata, the server does not accept the call, so this test hangs.
- xit 'supports a server-initiated ping pong', bidi: true do
+ it 'supports a server-initiated ping pong', bidi: true do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false)
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index f3b89b5895..34e5cdcd04 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -81,14 +81,17 @@ EchoStub = EchoService.rpc_stub_class
class SlowService
include GRPC::GenericService
rpc :an_rpc, EchoMsg, EchoMsg
+ attr_reader :received_md, :delay
def initialize(_default_var = 'ignored')
+ @delay = 0.25
+ @received_md = []
end
- def an_rpc(req, _call)
- delay = 0.25
- logger.info("starting a slow #{delay} rpc")
- sleep delay
+ def an_rpc(req, call)
+ logger.info("starting a slow #{@delay} rpc")
+ sleep @delay
+ @received_md << call.metadata unless call.metadata.nil?
req # send back the req as the response
end
end
@@ -354,6 +357,37 @@ describe GRPC::RpcServer do
t.join
end
+ it 'should receive metadata when a deadline is specified', server: true do
+ service = SlowService.new
+ @srv.handle(service)
+ t = Thread.new { @srv.run }
+ @srv.wait_till_running
+ req = EchoMsg.new
+ stub = SlowStub.new(@host, **@client_opts)
+ deadline = service.delay + 0.5 # wait for long enough
+ expect(stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2')).to be_a(EchoMsg)
+ wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
+ expect(service.received_md).to eq(wanted_md)
+ @srv.stop
+ t.join
+ end
+
+ it 'should not receive metadata if the client times out', server: true do
+ service = SlowService.new
+ @srv.handle(service)
+ t = Thread.new { @srv.run }
+ @srv.wait_till_running
+ req = EchoMsg.new
+ stub = SlowStub.new(@host, **@client_opts)
+ deadline = 0.1 # too short for SlowService to respond
+ blk = proc { stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2') }
+ expect(&blk).to raise_error GRPC::BadStatus
+ wanted_md = []
+ expect(service.received_md).to eq(wanted_md)
+ @srv.stop
+ t.join
+ end
+
it 'should receive updated metadata', server: true do
service = EchoService.new
@srv.handle(service)