aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar murgatroid99 <michael.lumish@gmail.com>2015-01-14 17:09:28 -0800
committerGravatar murgatroid99 <michael.lumish@gmail.com>2015-01-14 17:09:28 -0800
commit82fdc983b2659b4752263e86c3f34ce26e6b97a2 (patch)
treeb870399ecb5037bf9de9587b68ac3701cfc65ab7 /src
parent6061f2b6058cfd1135488f55e832622226cd558c (diff)
parent2739a49eeebb4a10aea5bdce742355e2e42a864d (diff)
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'src')
-rw-r--r--src/compiler/cpp_generator.cc83
-rw-r--r--src/compiler/go_generator.cc530
-rw-r--r--src/compiler/go_generator.h51
-rw-r--r--src/compiler/go_plugin.cc83
-rw-r--r--src/core/channel/http_client_filter.c17
-rw-r--r--src/core/channel/http_client_filter.h2
-rw-r--r--src/core/security/security_context.c11
-rw-r--r--src/core/support/log_posix.c15
-rw-r--r--src/core/support/log_win32.c9
-rw-r--r--src/core/support/time.c2
-rw-r--r--src/core/surface/call.c37
-rw-r--r--src/cpp/stream/stream_context.cc35
-rw-r--r--src/cpp/stream/stream_context.h4
13 files changed, 132 insertions, 747 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index 1116049806..94e56d73a6 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -143,16 +143,16 @@ void PrintHeaderClientMethod(google::protobuf::io::Printer* printer,
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
- "::grpc::ClientWriter<$Request$>* $Method$("
+ "::grpc::ClientWriter< $Request$>* $Method$("
"::grpc::ClientContext* context, $Response$* response);\n\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
- "::grpc::ClientReader<$Response$>* $Method$("
+ "::grpc::ClientReader< $Response$>* $Method$("
"::grpc::ClientContext* context, const $Request$* request);\n\n");
} else if (BidiStreaming(method)) {
printer->Print(*vars,
- "::grpc::ClientReaderWriter<$Request$, $Response$>* "
+ "::grpc::ClientReaderWriter< $Request$, $Response$>* "
"$Method$(::grpc::ClientContext* context);\n\n");
}
}
@@ -174,19 +174,20 @@ void PrintHeaderServerMethod(google::protobuf::io::Printer* printer,
printer->Print(*vars,
"virtual ::grpc::Status $Method$("
"::grpc::ServerContext* context, "
- "::grpc::ServerReader<$Request$>* reader, "
+ "::grpc::ServerReader< $Request$>* reader, "
"$Response$* response);\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(*vars,
"virtual ::grpc::Status $Method$("
"::grpc::ServerContext* context, const $Request$* request, "
- "::grpc::ServerWriter<$Response$>* writer);\n");
+ "::grpc::ServerWriter< $Response$>* writer);\n");
} else if (BidiStreaming(method)) {
- printer->Print(*vars,
- "virtual ::grpc::Status $Method$("
- "::grpc::ServerContext* context, "
- "::grpc::ServerReaderWriter<$Response$, $Request$>* stream);"
- "\n");
+ printer->Print(
+ *vars,
+ "virtual ::grpc::Status $Method$("
+ "::grpc::ServerContext* context, "
+ "::grpc::ServerReaderWriter< $Response$, $Request$>* stream);"
+ "\n");
}
}
@@ -211,7 +212,7 @@ void PrintHeaderService(google::protobuf::io::Printer* printer,
printer->Outdent();
printer->Print("};\n");
printer->Print(
- "static Stub* NewStub(const std::shared_ptr<::grpc::ChannelInterface>& "
+ "static Stub* NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& "
"channel);\n");
printer->Print("\n");
@@ -269,11 +270,12 @@ void PrintSourceClientMethod(google::protobuf::io::Printer* printer,
"context, request, response);\n"
"}\n\n");
} else if (ClientOnlyStreaming(method)) {
+ printer->Print(
+ *vars,
+ "::grpc::ClientWriter< $Request$>* $Service$::Stub::$Method$("
+ "::grpc::ClientContext* context, $Response$* response) {\n");
printer->Print(*vars,
- "::grpc::ClientWriter<$Request$>* $Service$::Stub::$Method$("
- "::grpc::ClientContext* context, $Response$* response) {\n");
- printer->Print(*vars,
- " return new ::grpc::ClientWriter<$Request$>("
+ " return new ::grpc::ClientWriter< $Request$>("
"channel()->CreateStream("
"::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", "
"::grpc::RpcMethod::RpcType::CLIENT_STREAMING), "
@@ -282,10 +284,10 @@ void PrintSourceClientMethod(google::protobuf::io::Printer* printer,
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
- "::grpc::ClientReader<$Response$>* $Service$::Stub::$Method$("
+ "::grpc::ClientReader< $Response$>* $Service$::Stub::$Method$("
"::grpc::ClientContext* context, const $Request$* request) {\n");
printer->Print(*vars,
- " return new ::grpc::ClientReader<$Response$>("
+ " return new ::grpc::ClientReader< $Response$>("
"channel()->CreateStream("
"::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", "
"::grpc::RpcMethod::RpcType::SERVER_STREAMING), "
@@ -294,11 +296,11 @@ void PrintSourceClientMethod(google::protobuf::io::Printer* printer,
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
- "::grpc::ClientReaderWriter<$Request$, $Response$>* "
+ "::grpc::ClientReaderWriter< $Request$, $Response$>* "
"$Service$::Stub::$Method$(::grpc::ClientContext* context) {\n");
printer->Print(
*vars,
- " return new ::grpc::ClientReaderWriter<$Request$, $Response$>("
+ " return new ::grpc::ClientReaderWriter< $Request$, $Response$>("
"channel()->CreateStream("
"::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", "
"::grpc::RpcMethod::RpcType::BIDI_STREAMING), "
@@ -328,7 +330,7 @@ void PrintSourceServerMethod(google::protobuf::io::Printer* printer,
printer->Print(*vars,
"::grpc::Status $Service$::Service::$Method$("
"::grpc::ServerContext* context, "
- "::grpc::ServerReader<$Request$>* reader, "
+ "::grpc::ServerReader< $Request$>* reader, "
"$Response$* response) {\n");
printer->Print(
" return ::grpc::Status("
@@ -339,7 +341,7 @@ void PrintSourceServerMethod(google::protobuf::io::Printer* printer,
"::grpc::Status $Service$::Service::$Method$("
"::grpc::ServerContext* context, "
"const $Request$* request, "
- "::grpc::ServerWriter<$Response$>* writer) {\n");
+ "::grpc::ServerWriter< $Response$>* writer) {\n");
printer->Print(
" return ::grpc::Status("
"::grpc::StatusCode::UNIMPLEMENTED);\n");
@@ -348,7 +350,7 @@ void PrintSourceServerMethod(google::protobuf::io::Printer* printer,
printer->Print(*vars,
"::grpc::Status $Service$::Service::$Method$("
"::grpc::ServerContext* context, "
- "::grpc::ServerReaderWriter<$Response$, $Request$>* "
+ "::grpc::ServerReaderWriter< $Response$, $Request$>* "
"stream) {\n");
printer->Print(
" return ::grpc::Status("
@@ -361,13 +363,14 @@ void PrintSourceService(google::protobuf::io::Printer* printer,
const google::protobuf::ServiceDescriptor* service,
map<string, string>* vars) {
(*vars)["Service"] = service->name();
- printer->Print(*vars,
- "$Service$::Stub* $Service$::NewStub("
- "const std::shared_ptr<::grpc::ChannelInterface>& channel) {\n"
- " $Service$::Stub* stub = new $Service$::Stub();\n"
- " stub->set_channel(channel);\n"
- " return stub;\n"
- "};\n\n");
+ printer->Print(
+ *vars,
+ "$Service$::Stub* $Service$::NewStub("
+ "const std::shared_ptr< ::grpc::ChannelInterface>& channel) {\n"
+ " $Service$::Stub* stub = new $Service$::Stub();\n"
+ " stub->set_channel(channel);\n"
+ " return stub;\n"
+ "};\n\n");
for (int i = 0; i < service->method_count(); ++i) {
PrintSourceClientMethod(printer, service->method(i), vars);
}
@@ -400,9 +403,9 @@ void PrintSourceService(google::protobuf::io::Printer* printer,
"service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
" \"/$Package$$Service$/$Method$\",\n"
" ::grpc::RpcMethod::NORMAL_RPC,\n"
- " new ::grpc::RpcMethodHandler<$Service$::Service, $Request$, "
+ " new ::grpc::RpcMethodHandler< $Service$::Service, $Request$, "
"$Response$>(\n"
- " std::function<::grpc::Status($Service$::Service*, "
+ " std::function< ::grpc::Status($Service$::Service*, "
"::grpc::ServerContext*, const $Request$*, $Response$*)>("
"&$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
@@ -412,11 +415,11 @@ void PrintSourceService(google::protobuf::io::Printer* printer,
"service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
" \"/$Package$$Service$/$Method$\",\n"
" ::grpc::RpcMethod::CLIENT_STREAMING,\n"
- " new ::grpc::ClientStreamingHandler<"
+ " new ::grpc::ClientStreamingHandler< "
"$Service$::Service, $Request$, $Response$>(\n"
- " std::function<::grpc::Status($Service$::Service*, "
+ " std::function< ::grpc::Status($Service$::Service*, "
"::grpc::ServerContext*, "
- "::grpc::ServerReader<$Request$>*, $Response$*)>("
+ "::grpc::ServerReader< $Request$>*, $Response$*)>("
"&$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
} else if (ServerOnlyStreaming(method)) {
@@ -425,11 +428,11 @@ void PrintSourceService(google::protobuf::io::Printer* printer,
"service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
" \"/$Package$$Service$/$Method$\",\n"
" ::grpc::RpcMethod::SERVER_STREAMING,\n"
- " new ::grpc::ServerStreamingHandler<"
+ " new ::grpc::ServerStreamingHandler< "
"$Service$::Service, $Request$, $Response$>(\n"
- " std::function<::grpc::Status($Service$::Service*, "
+ " std::function< ::grpc::Status($Service$::Service*, "
"::grpc::ServerContext*, "
- "const $Request$*, ::grpc::ServerWriter<$Response$>*)>("
+ "const $Request$*, ::grpc::ServerWriter< $Response$>*)>("
"&$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
} else if (BidiStreaming(method)) {
@@ -438,11 +441,11 @@ void PrintSourceService(google::protobuf::io::Printer* printer,
"service_->AddMethod(new ::grpc::RpcServiceMethod(\n"
" \"/$Package$$Service$/$Method$\",\n"
" ::grpc::RpcMethod::BIDI_STREAMING,\n"
- " new ::grpc::BidiStreamingHandler<"
+ " new ::grpc::BidiStreamingHandler< "
"$Service$::Service, $Request$, $Response$>(\n"
- " std::function<::grpc::Status($Service$::Service*, "
+ " std::function< ::grpc::Status($Service$::Service*, "
"::grpc::ServerContext*, "
- "::grpc::ServerReaderWriter<$Response$, $Request$>*)>("
+ "::grpc::ServerReaderWriter< $Response$, $Request$>*)>("
"&$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
}
diff --git a/src/compiler/go_generator.cc b/src/compiler/go_generator.cc
deleted file mode 100644
index 84aa27668e..0000000000
--- a/src/compiler/go_generator.cc
+++ /dev/null
@@ -1,530 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/compiler/go_generator.h"
-
-#include <cctype>
-
-#include <google/protobuf/io/printer.h>
-#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
-#include <google/protobuf/descriptor.pb.h>
-#include <google/protobuf/descriptor.h>
-
-using namespace std;
-
-namespace grpc_go_generator {
-
-bool NoStreaming(const google::protobuf::MethodDescriptor* method) {
- return !method->client_streaming() && !method->server_streaming();
-}
-
-bool ClientOnlyStreaming(const google::protobuf::MethodDescriptor* method) {
- return method->client_streaming() && !method->server_streaming();
-}
-
-bool ServerOnlyStreaming(const google::protobuf::MethodDescriptor* method) {
- return !method->client_streaming() && method->server_streaming();
-}
-
-bool BidiStreaming(const google::protobuf::MethodDescriptor* method) {
- return method->client_streaming() && method->server_streaming();
-}
-
-bool HasClientOnlyStreaming(const google::protobuf::FileDescriptor* file) {
- for (int i = 0; i < file->service_count(); i++) {
- for (int j = 0; j < file->service(i)->method_count(); j++) {
- if (ClientOnlyStreaming(file->service(i)->method(j))) {
- return true;
- }
- }
- }
- return false;
-}
-
-string LowerCaseService(const string& service) {
- string ret = service;
- if (!ret.empty() && ret[0] >= 'A' && ret[0] <= 'Z') {
- ret[0] = ret[0] - 'A' + 'a';
- }
- return ret;
-}
-
-void PrintClientMethodDef(google::protobuf::io::Printer* printer,
- const google::protobuf::MethodDescriptor* method,
- map<string, string>* vars) {
- (*vars)["Method"] = method->name();
- (*vars)["Request"] = method->input_type()->name();
- (*vars)["Response"] = method->output_type()->name();
- if (NoStreaming(method)) {
- printer->Print(*vars,
- "\t$Method$(ctx context.Context, in *$Request$, opts "
- "...rpc.CallOption) "
- "(*$Response$, error)\n");
- } else if (BidiStreaming(method)) {
- printer->Print(*vars,
- "\t$Method$(ctx context.Context, opts ...rpc.CallOption) "
- "($Service$_$Method$Client, error)\n");
- } else if (ServerOnlyStreaming(method)) {
- printer->Print(
- *vars,
- "\t$Method$(ctx context.Context, m *$Request$, opts ...rpc.CallOption) "
- "($Service$_$Method$Client, error)\n");
- } else if (ClientOnlyStreaming(method)) {
- printer->Print(*vars,
- "\t$Method$(ctx context.Context, opts ...rpc.CallOption) "
- "($Service$_$Method$Client, error)\n");
- }
-}
-
-void PrintClientMethodImpl(google::protobuf::io::Printer* printer,
- const google::protobuf::MethodDescriptor* method,
- map<string, string>* vars) {
- (*vars)["Method"] = method->name();
- (*vars)["Request"] = method->input_type()->name();
- (*vars)["Response"] = method->output_type()->name();
-
- if (NoStreaming(method)) {
- printer->Print(
- *vars,
- "func (c *$ServiceStruct$Client) $Method$(ctx context.Context, "
- "in *$Request$, opts ...rpc.CallOption) (*$Response$, error) {\n");
- printer->Print(*vars, "\tout := new($Response$)\n");
- printer->Print(*vars,
- "\terr := rpc.Invoke(ctx, \"/$Package$$Service$/$Method$\", "
- "in, out, c.cc, opts...)\n");
- printer->Print("\tif err != nil {\n");
- printer->Print("\t\treturn nil, err\n");
- printer->Print("\t}\n");
- printer->Print("\treturn out, nil\n");
- printer->Print("}\n\n");
- } else if (BidiStreaming(method)) {
- printer->Print(
- *vars,
- "func (c *$ServiceStruct$Client) $Method$(ctx context.Context, opts "
- "...rpc.CallOption) ($Service$_$Method$Client, error) {\n"
- "\tstream, err := rpc.NewClientStream(ctx, c.cc, "
- "\"/$Package$$Service$/$Method$\", opts...)\n"
- "\tif err != nil {\n"
- "\t\treturn nil, err\n"
- "\t}\n"
- "\treturn &$ServiceStruct$$Method$Client{stream}, nil\n"
- "}\n\n");
- printer->Print(*vars,
- "type $Service$_$Method$Client interface {\n"
- "\tSend(*$Request$) error\n"
- "\tRecv() (*$Response$, error)\n"
- "\trpc.ClientStream\n"
- "}\n\n");
- printer->Print(*vars,
- "type $ServiceStruct$$Method$Client struct {\n"
- "\trpc.ClientStream\n"
- "}\n\n");
- printer->Print(
- *vars,
- "func (x *$ServiceStruct$$Method$Client) Send(m *$Request$) error {\n"
- "\treturn x.ClientStream.SendProto(m)\n"
- "}\n\n");
- printer->Print(
- *vars,
- "func (x *$ServiceStruct$$Method$Client) Recv() (*$Response$, error) "
- "{\n"
- "\tm := new($Response$)\n"
- "\tif err := x.ClientStream.RecvProto(m); err != nil {\n"
- "\t\treturn nil, err\n"
- "\t}\n"
- "\treturn m, nil\n"
- "}\n\n");
- } else if (ServerOnlyStreaming(method)) {
- printer->Print(
- *vars,
- "func (c *$ServiceStruct$Client) $Method$(ctx context.Context, m "
- "*$Request$, "
- "opts ...rpc.CallOption) ($Service$_$Method$Client, error) {\n"
- "\tstream, err := rpc.NewClientStream(ctx, c.cc, "
- "\"/$Package$$Service$/$Method$\", opts...)\n"
- "\tif err != nil {\n"
- "\t\treturn nil, err\n"
- "\t}\n"
- "\tx := &$ServiceStruct$$Method$Client{stream}\n"
- "\tif err := x.ClientStream.SendProto(m); err != nil {\n"
- "\t\treturn nil, err\n"
- "\t}\n"
- "\tif err := x.ClientStream.CloseSend(); err != nil {\n"
- "\t\treturn nil, err\n"
- "\t}\n"
- "\treturn x, nil\n"
- "}\n\n");
- printer->Print(*vars,
- "type $Service$_$Method$Client interface {\n"
- "\tRecv() (*$Response$, error)\n"
- "\trpc.ClientStream\n"
- "}\n\n");
- printer->Print(*vars,
- "type $ServiceStruct$$Method$Client struct {\n"
- "\trpc.ClientStream\n"
- "}\n\n");
- printer->Print(
- *vars,
- "func (x *$ServiceStruct$$Method$Client) Recv() (*$Response$, error) "
- "{\n"
- "\tm := new($Response$)\n"
- "\tif err := x.ClientStream.RecvProto(m); err != nil {\n"
- "\t\treturn nil, err\n"
- "\t}\n"
- "\treturn m, nil\n"
- "}\n\n");
- } else if (ClientOnlyStreaming(method)) {
- printer->Print(
- *vars,
- "func (c *$ServiceStruct$Client) $Method$(ctx context.Context, opts "
- "...rpc.CallOption) ($Service$_$Method$Client, error) {\n"
- "\tstream, err := rpc.NewClientStream(ctx, c.cc, "
- "\"/$Package$$Service$/$Method$\", opts...)\n"
- "\tif err != nil {\n"
- "\t\treturn nil, err\n"
- "\t}\n"
- "\treturn &$ServiceStruct$$Method$Client{stream}, nil\n"
- "}\n\n");
- printer->Print(*vars,
- "type $Service$_$Method$Client interface {\n"
- "\tSend(*$Request$) error\n"
- "\tCloseAndRecv() (*$Response$, error)\n"
- "\trpc.ClientStream\n"
- "}\n\n");
- printer->Print(*vars,
- "type $ServiceStruct$$Method$Client struct {\n"
- "\trpc.ClientStream\n"
- "}\n\n");
- printer->Print(
- *vars,
- "func (x *$ServiceStruct$$Method$Client) Send(m *$Request$) error {\n"
- "\treturn x.ClientStream.SendProto(m)\n"
- "}\n\n");
- printer->Print(
- *vars,
- "func (x *$ServiceStruct$$Method$Client) CloseAndRecv() (*$Response$, "
- "error) {\n"
- "\tif err := x.ClientStream.CloseSend(); err != nil {\n"
- "\t\treturn nil, err\n"
- "\t}\n"
- "\tm := new($Response$)\n"
- "\tif err := x.ClientStream.RecvProto(m); err != nil {\n"
- "\t\treturn nil, err\n"
- "\t}\n"
- "\t// Read EOF.\n"
- "\tif err := x.ClientStream.RecvProto(m); err == io.EOF {\n"
- "\t\treturn m, io.EOF\n"
- "\t}\n"
- "\t// gRPC protocol violation.\n"
- "\treturn m, fmt.Errorf(\"Violate gRPC client streaming protocol: no "
- "EOF after the response.\")\n"
- "}\n\n");
- }
-}
-
-void PrintClient(google::protobuf::io::Printer* printer,
- const google::protobuf::ServiceDescriptor* service,
- map<string, string>* vars) {
- (*vars)["Service"] = service->name();
- (*vars)["ServiceStruct"] = LowerCaseService(service->name());
- printer->Print(*vars, "type $Service$Client interface {\n");
- for (int i = 0; i < service->method_count(); ++i) {
- PrintClientMethodDef(printer, service->method(i), vars);
- }
- printer->Print("}\n\n");
-
- printer->Print(*vars,
- "type $ServiceStruct$Client struct {\n"
- "\tcc *rpc.ClientConn\n"
- "}\n\n");
- printer->Print(
- *vars,
- "func New$Service$Client(cc *rpc.ClientConn) $Service$Client {\n"
- "\treturn &$ServiceStruct$Client{cc}\n"
- "}\n\n");
- for (int i = 0; i < service->method_count(); ++i) {
- PrintClientMethodImpl(printer, service->method(i), vars);
- }
-}
-
-void PrintServerMethodDef(google::protobuf::io::Printer* printer,
- const google::protobuf::MethodDescriptor* method,
- map<string, string>* vars) {
- (*vars)["Method"] = method->name();
- (*vars)["Request"] = method->input_type()->name();
- (*vars)["Response"] = method->output_type()->name();
- if (NoStreaming(method)) {
- printer->Print(
- *vars,
- "\t$Method$(context.Context, *$Request$) (*$Response$, error)\n");
- } else if (BidiStreaming(method)) {
- printer->Print(*vars, "\t$Method$($Service$_$Method$Server) error\n");
- } else if (ServerOnlyStreaming(method)) {
- printer->Print(*vars,
- "\t$Method$(*$Request$, $Service$_$Method$Server) error\n");
- } else if (ClientOnlyStreaming(method)) {
- printer->Print(*vars, "\t$Method$($Service$_$Method$Server) error\n");
- }
-}
-
-void PrintServerHandler(google::protobuf::io::Printer* printer,
- const google::protobuf::MethodDescriptor* method,
- map<string, string>* vars) {
- (*vars)["Method"] = method->name();
- (*vars)["Request"] = method->input_type()->name();
- (*vars)["Response"] = method->output_type()->name();
- if (NoStreaming(method)) {
- printer->Print(
- *vars,
- "func _$Service$_$Method$_Handler(srv interface{}, ctx context.Context,"
- " buf []byte) (proto.Message, error) {\n");
- printer->Print(*vars, "\tin := new($Request$)\n");
- printer->Print("\tif err := proto.Unmarshal(buf, in); err != nil {\n");
- printer->Print("\t\treturn nil, err\n");
- printer->Print("\t}\n");
- printer->Print(*vars,
- "\tout, err := srv.($Service$Server).$Method$(ctx, in)\n");
- printer->Print("\tif err != nil {\n");
- printer->Print("\t\treturn nil, err\n");
- printer->Print("\t}\n");
- printer->Print("\treturn out, nil\n");
- printer->Print("}\n\n");
- } else if (BidiStreaming(method)) {
- printer->Print(
- *vars,
- "func _$Service$_$Method$_Handler(srv interface{}, stream rpc.Stream) "
- "error {\n"
- "\treturn srv.($Service$Server).$Method$(&$ServiceStruct$$Method$Server"
- "{stream})\n"
- "}\n\n");
- printer->Print(*vars,
- "type $Service$_$Method$Server interface {\n"
- "\tSend(*$Response$) error\n"
- "\tRecv() (*$Request$, error)\n"
- "\trpc.Stream\n"
- "}\n\n");
- printer->Print(*vars,
- "type $ServiceStruct$$Method$Server struct {\n"
- "\trpc.Stream\n"
- "}\n\n");
- printer->Print(
- *vars,
- "func (x *$ServiceStruct$$Method$Server) Send(m *$Response$) error {\n"
- "\treturn x.Stream.SendProto(m)\n"
- "}\n\n");
- printer->Print(
- *vars,
- "func (x *$ServiceStruct$$Method$Server) Recv() (*$Request$, error) "
- "{\n"
- "\tm := new($Request$)\n"
- "\tif err := x.Stream.RecvProto(m); err != nil {\n"
- "\t\treturn nil, err\n"
- "\t}\n"
- "\treturn m, nil\n"
- "}\n\n");
- } else if (ServerOnlyStreaming(method)) {
- printer->Print(
- *vars,
- "func _$Service$_$Method$_Handler(srv interface{}, stream rpc.Stream) "
- "error {\n"
- "\tm := new($Request$)\n"
- "\tif err := stream.RecvProto(m); err != nil {\n"
- "\t\treturn err\n"
- "\t}\n"
- "\treturn srv.($Service$Server).$Method$(m, "
- "&$ServiceStruct$$Method$Server{stream})\n"
- "}\n\n");
- printer->Print(*vars,
- "type $Service$_$Method$Server interface {\n"
- "\tSend(*$Response$) error\n"
- "\trpc.Stream\n"
- "}\n\n");
- printer->Print(*vars,
- "type $ServiceStruct$$Method$Server struct {\n"
- "\trpc.Stream\n"
- "}\n\n");
- printer->Print(
- *vars,
- "func (x *$ServiceStruct$$Method$Server) Send(m *$Response$) error {\n"
- "\treturn x.Stream.SendProto(m)\n"
- "}\n\n");
- } else if (ClientOnlyStreaming(method)) {
- printer->Print(
- *vars,
- "func _$Service$_$Method$_Handler(srv interface{}, stream rpc.Stream) "
- "error {\n"
- "\treturn srv.($Service$Server).$Method$(&$ServiceStruct$$Method$Server"
- "{stream})\n"
- "}\n\n");
- printer->Print(*vars,
- "type $Service$_$Method$Server interface {\n"
- "\tSendAndClose(*$Response$) error\n"
- "\tRecv() (*$Request$, error)\n"
- "\trpc.Stream\n"
- "}\n\n");
- printer->Print(*vars,
- "type $ServiceStruct$$Method$Server struct {\n"
- "\trpc.Stream\n"
- "}\n\n");
- printer->Print(
- *vars,
- "func (x *$ServiceStruct$$Method$Server) SendAndClose(m *$Response$) "
- "error {\n"
- "\tif err := x.Stream.SendProto(m); err != nil {\n"
- "\t\treturn err\n"
- "\t}\n"
- "\treturn nil\n"
- "}\n\n");
- printer->Print(
- *vars,
- "func (x *$ServiceStruct$$Method$Server) Recv() (*$Request$, error) {\n"
- "\tm := new($Request$)\n"
- "\tif err := x.Stream.RecvProto(m); err != nil {\n"
- "\t\treturn nil, err\n"
- "\t}\n"
- "\treturn m, nil\n"
- "}\n\n");
- }
-}
-
-void PrintServerMethodDesc(google::protobuf::io::Printer* printer,
- const google::protobuf::MethodDescriptor* method,
- map<string, string>* vars) {
- (*vars)["Method"] = method->name();
- printer->Print("\t\t{\n");
- printer->Print(*vars, "\t\t\tMethodName:\t\"$Method$\",\n");
- printer->Print(*vars, "\t\t\tHandler:\t_$Service$_$Method$_Handler,\n");
- printer->Print("\t\t},\n");
-}
-
-void PrintServerStreamingMethodDesc(
- google::protobuf::io::Printer* printer,
- const google::protobuf::MethodDescriptor* method,
- map<string, string>* vars) {
- (*vars)["Method"] = method->name();
- printer->Print("\t\t{\n");
- printer->Print(*vars, "\t\t\tStreamName:\t\"$Method$\",\n");
- printer->Print(*vars, "\t\t\tHandler:\t_$Service$_$Method$_Handler,\n");
- printer->Print("\t\t},\n");
-}
-
-void PrintServer(google::protobuf::io::Printer* printer,
- const google::protobuf::ServiceDescriptor* service,
- map<string, string>* vars) {
- (*vars)["Service"] = service->name();
- printer->Print(*vars, "type $Service$Server interface {\n");
- for (int i = 0; i < service->method_count(); ++i) {
- PrintServerMethodDef(printer, service->method(i), vars);
- }
- printer->Print("}\n\n");
-
- printer->Print(*vars,
- "func RegisterService(s *rpc.Server, srv $Service$Server) {\n"
- "\ts.RegisterService(&_$Service$_serviceDesc, srv)\n"
- "}\n\n");
-
- for (int i = 0; i < service->method_count(); ++i) {
- PrintServerHandler(printer, service->method(i), vars);
- }
-
- printer->Print(*vars,
- "var _$Service$_serviceDesc = rpc.ServiceDesc{\n"
- "\tServiceName: \"$Package$$Service$\",\n"
- "\tHandlerType: (*$Service$Server)(nil),\n"
- "\tMethods: []rpc.MethodDesc{\n");
- for (int i = 0; i < service->method_count(); ++i) {
- if (NoStreaming(service->method(i))) {
- PrintServerMethodDesc(printer, service->method(i), vars);
- }
- }
- printer->Print("\t},\n");
-
- printer->Print("\tStreams: []rpc.StreamDesc{\n");
- for (int i = 0; i < service->method_count(); ++i) {
- if (!NoStreaming(service->method(i))) {
- PrintServerStreamingMethodDesc(printer, service->method(i), vars);
- }
- }
- printer->Print(
- "\t},\n"
- "}\n\n");
-}
-
-std::string BadToUnderscore(std::string str) {
- for (unsigned i = 0; i < str.size(); ++i) {
- if (!std::isalnum(str[i])) {
- str[i] = '_';
- }
- }
- return str;
-}
-
-string GetServices(const google::protobuf::FileDescriptor* file) {
- string output;
- google::protobuf::io::StringOutputStream output_stream(&output);
- google::protobuf::io::Printer printer(&output_stream, '$');
- map<string, string> vars;
-
- string package_name = !file->options().go_package().empty()
- ? file->options().go_package()
- : file->package();
- vars["PackageName"] = BadToUnderscore(package_name);
- printer.Print(vars, "package $PackageName$\n\n");
- printer.Print("import (\n");
- if (HasClientOnlyStreaming(file)) {
- printer.Print(
- "\t\"fmt\"\n"
- "\t\"io\"\n");
- }
- printer.Print(
- "\t\"google/net/grpc/go/rpc\"\n"
- "\tcontext \"google/third_party/golang/go_net/context/context\"\n"
- "\tproto \"google/net/proto2/go/proto\"\n"
- ")\n\n");
-
- // $Package$ is used to fully qualify method names.
- vars["Package"] = file->package();
- if (!file->package().empty()) {
- vars["Package"].append(".");
- }
-
- for (int i = 0; i < file->service_count(); ++i) {
- PrintClient(&printer, file->service(0), &vars);
- printer.Print("\n");
- PrintServer(&printer, file->service(0), &vars);
- printer.Print("\n");
- }
- return output;
-}
-
-} // namespace grpc_go_generator
diff --git a/src/compiler/go_generator.h b/src/compiler/go_generator.h
deleted file mode 100644
index 5744345b56..0000000000
--- a/src/compiler/go_generator.h
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef NET_GRPC_COMPILER_GO_GENERATOR_H_
-#define NET_GRPC_COMPILER_GO_GENERATOR_H_
-
-#include <string>
-
-namespace google {
-namespace protobuf {
-class FileDescriptor;
-} // namespace protobuf
-} // namespace google
-
-namespace grpc_go_generator {
-
-std::string GetServices(const google::protobuf::FileDescriptor* file);
-
-} // namespace grpc_go_generator
-
-#endif // NET_GRPC_COMPILER_GO_GENERATOR_H_
diff --git a/src/compiler/go_plugin.cc b/src/compiler/go_plugin.cc
deleted file mode 100644
index c81612c0ab..0000000000
--- a/src/compiler/go_plugin.cc
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-// Generates go gRPC service interface out of Protobuf IDL.
-//
-// This is a Proto2 compiler plugin. See net/proto2/compiler/proto/plugin.proto
-// and net/proto2/compiler/public/plugin.h for more information on plugins.
-
-#include <fstream>
-#include <memory>
-
-using namespace std;
-
-#include "src/compiler/go_generator.h"
-#include <google/protobuf/compiler/code_generator.h>
-#include <google/protobuf/compiler/plugin.h>
-#include <google/protobuf/io/coded_stream.h>
-#include <google/protobuf/io/zero_copy_stream.h>
-#include <google/protobuf/descriptor.h>
-
-class GoGrpcGenerator : public google::protobuf::compiler::CodeGenerator {
- public:
- GoGrpcGenerator() {}
- virtual ~GoGrpcGenerator() {}
-
- virtual bool Generate(const google::protobuf::FileDescriptor* file,
- const string& parameter,
- google::protobuf::compiler::GeneratorContext* context,
- string* error) const {
- // Get output file name.
- string file_name;
- if (file->name().size() > 6 &&
- file->name().find_last_of(".proto") == file->name().size() - 1) {
- file_name =
- file->name().substr(0, file->name().size() - 6) + "_grpc.pb.go";
- } else {
- *error = "Invalid proto file name. Proto file must end with .proto";
- return false;
- }
-
- std::unique_ptr<google::protobuf::io::ZeroCopyOutputStream> output(
- context->Open(file_name));
- google::protobuf::io::CodedOutputStream coded_out(output.get());
- string code = grpc_go_generator::GetServices(file);
- coded_out.WriteRaw(code.data(), code.size());
- return true;
- }
-};
-
-int main(int argc, char* argv[]) {
- GoGrpcGenerator generator;
- return google::protobuf::compiler::PluginMain(argc, argv, &generator);
-}
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index 98e26aa563..ab9d3aff16 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -32,6 +32,7 @@
*/
#include "src/core/channel/http_client_filter.h"
+#include <string.h>
#include <grpc/support/log.h>
typedef struct call_data { int sent_headers; } call_data;
@@ -130,6 +131,19 @@ static void destroy_call_elem(grpc_call_element *elem) {
ignore_unused(channeld);
}
+static const char *scheme_from_args(const grpc_channel_args *args) {
+ int i;
+ if (args != NULL) {
+ for (i = 0; i < args->num_args; ++i) {
+ if (args->args[i].type == GRPC_ARG_STRING &&
+ strcmp(args->args[i].key, GRPC_ARG_HTTP2_SCHEME) == 0) {
+ return args->args[i].value.string;
+ }
+ }
+ }
+ return "http";
+}
+
/* Constructor for channel_data */
static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx,
@@ -146,7 +160,8 @@ static void init_channel_elem(grpc_channel_element *elem,
/* initialize members */
channeld->te_trailers = grpc_mdelem_from_strings(mdctx, "te", "trailers");
channeld->method = grpc_mdelem_from_strings(mdctx, ":method", "POST");
- channeld->scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "grpc");
+ channeld->scheme =
+ grpc_mdelem_from_strings(mdctx, ":scheme", scheme_from_args(args));
channeld->content_type =
grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc");
}
diff --git a/src/core/channel/http_client_filter.h b/src/core/channel/http_client_filter.h
index f939cbd351..21cde4877b 100644
--- a/src/core/channel/http_client_filter.h
+++ b/src/core/channel/http_client_filter.h
@@ -39,4 +39,6 @@
/* Processes metadata on the client side for HTTP2 transports */
extern const grpc_channel_filter grpc_http_client_filter;
+#define GRPC_ARG_HTTP2_SCHEME "grpc.http2_scheme"
+
#endif /* __GRPC_INTERNAL_CHANNEL_HTTP_CLIENT_FILTER_H__ */
diff --git a/src/core/security/security_context.c b/src/core/security/security_context.c
index d519ecab87..fc722f2d82 100644
--- a/src/core/security/security_context.c
+++ b/src/core/security/security_context.c
@@ -35,6 +35,8 @@
#include <string.h>
+#include "src/core/channel/channel_args.h"
+#include "src/core/channel/http_client_filter.h"
#include "src/core/security/credentials.h"
#include "src/core/security/secure_endpoint.h"
#include "src/core/surface/lame_client.h"
@@ -444,6 +446,8 @@ grpc_channel *grpc_ssl_channel_create(grpc_credentials *ssl_creds,
grpc_security_status status = GRPC_SECURITY_OK;
size_t i = 0;
const char *secure_peer_name = target;
+ grpc_arg arg;
+ grpc_channel_args *new_args;
for (i = 0; args && i < args->num_args; i++) {
grpc_arg *arg = &args->args[i];
@@ -459,8 +463,13 @@ grpc_channel *grpc_ssl_channel_create(grpc_credentials *ssl_creds,
if (status != GRPC_SECURITY_OK) {
return grpc_lame_client_channel_create();
}
- channel = grpc_secure_channel_create_internal(target, args, ctx);
+ arg.type = GRPC_ARG_STRING;
+ arg.key = GRPC_ARG_HTTP2_SCHEME;
+ arg.value.string = "https";
+ new_args = grpc_channel_args_copy_and_add(args, &arg);
+ channel = grpc_secure_channel_create_internal(target, new_args, ctx);
grpc_security_context_unref(&ctx->base);
+ grpc_channel_args_destroy(new_args);
return channel;
}
diff --git a/src/core/support/log_posix.c b/src/core/support/log_posix.c
index 0420570a3e..ee2705a2c2 100644
--- a/src/core/support/log_posix.c
+++ b/src/core/support/log_posix.c
@@ -31,21 +31,26 @@
*
*/
-#define _POSIX_SOURCE
+#ifndef _POSIX_C_SOURCE
+#define _POSIX_C_SOURCE 200112L
+#endif
+
#define _GNU_SOURCE
#include <grpc/support/port_platform.h>
#if defined(GPR_POSIX_LOG)
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <stdio.h>
#include <stdarg.h>
#include <string.h>
+#include <stdio.h>
#include <time.h>
#include <pthread.h>
-static long gettid() { return pthread_self(); }
+static gpr_intptr gettid() { return (gpr_intptr)pthread_self(); }
void gpr_log(const char *file, int line, gpr_log_severity severity,
const char *format, ...) {
@@ -55,7 +60,7 @@ void gpr_log(const char *file, int line, gpr_log_severity severity,
int ret;
va_list args;
va_start(args, format);
- ret = vsnprintf(buf, format, args);
+ ret = vsnprintf(buf, sizeof(buf), format, args);
va_end(args);
if (ret < 0) {
message = NULL;
@@ -64,7 +69,7 @@ void gpr_log(const char *file, int line, gpr_log_severity severity,
} else {
message = allocated = gpr_malloc(ret + 1);
va_start(args, format);
- vsnprintf(message, format, args);
+ vsnprintf(message, ret, format, args);
va_end(args);
}
gpr_log_message(file, line, severity, message);
@@ -91,7 +96,7 @@ void gpr_default_log(gpr_log_func_args *args) {
strcpy(time_buffer, "error:strftime");
}
- fprintf(stderr, "%s%s.%09d %7ld %s:%d] %s\n",
+ fprintf(stderr, "%s%s.%09d %7tu %s:%d] %s\n",
gpr_log_severity_string(args->severity), time_buffer,
(int)(now.tv_nsec), gettid(), display_file, args->line,
args->message);
diff --git a/src/core/support/log_win32.c b/src/core/support/log_win32.c
index ae5f23a90d..dc8c1d0785 100644
--- a/src/core/support/log_win32.c
+++ b/src/core/support/log_win32.c
@@ -36,12 +36,13 @@
#ifdef GPR_WIN32
#include <grpc/support/log.h>
+#include <grpc/support/alloc.h>
#include <stdio.h>
#include <stdarg.h>
void gpr_log(const char *file, int line, gpr_log_severity severity,
- const char *message) {
- const char *message = NULL;
+ const char *format, ...) {
+ char *message = NULL;
va_list args;
int ret;
@@ -53,7 +54,7 @@ void gpr_log(const char *file, int line, gpr_log_severity severity,
message = NULL;
} else {
/* Allocate a new buffer, with space for the NUL terminator. */
- strp_buflen = (size_t)ret + 1;
+ size_t strp_buflen = (size_t)ret + 1;
message = gpr_malloc(strp_buflen);
/* Print to the buffer. */
@@ -73,7 +74,7 @@ void gpr_log(const char *file, int line, gpr_log_severity severity,
/* Simple starter implementation */
void gpr_default_log(gpr_log_func_args *args) {
- fprintf(stderr, "%s %s:%d: %s\n", gpr_log_severity_string(severity),
+ fprintf(stderr, "%s %s:%d: %s\n", gpr_log_severity_string(args->severity),
args->file, args->line, args->message);
}
diff --git a/src/core/support/time.c b/src/core/support/time.c
index 0e88c65be0..97243318fd 100644
--- a/src/core/support/time.c
+++ b/src/core/support/time.c
@@ -259,7 +259,7 @@ gpr_int32 gpr_time_to_millis(gpr_timespec t) {
} else if (t.tv_sec <= -2147483) {
/* TODO(ctiller): correct handling here (it's so far in the past do we
care?) */
- return -2147483648;
+ return -2147483647;
} else {
return t.tv_sec * GPR_MS_PER_SEC + t.tv_nsec / GPR_NS_PER_MS;
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 4ea8378d96..0d72bf42fb 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -178,6 +178,7 @@ struct grpc_call {
gpr_uint8 received_metadata;
gpr_uint8 have_read;
gpr_uint8 have_alarm;
+ gpr_uint8 got_status_code;
/* The current outstanding read message tag (only valid if have_read == 1) */
void *read_tag;
void *metadata_tag;
@@ -225,6 +226,7 @@ grpc_call *grpc_call_create(grpc_channel *channel,
call->have_write = 0;
call->have_alarm = 0;
call->received_metadata = 0;
+ call->got_status_code = 0;
call->status_code =
server_transport_data != NULL ? GRPC_STATUS_OK : GRPC_STATUS_UNKNOWN;
call->status_details = NULL;
@@ -268,6 +270,19 @@ void grpc_call_destroy(grpc_call *c) {
grpc_call_internal_unref(c);
}
+static void maybe_set_status_code(grpc_call *call, gpr_uint32 status) {
+ if (!call->got_status_code) {
+ call->status_code = status;
+ call->got_status_code = 1;
+ }
+}
+
+static void maybe_set_status_details(grpc_call *call, grpc_mdstr *status) {
+ if (!call->status_details) {
+ call->status_details = grpc_mdstr_ref(status);
+ }
+}
+
grpc_call_error grpc_call_cancel(grpc_call *c) {
grpc_call_element *elem;
grpc_call_op op;
@@ -284,6 +299,21 @@ grpc_call_error grpc_call_cancel(grpc_call *c) {
return GRPC_CALL_OK;
}
+grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
+ grpc_status_code status,
+ const char *description) {
+ grpc_mdstr *details =
+ description ? grpc_mdstr_from_string(c->metadata_context, description)
+ : NULL;
+ gpr_mu_lock(&c->read_mu);
+ maybe_set_status_code(c, status);
+ if (details) {
+ maybe_set_status_details(c, details);
+ }
+ gpr_mu_unlock(&c->read_mu);
+ return grpc_call_cancel(c);
+}
+
void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) {
grpc_call_element *elem;
GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
@@ -800,14 +830,11 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op *op) {
grpc_mdelem *md = op->data.metadata;
grpc_mdstr *key = md->key;
if (key == grpc_channel_get_status_string(call->channel)) {
- call->status_code = decode_status(md);
+ maybe_set_status_code(call, decode_status(md));
grpc_mdelem_unref(md);
op->done_cb(op->user_data, GRPC_OP_OK);
} else if (key == grpc_channel_get_message_string(call->channel)) {
- if (call->status_details) {
- grpc_mdstr_unref(call->status_details);
- }
- call->status_details = grpc_mdstr_ref(md->value);
+ maybe_set_status_details(call, md->value);
grpc_mdelem_unref(md);
op->done_cb(op->user_data, GRPC_OP_OK);
} else {
diff --git a/src/cpp/stream/stream_context.cc b/src/cpp/stream/stream_context.cc
index 7936a30dfd..5ccf8c9682 100644
--- a/src/cpp/stream/stream_context.cc
+++ b/src/cpp/stream/stream_context.cc
@@ -112,9 +112,8 @@ bool StreamContext::Read(google::protobuf::Message* msg) {
if (read_ev->data.read) {
if (!DeserializeProto(read_ev->data.read, msg)) {
ret = false;
- FinishStream(
- Status(StatusCode::DATA_LOSS, "Failed to parse incoming proto"),
- true);
+ grpc_call_cancel_with_status(call(), GRPC_STATUS_DATA_LOSS,
+ "Failed to parse incoming proto");
}
} else {
ret = false;
@@ -132,9 +131,8 @@ bool StreamContext::Write(const google::protobuf::Message* msg, bool is_last) {
if (msg) {
grpc_byte_buffer* out_buf = nullptr;
if (!SerializeProto(*msg, &out_buf)) {
- FinishStream(Status(StatusCode::INVALID_ARGUMENT,
- "Failed to serialize outgoing proto"),
- true);
+ grpc_call_cancel_with_status(call(), GRPC_STATUS_INVALID_ARGUMENT,
+ "Failed to serialize outgoing proto");
return false;
}
int flag = is_last ? GRPC_WRITE_BUFFER_HINT : 0;
@@ -172,29 +170,18 @@ const Status& StreamContext::Wait() {
grpc_event_finish(metadata_ev);
// TODO(yangg) protect states by a mutex, including other places.
if (!self_halfclosed_ || !peer_halfclosed_) {
- FinishStream(Status::Cancelled, true);
- } else {
- grpc_event* finish_ev =
- grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future);
- GPR_ASSERT(finish_ev->type == GRPC_FINISHED);
- final_status_ = Status(
- static_cast<StatusCode>(finish_ev->data.finished.status),
- finish_ev->data.finished.details ? finish_ev->data.finished.details
- : "");
- grpc_event_finish(finish_ev);
- }
- return final_status_;
-}
-
-void StreamContext::FinishStream(const Status& status, bool send) {
- if (send) {
- grpc_call_cancel(call());
+ Cancel();
}
grpc_event* finish_ev =
grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future);
GPR_ASSERT(finish_ev->type == GRPC_FINISHED);
+ final_status_ = Status(
+ static_cast<StatusCode>(finish_ev->data.finished.status),
+ finish_ev->data.finished.details ? finish_ev->data.finished.details : "");
grpc_event_finish(finish_ev);
- final_status_ = status;
+ return final_status_;
}
+void StreamContext::Cancel() { grpc_call_cancel(call()); }
+
} // namespace grpc
diff --git a/src/cpp/stream/stream_context.h b/src/cpp/stream/stream_context.h
index f70fe6daa3..4781f27a77 100644
--- a/src/cpp/stream/stream_context.h
+++ b/src/cpp/stream/stream_context.h
@@ -48,7 +48,7 @@ namespace grpc {
class ClientContext;
class RpcMethod;
-class StreamContext : public StreamContextInterface {
+class StreamContext final : public StreamContextInterface {
public:
StreamContext(const RpcMethod& method, ClientContext* context,
const google::protobuf::Message* request,
@@ -63,7 +63,7 @@ class StreamContext : public StreamContextInterface {
bool Read(google::protobuf::Message* msg) override;
bool Write(const google::protobuf::Message* msg, bool is_last) override;
const Status& Wait() override;
- void FinishStream(const Status& status, bool send) override;
+ void Cancel() override;
google::protobuf::Message* request() override { return request_; }
google::protobuf::Message* response() override { return result_; }