diff options
author | 2015-01-14 17:09:28 -0800 | |
---|---|---|
committer | 2015-01-14 17:09:28 -0800 | |
commit | 82fdc983b2659b4752263e86c3f34ce26e6b97a2 (patch) | |
tree | b870399ecb5037bf9de9587b68ac3701cfc65ab7 /src | |
parent | 6061f2b6058cfd1135488f55e832622226cd558c (diff) | |
parent | 2739a49eeebb4a10aea5bdce742355e2e42a864d (diff) |
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'src')
-rw-r--r-- | src/compiler/cpp_generator.cc | 83 | ||||
-rw-r--r-- | src/compiler/go_generator.cc | 530 | ||||
-rw-r--r-- | src/compiler/go_generator.h | 51 | ||||
-rw-r--r-- | src/compiler/go_plugin.cc | 83 | ||||
-rw-r--r-- | src/core/channel/http_client_filter.c | 17 | ||||
-rw-r--r-- | src/core/channel/http_client_filter.h | 2 | ||||
-rw-r--r-- | src/core/security/security_context.c | 11 | ||||
-rw-r--r-- | src/core/support/log_posix.c | 15 | ||||
-rw-r--r-- | src/core/support/log_win32.c | 9 | ||||
-rw-r--r-- | src/core/support/time.c | 2 | ||||
-rw-r--r-- | src/core/surface/call.c | 37 | ||||
-rw-r--r-- | src/cpp/stream/stream_context.cc | 35 | ||||
-rw-r--r-- | src/cpp/stream/stream_context.h | 4 |
13 files changed, 132 insertions, 747 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 1116049806..94e56d73a6 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -143,16 +143,16 @@ void PrintHeaderClientMethod(google::protobuf::io::Printer* printer, } else if (ClientOnlyStreaming(method)) { printer->Print( *vars, - "::grpc::ClientWriter<$Request$>* $Method$(" + "::grpc::ClientWriter< $Request$>* $Method$(" "::grpc::ClientContext* context, $Response$* response);\n\n"); } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, - "::grpc::ClientReader<$Response$>* $Method$(" + "::grpc::ClientReader< $Response$>* $Method$(" "::grpc::ClientContext* context, const $Request$* request);\n\n"); } else if (BidiStreaming(method)) { printer->Print(*vars, - "::grpc::ClientReaderWriter<$Request$, $Response$>* " + "::grpc::ClientReaderWriter< $Request$, $Response$>* " "$Method$(::grpc::ClientContext* context);\n\n"); } } @@ -174,19 +174,20 @@ void PrintHeaderServerMethod(google::protobuf::io::Printer* printer, printer->Print(*vars, "virtual ::grpc::Status $Method$(" "::grpc::ServerContext* context, " - "::grpc::ServerReader<$Request$>* reader, " + "::grpc::ServerReader< $Request$>* reader, " "$Response$* response);\n"); } else if (ServerOnlyStreaming(method)) { printer->Print(*vars, "virtual ::grpc::Status $Method$(" "::grpc::ServerContext* context, const $Request$* request, " - "::grpc::ServerWriter<$Response$>* writer);\n"); + "::grpc::ServerWriter< $Response$>* writer);\n"); } else if (BidiStreaming(method)) { - printer->Print(*vars, - "virtual ::grpc::Status $Method$(" - "::grpc::ServerContext* context, " - "::grpc::ServerReaderWriter<$Response$, $Request$>* stream);" - "\n"); + printer->Print( + *vars, + "virtual ::grpc::Status $Method$(" + "::grpc::ServerContext* context, " + "::grpc::ServerReaderWriter< $Response$, $Request$>* stream);" + "\n"); } } @@ -211,7 +212,7 @@ void PrintHeaderService(google::protobuf::io::Printer* printer, printer->Outdent(); printer->Print("};\n"); printer->Print( - "static Stub* NewStub(const std::shared_ptr<::grpc::ChannelInterface>& " + "static Stub* NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& " "channel);\n"); printer->Print("\n"); @@ -269,11 +270,12 @@ void PrintSourceClientMethod(google::protobuf::io::Printer* printer, "context, request, response);\n" "}\n\n"); } else if (ClientOnlyStreaming(method)) { + printer->Print( + *vars, + "::grpc::ClientWriter< $Request$>* $Service$::Stub::$Method$(" + "::grpc::ClientContext* context, $Response$* response) {\n"); printer->Print(*vars, - "::grpc::ClientWriter<$Request$>* $Service$::Stub::$Method$(" - "::grpc::ClientContext* context, $Response$* response) {\n"); - printer->Print(*vars, - " return new ::grpc::ClientWriter<$Request$>(" + " return new ::grpc::ClientWriter< $Request$>(" "channel()->CreateStream(" "::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", " "::grpc::RpcMethod::RpcType::CLIENT_STREAMING), " @@ -282,10 +284,10 @@ void PrintSourceClientMethod(google::protobuf::io::Printer* printer, } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, - "::grpc::ClientReader<$Response$>* $Service$::Stub::$Method$(" + "::grpc::ClientReader< $Response$>* $Service$::Stub::$Method$(" "::grpc::ClientContext* context, const $Request$* request) {\n"); printer->Print(*vars, - " return new ::grpc::ClientReader<$Response$>(" + " return new ::grpc::ClientReader< $Response$>(" "channel()->CreateStream(" "::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", " "::grpc::RpcMethod::RpcType::SERVER_STREAMING), " @@ -294,11 +296,11 @@ void PrintSourceClientMethod(google::protobuf::io::Printer* printer, } else if (BidiStreaming(method)) { printer->Print( *vars, - "::grpc::ClientReaderWriter<$Request$, $Response$>* " + "::grpc::ClientReaderWriter< $Request$, $Response$>* " "$Service$::Stub::$Method$(::grpc::ClientContext* context) {\n"); printer->Print( *vars, - " return new ::grpc::ClientReaderWriter<$Request$, $Response$>(" + " return new ::grpc::ClientReaderWriter< $Request$, $Response$>(" "channel()->CreateStream(" "::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", " "::grpc::RpcMethod::RpcType::BIDI_STREAMING), " @@ -328,7 +330,7 @@ void PrintSourceServerMethod(google::protobuf::io::Printer* printer, printer->Print(*vars, "::grpc::Status $Service$::Service::$Method$(" "::grpc::ServerContext* context, " - "::grpc::ServerReader<$Request$>* reader, " + "::grpc::ServerReader< $Request$>* reader, " "$Response$* response) {\n"); printer->Print( " return ::grpc::Status(" @@ -339,7 +341,7 @@ void PrintSourceServerMethod(google::protobuf::io::Printer* printer, "::grpc::Status $Service$::Service::$Method$(" "::grpc::ServerContext* context, " "const $Request$* request, " - "::grpc::ServerWriter<$Response$>* writer) {\n"); + "::grpc::ServerWriter< $Response$>* writer) {\n"); printer->Print( " return ::grpc::Status(" "::grpc::StatusCode::UNIMPLEMENTED);\n"); @@ -348,7 +350,7 @@ void PrintSourceServerMethod(google::protobuf::io::Printer* printer, printer->Print(*vars, "::grpc::Status $Service$::Service::$Method$(" "::grpc::ServerContext* context, " - "::grpc::ServerReaderWriter<$Response$, $Request$>* " + "::grpc::ServerReaderWriter< $Response$, $Request$>* " "stream) {\n"); printer->Print( " return ::grpc::Status(" @@ -361,13 +363,14 @@ void PrintSourceService(google::protobuf::io::Printer* printer, const google::protobuf::ServiceDescriptor* service, map<string, string>* vars) { (*vars)["Service"] = service->name(); - printer->Print(*vars, - "$Service$::Stub* $Service$::NewStub(" - "const std::shared_ptr<::grpc::ChannelInterface>& channel) {\n" - " $Service$::Stub* stub = new $Service$::Stub();\n" - " stub->set_channel(channel);\n" - " return stub;\n" - "};\n\n"); + printer->Print( + *vars, + "$Service$::Stub* $Service$::NewStub(" + "const std::shared_ptr< ::grpc::ChannelInterface>& channel) {\n" + " $Service$::Stub* stub = new $Service$::Stub();\n" + " stub->set_channel(channel);\n" + " return stub;\n" + "};\n\n"); for (int i = 0; i < service->method_count(); ++i) { PrintSourceClientMethod(printer, service->method(i), vars); } @@ -400,9 +403,9 @@ void PrintSourceService(google::protobuf::io::Printer* printer, "service_->AddMethod(new ::grpc::RpcServiceMethod(\n" " \"/$Package$$Service$/$Method$\",\n" " ::grpc::RpcMethod::NORMAL_RPC,\n" - " new ::grpc::RpcMethodHandler<$Service$::Service, $Request$, " + " new ::grpc::RpcMethodHandler< $Service$::Service, $Request$, " "$Response$>(\n" - " std::function<::grpc::Status($Service$::Service*, " + " std::function< ::grpc::Status($Service$::Service*, " "::grpc::ServerContext*, const $Request$*, $Response$*)>(" "&$Service$::Service::$Method$), this),\n" " new $Request$, new $Response$));\n"); @@ -412,11 +415,11 @@ void PrintSourceService(google::protobuf::io::Printer* printer, "service_->AddMethod(new ::grpc::RpcServiceMethod(\n" " \"/$Package$$Service$/$Method$\",\n" " ::grpc::RpcMethod::CLIENT_STREAMING,\n" - " new ::grpc::ClientStreamingHandler<" + " new ::grpc::ClientStreamingHandler< " "$Service$::Service, $Request$, $Response$>(\n" - " std::function<::grpc::Status($Service$::Service*, " + " std::function< ::grpc::Status($Service$::Service*, " "::grpc::ServerContext*, " - "::grpc::ServerReader<$Request$>*, $Response$*)>(" + "::grpc::ServerReader< $Request$>*, $Response$*)>(" "&$Service$::Service::$Method$), this),\n" " new $Request$, new $Response$));\n"); } else if (ServerOnlyStreaming(method)) { @@ -425,11 +428,11 @@ void PrintSourceService(google::protobuf::io::Printer* printer, "service_->AddMethod(new ::grpc::RpcServiceMethod(\n" " \"/$Package$$Service$/$Method$\",\n" " ::grpc::RpcMethod::SERVER_STREAMING,\n" - " new ::grpc::ServerStreamingHandler<" + " new ::grpc::ServerStreamingHandler< " "$Service$::Service, $Request$, $Response$>(\n" - " std::function<::grpc::Status($Service$::Service*, " + " std::function< ::grpc::Status($Service$::Service*, " "::grpc::ServerContext*, " - "const $Request$*, ::grpc::ServerWriter<$Response$>*)>(" + "const $Request$*, ::grpc::ServerWriter< $Response$>*)>(" "&$Service$::Service::$Method$), this),\n" " new $Request$, new $Response$));\n"); } else if (BidiStreaming(method)) { @@ -438,11 +441,11 @@ void PrintSourceService(google::protobuf::io::Printer* printer, "service_->AddMethod(new ::grpc::RpcServiceMethod(\n" " \"/$Package$$Service$/$Method$\",\n" " ::grpc::RpcMethod::BIDI_STREAMING,\n" - " new ::grpc::BidiStreamingHandler<" + " new ::grpc::BidiStreamingHandler< " "$Service$::Service, $Request$, $Response$>(\n" - " std::function<::grpc::Status($Service$::Service*, " + " std::function< ::grpc::Status($Service$::Service*, " "::grpc::ServerContext*, " - "::grpc::ServerReaderWriter<$Response$, $Request$>*)>(" + "::grpc::ServerReaderWriter< $Response$, $Request$>*)>(" "&$Service$::Service::$Method$), this),\n" " new $Request$, new $Response$));\n"); } diff --git a/src/compiler/go_generator.cc b/src/compiler/go_generator.cc deleted file mode 100644 index 84aa27668e..0000000000 --- a/src/compiler/go_generator.cc +++ /dev/null @@ -1,530 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/compiler/go_generator.h" - -#include <cctype> - -#include <google/protobuf/io/printer.h> -#include <google/protobuf/io/zero_copy_stream_impl_lite.h> -#include <google/protobuf/descriptor.pb.h> -#include <google/protobuf/descriptor.h> - -using namespace std; - -namespace grpc_go_generator { - -bool NoStreaming(const google::protobuf::MethodDescriptor* method) { - return !method->client_streaming() && !method->server_streaming(); -} - -bool ClientOnlyStreaming(const google::protobuf::MethodDescriptor* method) { - return method->client_streaming() && !method->server_streaming(); -} - -bool ServerOnlyStreaming(const google::protobuf::MethodDescriptor* method) { - return !method->client_streaming() && method->server_streaming(); -} - -bool BidiStreaming(const google::protobuf::MethodDescriptor* method) { - return method->client_streaming() && method->server_streaming(); -} - -bool HasClientOnlyStreaming(const google::protobuf::FileDescriptor* file) { - for (int i = 0; i < file->service_count(); i++) { - for (int j = 0; j < file->service(i)->method_count(); j++) { - if (ClientOnlyStreaming(file->service(i)->method(j))) { - return true; - } - } - } - return false; -} - -string LowerCaseService(const string& service) { - string ret = service; - if (!ret.empty() && ret[0] >= 'A' && ret[0] <= 'Z') { - ret[0] = ret[0] - 'A' + 'a'; - } - return ret; -} - -void PrintClientMethodDef(google::protobuf::io::Printer* printer, - const google::protobuf::MethodDescriptor* method, - map<string, string>* vars) { - (*vars)["Method"] = method->name(); - (*vars)["Request"] = method->input_type()->name(); - (*vars)["Response"] = method->output_type()->name(); - if (NoStreaming(method)) { - printer->Print(*vars, - "\t$Method$(ctx context.Context, in *$Request$, opts " - "...rpc.CallOption) " - "(*$Response$, error)\n"); - } else if (BidiStreaming(method)) { - printer->Print(*vars, - "\t$Method$(ctx context.Context, opts ...rpc.CallOption) " - "($Service$_$Method$Client, error)\n"); - } else if (ServerOnlyStreaming(method)) { - printer->Print( - *vars, - "\t$Method$(ctx context.Context, m *$Request$, opts ...rpc.CallOption) " - "($Service$_$Method$Client, error)\n"); - } else if (ClientOnlyStreaming(method)) { - printer->Print(*vars, - "\t$Method$(ctx context.Context, opts ...rpc.CallOption) " - "($Service$_$Method$Client, error)\n"); - } -} - -void PrintClientMethodImpl(google::protobuf::io::Printer* printer, - const google::protobuf::MethodDescriptor* method, - map<string, string>* vars) { - (*vars)["Method"] = method->name(); - (*vars)["Request"] = method->input_type()->name(); - (*vars)["Response"] = method->output_type()->name(); - - if (NoStreaming(method)) { - printer->Print( - *vars, - "func (c *$ServiceStruct$Client) $Method$(ctx context.Context, " - "in *$Request$, opts ...rpc.CallOption) (*$Response$, error) {\n"); - printer->Print(*vars, "\tout := new($Response$)\n"); - printer->Print(*vars, - "\terr := rpc.Invoke(ctx, \"/$Package$$Service$/$Method$\", " - "in, out, c.cc, opts...)\n"); - printer->Print("\tif err != nil {\n"); - printer->Print("\t\treturn nil, err\n"); - printer->Print("\t}\n"); - printer->Print("\treturn out, nil\n"); - printer->Print("}\n\n"); - } else if (BidiStreaming(method)) { - printer->Print( - *vars, - "func (c *$ServiceStruct$Client) $Method$(ctx context.Context, opts " - "...rpc.CallOption) ($Service$_$Method$Client, error) {\n" - "\tstream, err := rpc.NewClientStream(ctx, c.cc, " - "\"/$Package$$Service$/$Method$\", opts...)\n" - "\tif err != nil {\n" - "\t\treturn nil, err\n" - "\t}\n" - "\treturn &$ServiceStruct$$Method$Client{stream}, nil\n" - "}\n\n"); - printer->Print(*vars, - "type $Service$_$Method$Client interface {\n" - "\tSend(*$Request$) error\n" - "\tRecv() (*$Response$, error)\n" - "\trpc.ClientStream\n" - "}\n\n"); - printer->Print(*vars, - "type $ServiceStruct$$Method$Client struct {\n" - "\trpc.ClientStream\n" - "}\n\n"); - printer->Print( - *vars, - "func (x *$ServiceStruct$$Method$Client) Send(m *$Request$) error {\n" - "\treturn x.ClientStream.SendProto(m)\n" - "}\n\n"); - printer->Print( - *vars, - "func (x *$ServiceStruct$$Method$Client) Recv() (*$Response$, error) " - "{\n" - "\tm := new($Response$)\n" - "\tif err := x.ClientStream.RecvProto(m); err != nil {\n" - "\t\treturn nil, err\n" - "\t}\n" - "\treturn m, nil\n" - "}\n\n"); - } else if (ServerOnlyStreaming(method)) { - printer->Print( - *vars, - "func (c *$ServiceStruct$Client) $Method$(ctx context.Context, m " - "*$Request$, " - "opts ...rpc.CallOption) ($Service$_$Method$Client, error) {\n" - "\tstream, err := rpc.NewClientStream(ctx, c.cc, " - "\"/$Package$$Service$/$Method$\", opts...)\n" - "\tif err != nil {\n" - "\t\treturn nil, err\n" - "\t}\n" - "\tx := &$ServiceStruct$$Method$Client{stream}\n" - "\tif err := x.ClientStream.SendProto(m); err != nil {\n" - "\t\treturn nil, err\n" - "\t}\n" - "\tif err := x.ClientStream.CloseSend(); err != nil {\n" - "\t\treturn nil, err\n" - "\t}\n" - "\treturn x, nil\n" - "}\n\n"); - printer->Print(*vars, - "type $Service$_$Method$Client interface {\n" - "\tRecv() (*$Response$, error)\n" - "\trpc.ClientStream\n" - "}\n\n"); - printer->Print(*vars, - "type $ServiceStruct$$Method$Client struct {\n" - "\trpc.ClientStream\n" - "}\n\n"); - printer->Print( - *vars, - "func (x *$ServiceStruct$$Method$Client) Recv() (*$Response$, error) " - "{\n" - "\tm := new($Response$)\n" - "\tif err := x.ClientStream.RecvProto(m); err != nil {\n" - "\t\treturn nil, err\n" - "\t}\n" - "\treturn m, nil\n" - "}\n\n"); - } else if (ClientOnlyStreaming(method)) { - printer->Print( - *vars, - "func (c *$ServiceStruct$Client) $Method$(ctx context.Context, opts " - "...rpc.CallOption) ($Service$_$Method$Client, error) {\n" - "\tstream, err := rpc.NewClientStream(ctx, c.cc, " - "\"/$Package$$Service$/$Method$\", opts...)\n" - "\tif err != nil {\n" - "\t\treturn nil, err\n" - "\t}\n" - "\treturn &$ServiceStruct$$Method$Client{stream}, nil\n" - "}\n\n"); - printer->Print(*vars, - "type $Service$_$Method$Client interface {\n" - "\tSend(*$Request$) error\n" - "\tCloseAndRecv() (*$Response$, error)\n" - "\trpc.ClientStream\n" - "}\n\n"); - printer->Print(*vars, - "type $ServiceStruct$$Method$Client struct {\n" - "\trpc.ClientStream\n" - "}\n\n"); - printer->Print( - *vars, - "func (x *$ServiceStruct$$Method$Client) Send(m *$Request$) error {\n" - "\treturn x.ClientStream.SendProto(m)\n" - "}\n\n"); - printer->Print( - *vars, - "func (x *$ServiceStruct$$Method$Client) CloseAndRecv() (*$Response$, " - "error) {\n" - "\tif err := x.ClientStream.CloseSend(); err != nil {\n" - "\t\treturn nil, err\n" - "\t}\n" - "\tm := new($Response$)\n" - "\tif err := x.ClientStream.RecvProto(m); err != nil {\n" - "\t\treturn nil, err\n" - "\t}\n" - "\t// Read EOF.\n" - "\tif err := x.ClientStream.RecvProto(m); err == io.EOF {\n" - "\t\treturn m, io.EOF\n" - "\t}\n" - "\t// gRPC protocol violation.\n" - "\treturn m, fmt.Errorf(\"Violate gRPC client streaming protocol: no " - "EOF after the response.\")\n" - "}\n\n"); - } -} - -void PrintClient(google::protobuf::io::Printer* printer, - const google::protobuf::ServiceDescriptor* service, - map<string, string>* vars) { - (*vars)["Service"] = service->name(); - (*vars)["ServiceStruct"] = LowerCaseService(service->name()); - printer->Print(*vars, "type $Service$Client interface {\n"); - for (int i = 0; i < service->method_count(); ++i) { - PrintClientMethodDef(printer, service->method(i), vars); - } - printer->Print("}\n\n"); - - printer->Print(*vars, - "type $ServiceStruct$Client struct {\n" - "\tcc *rpc.ClientConn\n" - "}\n\n"); - printer->Print( - *vars, - "func New$Service$Client(cc *rpc.ClientConn) $Service$Client {\n" - "\treturn &$ServiceStruct$Client{cc}\n" - "}\n\n"); - for (int i = 0; i < service->method_count(); ++i) { - PrintClientMethodImpl(printer, service->method(i), vars); - } -} - -void PrintServerMethodDef(google::protobuf::io::Printer* printer, - const google::protobuf::MethodDescriptor* method, - map<string, string>* vars) { - (*vars)["Method"] = method->name(); - (*vars)["Request"] = method->input_type()->name(); - (*vars)["Response"] = method->output_type()->name(); - if (NoStreaming(method)) { - printer->Print( - *vars, - "\t$Method$(context.Context, *$Request$) (*$Response$, error)\n"); - } else if (BidiStreaming(method)) { - printer->Print(*vars, "\t$Method$($Service$_$Method$Server) error\n"); - } else if (ServerOnlyStreaming(method)) { - printer->Print(*vars, - "\t$Method$(*$Request$, $Service$_$Method$Server) error\n"); - } else if (ClientOnlyStreaming(method)) { - printer->Print(*vars, "\t$Method$($Service$_$Method$Server) error\n"); - } -} - -void PrintServerHandler(google::protobuf::io::Printer* printer, - const google::protobuf::MethodDescriptor* method, - map<string, string>* vars) { - (*vars)["Method"] = method->name(); - (*vars)["Request"] = method->input_type()->name(); - (*vars)["Response"] = method->output_type()->name(); - if (NoStreaming(method)) { - printer->Print( - *vars, - "func _$Service$_$Method$_Handler(srv interface{}, ctx context.Context," - " buf []byte) (proto.Message, error) {\n"); - printer->Print(*vars, "\tin := new($Request$)\n"); - printer->Print("\tif err := proto.Unmarshal(buf, in); err != nil {\n"); - printer->Print("\t\treturn nil, err\n"); - printer->Print("\t}\n"); - printer->Print(*vars, - "\tout, err := srv.($Service$Server).$Method$(ctx, in)\n"); - printer->Print("\tif err != nil {\n"); - printer->Print("\t\treturn nil, err\n"); - printer->Print("\t}\n"); - printer->Print("\treturn out, nil\n"); - printer->Print("}\n\n"); - } else if (BidiStreaming(method)) { - printer->Print( - *vars, - "func _$Service$_$Method$_Handler(srv interface{}, stream rpc.Stream) " - "error {\n" - "\treturn srv.($Service$Server).$Method$(&$ServiceStruct$$Method$Server" - "{stream})\n" - "}\n\n"); - printer->Print(*vars, - "type $Service$_$Method$Server interface {\n" - "\tSend(*$Response$) error\n" - "\tRecv() (*$Request$, error)\n" - "\trpc.Stream\n" - "}\n\n"); - printer->Print(*vars, - "type $ServiceStruct$$Method$Server struct {\n" - "\trpc.Stream\n" - "}\n\n"); - printer->Print( - *vars, - "func (x *$ServiceStruct$$Method$Server) Send(m *$Response$) error {\n" - "\treturn x.Stream.SendProto(m)\n" - "}\n\n"); - printer->Print( - *vars, - "func (x *$ServiceStruct$$Method$Server) Recv() (*$Request$, error) " - "{\n" - "\tm := new($Request$)\n" - "\tif err := x.Stream.RecvProto(m); err != nil {\n" - "\t\treturn nil, err\n" - "\t}\n" - "\treturn m, nil\n" - "}\n\n"); - } else if (ServerOnlyStreaming(method)) { - printer->Print( - *vars, - "func _$Service$_$Method$_Handler(srv interface{}, stream rpc.Stream) " - "error {\n" - "\tm := new($Request$)\n" - "\tif err := stream.RecvProto(m); err != nil {\n" - "\t\treturn err\n" - "\t}\n" - "\treturn srv.($Service$Server).$Method$(m, " - "&$ServiceStruct$$Method$Server{stream})\n" - "}\n\n"); - printer->Print(*vars, - "type $Service$_$Method$Server interface {\n" - "\tSend(*$Response$) error\n" - "\trpc.Stream\n" - "}\n\n"); - printer->Print(*vars, - "type $ServiceStruct$$Method$Server struct {\n" - "\trpc.Stream\n" - "}\n\n"); - printer->Print( - *vars, - "func (x *$ServiceStruct$$Method$Server) Send(m *$Response$) error {\n" - "\treturn x.Stream.SendProto(m)\n" - "}\n\n"); - } else if (ClientOnlyStreaming(method)) { - printer->Print( - *vars, - "func _$Service$_$Method$_Handler(srv interface{}, stream rpc.Stream) " - "error {\n" - "\treturn srv.($Service$Server).$Method$(&$ServiceStruct$$Method$Server" - "{stream})\n" - "}\n\n"); - printer->Print(*vars, - "type $Service$_$Method$Server interface {\n" - "\tSendAndClose(*$Response$) error\n" - "\tRecv() (*$Request$, error)\n" - "\trpc.Stream\n" - "}\n\n"); - printer->Print(*vars, - "type $ServiceStruct$$Method$Server struct {\n" - "\trpc.Stream\n" - "}\n\n"); - printer->Print( - *vars, - "func (x *$ServiceStruct$$Method$Server) SendAndClose(m *$Response$) " - "error {\n" - "\tif err := x.Stream.SendProto(m); err != nil {\n" - "\t\treturn err\n" - "\t}\n" - "\treturn nil\n" - "}\n\n"); - printer->Print( - *vars, - "func (x *$ServiceStruct$$Method$Server) Recv() (*$Request$, error) {\n" - "\tm := new($Request$)\n" - "\tif err := x.Stream.RecvProto(m); err != nil {\n" - "\t\treturn nil, err\n" - "\t}\n" - "\treturn m, nil\n" - "}\n\n"); - } -} - -void PrintServerMethodDesc(google::protobuf::io::Printer* printer, - const google::protobuf::MethodDescriptor* method, - map<string, string>* vars) { - (*vars)["Method"] = method->name(); - printer->Print("\t\t{\n"); - printer->Print(*vars, "\t\t\tMethodName:\t\"$Method$\",\n"); - printer->Print(*vars, "\t\t\tHandler:\t_$Service$_$Method$_Handler,\n"); - printer->Print("\t\t},\n"); -} - -void PrintServerStreamingMethodDesc( - google::protobuf::io::Printer* printer, - const google::protobuf::MethodDescriptor* method, - map<string, string>* vars) { - (*vars)["Method"] = method->name(); - printer->Print("\t\t{\n"); - printer->Print(*vars, "\t\t\tStreamName:\t\"$Method$\",\n"); - printer->Print(*vars, "\t\t\tHandler:\t_$Service$_$Method$_Handler,\n"); - printer->Print("\t\t},\n"); -} - -void PrintServer(google::protobuf::io::Printer* printer, - const google::protobuf::ServiceDescriptor* service, - map<string, string>* vars) { - (*vars)["Service"] = service->name(); - printer->Print(*vars, "type $Service$Server interface {\n"); - for (int i = 0; i < service->method_count(); ++i) { - PrintServerMethodDef(printer, service->method(i), vars); - } - printer->Print("}\n\n"); - - printer->Print(*vars, - "func RegisterService(s *rpc.Server, srv $Service$Server) {\n" - "\ts.RegisterService(&_$Service$_serviceDesc, srv)\n" - "}\n\n"); - - for (int i = 0; i < service->method_count(); ++i) { - PrintServerHandler(printer, service->method(i), vars); - } - - printer->Print(*vars, - "var _$Service$_serviceDesc = rpc.ServiceDesc{\n" - "\tServiceName: \"$Package$$Service$\",\n" - "\tHandlerType: (*$Service$Server)(nil),\n" - "\tMethods: []rpc.MethodDesc{\n"); - for (int i = 0; i < service->method_count(); ++i) { - if (NoStreaming(service->method(i))) { - PrintServerMethodDesc(printer, service->method(i), vars); - } - } - printer->Print("\t},\n"); - - printer->Print("\tStreams: []rpc.StreamDesc{\n"); - for (int i = 0; i < service->method_count(); ++i) { - if (!NoStreaming(service->method(i))) { - PrintServerStreamingMethodDesc(printer, service->method(i), vars); - } - } - printer->Print( - "\t},\n" - "}\n\n"); -} - -std::string BadToUnderscore(std::string str) { - for (unsigned i = 0; i < str.size(); ++i) { - if (!std::isalnum(str[i])) { - str[i] = '_'; - } - } - return str; -} - -string GetServices(const google::protobuf::FileDescriptor* file) { - string output; - google::protobuf::io::StringOutputStream output_stream(&output); - google::protobuf::io::Printer printer(&output_stream, '$'); - map<string, string> vars; - - string package_name = !file->options().go_package().empty() - ? file->options().go_package() - : file->package(); - vars["PackageName"] = BadToUnderscore(package_name); - printer.Print(vars, "package $PackageName$\n\n"); - printer.Print("import (\n"); - if (HasClientOnlyStreaming(file)) { - printer.Print( - "\t\"fmt\"\n" - "\t\"io\"\n"); - } - printer.Print( - "\t\"google/net/grpc/go/rpc\"\n" - "\tcontext \"google/third_party/golang/go_net/context/context\"\n" - "\tproto \"google/net/proto2/go/proto\"\n" - ")\n\n"); - - // $Package$ is used to fully qualify method names. - vars["Package"] = file->package(); - if (!file->package().empty()) { - vars["Package"].append("."); - } - - for (int i = 0; i < file->service_count(); ++i) { - PrintClient(&printer, file->service(0), &vars); - printer.Print("\n"); - PrintServer(&printer, file->service(0), &vars); - printer.Print("\n"); - } - return output; -} - -} // namespace grpc_go_generator diff --git a/src/compiler/go_generator.h b/src/compiler/go_generator.h deleted file mode 100644 index 5744345b56..0000000000 --- a/src/compiler/go_generator.h +++ /dev/null @@ -1,51 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef NET_GRPC_COMPILER_GO_GENERATOR_H_ -#define NET_GRPC_COMPILER_GO_GENERATOR_H_ - -#include <string> - -namespace google { -namespace protobuf { -class FileDescriptor; -} // namespace protobuf -} // namespace google - -namespace grpc_go_generator { - -std::string GetServices(const google::protobuf::FileDescriptor* file); - -} // namespace grpc_go_generator - -#endif // NET_GRPC_COMPILER_GO_GENERATOR_H_ diff --git a/src/compiler/go_plugin.cc b/src/compiler/go_plugin.cc deleted file mode 100644 index c81612c0ab..0000000000 --- a/src/compiler/go_plugin.cc +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -// Generates go gRPC service interface out of Protobuf IDL. -// -// This is a Proto2 compiler plugin. See net/proto2/compiler/proto/plugin.proto -// and net/proto2/compiler/public/plugin.h for more information on plugins. - -#include <fstream> -#include <memory> - -using namespace std; - -#include "src/compiler/go_generator.h" -#include <google/protobuf/compiler/code_generator.h> -#include <google/protobuf/compiler/plugin.h> -#include <google/protobuf/io/coded_stream.h> -#include <google/protobuf/io/zero_copy_stream.h> -#include <google/protobuf/descriptor.h> - -class GoGrpcGenerator : public google::protobuf::compiler::CodeGenerator { - public: - GoGrpcGenerator() {} - virtual ~GoGrpcGenerator() {} - - virtual bool Generate(const google::protobuf::FileDescriptor* file, - const string& parameter, - google::protobuf::compiler::GeneratorContext* context, - string* error) const { - // Get output file name. - string file_name; - if (file->name().size() > 6 && - file->name().find_last_of(".proto") == file->name().size() - 1) { - file_name = - file->name().substr(0, file->name().size() - 6) + "_grpc.pb.go"; - } else { - *error = "Invalid proto file name. Proto file must end with .proto"; - return false; - } - - std::unique_ptr<google::protobuf::io::ZeroCopyOutputStream> output( - context->Open(file_name)); - google::protobuf::io::CodedOutputStream coded_out(output.get()); - string code = grpc_go_generator::GetServices(file); - coded_out.WriteRaw(code.data(), code.size()); - return true; - } -}; - -int main(int argc, char* argv[]) { - GoGrpcGenerator generator; - return google::protobuf::compiler::PluginMain(argc, argv, &generator); -} diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 98e26aa563..ab9d3aff16 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -32,6 +32,7 @@ */ #include "src/core/channel/http_client_filter.h" +#include <string.h> #include <grpc/support/log.h> typedef struct call_data { int sent_headers; } call_data; @@ -130,6 +131,19 @@ static void destroy_call_elem(grpc_call_element *elem) { ignore_unused(channeld); } +static const char *scheme_from_args(const grpc_channel_args *args) { + int i; + if (args != NULL) { + for (i = 0; i < args->num_args; ++i) { + if (args->args[i].type == GRPC_ARG_STRING && + strcmp(args->args[i].key, GRPC_ARG_HTTP2_SCHEME) == 0) { + return args->args[i].value.string; + } + } + } + return "http"; +} + /* Constructor for channel_data */ static void init_channel_elem(grpc_channel_element *elem, const grpc_channel_args *args, grpc_mdctx *mdctx, @@ -146,7 +160,8 @@ static void init_channel_elem(grpc_channel_element *elem, /* initialize members */ channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers"); channeld->method = grpc_mdelem_from_strings(mdctx, ":method", "POST"); - channeld->scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "grpc"); + channeld->scheme = + grpc_mdelem_from_strings(mdctx, ":scheme", scheme_from_args(args)); channeld->content_type = grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc"); } diff --git a/src/core/channel/http_client_filter.h b/src/core/channel/http_client_filter.h index f939cbd351..21cde4877b 100644 --- a/src/core/channel/http_client_filter.h +++ b/src/core/channel/http_client_filter.h @@ -39,4 +39,6 @@ /* Processes metadata on the client side for HTTP2 transports */ extern const grpc_channel_filter grpc_http_client_filter; +#define GRPC_ARG_HTTP2_SCHEME "grpc.http2_scheme" + #endif /* __GRPC_INTERNAL_CHANNEL_HTTP_CLIENT_FILTER_H__ */ diff --git a/src/core/security/security_context.c b/src/core/security/security_context.c index d519ecab87..fc722f2d82 100644 --- a/src/core/security/security_context.c +++ b/src/core/security/security_context.c @@ -35,6 +35,8 @@ #include <string.h> +#include "src/core/channel/channel_args.h" +#include "src/core/channel/http_client_filter.h" #include "src/core/security/credentials.h" #include "src/core/security/secure_endpoint.h" #include "src/core/surface/lame_client.h" @@ -444,6 +446,8 @@ grpc_channel *grpc_ssl_channel_create(grpc_credentials *ssl_creds, grpc_security_status status = GRPC_SECURITY_OK; size_t i = 0; const char *secure_peer_name = target; + grpc_arg arg; + grpc_channel_args *new_args; for (i = 0; args && i < args->num_args; i++) { grpc_arg *arg = &args->args[i]; @@ -459,8 +463,13 @@ grpc_channel *grpc_ssl_channel_create(grpc_credentials *ssl_creds, if (status != GRPC_SECURITY_OK) { return grpc_lame_client_channel_create(); } - channel = grpc_secure_channel_create_internal(target, args, ctx); + arg.type = GRPC_ARG_STRING; + arg.key = GRPC_ARG_HTTP2_SCHEME; + arg.value.string = "https"; + new_args = grpc_channel_args_copy_and_add(args, &arg); + channel = grpc_secure_channel_create_internal(target, new_args, ctx); grpc_security_context_unref(&ctx->base); + grpc_channel_args_destroy(new_args); return channel; } diff --git a/src/core/support/log_posix.c b/src/core/support/log_posix.c index 0420570a3e..ee2705a2c2 100644 --- a/src/core/support/log_posix.c +++ b/src/core/support/log_posix.c @@ -31,21 +31,26 @@ * */ -#define _POSIX_SOURCE +#ifndef _POSIX_C_SOURCE +#define _POSIX_C_SOURCE 200112L +#endif + #define _GNU_SOURCE #include <grpc/support/port_platform.h> #if defined(GPR_POSIX_LOG) +#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/time.h> #include <stdio.h> #include <stdarg.h> #include <string.h> +#include <stdio.h> #include <time.h> #include <pthread.h> -static long gettid() { return pthread_self(); } +static gpr_intptr gettid() { return (gpr_intptr)pthread_self(); } void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format, ...) { @@ -55,7 +60,7 @@ void gpr_log(const char *file, int line, gpr_log_severity severity, int ret; va_list args; va_start(args, format); - ret = vsnprintf(buf, format, args); + ret = vsnprintf(buf, sizeof(buf), format, args); va_end(args); if (ret < 0) { message = NULL; @@ -64,7 +69,7 @@ void gpr_log(const char *file, int line, gpr_log_severity severity, } else { message = allocated = gpr_malloc(ret + 1); va_start(args, format); - vsnprintf(message, format, args); + vsnprintf(message, ret, format, args); va_end(args); } gpr_log_message(file, line, severity, message); @@ -91,7 +96,7 @@ void gpr_default_log(gpr_log_func_args *args) { strcpy(time_buffer, "error:strftime"); } - fprintf(stderr, "%s%s.%09d %7ld %s:%d] %s\n", + fprintf(stderr, "%s%s.%09d %7tu %s:%d] %s\n", gpr_log_severity_string(args->severity), time_buffer, (int)(now.tv_nsec), gettid(), display_file, args->line, args->message); diff --git a/src/core/support/log_win32.c b/src/core/support/log_win32.c index ae5f23a90d..dc8c1d0785 100644 --- a/src/core/support/log_win32.c +++ b/src/core/support/log_win32.c @@ -36,12 +36,13 @@ #ifdef GPR_WIN32 #include <grpc/support/log.h> +#include <grpc/support/alloc.h> #include <stdio.h> #include <stdarg.h> void gpr_log(const char *file, int line, gpr_log_severity severity, - const char *message) { - const char *message = NULL; + const char *format, ...) { + char *message = NULL; va_list args; int ret; @@ -53,7 +54,7 @@ void gpr_log(const char *file, int line, gpr_log_severity severity, message = NULL; } else { /* Allocate a new buffer, with space for the NUL terminator. */ - strp_buflen = (size_t)ret + 1; + size_t strp_buflen = (size_t)ret + 1; message = gpr_malloc(strp_buflen); /* Print to the buffer. */ @@ -73,7 +74,7 @@ void gpr_log(const char *file, int line, gpr_log_severity severity, /* Simple starter implementation */ void gpr_default_log(gpr_log_func_args *args) { - fprintf(stderr, "%s %s:%d: %s\n", gpr_log_severity_string(severity), + fprintf(stderr, "%s %s:%d: %s\n", gpr_log_severity_string(args->severity), args->file, args->line, args->message); } diff --git a/src/core/support/time.c b/src/core/support/time.c index 0e88c65be0..97243318fd 100644 --- a/src/core/support/time.c +++ b/src/core/support/time.c @@ -259,7 +259,7 @@ gpr_int32 gpr_time_to_millis(gpr_timespec t) { } else if (t.tv_sec <= -2147483) { /* TODO(ctiller): correct handling here (it's so far in the past do we care?) */ - return -2147483648; + return -2147483647; } else { return t.tv_sec * GPR_MS_PER_SEC + t.tv_nsec / GPR_NS_PER_MS; } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 4ea8378d96..0d72bf42fb 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -178,6 +178,7 @@ struct grpc_call { gpr_uint8 received_metadata; gpr_uint8 have_read; gpr_uint8 have_alarm; + gpr_uint8 got_status_code; /* The current outstanding read message tag (only valid if have_read == 1) */ void *read_tag; void *metadata_tag; @@ -225,6 +226,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, call->have_write = 0; call->have_alarm = 0; call->received_metadata = 0; + call->got_status_code = 0; call->status_code = server_transport_data != NULL ? GRPC_STATUS_OK : GRPC_STATUS_UNKNOWN; call->status_details = NULL; @@ -268,6 +270,19 @@ void grpc_call_destroy(grpc_call *c) { grpc_call_internal_unref(c); } +static void maybe_set_status_code(grpc_call *call, gpr_uint32 status) { + if (!call->got_status_code) { + call->status_code = status; + call->got_status_code = 1; + } +} + +static void maybe_set_status_details(grpc_call *call, grpc_mdstr *status) { + if (!call->status_details) { + call->status_details = grpc_mdstr_ref(status); + } +} + grpc_call_error grpc_call_cancel(grpc_call *c) { grpc_call_element *elem; grpc_call_op op; @@ -284,6 +299,21 @@ grpc_call_error grpc_call_cancel(grpc_call *c) { return GRPC_CALL_OK; } +grpc_call_error grpc_call_cancel_with_status(grpc_call *c, + grpc_status_code status, + const char *description) { + grpc_mdstr *details = + description ? grpc_mdstr_from_string(c->metadata_context, description) + : NULL; + gpr_mu_lock(&c->read_mu); + maybe_set_status_code(c, status); + if (details) { + maybe_set_status_details(c, details); + } + gpr_mu_unlock(&c->read_mu); + return grpc_call_cancel(c); +} + void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) { grpc_call_element *elem; GPR_ASSERT(op->dir == GRPC_CALL_DOWN); @@ -800,14 +830,11 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op *op) { grpc_mdelem *md = op->data.metadata; grpc_mdstr *key = md->key; if (key == grpc_channel_get_status_string(call->channel)) { - call->status_code = decode_status(md); + maybe_set_status_code(call, decode_status(md)); grpc_mdelem_unref(md); op->done_cb(op->user_data, GRPC_OP_OK); } else if (key == grpc_channel_get_message_string(call->channel)) { - if (call->status_details) { - grpc_mdstr_unref(call->status_details); - } - call->status_details = grpc_mdstr_ref(md->value); + maybe_set_status_details(call, md->value); grpc_mdelem_unref(md); op->done_cb(op->user_data, GRPC_OP_OK); } else { diff --git a/src/cpp/stream/stream_context.cc b/src/cpp/stream/stream_context.cc index 7936a30dfd..5ccf8c9682 100644 --- a/src/cpp/stream/stream_context.cc +++ b/src/cpp/stream/stream_context.cc @@ -112,9 +112,8 @@ bool StreamContext::Read(google::protobuf::Message* msg) { if (read_ev->data.read) { if (!DeserializeProto(read_ev->data.read, msg)) { ret = false; - FinishStream( - Status(StatusCode::DATA_LOSS, "Failed to parse incoming proto"), - true); + grpc_call_cancel_with_status(call(), GRPC_STATUS_DATA_LOSS, + "Failed to parse incoming proto"); } } else { ret = false; @@ -132,9 +131,8 @@ bool StreamContext::Write(const google::protobuf::Message* msg, bool is_last) { if (msg) { grpc_byte_buffer* out_buf = nullptr; if (!SerializeProto(*msg, &out_buf)) { - FinishStream(Status(StatusCode::INVALID_ARGUMENT, - "Failed to serialize outgoing proto"), - true); + grpc_call_cancel_with_status(call(), GRPC_STATUS_INVALID_ARGUMENT, + "Failed to serialize outgoing proto"); return false; } int flag = is_last ? GRPC_WRITE_BUFFER_HINT : 0; @@ -172,29 +170,18 @@ const Status& StreamContext::Wait() { grpc_event_finish(metadata_ev); // TODO(yangg) protect states by a mutex, including other places. if (!self_halfclosed_ || !peer_halfclosed_) { - FinishStream(Status::Cancelled, true); - } else { - grpc_event* finish_ev = - grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future); - GPR_ASSERT(finish_ev->type == GRPC_FINISHED); - final_status_ = Status( - static_cast<StatusCode>(finish_ev->data.finished.status), - finish_ev->data.finished.details ? finish_ev->data.finished.details - : ""); - grpc_event_finish(finish_ev); - } - return final_status_; -} - -void StreamContext::FinishStream(const Status& status, bool send) { - if (send) { - grpc_call_cancel(call()); + Cancel(); } grpc_event* finish_ev = grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future); GPR_ASSERT(finish_ev->type == GRPC_FINISHED); + final_status_ = Status( + static_cast<StatusCode>(finish_ev->data.finished.status), + finish_ev->data.finished.details ? finish_ev->data.finished.details : ""); grpc_event_finish(finish_ev); - final_status_ = status; + return final_status_; } +void StreamContext::Cancel() { grpc_call_cancel(call()); } + } // namespace grpc diff --git a/src/cpp/stream/stream_context.h b/src/cpp/stream/stream_context.h index f70fe6daa3..4781f27a77 100644 --- a/src/cpp/stream/stream_context.h +++ b/src/cpp/stream/stream_context.h @@ -48,7 +48,7 @@ namespace grpc { class ClientContext; class RpcMethod; -class StreamContext : public StreamContextInterface { +class StreamContext final : public StreamContextInterface { public: StreamContext(const RpcMethod& method, ClientContext* context, const google::protobuf::Message* request, @@ -63,7 +63,7 @@ class StreamContext : public StreamContextInterface { bool Read(google::protobuf::Message* msg) override; bool Write(const google::protobuf::Message* msg, bool is_last) override; const Status& Wait() override; - void FinishStream(const Status& status, bool send) override; + void Cancel() override; google::protobuf::Message* request() override { return request_; } google::protobuf::Message* response() override { return result_; } |