aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-07-17 14:36:45 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-07-17 14:36:45 -0700
commitaf26143b6fc76559f0907aadfc9f23c4e2e60844 (patch)
tree2805cc3317954094eb8baf8a30898b1a93bbc00f /src
parent232e04b6199090b5805b3e7148fcfce3b8ef9956 (diff)
parentfea28b79fb0a211b2ea8515dab25db19099ea595 (diff)
Merge github.com:grpc/grpc into i-want-to-wait-free
Diffstat (limited to 'src')
-rw-r--r--src/compiler/csharp_generator.cc50
-rw-r--r--src/compiler/objective_c_generator.cc2
-rw-r--r--src/core/client_config/lb_policies/pick_first.h2
-rw-r--r--src/core/client_config/uri_parser.c2
-rw-r--r--src/core/iomgr/pollset_set.h2
-rw-r--r--src/core/surface/byte_buffer_queue.c8
-rw-r--r--src/core/surface/byte_buffer_queue.h2
-rw-r--r--src/core/surface/call.c13
-rw-r--r--src/core/surface/completion_queue.c6
-rw-r--r--src/core/transport/chttp2/frame_window_update.c4
-rw-r--r--src/core/transport/chttp2/internal.h20
-rw-r--r--src/core/transport/chttp2/parsing.c7
-rw-r--r--src/core/transport/chttp2/stream_lists.c7
-rw-r--r--src/core/transport/chttp2/writing.c49
-rw-r--r--src/core/transport/chttp2_transport.c30
-rw-r--r--src/core/transport/transport.h4
-rw-r--r--src/core/transport/transport_op_string.c3
-rw-r--r--src/cpp/common/auth_property_iterator.cc87
-rw-r--r--src/cpp/common/secure_auth_context.cc16
-rw-r--r--src/cpp/common/secure_auth_context.h4
-rw-r--r--src/cpp/server/create_default_thread_pool.cc4
-rw-r--r--src/cpp/server/fixed_size_thread_pool.cc (renamed from src/cpp/server/thread_pool.cc)15
-rw-r--r--src/cpp/server/server_builder.cc2
-rw-r--r--src/cpp/server/server_context.cc2
-rw-r--r--src/csharp/Grpc.Auth/OAuth2InterceptorFactory.cs8
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs10
-rw-r--r--src/csharp/Grpc.Core/Calls.cs2
-rw-r--r--src/csharp/Grpc.Core/ClientBase.cs (renamed from src/csharp/Grpc.Core/Stub/AbstractStub.cs)41
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.csproj3
-rw-r--r--src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs8
-rw-r--r--src/csharp/Grpc.Core/Metadata.cs177
-rw-r--r--src/csharp/Grpc.Core/Stub/StubConfiguration.cs64
-rw-r--r--src/csharp/Grpc.Core/Version.cs1
-rw-r--r--src/csharp/Grpc.Examples.MathClient/MathClient.cs14
-rw-r--r--src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs10
-rw-r--r--src/csharp/Grpc.Examples/MathExamples.cs26
-rw-r--r--src/csharp/Grpc.Examples/MathGrpc.cs58
-rw-r--r--src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs2
-rw-r--r--src/csharp/Grpc.HealthCheck/HealthGrpc.cs34
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClient.cs9
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs2
-rw-r--r--src/csharp/Grpc.IntegrationTesting/TestGrpc.cs82
-rwxr-xr-xsrc/csharp/generate_proto_csharp.sh9
-rw-r--r--src/node/test/surface_test.js42
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.h4
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m4
-rw-r--r--src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h5
-rw-r--r--src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m4
-rw-r--r--src/objective-c/ProtoRPC/ProtoRPC.h2
-rw-r--r--src/objective-c/ProtoRPC/ProtoRPC.m16
-rw-r--r--src/objective-c/ProtoRPC/ProtoService.h4
-rw-r--r--src/objective-c/ProtoRPC/ProtoService.m2
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.h2
-rw-r--r--src/objective-c/RxLibrary/GRXForwardingWriter.h (renamed from src/cpp/server/thread_pool.h)46
-rw-r--r--src/objective-c/RxLibrary/GRXForwardingWriter.m112
-rw-r--r--src/objective-c/RxLibrary/GRXImmediateWriter.h14
-rw-r--r--src/objective-c/RxLibrary/GRXImmediateWriter.m12
-rw-r--r--src/objective-c/RxLibrary/GRXWriter+Immediate.m12
-rw-r--r--src/objective-c/RxLibrary/GRXWriter.h11
-rw-r--r--src/objective-c/RxLibrary/GRXWriter.m76
-rw-r--r--src/objective-c/RxLibrary/transformations/GRXMappingWriter.h6
-rw-r--r--src/objective-c/RxLibrary/transformations/GRXMappingWriter.m6
-rw-r--r--src/objective-c/tests/GRPCClientTests.m12
-rw-r--r--src/objective-c/tests/InteropTests.m23
-rw-r--r--src/objective-c/tests/LocalClearTextTests.m6
-rwxr-xr-xsrc/php/bin/determine_extension_dir.sh2
66 files changed, 787 insertions, 537 deletions
diff --git a/src/compiler/csharp_generator.cc b/src/compiler/csharp_generator.cc
index ccb0b688b6..1910e9bd2d 100644
--- a/src/compiler/csharp_generator.cc
+++ b/src/compiler/csharp_generator.cc
@@ -257,7 +257,7 @@ void GenerateStaticMethodField(Printer* out, const MethodDescriptor *method) {
}
void GenerateClientInterface(Printer* out, const ServiceDescriptor *service) {
- out->Print("// client-side stub interface\n");
+ out->Print("// client interface\n");
out->Print("public interface $name$\n", "name",
GetClientInterfaceName(service));
out->Print("{\n");
@@ -269,7 +269,7 @@ void GenerateClientInterface(Printer* out, const ServiceDescriptor *service) {
if (method_type == METHODTYPE_NO_STREAMING) {
// unary calls have an extra synchronous stub method
out->Print(
- "$response$ $methodname$($request$ request, CancellationToken token = default(CancellationToken));\n",
+ "$response$ $methodname$($request$ request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));\n",
"methodname", method->name(), "request",
GetClassName(method->input_type()), "response",
GetClassName(method->output_type()));
@@ -280,7 +280,7 @@ void GenerateClientInterface(Printer* out, const ServiceDescriptor *service) {
method_name += "Async"; // prevent name clash with synchronous method.
}
out->Print(
- "$returntype$ $methodname$($request_maybe$CancellationToken token = default(CancellationToken));\n",
+ "$returntype$ $methodname$($request_maybe$Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));\n",
"methodname", method_name, "request_maybe",
GetMethodRequestParamMaybe(method), "returntype",
GetMethodReturnTypeClient(method));
@@ -312,7 +312,7 @@ void GenerateServerInterface(Printer* out, const ServiceDescriptor *service) {
void GenerateClientStub(Printer* out, const ServiceDescriptor *service) {
out->Print("// client stub\n");
out->Print(
- "public class $name$ : AbstractStub<$name$, StubConfiguration>, $interface$\n",
+ "public class $name$ : ClientBase, $interface$\n",
"name", GetClientClassName(service), "interface",
GetClientInterfaceName(service));
out->Print("{\n");
@@ -320,12 +320,7 @@ void GenerateClientStub(Printer* out, const ServiceDescriptor *service) {
// constructors
out->Print(
- "public $name$(Channel channel) : this(channel, StubConfiguration.Default)\n",
- "name", GetClientClassName(service));
- out->Print("{\n");
- out->Print("}\n");
- out->Print(
- "public $name$(Channel channel, StubConfiguration config) : base(channel, config)\n",
+ "public $name$(Channel channel) : base(channel)\n",
"name", GetClientClassName(service));
out->Print("{\n");
out->Print("}\n");
@@ -337,16 +332,16 @@ void GenerateClientStub(Printer* out, const ServiceDescriptor *service) {
if (method_type == METHODTYPE_NO_STREAMING) {
// unary calls have an extra synchronous stub method
out->Print(
- "public $response$ $methodname$($request$ request, CancellationToken token = default(CancellationToken))\n",
+ "public $response$ $methodname$($request$ request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))\n",
"methodname", method->name(), "request",
GetClassName(method->input_type()), "response",
GetClassName(method->output_type()));
out->Print("{\n");
out->Indent();
- out->Print("var call = CreateCall($servicenamefield$, $methodfield$);\n",
+ out->Print("var call = CreateCall($servicenamefield$, $methodfield$, headers);\n",
"servicenamefield", GetServiceNameFieldName(), "methodfield",
GetMethodFieldName(method));
- out->Print("return Calls.BlockingUnaryCall(call, request, token);\n");
+ out->Print("return Calls.BlockingUnaryCall(call, request, cancellationToken);\n");
out->Outdent();
out->Print("}\n");
}
@@ -356,28 +351,28 @@ void GenerateClientStub(Printer* out, const ServiceDescriptor *service) {
method_name += "Async"; // prevent name clash with synchronous method.
}
out->Print(
- "public $returntype$ $methodname$($request_maybe$CancellationToken token = default(CancellationToken))\n",
+ "public $returntype$ $methodname$($request_maybe$Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))\n",
"methodname", method_name, "request_maybe",
GetMethodRequestParamMaybe(method), "returntype",
GetMethodReturnTypeClient(method));
out->Print("{\n");
out->Indent();
- out->Print("var call = CreateCall($servicenamefield$, $methodfield$);\n",
+ out->Print("var call = CreateCall($servicenamefield$, $methodfield$, headers);\n",
"servicenamefield", GetServiceNameFieldName(), "methodfield",
GetMethodFieldName(method));
switch (GetMethodType(method)) {
case METHODTYPE_NO_STREAMING:
- out->Print("return Calls.AsyncUnaryCall(call, request, token);\n");
+ out->Print("return Calls.AsyncUnaryCall(call, request, cancellationToken);\n");
break;
case METHODTYPE_CLIENT_STREAMING:
- out->Print("return Calls.AsyncClientStreamingCall(call, token);\n");
+ out->Print("return Calls.AsyncClientStreamingCall(call, cancellationToken);\n");
break;
case METHODTYPE_SERVER_STREAMING:
out->Print(
- "return Calls.AsyncServerStreamingCall(call, request, token);\n");
+ "return Calls.AsyncServerStreamingCall(call, request, cancellationToken);\n");
break;
case METHODTYPE_BIDI_STREAMING:
- out->Print("return Calls.AsyncDuplexStreamingCall(call, token);\n");
+ out->Print("return Calls.AsyncDuplexStreamingCall(call, cancellationToken);\n");
break;
default:
GOOGLE_LOG(FATAL)<< "Can't get here.";
@@ -423,9 +418,9 @@ void GenerateBindServiceMethod(Printer* out, const ServiceDescriptor *service) {
}
void GenerateNewStubMethods(Printer* out, const ServiceDescriptor *service) {
- out->Print("// creates a new client stub\n");
- out->Print("public static $interface$ NewStub(Channel channel)\n",
- "interface", GetClientInterfaceName(service));
+ out->Print("// creates a new client\n");
+ out->Print("public static $classname$ NewClient(Channel channel)\n",
+ "classname", GetClientClassName(service));
out->Print("{\n");
out->Indent();
out->Print("return new $classname$(channel);\n", "classname",
@@ -433,17 +428,6 @@ void GenerateNewStubMethods(Printer* out, const ServiceDescriptor *service) {
out->Outdent();
out->Print("}\n");
out->Print("\n");
-
- out->Print("// creates a new client stub\n");
- out->Print(
- "public static $interface$ NewStub(Channel channel, StubConfiguration config)\n",
- "interface", GetClientInterfaceName(service));
- out->Print("{\n");
- out->Indent();
- out->Print("return new $classname$(channel, config);\n", "classname",
- GetClientClassName(service));
- out->Outdent();
- out->Print("}\n");
}
void GenerateService(Printer* out, const ServiceDescriptor *service) {
diff --git a/src/compiler/objective_c_generator.cc b/src/compiler/objective_c_generator.cc
index 2a74a3b340..711d0d5870 100644
--- a/src/compiler/objective_c_generator.cc
+++ b/src/compiler/objective_c_generator.cc
@@ -67,7 +67,7 @@ void PrintMethodSignature(Printer *printer, const MethodDescriptor *method,
printer->Print(vars, "- ($return_type$)$method_name$With");
if (method->client_streaming()) {
- printer->Print("RequestsWriter:(id<GRXWriter>)requestWriter");
+ printer->Print("RequestsWriter:(GRXWriter *)requestWriter");
} else {
printer->Print(vars, "Request:($request_class$ *)request");
}
diff --git a/src/core/client_config/lb_policies/pick_first.h b/src/core/client_config/lb_policies/pick_first.h
index 94c2a9f0c7..31394985e5 100644
--- a/src/core/client_config/lb_policies/pick_first.h
+++ b/src/core/client_config/lb_policies/pick_first.h
@@ -36,6 +36,8 @@
#include "src/core/client_config/lb_policy.h"
+/** Returns a load balancing policy instance that picks up the first subchannel
+ * from \a subchannels to succesfully connect */
grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels,
size_t num_subchannels);
diff --git a/src/core/client_config/uri_parser.c b/src/core/client_config/uri_parser.c
index 776a255923..410a61c8cf 100644
--- a/src/core/client_config/uri_parser.c
+++ b/src/core/client_config/uri_parser.c
@@ -98,7 +98,7 @@ grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) {
if (uri_text[scheme_end + 1] == '/' && uri_text[scheme_end + 2] == '/') {
authority_begin = scheme_end + 3;
- for (i = authority_begin; uri_text[i] != 0; i++) {
+ for (i = authority_begin; uri_text[i] != 0 && authority_end == -1; i++) {
if (uri_text[i] == '/') {
authority_end = i;
}
diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h
index 98e3b552a7..6d73951c70 100644
--- a/src/core/iomgr/pollset_set.h
+++ b/src/core/iomgr/pollset_set.h
@@ -38,7 +38,7 @@
/* A grpc_pollset_set is a set of pollsets that are interested in an
action. Adding a pollset to a pollset_set automatically adds any
- fd's (etc) that have been registered with the set_set with that pollset.
+ fd's (etc) that have been registered with the set_set to that pollset.
Registering fd's automatically adds them to all current pollsets. */
#ifdef GPR_POSIX_SOCKET
diff --git a/src/core/surface/byte_buffer_queue.c b/src/core/surface/byte_buffer_queue.c
index 7c31bfe5da..e47dc4f4ce 100644
--- a/src/core/surface/byte_buffer_queue.c
+++ b/src/core/surface/byte_buffer_queue.c
@@ -62,6 +62,7 @@ int grpc_bbq_empty(grpc_byte_buffer_queue *q) {
}
void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *buffer) {
+ q->bytes += grpc_byte_buffer_length(buffer);
bba_push(&q->filling, buffer);
}
@@ -72,8 +73,11 @@ void grpc_bbq_flush(grpc_byte_buffer_queue *q) {
}
}
+size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q) { return q->bytes; }
+
grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) {
grpc_bbq_array temp_array;
+ grpc_byte_buffer *out;
if (q->drain_pos == q->draining.count) {
if (q->filling.count == 0) {
@@ -87,5 +91,7 @@ grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) {
q->draining = temp_array;
}
- return q->draining.data[q->drain_pos++];
+ out = q->draining.data[q->drain_pos++];
+ q->bytes -= grpc_byte_buffer_length(out);
+ return out;
}
diff --git a/src/core/surface/byte_buffer_queue.h b/src/core/surface/byte_buffer_queue.h
index 32c57f8756..f01958984f 100644
--- a/src/core/surface/byte_buffer_queue.h
+++ b/src/core/surface/byte_buffer_queue.h
@@ -49,6 +49,7 @@ typedef struct {
size_t drain_pos;
grpc_bbq_array filling;
grpc_bbq_array draining;
+ size_t bytes;
} grpc_byte_buffer_queue;
void grpc_bbq_destroy(grpc_byte_buffer_queue *q);
@@ -56,5 +57,6 @@ grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q);
void grpc_bbq_flush(grpc_byte_buffer_queue *q);
int grpc_bbq_empty(grpc_byte_buffer_queue *q);
void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb);
+size_t grpc_bbq_bytes(grpc_byte_buffer_queue *q);
#endif /* GRPC_INTERNAL_CORE_SURFACE_BYTE_BUFFER_QUEUE_H */
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 0a551ac47f..71f4235571 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -513,6 +513,8 @@ static void unlock(grpc_call *call) {
int completing_requests = 0;
int start_op = 0;
int i;
+ const gpr_uint32 MAX_RECV_PEEK_AHEAD = 65536;
+ size_t buffered_bytes;
int cancel_alarm = 0;
memset(&op, 0, sizeof(op));
@@ -528,6 +530,17 @@ static void unlock(grpc_call *call) {
op.recv_ops = &call->recv_ops;
op.recv_state = &call->recv_state;
op.on_done_recv = &call->on_done_recv;
+ if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) {
+ op.max_recv_bytes = call->incoming_message_length -
+ call->incoming_message.length + MAX_RECV_PEEK_AHEAD;
+ } else {
+ buffered_bytes = grpc_bbq_bytes(&call->incoming_queue);
+ if (buffered_bytes > MAX_RECV_PEEK_AHEAD) {
+ op.max_recv_bytes = 0;
+ } else {
+ op.max_recv_bytes = MAX_RECV_PEEK_AHEAD - buffered_bytes;
+ }
+ }
call->receiving = 1;
GRPC_CALL_INTERNAL_REF(call, "receiving");
start_op = 1;
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index c67f75fc5c..f3630b31dd 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -116,7 +116,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc) {
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
void (*done)(void *done_arg, grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage) {
- int shutdown = gpr_unref(&cc->pending_events);
+ int shutdown;
storage->tag = tag;
storage->done = done;
@@ -124,15 +124,15 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
storage->next =
((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0));
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ shutdown = gpr_unref(&cc->pending_events);
if (!shutdown) {
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
cc->completed_tail->next =
((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
cc->completed_tail = storage;
grpc_pollset_kick(&cc->pollset);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
} else {
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
cc->completed_tail->next =
((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
cc->completed_tail = storage;
diff --git a/src/core/transport/chttp2/frame_window_update.c b/src/core/transport/chttp2/frame_window_update.c
index b817df7745..d624298ad2 100644
--- a/src/core/transport/chttp2/frame_window_update.c
+++ b/src/core/transport/chttp2/frame_window_update.c
@@ -94,8 +94,8 @@ grpc_chttp2_parse_error grpc_chttp2_window_update_parser_parse(
}
GPR_ASSERT(is_last);
- if (transport_parsing->incoming_stream_id) {
- if (stream_parsing) {
+ if (transport_parsing->incoming_stream_id != 0) {
+ if (stream_parsing != NULL) {
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("update", transport_parsing,
stream_parsing, outgoing_window_update,
p->amount);
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index bdd4b432eb..e5e6f445b7 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -353,7 +353,19 @@ typedef struct {
/** window available for us to send to peer */
gpr_int64 outgoing_window;
- /** window available for peer to send to us - updated after parse */
+ /** The number of bytes the upper layers have offered to receive.
+ As the upper layer offers more bytes, this value increases.
+ As bytes are read, this value decreases. */
+ gpr_uint32 max_recv_bytes;
+ /** The number of bytes the upper layer has offered to read but we have
+ not yet announced to HTTP2 flow control.
+ As the upper layers offer to read more bytes, this value increases.
+ As we advertise incoming flow control window, this value decreases. */
+ gpr_uint32 unannounced_incoming_window;
+ /** The number of bytes of HTTP2 flow control we have advertised.
+ As we advertise incoming flow control window, this value increases.
+ As bytes are read, this value decreases.
+ Updated after parse. */
gpr_uint32 incoming_window;
/** stream ops the transport user would like to send */
grpc_stream_op_buffer *outgoing_sopb;
@@ -391,6 +403,8 @@ typedef struct {
grpc_stream_op_buffer sopb;
/** how strongly should we indicate closure with the next write */
grpc_chttp2_send_closed send_closed;
+ /** how much window should we announce? */
+ gpr_uint32 announce_window;
} grpc_chttp2_stream_writing;
struct grpc_chttp2_stream_parsing {
@@ -501,7 +515,9 @@ void grpc_chttp2_list_add_writable_window_update_stream(
grpc_chttp2_stream_global *stream_global);
int grpc_chttp2_list_pop_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global **stream_global);
+ grpc_chttp2_transport_writing *transport_writing,
+ grpc_chttp2_stream_global **stream_global,
+ grpc_chttp2_stream_writing **stream_writing);
void grpc_chttp2_list_remove_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c
index 9597395aab..82362544d5 100644
--- a/src/core/transport/chttp2/parsing.c
+++ b/src/core/transport/chttp2/parsing.c
@@ -173,7 +173,14 @@ void grpc_chttp2_publish_reads(
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
"parsed", transport_parsing, stream_parsing, incoming_window_delta,
-(gpr_int64)stream_parsing->incoming_window_delta);
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
+ "parsed", transport_parsing, stream_global, max_recv_bytes,
+ -(gpr_int64)stream_parsing->incoming_window_delta);
stream_global->incoming_window -= stream_parsing->incoming_window_delta;
+ GPR_ASSERT(stream_global->max_recv_bytes >=
+ stream_parsing->incoming_window_delta);
+ stream_global->max_recv_bytes -=
+ stream_parsing->incoming_window_delta;
stream_parsing->incoming_window_delta = 0;
grpc_chttp2_list_add_writable_window_update_stream(transport_global,
stream_global);
diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c
index 4fea058c19..590f6abfbc 100644
--- a/src/core/transport/chttp2/stream_lists.c
+++ b/src/core/transport/chttp2/stream_lists.c
@@ -139,6 +139,7 @@ static void stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s,
void grpc_chttp2_list_add_writable_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
+ GPR_ASSERT(stream_global->id != 0);
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global), GRPC_CHTTP2_LIST_WRITABLE);
}
@@ -204,6 +205,7 @@ int grpc_chttp2_list_pop_written_stream(
void grpc_chttp2_list_add_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
+ GPR_ASSERT(stream_global->id != 0);
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE);
@@ -211,11 +213,14 @@ void grpc_chttp2_list_add_writable_window_update_stream(
int grpc_chttp2_list_pop_writable_window_update_stream(
grpc_chttp2_transport_global *transport_global,
- grpc_chttp2_stream_global **stream_global) {
+ grpc_chttp2_transport_writing *transport_writing,
+ grpc_chttp2_stream_global **stream_global,
+ grpc_chttp2_stream_writing **stream_writing) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_WRITABLE_WINDOW_UPDATE);
*stream_global = &stream->global;
+ *stream_writing = &stream->writing;
return r;
}
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index a78654334e..d8ec117aa5 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -66,11 +66,9 @@ int grpc_chttp2_unlocking_check_writes(
/* for each grpc_chttp2_stream that's become writable, frame it's data
(according to
available window sizes) and add to the output buffer */
- while (transport_global->outgoing_window &&
- grpc_chttp2_list_pop_writable_stream(transport_global,
+ while (grpc_chttp2_list_pop_writable_stream(transport_global,
transport_writing, &stream_global,
- &stream_writing) &&
- stream_global->outgoing_window > 0) {
+ &stream_writing)) {
stream_writing->id = stream_global->id;
window_delta = grpc_chttp2_preencode(
stream_global->outgoing_sopb->ops, &stream_global->outgoing_sopb->nops,
@@ -106,20 +104,21 @@ int grpc_chttp2_unlocking_check_writes(
/* for each grpc_chttp2_stream that wants to update its window, add that
* window here */
while (grpc_chttp2_list_pop_writable_window_update_stream(transport_global,
- &stream_global)) {
- window_delta =
- transport_global->settings[GRPC_LOCAL_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] -
- stream_global->incoming_window;
- if (!stream_global->read_closed && window_delta > 0) {
- gpr_slice_buffer_add(
- &transport_writing->outbuf,
- grpc_chttp2_window_update_create(stream_global->id, window_delta));
+ transport_writing,
+ &stream_global,
+ &stream_writing)) {
+ stream_writing->id = stream_global->id;
+ if (!stream_global->read_closed && stream_global->unannounced_incoming_window > 0) {
+ stream_writing->announce_window = stream_global->unannounced_incoming_window;
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global,
- incoming_window, window_delta);
- stream_global->incoming_window += window_delta;
+ incoming_window, stream_global->unannounced_incoming_window);
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("write", transport_global, stream_global,
+ unannounced_incoming_window, -(gpr_int64)stream_global->unannounced_incoming_window);
+ stream_global->incoming_window += stream_global->unannounced_incoming_window;
+ stream_global->unannounced_incoming_window = 0;
grpc_chttp2_list_add_incoming_window_updated(transport_global,
stream_global);
+ grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
}
}
@@ -169,10 +168,19 @@ static void finalize_outbuf(grpc_chttp2_transport_writing *transport_writing) {
while (
grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) {
- grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops,
- stream_writing->send_closed != GRPC_DONT_SEND_CLOSED,
- stream_writing->id, &transport_writing->hpack_compressor,
- &transport_writing->outbuf);
+ if (stream_writing->sopb.nops > 0 || stream_writing->send_closed != GRPC_DONT_SEND_CLOSED) {
+ grpc_chttp2_encode(stream_writing->sopb.ops, stream_writing->sopb.nops,
+ stream_writing->send_closed != GRPC_DONT_SEND_CLOSED,
+ stream_writing->id, &transport_writing->hpack_compressor,
+ &transport_writing->outbuf);
+ }
+ if (stream_writing->announce_window > 0) {
+ gpr_slice_buffer_add(
+ &transport_writing->outbuf,
+ grpc_chttp2_window_update_create(
+ stream_writing->id, stream_writing->announce_window));
+ stream_writing->announce_window = 0;
+ }
stream_writing->sopb.nops = 0;
if (stream_writing->send_closed == GRPC_SEND_CLOSED_WITH_RST_STREAM) {
gpr_slice_buffer_add(&transport_writing->outbuf,
@@ -197,7 +205,8 @@ void grpc_chttp2_cleanup_writing(
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
- if (stream_global->outgoing_sopb->nops == 0) {
+ if (stream_global->outgoing_sopb != NULL &&
+ stream_global->outgoing_sopb->nops == 0) {
stream_global->outgoing_sopb = NULL;
grpc_chttp2_schedule_closure(transport_global,
stream_global->send_done_closure, 1);
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index ac399e4a1d..c923d5e42f 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -358,7 +358,9 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
s->global.outgoing_window =
t->global.settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
- s->parsing.incoming_window = s->global.incoming_window =
+ s->global.max_recv_bytes =
+ s->parsing.incoming_window =
+ s->global.incoming_window =
t->global.settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
*t->accepting_stream = s;
@@ -562,6 +564,8 @@ static void maybe_start_some_streams(
stream_global->incoming_window =
transport_global->settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
+ stream_global->max_recv_bytes =
+ GPR_MAX(stream_global->incoming_window, stream_global->max_recv_bytes);
grpc_chttp2_stream_map_add(
&TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map,
stream_global->id, STREAM_FROM_GLOBAL(stream_global));
@@ -570,6 +574,9 @@ static void maybe_start_some_streams(
grpc_chttp2_list_add_incoming_window_updated(transport_global,
stream_global);
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
+ grpc_chttp2_list_add_writable_window_update_stream(transport_global,
+ stream_global);
+
}
/* cancel out streams that will never be started */
while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID &&
@@ -620,12 +627,23 @@ static void perform_stream_op_locked(
stream_global->publish_sopb = op->recv_ops;
stream_global->publish_sopb->nops = 0;
stream_global->publish_state = op->recv_state;
+ if (stream_global->max_recv_bytes < op->max_recv_bytes) {
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("op", transport_global, stream_global,
+ max_recv_bytes, op->max_recv_bytes - stream_global->max_recv_bytes);
+ GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
+ "op", transport_global, stream_global, unannounced_incoming_window,
+ op->max_recv_bytes - stream_global->max_recv_bytes);
+ stream_global->unannounced_incoming_window += op->max_recv_bytes - stream_global->max_recv_bytes;
+ stream_global->max_recv_bytes = op->max_recv_bytes;
+ }
grpc_chttp2_incoming_metadata_live_op_buffer_end(
&stream_global->outstanding_metadata);
- grpc_chttp2_list_add_read_write_state_changed(transport_global,
- stream_global);
- grpc_chttp2_list_add_writable_window_update_stream(transport_global,
- stream_global);
+ if (stream_global->id != 0) {
+ grpc_chttp2_list_add_read_write_state_changed(transport_global,
+ stream_global);
+ grpc_chttp2_list_add_writable_window_update_stream(transport_global,
+ stream_global);
+ }
}
if (op->bind_pollset) {
@@ -1038,7 +1056,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason,
identifier = gpr_strdup(context_scope);
}
gpr_log(GPR_INFO,
- "FLOWCTL: %s %-10s %8s %-23s %8lld %c %8lld = %8lld %-10s [%s:%d]",
+ "FLOWCTL: %s %-10s %8s %-27s %8lld %c %8lld = %8lld %-10s [%s:%d]",
is_client ? "client" : "server", identifier, context_thread, var,
current_value, delta < 0 ? '-' : '+', delta < 0 ? -delta : delta,
current_value + delta, reason, file, line);
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 1429737721..64503604ee 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -72,6 +72,10 @@ typedef struct grpc_transport_stream_op {
grpc_stream_op_buffer *recv_ops;
grpc_stream_state *recv_state;
+ /** The number of bytes this peer is currently prepared to receive.
+ These bytes will be eventually used to replenish per-stream flow control
+ windows. */
+ gpr_uint32 max_recv_bytes;
grpc_iomgr_closure *on_done_recv;
grpc_pollset *bind_pollset;
diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c
index 0da396a320..862eb40c4b 100644
--- a/src/core/transport/transport_op_string.c
+++ b/src/core/transport/transport_op_string.c
@@ -128,7 +128,8 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) {
if (op->recv_ops) {
if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
first = 0;
- gpr_strvec_add(&b, gpr_strdup("RECV"));
+ gpr_asprintf(&tmp, "RECV:max_recv_bytes=%d", op->max_recv_bytes);
+ gpr_strvec_add(&b, tmp);
}
if (op->bind_pollset) {
diff --git a/src/cpp/common/auth_property_iterator.cc b/src/cpp/common/auth_property_iterator.cc
new file mode 100644
index 0000000000..e706c6c921
--- /dev/null
+++ b/src/cpp/common/auth_property_iterator.cc
@@ -0,0 +1,87 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc++/auth_property_iterator.h>
+
+#include <grpc/grpc_security.h>
+
+namespace grpc {
+
+AuthPropertyIterator::AuthPropertyIterator()
+ : property_(nullptr), ctx_(nullptr), index_(0), name_(nullptr) {}
+
+AuthPropertyIterator::AuthPropertyIterator(
+ const grpc_auth_property* property, const grpc_auth_property_iterator* iter)
+ : property_(property),
+ ctx_(iter->ctx),
+ index_(iter->index),
+ name_(iter->name) {}
+
+AuthPropertyIterator::~AuthPropertyIterator() {}
+
+AuthPropertyIterator& AuthPropertyIterator::operator++() {
+ grpc_auth_property_iterator iter = {ctx_, index_, name_};
+ property_ = grpc_auth_property_iterator_next(&iter);
+ ctx_ = iter.ctx;
+ index_ = iter.index;
+ name_ = iter.name;
+ return *this;
+}
+
+AuthPropertyIterator AuthPropertyIterator::operator++(int) {
+ AuthPropertyIterator tmp(*this);
+ operator++();
+ return tmp;
+}
+
+bool AuthPropertyIterator::operator==(
+ const AuthPropertyIterator& rhs) const {
+ if (property_ == nullptr || rhs.property_ == nullptr) {
+ return property_ == rhs.property_;
+ } else {
+ return index_ == rhs.index_;
+ }
+}
+
+bool AuthPropertyIterator::operator!=(
+ const AuthPropertyIterator& rhs) const {
+ return !operator==(rhs);
+}
+
+const AuthProperty AuthPropertyIterator::operator*() {
+ return std::make_pair<grpc::string, grpc::string>(
+ grpc::string(property_->name),
+ grpc::string(property_->value, property_->value_length));
+}
+
+} // namespace grpc
diff --git a/src/cpp/common/secure_auth_context.cc b/src/cpp/common/secure_auth_context.cc
index 4513723653..87d7bab75c 100644
--- a/src/cpp/common/secure_auth_context.cc
+++ b/src/cpp/common/secure_auth_context.cc
@@ -77,4 +77,20 @@ std::vector<grpc::string> SecureAuthContext::FindPropertyValues(
return values;
}
+AuthPropertyIterator SecureAuthContext::begin() const {
+ if (ctx_) {
+ grpc_auth_property_iterator iter =
+ grpc_auth_context_property_iterator(ctx_);
+ const grpc_auth_property* property =
+ grpc_auth_property_iterator_next(&iter);
+ return AuthPropertyIterator(property, &iter);
+ } else {
+ return end();
+ }
+}
+
+AuthPropertyIterator SecureAuthContext::end() const {
+ return AuthPropertyIterator();
+}
+
} // namespace grpc
diff --git a/src/cpp/common/secure_auth_context.h b/src/cpp/common/secure_auth_context.h
index bba46803cd..264ed620a3 100644
--- a/src/cpp/common/secure_auth_context.h
+++ b/src/cpp/common/secure_auth_context.h
@@ -53,6 +53,10 @@ class SecureAuthContext GRPC_FINAL : public AuthContext {
std::vector<grpc::string> FindPropertyValues(const grpc::string& name) const
GRPC_OVERRIDE;
+ AuthPropertyIterator begin() const GRPC_OVERRIDE;
+
+ AuthPropertyIterator end() const GRPC_OVERRIDE;
+
private:
grpc_auth_context* ctx_;
};
diff --git a/src/cpp/server/create_default_thread_pool.cc b/src/cpp/server/create_default_thread_pool.cc
index 89c1d7e929..cc182f59f4 100644
--- a/src/cpp/server/create_default_thread_pool.cc
+++ b/src/cpp/server/create_default_thread_pool.cc
@@ -32,7 +32,7 @@
*/
#include <grpc/support/cpu.h>
-#include "src/cpp/server/thread_pool.h"
+#include <grpc++/fixed_size_thread_pool.h>
#ifndef GRPC_CUSTOM_DEFAULT_THREAD_POOL
@@ -41,7 +41,7 @@ namespace grpc {
ThreadPoolInterface* CreateDefaultThreadPool() {
int cores = gpr_cpu_num_cores();
if (!cores) cores = 4;
- return new ThreadPool(cores);
+ return new FixedSizeThreadPool(cores);
}
} // namespace grpc
diff --git a/src/cpp/server/thread_pool.cc b/src/cpp/server/fixed_size_thread_pool.cc
index 118cabcb61..710bcbb573 100644
--- a/src/cpp/server/thread_pool.cc
+++ b/src/cpp/server/fixed_size_thread_pool.cc
@@ -33,12 +33,11 @@
#include <grpc++/impl/sync.h>
#include <grpc++/impl/thd.h>
-
-#include "src/cpp/server/thread_pool.h"
+#include <grpc++/fixed_size_thread_pool.h>
namespace grpc {
-void ThreadPool::ThreadFunc() {
+void FixedSizeThreadPool::ThreadFunc() {
for (;;) {
// Wait until work is available or we are shutting down.
grpc::unique_lock<grpc::mutex> lock(mu_);
@@ -58,13 +57,14 @@ void ThreadPool::ThreadFunc() {
}
}
-ThreadPool::ThreadPool(int num_threads) : shutdown_(false) {
+FixedSizeThreadPool::FixedSizeThreadPool(int num_threads) : shutdown_(false) {
for (int i = 0; i < num_threads; i++) {
- threads_.push_back(new grpc::thread(&ThreadPool::ThreadFunc, this));
+ threads_.push_back(
+ new grpc::thread(&FixedSizeThreadPool::ThreadFunc, this));
}
}
-ThreadPool::~ThreadPool() {
+FixedSizeThreadPool::~FixedSizeThreadPool() {
{
grpc::lock_guard<grpc::mutex> lock(mu_);
shutdown_ = true;
@@ -76,7 +76,8 @@ ThreadPool::~ThreadPool() {
}
}
-void ThreadPool::ScheduleCallback(const std::function<void()>& callback) {
+void FixedSizeThreadPool::ScheduleCallback(
+ const std::function<void()>& callback) {
grpc::lock_guard<grpc::mutex> lock(mu_);
callbacks_.push(callback);
cv_.notify_one();
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 86c78f05ff..f723d4611a 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -37,7 +37,7 @@
#include <grpc/support/log.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/server.h>
-#include "src/cpp/server/thread_pool.h"
+#include <grpc++/thread_pool_interface.h>
namespace grpc {
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 0be77138d1..3b8a026996 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -144,7 +144,7 @@ void ServerContext::AddTrailingMetadata(const grpc::string& key,
trailing_metadata_.insert(std::make_pair(key, value));
}
-bool ServerContext::IsCancelled() {
+bool ServerContext::IsCancelled() const {
return completion_op_ && completion_op_->CheckCancelled(cq_);
}
diff --git a/src/csharp/Grpc.Auth/OAuth2InterceptorFactory.cs b/src/csharp/Grpc.Auth/OAuth2InterceptorFactory.cs
index ca384d1a6e..420c4cb537 100644
--- a/src/csharp/Grpc.Auth/OAuth2InterceptorFactory.cs
+++ b/src/csharp/Grpc.Auth/OAuth2InterceptorFactory.cs
@@ -52,10 +52,10 @@ namespace Grpc.Auth
/// <summary>
/// Creates OAuth2 interceptor.
/// </summary>
- public static HeaderInterceptorDelegate Create(GoogleCredential googleCredential)
+ public static MetadataInterceptorDelegate Create(GoogleCredential googleCredential)
{
var interceptor = new OAuth2Interceptor(googleCredential.InternalCredential, SystemClock.Default);
- return new HeaderInterceptorDelegate(interceptor.InterceptHeaders);
+ return new MetadataInterceptorDelegate(interceptor.InterceptHeaders);
}
/// <summary>
@@ -94,10 +94,10 @@ namespace Grpc.Auth
return credential.Token.AccessToken;
}
- public void InterceptHeaders(Metadata.Builder headerBuilder)
+ public void InterceptHeaders(Metadata metadata)
{
var accessToken = GetAccessToken(CancellationToken.None);
- headerBuilder.Add(new Metadata.MetadataEntry(AuthorizationHeader, Schema + " " + accessToken));
+ metadata.Add(new Metadata.Entry(AuthorizationHeader, Schema + " " + accessToken));
}
}
}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs b/src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs
index 2f6013483d..320423b245 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs
@@ -44,17 +44,17 @@ namespace Grpc.Core.Internal.Tests
[Test]
public void CreateEmptyAndDestroy()
{
- var metadata = Metadata.CreateBuilder().Build();
- var nativeMetadata = MetadataArraySafeHandle.Create(metadata);
+ var nativeMetadata = MetadataArraySafeHandle.Create(new Metadata());
nativeMetadata.Dispose();
}
[Test]
public void CreateAndDestroy()
{
- var metadata = Metadata.CreateBuilder()
- .Add(new Metadata.MetadataEntry("host", "somehost"))
- .Add(new Metadata.MetadataEntry("header2", "header value")).Build();
+ var metadata = new Metadata {
+ new Metadata.Entry("host", "somehost"),
+ new Metadata.Entry("header2", "header value"),
+ };
var nativeMetadata = MetadataArraySafeHandle.Create(metadata);
nativeMetadata.Dispose();
}
diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs
index 750282258f..9e95182c72 100644
--- a/src/csharp/Grpc.Core/Calls.cs
+++ b/src/csharp/Grpc.Core/Calls.cs
@@ -39,7 +39,7 @@ using Grpc.Core.Internal;
namespace Grpc.Core
{
/// <summary>
- /// Helper methods for generated client stubs to make RPC calls.
+ /// Helper methods for generated clients to make RPC calls.
/// </summary>
public static class Calls
{
diff --git a/src/csharp/Grpc.Core/Stub/AbstractStub.cs b/src/csharp/Grpc.Core/ClientBase.cs
index 4a8b254357..a099f96aea 100644
--- a/src/csharp/Grpc.Core/Stub/AbstractStub.cs
+++ b/src/csharp/Grpc.Core/ClientBase.cs
@@ -32,26 +32,39 @@
#endregion
using System;
+using System.Collections.Generic;
+
using Grpc.Core.Internal;
namespace Grpc.Core
{
- // TODO: support adding timeout to methods.
+ public delegate void MetadataInterceptorDelegate(Metadata metadata);
+
/// <summary>
- /// Base for client-side stubs.
+ /// Base class for client-side stubs.
/// </summary>
- public abstract class AbstractStub<TStub, TConfig>
- where TConfig : StubConfiguration
+ public abstract class ClientBase
{
readonly Channel channel;
- readonly TConfig config;
- public AbstractStub(Channel channel, TConfig config)
+ public ClientBase(Channel channel)
{
this.channel = channel;
- this.config = config;
}
+ /// <summary>
+ /// Can be used to register a custom header (initial metadata) interceptor.
+ /// The delegate each time before a new call on this client is started.
+ /// </summary>
+ public MetadataInterceptorDelegate HeaderInterceptor
+ {
+ get;
+ set;
+ }
+
+ /// <summary>
+ /// Channel associated with this client.
+ /// </summary>
public Channel Channel
{
get
@@ -63,13 +76,19 @@ namespace Grpc.Core
/// <summary>
/// Creates a new call to given method.
/// </summary>
- protected Call<TRequest, TResponse> CreateCall<TRequest, TResponse>(string serviceName, Method<TRequest, TResponse> method)
+ protected Call<TRequest, TResponse> CreateCall<TRequest, TResponse>(string serviceName, Method<TRequest, TResponse> method, Metadata metadata)
where TRequest : class
where TResponse : class
{
- var headerBuilder = Metadata.CreateBuilder();
- config.HeaderInterceptor(headerBuilder);
- return new Call<TRequest, TResponse>(serviceName, method, channel, headerBuilder.Build());
+ var interceptor = HeaderInterceptor;
+ if (interceptor != null)
+ {
+ metadata = metadata ?? new Metadata();
+ interceptor(metadata);
+ metadata.Freeze();
+ }
+ metadata = metadata ?? Metadata.Empty;
+ return new Call<TRequest, TResponse>(serviceName, method, channel, metadata);
}
}
}
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index cde42c3b7e..a227fe5477 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -88,8 +88,7 @@
<Compile Include="ServerCredentials.cs" />
<Compile Include="Metadata.cs" />
<Compile Include="Internal\MetadataArraySafeHandle.cs" />
- <Compile Include="Stub\AbstractStub.cs" />
- <Compile Include="Stub\StubConfiguration.cs" />
+ <Compile Include="ClientBase.cs" />
<Compile Include="Internal\ServerCalls.cs" />
<Compile Include="ServerMethods.cs" />
<Compile Include="Internal\ClientRequestStream.cs" />
diff --git a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
index c9c4d954c9..80aa7f5603 100644
--- a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
@@ -54,11 +54,11 @@ namespace Grpc.Core.Internal
public static MetadataArraySafeHandle Create(Metadata metadata)
{
- var entries = metadata.Entries;
- var metadataArray = grpcsharp_metadata_array_create(new UIntPtr((ulong)entries.Count));
- for (int i = 0; i < entries.Count; i++)
+ // TODO(jtattermusch): we might wanna check that the metadata is readonly
+ var metadataArray = grpcsharp_metadata_array_create(new UIntPtr((ulong)metadata.Count));
+ for (int i = 0; i < metadata.Count; i++)
{
- grpcsharp_metadata_array_add(metadataArray, entries[i].Key, entries[i].ValueBytes, new UIntPtr((ulong)entries[i].ValueBytes.Length));
+ grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, metadata[i].ValueBytes, new UIntPtr((ulong)metadata[i].ValueBytes.Length));
}
return metadataArray;
}
diff --git a/src/csharp/Grpc.Core/Metadata.cs b/src/csharp/Grpc.Core/Metadata.cs
index eccec26a61..4552d39d88 100644
--- a/src/csharp/Grpc.Core/Metadata.cs
+++ b/src/csharp/Grpc.Core/Metadata.cs
@@ -30,55 +30,163 @@
#endregion
using System;
+using System.Collections;
using System.Collections.Generic;
using System.Collections.Immutable;
+using System.Collections.Specialized;
using System.Runtime.InteropServices;
using System.Text;
+using Grpc.Core.Utils;
+
namespace Grpc.Core
{
/// <summary>
- /// gRPC call metadata.
+ /// Provides access to read and write metadata values to be exchanged during a call.
/// </summary>
- public class Metadata
+ public sealed class Metadata : IList<Metadata.Entry>
{
- public static readonly Metadata Empty = new Metadata(ImmutableList<MetadataEntry>.Empty);
+ /// <summary>
+ /// An read-only instance of metadata containing no entries.
+ /// </summary>
+ public static readonly Metadata Empty = new Metadata().Freeze();
+
+ readonly List<Entry> entries;
+ bool readOnly;
+
+ public Metadata()
+ {
+ this.entries = new List<Entry>();
+ }
+
+ public Metadata(ICollection<Entry> entries)
+ {
+ this.entries = new List<Entry>(entries);
+ }
+
+ /// <summary>
+ /// Makes this object read-only.
+ /// </summary>
+ /// <returns>this object</returns>
+ public Metadata Freeze()
+ {
+ this.readOnly = true;
+ return this;
+ }
+
+ // TODO: add support for access by key
+
+ #region IList members
+
+ public int IndexOf(Metadata.Entry item)
+ {
+ return entries.IndexOf(item);
+ }
- readonly ImmutableList<MetadataEntry> entries;
+ public void Insert(int index, Metadata.Entry item)
+ {
+ CheckWriteable();
+ entries.Insert(index, item);
+ }
- public Metadata(ImmutableList<MetadataEntry> entries)
+ public void RemoveAt(int index)
{
- this.entries = entries;
+ CheckWriteable();
+ entries.RemoveAt(index);
}
- public ImmutableList<MetadataEntry> Entries
+ public Metadata.Entry this[int index]
{
get
{
- return this.entries;
+ return entries[index];
+ }
+
+ set
+ {
+ CheckWriteable();
+ entries[index] = value;
}
}
- public static Builder CreateBuilder()
+ public void Add(Metadata.Entry item)
+ {
+ CheckWriteable();
+ entries.Add(item);
+ }
+
+ public void Clear()
+ {
+ CheckWriteable();
+ entries.Clear();
+ }
+
+ public bool Contains(Metadata.Entry item)
+ {
+ return entries.Contains(item);
+ }
+
+ public void CopyTo(Metadata.Entry[] array, int arrayIndex)
{
- return new Builder();
+ entries.CopyTo(array, arrayIndex);
}
-
- public struct MetadataEntry
+
+ public int Count
+ {
+ get { return entries.Count; }
+ }
+
+ public bool IsReadOnly
+ {
+ get { return readOnly; }
+ }
+
+ public bool Remove(Metadata.Entry item)
+ {
+ CheckWriteable();
+ return entries.Remove(item);
+ }
+
+ public IEnumerator<Metadata.Entry> GetEnumerator()
+ {
+ return entries.GetEnumerator();
+ }
+
+ IEnumerator System.Collections.IEnumerable.GetEnumerator()
+ {
+ return entries.GetEnumerator();
+ }
+
+ private void CheckWriteable()
+ {
+ Preconditions.CheckState(!readOnly, "Object is read only");
+ }
+
+ #endregion
+
+ /// <summary>
+ /// Metadata entry
+ /// </summary>
+ public struct Entry
{
+ private static readonly Encoding Encoding = Encoding.ASCII;
+
readonly string key;
- readonly byte[] valueBytes;
+ string value;
+ byte[] valueBytes;
- public MetadataEntry(string key, byte[] valueBytes)
+ public Entry(string key, byte[] valueBytes)
{
- this.key = key;
- this.valueBytes = valueBytes;
+ this.key = Preconditions.CheckNotNull(key);
+ this.value = null;
+ this.valueBytes = Preconditions.CheckNotNull(valueBytes);
}
- public MetadataEntry(string key, string value)
+ public Entry(string key, string value)
{
- this.key = key;
- this.valueBytes = Encoding.ASCII.GetBytes(value);
+ this.key = Preconditions.CheckNotNull(key);
+ this.value = Preconditions.CheckNotNull(value);
+ this.valueBytes = null;
}
public string Key
@@ -89,38 +197,29 @@ namespace Grpc.Core
}
}
- // TODO: using ByteString would guarantee immutability.
public byte[] ValueBytes
{
get
{
- return this.valueBytes;
+ if (valueBytes == null)
+ {
+ valueBytes = Encoding.GetBytes(value);
+ }
+ return valueBytes;
}
}
- }
- public class Builder
- {
- readonly List<Metadata.MetadataEntry> entries = new List<Metadata.MetadataEntry>();
-
- public List<MetadataEntry> Entries
+ public string Value
{
get
{
- return entries;
+ if (value == null)
+ {
+ value = Encoding.GetString(valueBytes);
+ }
+ return value;
}
}
-
- public Builder Add(MetadataEntry entry)
- {
- entries.Add(entry);
- return this;
- }
-
- public Metadata Build()
- {
- return new Metadata(entries.ToImmutableList());
- }
}
}
}
diff --git a/src/csharp/Grpc.Core/Stub/StubConfiguration.cs b/src/csharp/Grpc.Core/Stub/StubConfiguration.cs
deleted file mode 100644
index 5bcb5b40d2..0000000000
--- a/src/csharp/Grpc.Core/Stub/StubConfiguration.cs
+++ /dev/null
@@ -1,64 +0,0 @@
-#region Copyright notice and license
-
-// Copyright 2015, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-#endregion
-
-using System;
-using Grpc.Core.Internal;
-using Grpc.Core.Utils;
-
-namespace Grpc.Core
-{
- public delegate void HeaderInterceptorDelegate(Metadata.Builder headerBuilder);
-
- public class StubConfiguration
- {
- /// <summary>
- /// The default stub configuration.
- /// </summary>
- public static readonly StubConfiguration Default = new StubConfiguration((headerBuilder) => { });
-
- readonly HeaderInterceptorDelegate headerInterceptor;
-
- public StubConfiguration(HeaderInterceptorDelegate headerInterceptor)
- {
- this.headerInterceptor = Preconditions.CheckNotNull(headerInterceptor);
- }
-
- public HeaderInterceptorDelegate HeaderInterceptor
- {
- get
- {
- return headerInterceptor;
- }
- }
- }
-}
diff --git a/src/csharp/Grpc.Core/Version.cs b/src/csharp/Grpc.Core/Version.cs
index 972f495bd7..f1db1f6157 100644
--- a/src/csharp/Grpc.Core/Version.cs
+++ b/src/csharp/Grpc.Core/Version.cs
@@ -3,4 +3,3 @@ using System.Runtime.CompilerServices;
// The current version of gRPC C#.
[assembly: AssemblyVersion("0.6.0.*")]
-
diff --git a/src/csharp/Grpc.Examples.MathClient/MathClient.cs b/src/csharp/Grpc.Examples.MathClient/MathClient.cs
index b763721460..cfe2a06916 100644
--- a/src/csharp/Grpc.Examples.MathClient/MathClient.cs
+++ b/src/csharp/Grpc.Examples.MathClient/MathClient.cs
@@ -41,18 +41,18 @@ namespace math
{
using (Channel channel = new Channel("127.0.0.1", 23456))
{
- Math.IMathClient stub = new Math.MathClient(channel);
- MathExamples.DivExample(stub);
+ Math.IMathClient client = new Math.MathClient(channel);
+ MathExamples.DivExample(client);
- MathExamples.DivAsyncExample(stub).Wait();
+ MathExamples.DivAsyncExample(client).Wait();
- MathExamples.FibExample(stub).Wait();
+ MathExamples.FibExample(client).Wait();
- MathExamples.SumExample(stub).Wait();
+ MathExamples.SumExample(client).Wait();
- MathExamples.DivManyExample(stub).Wait();
+ MathExamples.DivManyExample(client).Wait();
- MathExamples.DependendRequestsExample(stub).Wait();
+ MathExamples.DependendRequestsExample(client).Wait();
}
GrpcEnvironment.Shutdown();
diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
index 10dceb60aa..e7c4b33120 100644
--- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
+++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
@@ -49,7 +49,7 @@ namespace math.Tests
string host = "localhost";
Server server;
Channel channel;
- Math.IMathClient client;
+ Math.MathClient client;
[TestFixtureSetUp]
public void Init()
@@ -59,14 +59,14 @@ namespace math.Tests
int port = server.AddListeningPort(host, Server.PickUnusedPort);
server.Start();
channel = new Channel(host, port);
+ client = Math.NewClient(channel);
// TODO(jtattermusch): get rid of the custom header here once we have dedicated tests
// for header support.
- var stubConfig = new StubConfiguration((headerBuilder) =>
+ client.HeaderInterceptor = (metadata) =>
{
- headerBuilder.Add(new Metadata.MetadataEntry("customHeader", "abcdef"));
- });
- client = Math.NewStub(channel, stubConfig);
+ metadata.Add(new Metadata.Entry("customHeader", "abcdef"));
+ };
}
[TestFixtureTearDown]
diff --git a/src/csharp/Grpc.Examples/MathExamples.cs b/src/csharp/Grpc.Examples/MathExamples.cs
index d2cfbee18f..7deb651689 100644
--- a/src/csharp/Grpc.Examples/MathExamples.cs
+++ b/src/csharp/Grpc.Examples/MathExamples.cs
@@ -38,29 +38,29 @@ namespace math
{
public static class MathExamples
{
- public static void DivExample(Math.IMathClient stub)
+ public static void DivExample(Math.IMathClient client)
{
- DivReply result = stub.Div(new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build());
+ DivReply result = client.Div(new DivArgs.Builder { Dividend = 10, Divisor = 3 }.Build());
Console.WriteLine("Div Result: " + result);
}
- public static async Task DivAsyncExample(Math.IMathClient stub)
+ public static async Task DivAsyncExample(Math.IMathClient client)
{
- Task<DivReply> resultTask = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build());
+ Task<DivReply> resultTask = client.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build());
DivReply result = await resultTask;
Console.WriteLine("DivAsync Result: " + result);
}
- public static async Task FibExample(Math.IMathClient stub)
+ public static async Task FibExample(Math.IMathClient client)
{
- using (var call = stub.Fib(new FibArgs.Builder { Limit = 5 }.Build()))
+ using (var call = client.Fib(new FibArgs.Builder { Limit = 5 }.Build()))
{
List<Num> result = await call.ResponseStream.ToList();
Console.WriteLine("Fib Result: " + string.Join("|", result));
}
}
- public static async Task SumExample(Math.IMathClient stub)
+ public static async Task SumExample(Math.IMathClient client)
{
var numbers = new List<Num>
{
@@ -69,14 +69,14 @@ namespace math
new Num.Builder { Num_ = 3 }.Build()
};
- using (var call = stub.Sum())
+ using (var call = client.Sum())
{
await call.RequestStream.WriteAll(numbers);
Console.WriteLine("Sum Result: " + await call.Result);
}
}
- public static async Task DivManyExample(Math.IMathClient stub)
+ public static async Task DivManyExample(Math.IMathClient client)
{
var divArgsList = new List<DivArgs>
{
@@ -84,14 +84,14 @@ namespace math
new DivArgs.Builder { Dividend = 100, Divisor = 21 }.Build(),
new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build()
};
- using (var call = stub.DivMany())
+ using (var call = client.DivMany())
{
await call.RequestStream.WriteAll(divArgsList);
Console.WriteLine("DivMany Result: " + string.Join("|", await call.ResponseStream.ToList()));
}
}
- public static async Task DependendRequestsExample(Math.IMathClient stub)
+ public static async Task DependendRequestsExample(Math.IMathClient client)
{
var numbers = new List<Num>
{
@@ -101,13 +101,13 @@ namespace math
};
Num sum;
- using (var sumCall = stub.Sum())
+ using (var sumCall = client.Sum())
{
await sumCall.RequestStream.WriteAll(numbers);
sum = await sumCall.Result;
}
- DivReply result = await stub.DivAsync(new DivArgs.Builder { Dividend = sum.Num_, Divisor = numbers.Count }.Build());
+ DivReply result = await client.DivAsync(new DivArgs.Builder { Dividend = sum.Num_, Divisor = numbers.Count }.Build());
Console.WriteLine("Avg Result: " + result);
}
}
diff --git a/src/csharp/Grpc.Examples/MathGrpc.cs b/src/csharp/Grpc.Examples/MathGrpc.cs
index b9efc44e8c..1805972ce3 100644
--- a/src/csharp/Grpc.Examples/MathGrpc.cs
+++ b/src/csharp/Grpc.Examples/MathGrpc.cs
@@ -41,14 +41,14 @@ namespace math {
__Marshaller_Num,
__Marshaller_Num);
- // client-side stub interface
+ // client interface
public interface IMathClient
{
- global::math.DivReply Div(global::math.DivArgs request, CancellationToken token = default(CancellationToken));
- Task<global::math.DivReply> DivAsync(global::math.DivArgs request, CancellationToken token = default(CancellationToken));
- AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(CancellationToken token = default(CancellationToken));
- AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, CancellationToken token = default(CancellationToken));
- AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(CancellationToken token = default(CancellationToken));
+ global::math.DivReply Div(global::math.DivArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
+ Task<global::math.DivReply> DivAsync(global::math.DivArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
+ AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
+ AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
+ AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
}
// server-side interface
@@ -61,38 +61,35 @@ namespace math {
}
// client stub
- public class MathClient : AbstractStub<MathClient, StubConfiguration>, IMathClient
+ public class MathClient : ClientBase, IMathClient
{
- public MathClient(Channel channel) : this(channel, StubConfiguration.Default)
+ public MathClient(Channel channel) : base(channel)
{
}
- public MathClient(Channel channel, StubConfiguration config) : base(channel, config)
+ public global::math.DivReply Div(global::math.DivArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
{
+ var call = CreateCall(__ServiceName, __Method_Div, headers);
+ return Calls.BlockingUnaryCall(call, request, cancellationToken);
}
- public global::math.DivReply Div(global::math.DivArgs request, CancellationToken token = default(CancellationToken))
+ public Task<global::math.DivReply> DivAsync(global::math.DivArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
{
- var call = CreateCall(__ServiceName, __Method_Div);
- return Calls.BlockingUnaryCall(call, request, token);
+ var call = CreateCall(__ServiceName, __Method_Div, headers);
+ return Calls.AsyncUnaryCall(call, request, cancellationToken);
}
- public Task<global::math.DivReply> DivAsync(global::math.DivArgs request, CancellationToken token = default(CancellationToken))
+ public AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
{
- var call = CreateCall(__ServiceName, __Method_Div);
- return Calls.AsyncUnaryCall(call, request, token);
+ var call = CreateCall(__ServiceName, __Method_DivMany, headers);
+ return Calls.AsyncDuplexStreamingCall(call, cancellationToken);
}
- public AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(CancellationToken token = default(CancellationToken))
+ public AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
{
- var call = CreateCall(__ServiceName, __Method_DivMany);
- return Calls.AsyncDuplexStreamingCall(call, token);
+ var call = CreateCall(__ServiceName, __Method_Fib, headers);
+ return Calls.AsyncServerStreamingCall(call, request, cancellationToken);
}
- public AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, CancellationToken token = default(CancellationToken))
+ public AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
{
- var call = CreateCall(__ServiceName, __Method_Fib);
- return Calls.AsyncServerStreamingCall(call, request, token);
- }
- public AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(CancellationToken token = default(CancellationToken))
- {
- var call = CreateCall(__ServiceName, __Method_Sum);
- return Calls.AsyncClientStreamingCall(call, token);
+ var call = CreateCall(__ServiceName, __Method_Sum, headers);
+ return Calls.AsyncClientStreamingCall(call, cancellationToken);
}
}
@@ -106,17 +103,12 @@ namespace math {
.AddMethod(__Method_Sum, serviceImpl.Sum).Build();
}
- // creates a new client stub
- public static IMathClient NewStub(Channel channel)
+ // creates a new client
+ public static MathClient NewClient(Channel channel)
{
return new MathClient(channel);
}
- // creates a new client stub
- public static IMathClient NewStub(Channel channel, StubConfiguration config)
- {
- return new MathClient(channel, config);
- }
}
}
#endregion
diff --git a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs
index 0ac1add8e4..73ff0e74b5 100644
--- a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs
+++ b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs
@@ -63,7 +63,7 @@ namespace Grpc.HealthCheck.Tests
server.Start();
channel = new Channel(Host, port);
- client = Grpc.Health.V1Alpha.Health.NewStub(channel);
+ client = Grpc.Health.V1Alpha.Health.NewClient(channel);
}
[TestFixtureTearDown]
diff --git a/src/csharp/Grpc.HealthCheck/HealthGrpc.cs b/src/csharp/Grpc.HealthCheck/HealthGrpc.cs
index ed9fc4ed77..3aebdcb557 100644
--- a/src/csharp/Grpc.HealthCheck/HealthGrpc.cs
+++ b/src/csharp/Grpc.HealthCheck/HealthGrpc.cs
@@ -21,11 +21,11 @@ namespace Grpc.Health.V1Alpha {
__Marshaller_HealthCheckRequest,
__Marshaller_HealthCheckResponse);
- // client-side stub interface
+ // client interface
public interface IHealthClient
{
- global::Grpc.Health.V1Alpha.HealthCheckResponse Check(global::Grpc.Health.V1Alpha.HealthCheckRequest request, CancellationToken token = default(CancellationToken));
- Task<global::Grpc.Health.V1Alpha.HealthCheckResponse> CheckAsync(global::Grpc.Health.V1Alpha.HealthCheckRequest request, CancellationToken token = default(CancellationToken));
+ global::Grpc.Health.V1Alpha.HealthCheckResponse Check(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
+ Task<global::Grpc.Health.V1Alpha.HealthCheckResponse> CheckAsync(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
}
// server-side interface
@@ -35,23 +35,20 @@ namespace Grpc.Health.V1Alpha {
}
// client stub
- public class HealthClient : AbstractStub<HealthClient, StubConfiguration>, IHealthClient
+ public class HealthClient : ClientBase, IHealthClient
{
- public HealthClient(Channel channel) : this(channel, StubConfiguration.Default)
+ public HealthClient(Channel channel) : base(channel)
{
}
- public HealthClient(Channel channel, StubConfiguration config) : base(channel, config)
+ public global::Grpc.Health.V1Alpha.HealthCheckResponse Check(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
{
+ var call = CreateCall(__ServiceName, __Method_Check, headers);
+ return Calls.BlockingUnaryCall(call, request, cancellationToken);
}
- public global::Grpc.Health.V1Alpha.HealthCheckResponse Check(global::Grpc.Health.V1Alpha.HealthCheckRequest request, CancellationToken token = default(CancellationToken))
+ public Task<global::Grpc.Health.V1Alpha.HealthCheckResponse> CheckAsync(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
{
- var call = CreateCall(__ServiceName, __Method_Check);
- return Calls.BlockingUnaryCall(call, request, token);
- }
- public Task<global::Grpc.Health.V1Alpha.HealthCheckResponse> CheckAsync(global::Grpc.Health.V1Alpha.HealthCheckRequest request, CancellationToken token = default(CancellationToken))
- {
- var call = CreateCall(__ServiceName, __Method_Check);
- return Calls.AsyncUnaryCall(call, request, token);
+ var call = CreateCall(__ServiceName, __Method_Check, headers);
+ return Calls.AsyncUnaryCall(call, request, cancellationToken);
}
}
@@ -62,17 +59,12 @@ namespace Grpc.Health.V1Alpha {
.AddMethod(__Method_Check, serviceImpl.Check).Build();
}
- // creates a new client stub
- public static IHealthClient NewStub(Channel channel)
+ // creates a new client
+ public static HealthClient NewClient(Channel channel)
{
return new HealthClient(channel);
}
- // creates a new client stub
- public static IHealthClient NewStub(Channel channel, StubConfiguration config)
- {
- return new HealthClient(channel, config);
- }
}
}
#endregion
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index bdcb2c505c..05e732dbd4 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -119,7 +119,7 @@ namespace Grpc.IntegrationTesting
using (Channel channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelOptions))
{
- var stubConfig = StubConfiguration.Default;
+ TestService.TestServiceClient client = new TestService.TestServiceClient(channel);
if (options.testCase == "service_account_creds" || options.testCase == "compute_engine_creds")
{
var credential = GoogleCredential.GetApplicationDefault();
@@ -127,10 +127,9 @@ namespace Grpc.IntegrationTesting
{
credential = credential.CreateScoped(new[] { AuthScope });
}
- stubConfig = new StubConfiguration(OAuth2InterceptorFactory.Create(credential));
+ client.HeaderInterceptor = OAuth2InterceptorFactory.Create(credential);
}
- TestService.ITestServiceClient client = new TestService.TestServiceClient(channel, stubConfig);
RunTestCase(options.testCase, client);
}
GrpcEnvironment.Shutdown();
@@ -363,7 +362,7 @@ namespace Grpc.IntegrationTesting
Console.WriteLine("running cancel_after_begin");
var cts = new CancellationTokenSource();
- using (var call = client.StreamingInputCall(cts.Token))
+ using (var call = client.StreamingInputCall(cancellationToken: cts.Token))
{
// TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it.
await Task.Delay(1000);
@@ -390,7 +389,7 @@ namespace Grpc.IntegrationTesting
Console.WriteLine("running cancel_after_first_response");
var cts = new CancellationTokenSource();
- using (var call = client.FullDuplexCall(cts.Token))
+ using (var call = client.FullDuplexCall(cancellationToken: cts.Token))
{
await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
index 6c2da9d2ee..f306289cfb 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
@@ -65,7 +65,7 @@ namespace Grpc.IntegrationTesting
new ChannelOption(ChannelOptions.SslTargetNameOverride, TestCredentials.DefaultHostOverride)
};
channel = new Channel(host, port, TestCredentials.CreateTestClientCredentials(true), options);
- client = TestService.NewStub(channel);
+ client = TestService.NewClient(channel);
}
[TestFixtureTearDown]
diff --git a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs
index ee077f9f56..96d9b23717 100644
--- a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs
+++ b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs
@@ -56,17 +56,17 @@ namespace grpc.testing {
__Marshaller_StreamingOutputCallRequest,
__Marshaller_StreamingOutputCallResponse);
- // client-side stub interface
+ // client interface
public interface ITestServiceClient
{
- global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken));
- Task<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken));
- global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken));
- Task<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken));
- AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken));
- AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken));
- AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken));
- AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken));
+ global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
+ Task<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
+ global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
+ Task<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
+ AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
+ AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
+ AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
+ AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> HalfDuplexCall(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
}
// server-side interface
@@ -81,53 +81,50 @@ namespace grpc.testing {
}
// client stub
- public class TestServiceClient : AbstractStub<TestServiceClient, StubConfiguration>, ITestServiceClient
+ public class TestServiceClient : ClientBase, ITestServiceClient
{
- public TestServiceClient(Channel channel) : this(channel, StubConfiguration.Default)
+ public TestServiceClient(Channel channel) : base(channel)
{
}
- public TestServiceClient(Channel channel, StubConfiguration config) : base(channel, config)
+ public global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
{
+ var call = CreateCall(__ServiceName, __Method_EmptyCall, headers);
+ return Calls.BlockingUnaryCall(call, request, cancellationToken);
}
- public global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken))
+ public Task<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
{
- var call = CreateCall(__ServiceName, __Method_EmptyCall);
- return Calls.BlockingUnaryCall(call, request, token);
+ var call = CreateCall(__ServiceName, __Method_EmptyCall, headers);
+ return Calls.AsyncUnaryCall(call, request, cancellationToken);
}
- public Task<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken))
+ public global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
{
- var call = CreateCall(__ServiceName, __Method_EmptyCall);
- return Calls.AsyncUnaryCall(call, request, token);
+ var call = CreateCall(__ServiceName, __Method_UnaryCall, headers);
+ return Calls.BlockingUnaryCall(call, request, cancellationToken);
}
- public global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken))
+ public Task<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
{
- var call = CreateCall(__ServiceName, __Method_UnaryCall);
- return Calls.BlockingUnaryCall(call, request, token);
+ var call = CreateCall(__ServiceName, __Method_UnaryCall, headers);
+ return Calls.AsyncUnaryCall(call, request, cancellationToken);
}
- public Task<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken))
+ public AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
{
- var call = CreateCall(__ServiceName, __Method_UnaryCall);
- return Calls.AsyncUnaryCall(call, request, token);
+ var call = CreateCall(__ServiceName, __Method_StreamingOutputCall, headers);
+ return Calls.AsyncServerStreamingCall(call, request, cancellationToken);
}
- public AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken))
+ public AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
{
- var call = CreateCall(__ServiceName, __Method_StreamingOutputCall);
- return Calls.AsyncServerStreamingCall(call, request, token);
+ var call = CreateCall(__ServiceName, __Method_StreamingInputCall, headers);
+ return Calls.AsyncClientStreamingCall(call, cancellationToken);
}
- public AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken))
+ public AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
{
- var call = CreateCall(__ServiceName, __Method_StreamingInputCall);
- return Calls.AsyncClientStreamingCall(call, token);
+ var call = CreateCall(__ServiceName, __Method_FullDuplexCall, headers);
+ return Calls.AsyncDuplexStreamingCall(call, cancellationToken);
}
- public AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken))
+ public AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> HalfDuplexCall(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
{
- var call = CreateCall(__ServiceName, __Method_FullDuplexCall);
- return Calls.AsyncDuplexStreamingCall(call, token);
- }
- public AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken))
- {
- var call = CreateCall(__ServiceName, __Method_HalfDuplexCall);
- return Calls.AsyncDuplexStreamingCall(call, token);
+ var call = CreateCall(__ServiceName, __Method_HalfDuplexCall, headers);
+ return Calls.AsyncDuplexStreamingCall(call, cancellationToken);
}
}
@@ -143,17 +140,12 @@ namespace grpc.testing {
.AddMethod(__Method_HalfDuplexCall, serviceImpl.HalfDuplexCall).Build();
}
- // creates a new client stub
- public static ITestServiceClient NewStub(Channel channel)
+ // creates a new client
+ public static TestServiceClient NewClient(Channel channel)
{
return new TestServiceClient(channel);
}
- // creates a new client stub
- public static ITestServiceClient NewStub(Channel channel, StubConfiguration config)
- {
- return new TestServiceClient(channel, config);
- }
}
}
#endregion
diff --git a/src/csharp/generate_proto_csharp.sh b/src/csharp/generate_proto_csharp.sh
index 6eb3887ea1..7c3ba70922 100755
--- a/src/csharp/generate_proto_csharp.sh
+++ b/src/csharp/generate_proto_csharp.sh
@@ -32,16 +32,17 @@
set +e
cd $(dirname $0)
+PROTOC=../../bins/opt/protobuf/protoc
PLUGIN=protoc-gen-grpc=../../bins/opt/grpc_csharp_plugin
EXAMPLES_DIR=Grpc.Examples
INTEROP_DIR=Grpc.IntegrationTesting
HEALTHCHECK_DIR=Grpc.HealthCheck
-protoc --plugin=$PLUGIN --grpc_out=$EXAMPLES_DIR \
+$PROTOC --plugin=$PLUGIN --grpc_out=$EXAMPLES_DIR \
-I $EXAMPLES_DIR/proto $EXAMPLES_DIR/proto/math.proto
-protoc --plugin=$PLUGIN --grpc_out=$INTEROP_DIR \
+$PROTOC --plugin=$PLUGIN --grpc_out=$INTEROP_DIR \
-I $INTEROP_DIR/proto $INTEROP_DIR/proto/test.proto
-
-protoc --plugin=$PLUGIN --grpc_out=$HEALTHCHECK_DIR \
+
+$PROTOC --plugin=$PLUGIN --grpc_out=$HEALTHCHECK_DIR \
-I $HEALTHCHECK_DIR/proto $HEALTHCHECK_DIR/proto/health.proto
diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js
index 8d1f99aaee..125957277f 100644
--- a/src/node/test/surface_test.js
+++ b/src/node/test/surface_test.js
@@ -418,6 +418,48 @@ describe('Other conditions', function() {
});
});
});
+ describe('Error object should contain the status', function() {
+ it('for a unary call', function(done) {
+ client.unary({error: true}, function(err, data) {
+ assert(err);
+ assert.strictEqual(err.code, grpc.status.UNKNOWN);
+ assert.strictEqual(err.message, 'Requested error');
+ done();
+ });
+ });
+ it('for a client stream call', function(done) {
+ var call = client.clientStream(function(err, data) {
+ assert(err);
+ assert.strictEqual(err.code, grpc.status.UNKNOWN);
+ assert.strictEqual(err.message, 'Requested error');
+ done();
+ });
+ call.write({error: false});
+ call.write({error: true});
+ call.end();
+ });
+ it('for a server stream call', function(done) {
+ var call = client.serverStream({error: true});
+ call.on('data', function(){});
+ call.on('error', function(error) {
+ assert.strictEqual(error.code, grpc.status.UNKNOWN);
+ assert.strictEqual(error.message, 'Requested error');
+ done();
+ });
+ });
+ it('for a bidi stream call', function(done) {
+ var call = client.bidiStream();
+ call.write({error: false});
+ call.write({error: true});
+ call.end();
+ call.on('data', function(){});
+ call.on('error', function(error) {
+ assert.strictEqual(error.code, grpc.status.UNKNOWN);
+ assert.strictEqual(error.message, 'Requested error');
+ done();
+ });
+ });
+ });
});
describe('Cancelling surface client', function() {
var client;
diff --git a/src/objective-c/GRPCClient/GRPCCall.h b/src/objective-c/GRPCClient/GRPCCall.h
index cba53fa2f6..4a8b7fff48 100644
--- a/src/objective-c/GRPCClient/GRPCCall.h
+++ b/src/objective-c/GRPCClient/GRPCCall.h
@@ -52,7 +52,7 @@
extern id const kGRPCStatusMetadataKey;
// Represents a single gRPC remote call.
-@interface GRPCCall : NSObject<GRXWriter>
+@interface GRPCCall : GRXWriter
// These HTTP headers will be passed to the server as part of this call. Each HTTP header is a
// name-value pair with string names and either string or binary values.
@@ -89,7 +89,7 @@ extern id const kGRPCStatusMetadataKey;
// To finish a call right away, invoke cancel.
- (instancetype)initWithHost:(NSString *)host
path:(NSString *)path
- requestsWriter:(id<GRXWriter>)requestsWriter NS_DESIGNATED_INITIALIZER;
+ requestsWriter:(GRXWriter *)requestsWriter NS_DESIGNATED_INITIALIZER;
// Finishes the request side of this call, notifies the server that the RPC
// should be cancelled, and finishes the response side of the call with an error
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index 4ac4e4d37f..53e5abe177 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -79,7 +79,7 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// all. This wrapper over our actual writeable ensures thread-safety and
// correct ordering.
GRPCDelegateWrapper *_responseWriteable;
- id<GRXWriter> _requestWriter;
+ GRXWriter *_requestWriter;
NSMutableDictionary *_requestMetadata;
NSMutableDictionary *_responseMetadata;
@@ -94,7 +94,7 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// Designated initializer
- (instancetype)initWithHost:(NSString *)host
path:(NSString *)path
- requestsWriter:(id<GRXWriter>)requestWriter {
+ requestsWriter:(GRXWriter *)requestWriter {
if (!host || !path) {
[NSException raise:NSInvalidArgumentException format:@"Neither host nor method can be nil."];
}
diff --git a/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h b/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h
index 1ef245fe37..9a30a2f966 100644
--- a/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h
+++ b/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h
@@ -33,8 +33,9 @@
#import <Foundation/Foundation.h>
+#import <RxLibrary/GRXWriter.h>
+
@protocol GRXWriteable;
-@protocol GRXWriter;
// This is a thread-safe wrapper over a GRXWriteable instance. It lets one
// enqueue calls to a GRXWriteable instance for the main thread, guaranteeing
@@ -54,7 +55,7 @@
// writesFinishedWithError: is sent to the writeable, and released after that.
// This is used to create a retain cycle that keeps both objects alive until the
// writing is explicitly finished.
-- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable writer:(id<GRXWriter>)writer
+- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable writer:(GRXWriter *)writer
NS_DESIGNATED_INITIALIZER;
// Enqueues writeValue: to be sent to the writeable in the main thread.
diff --git a/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m b/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m
index 59c0565494..294cfb7e23 100644
--- a/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m
+++ b/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m
@@ -38,7 +38,7 @@
@interface GRPCDelegateWrapper ()
// These are atomic so that cancellation can nillify them from any thread.
@property(atomic, strong) id<GRXWriteable> writeable;
-@property(atomic, strong) id<GRXWriter> writer;
+@property(atomic, strong) GRXWriter *writer;
@end
@implementation GRPCDelegateWrapper {
@@ -52,7 +52,7 @@
}
// Designated initializer
-- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable writer:(id<GRXWriter>)writer {
+- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable writer:(GRXWriter *)writer {
if (self = [super init]) {
_writeableQueue = dispatch_get_main_queue();
_writeable = writeable;
diff --git a/src/objective-c/ProtoRPC/ProtoRPC.h b/src/objective-c/ProtoRPC/ProtoRPC.h
index fcc0a507fe..bd926b7328 100644
--- a/src/objective-c/ProtoRPC/ProtoRPC.h
+++ b/src/objective-c/ProtoRPC/ProtoRPC.h
@@ -40,7 +40,7 @@
- (instancetype)initWithHost:(NSString *)host
method:(ProtoMethod *)method
- requestsWriter:(id<GRXWriter>)requestsWriter
+ requestsWriter:(GRXWriter *)requestsWriter
responseClass:(Class)responseClass
responsesWriteable:(id<GRXWriteable>)responsesWriteable NS_DESIGNATED_INITIALIZER;
diff --git a/src/objective-c/ProtoRPC/ProtoRPC.m b/src/objective-c/ProtoRPC/ProtoRPC.m
index fe3ccf0541..889d71a308 100644
--- a/src/objective-c/ProtoRPC/ProtoRPC.m
+++ b/src/objective-c/ProtoRPC/ProtoRPC.m
@@ -35,7 +35,6 @@
#import <GPBProtocolBuffers.h>
#import <RxLibrary/GRXWriteable.h>
-#import <RxLibrary/GRXWriter.h>
#import <RxLibrary/GRXWriter+Transformations.h>
@implementation ProtoRPC {
@@ -46,7 +45,7 @@
#pragma clang diagnostic ignored "-Wobjc-designated-initializers"
- (instancetype)initWithHost:(NSString *)host
path:(NSString *)path
- requestsWriter:(id<GRXWriter>)requestsWriter {
+ requestsWriter:(GRXWriter *)requestsWriter {
[NSException raise:NSInvalidArgumentException
format:@"Please use ProtoRPC's designated initializer instead."];
return nil;
@@ -56,7 +55,7 @@
// Designated initializer
- (instancetype)initWithHost:(NSString *)host
method:(ProtoMethod *)method
- requestsWriter:(id<GRXWriter>)requestsWriter
+ requestsWriter:(GRXWriter *)requestsWriter
responseClass:(Class)responseClass
responsesWriteable:(id<GRXWriteable>)responsesWriteable {
// Because we can't tell the type system to constrain the class, we need to check at runtime:
@@ -65,12 +64,11 @@
format:@"A protobuf class to parse the responses must be provided."];
}
// A writer that serializes the proto messages to send.
- id<GRXWriter> bytesWriter =
- [[[GRXWriter alloc] initWithWriter:requestsWriter] map:^id(GPBMessage *proto) {
- // TODO(jcanizales): Fail with an understandable error message if the requestsWriter isn't
- // sending GPBMessages.
- return [proto data];
- }];
+ GRXWriter *bytesWriter = [requestsWriter map:^id(GPBMessage *proto) {
+ // TODO(jcanizales): Fail with an understandable error message if the requestsWriter isn't
+ // sending GPBMessages.
+ return [proto data];
+ }];
if ((self = [super initWithHost:host path:method.HTTPPath requestsWriter:bytesWriter])) {
// A writeable that parses the proto messages received.
_responseWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) {
diff --git a/src/objective-c/ProtoRPC/ProtoService.h b/src/objective-c/ProtoRPC/ProtoService.h
index c5ef820f48..2e8cb33696 100644
--- a/src/objective-c/ProtoRPC/ProtoService.h
+++ b/src/objective-c/ProtoRPC/ProtoService.h
@@ -35,7 +35,7 @@
@class ProtoRPC;
@protocol GRXWriteable;
-@protocol GRXWriter;
+@class GRXWriter;
@interface ProtoService : NSObject
- (instancetype)initWithHost:(NSString *)host
@@ -43,7 +43,7 @@
serviceName:(NSString *)serviceName NS_DESIGNATED_INITIALIZER;
- (ProtoRPC *)RPCToMethod:(NSString *)method
- requestsWriter:(id<GRXWriter>)requestsWriter
+ requestsWriter:(GRXWriter *)requestsWriter
responseClass:(Class)responseClass
responsesWriteable:(id<GRXWriteable>)responsesWriteable;
@end
diff --git a/src/objective-c/ProtoRPC/ProtoService.m b/src/objective-c/ProtoRPC/ProtoService.m
index d7c5b6a850..fccc6aadc9 100644
--- a/src/objective-c/ProtoRPC/ProtoService.m
+++ b/src/objective-c/ProtoRPC/ProtoService.m
@@ -66,7 +66,7 @@
}
- (ProtoRPC *)RPCToMethod:(NSString *)method
- requestsWriter:(id<GRXWriter>)requestsWriter
+ requestsWriter:(GRXWriter *)requestsWriter
responseClass:(Class)responseClass
responsesWriteable:(id<GRXWriteable>)responsesWriteable {
ProtoMethod *methodName = [[ProtoMethod alloc] initWithPackage:_packageName
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h
index 5e876a73bf..b6296e1ed7 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.h
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h
@@ -51,7 +51,7 @@
// pipe will keep buffering all data written to it, your application could run out of memory and
// crash. If you want to react to flow control signals to prevent that, instead of using this class
// you can implement an object that conforms to GRXWriter.
-@interface GRXBufferedPipe : NSObject<GRXWriteable, GRXWriter>
+@interface GRXBufferedPipe : GRXWriter<GRXWriteable>
// Convenience constructor.
+ (instancetype)pipe;
diff --git a/src/cpp/server/thread_pool.h b/src/objective-c/RxLibrary/GRXForwardingWriter.h
index 3b70249bf9..d004333d2b 100644
--- a/src/cpp/server/thread_pool.h
+++ b/src/objective-c/RxLibrary/GRXForwardingWriter.h
@@ -31,39 +31,13 @@
*
*/
-#ifndef GRPC_INTERNAL_CPP_SERVER_THREAD_POOL_H
-#define GRPC_INTERNAL_CPP_SERVER_THREAD_POOL_H
-
-#include <grpc++/config.h>
-
-#include <grpc++/impl/sync.h>
-#include <grpc++/impl/thd.h>
-#include <grpc++/thread_pool_interface.h>
-
-#include <queue>
-#include <vector>
-
-namespace grpc {
-
-class ThreadPool GRPC_FINAL : public ThreadPoolInterface {
- public:
- explicit ThreadPool(int num_threads);
- ~ThreadPool();
-
- void ScheduleCallback(const std::function<void()>& callback) GRPC_OVERRIDE;
-
- private:
- grpc::mutex mu_;
- grpc::condition_variable cv_;
- bool shutdown_;
- std::queue<std::function<void()>> callbacks_;
- std::vector<grpc::thread*> threads_;
-
- void ThreadFunc();
-};
-
-ThreadPoolInterface* CreateDefaultThreadPool();
-
-} // namespace grpc
-
-#endif // GRPC_INTERNAL_CPP_SERVER_THREAD_POOL_H
+#import "GRXWriter.h"
+
+// A "proxy" class that simply forwards values, completion, and errors from its
+// input writer to its writeable.
+// It is useful as a superclass for pipes that act as a transformation of their
+// input writer, and for classes that represent objects with input and
+// output sequences of values, like an RPC.
+@interface GRXForwardingWriter : GRXWriter
+- (instancetype)initWithWriter:(GRXWriter *)writer NS_DESIGNATED_INITIALIZER;
+@end
diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.m b/src/objective-c/RxLibrary/GRXForwardingWriter.m
new file mode 100644
index 0000000000..2342f51ab3
--- /dev/null
+++ b/src/objective-c/RxLibrary/GRXForwardingWriter.m
@@ -0,0 +1,112 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#import "GRXForwardingWriter.h"
+
+@interface GRXForwardingWriter () <GRXWriteable>
+@end
+
+@implementation GRXForwardingWriter {
+ GRXWriter *_writer;
+ id<GRXWriteable> _writeable;
+}
+
+- (instancetype)init {
+ return [self initWithWriter:nil];
+}
+
+// Designated initializer
+- (instancetype)initWithWriter:(GRXWriter *)writer {
+ if (!writer) {
+ [NSException raise:NSInvalidArgumentException format:@"writer can't be nil."];
+ }
+ if ((self = [super init])) {
+ _writer = writer;
+ }
+ return self;
+}
+
+// This is used to send a completion or an error to the writeable. It nillifies
+// our reference to it in order to guarantee no more messages are sent to it,
+// and to release it.
+- (void)finishOutputWithError:(NSError *)errorOrNil {
+ id<GRXWriteable> writeable = _writeable;
+ _writeable = nil;
+ [writeable writesFinishedWithError:errorOrNil];
+}
+
+// This is used to stop the input writer. It nillifies our reference to it
+// to release it.
+- (void)finishInput {
+ GRXWriter *writer = _writer;
+ _writer = nil;
+ writer.state = GRXWriterStateFinished;
+}
+
+#pragma mark GRXWriteable implementation
+
+- (void)writeValue:(id)value {
+ [_writeable writeValue:value];
+}
+
+- (void)writesFinishedWithError:(NSError *)errorOrNil {
+ _writer = nil;
+ [self finishOutputWithError:errorOrNil];
+}
+
+#pragma mark GRXWriter implementation
+
+- (GRXWriterState)state {
+ return _writer ? _writer.state : GRXWriterStateFinished;
+}
+
+- (void)setState:(GRXWriterState)state {
+ if (state == GRXWriterStateFinished) {
+ _writeable = nil;
+ [self finishInput];
+ } else {
+ _writer.state = state;
+ }
+}
+
+- (void)startWithWriteable:(id<GRXWriteable>)writeable {
+ _writeable = writeable;
+ [_writer startWithWriteable:self];
+}
+
+- (void)finishWithError:(NSError *)errorOrNil {
+ [self finishOutputWithError:errorOrNil];
+ [self finishInput];
+}
+
+@end
diff --git a/src/objective-c/RxLibrary/GRXImmediateWriter.h b/src/objective-c/RxLibrary/GRXImmediateWriter.h
index f86d38dcd8..b171f0c760 100644
--- a/src/objective-c/RxLibrary/GRXImmediateWriter.h
+++ b/src/objective-c/RxLibrary/GRXImmediateWriter.h
@@ -40,15 +40,15 @@
//
// Unless the writeable callback pauses them or stops them early, these writers will do all their
// interactions with the writeable before the start method returns.
-@interface GRXImmediateWriter : NSObject<GRXWriter>
+@interface GRXImmediateWriter : GRXWriter
// Returns a writer that pulls values from the passed NSEnumerator instance and pushes them to
// its writeable. The NSEnumerator is released when it finishes.
-+ (id<GRXWriter>)writerWithEnumerator:(NSEnumerator *)enumerator;
++ (GRXWriter *)writerWithEnumerator:(NSEnumerator *)enumerator;
// Returns a writer that pushes to its writeable the successive values returned by the passed
// block. When the block first returns nil, it is released.
-+ (id<GRXWriter>)writerWithValueSupplier:(id (^)())block;
++ (GRXWriter *)writerWithValueSupplier:(id (^)())block;
// Returns a writer that iterates over the values of the passed container and pushes them to
// its writeable. The container is released when the iteration is over.
@@ -56,18 +56,18 @@
// Note that the usual speed gain of NSFastEnumeration over NSEnumerator results from not having to
// call one method per element. Because GRXWriteable instances accept values one by one, that speed
// gain doesn't happen here.
-+ (id<GRXWriter>)writerWithContainer:(id<NSFastEnumeration>)container;
++ (GRXWriter *)writerWithContainer:(id<NSFastEnumeration>)container;
// Returns a writer that sends the passed value to its writeable and then finishes (releasing the
// value).
-+ (id<GRXWriter>)writerWithValue:(id)value;
++ (GRXWriter *)writerWithValue:(id)value;
// Returns a writer that, as part of its start method, sends the passed error to the writeable
// (then releasing the error).
-+ (id<GRXWriter>)writerWithError:(NSError *)error;
++ (GRXWriter *)writerWithError:(NSError *)error;
// Returns a writer that, as part of its start method, finishes immediately without sending any
// values to its writeable.
-+ (id<GRXWriter>)emptyWriter;
++ (GRXWriter *)emptyWriter;
@end
diff --git a/src/objective-c/RxLibrary/GRXImmediateWriter.m b/src/objective-c/RxLibrary/GRXImmediateWriter.m
index 0b4979872e..b6d2b2cac0 100644
--- a/src/objective-c/RxLibrary/GRXImmediateWriter.m
+++ b/src/objective-c/RxLibrary/GRXImmediateWriter.m
@@ -63,19 +63,19 @@
return [[self alloc] initWithEnumerator:enumerator error:errorOrNil];
}
-+ (id<GRXWriter>)writerWithEnumerator:(NSEnumerator *)enumerator {
++ (GRXWriter *)writerWithEnumerator:(NSEnumerator *)enumerator {
return [self writerWithEnumerator:enumerator error:nil];
}
-+ (id<GRXWriter>)writerWithValueSupplier:(id (^)())block {
++ (GRXWriter *)writerWithValueSupplier:(id (^)())block {
return [self writerWithEnumerator:[NSEnumerator grx_enumeratorWithValueSupplier:block]];
}
-+ (id<GRXWriter>)writerWithContainer:(id<NSFastEnumeration>)container {
++ (GRXWriter *)writerWithContainer:(id<NSFastEnumeration>)container {
return [self writerWithEnumerator:[NSEnumerator grx_enumeratorWithContainer:container]];;
}
-+ (id<GRXWriter>)writerWithValue:(id)value {
++ (GRXWriter *)writerWithValue:(id)value {
if (value) {
return [self writerWithEnumerator:[NSEnumerator grx_enumeratorWithSingleValue:value]];
} else {
@@ -83,7 +83,7 @@
}
}
-+ (id<GRXWriter>)writerWithError:(NSError *)error {
++ (GRXWriter *)writerWithError:(NSError *)error {
if (error) {
return [self writerWithEnumerator:nil error:error];
} else {
@@ -91,7 +91,7 @@
}
}
-+ (id<GRXWriter>)emptyWriter {
++ (GRXWriter *)emptyWriter {
static GRXImmediateWriter *emptyWriter;
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
diff --git a/src/objective-c/RxLibrary/GRXWriter+Immediate.m b/src/objective-c/RxLibrary/GRXWriter+Immediate.m
index 39c54f86ec..1d55eb3529 100644
--- a/src/objective-c/RxLibrary/GRXWriter+Immediate.m
+++ b/src/objective-c/RxLibrary/GRXWriter+Immediate.m
@@ -38,27 +38,27 @@
@implementation GRXWriter (Immediate)
+ (instancetype)writerWithEnumerator:(NSEnumerator *)enumerator {
- return [[self alloc] initWithWriter:[GRXImmediateWriter writerWithEnumerator:enumerator]];
+ return [GRXImmediateWriter writerWithEnumerator:enumerator];
}
+ (instancetype)writerWithValueSupplier:(id (^)())block {
- return [[self alloc] initWithWriter:[GRXImmediateWriter writerWithValueSupplier:block]];
+ return [GRXImmediateWriter writerWithValueSupplier:block];
}
+ (instancetype)writerWithContainer:(id<NSFastEnumeration>)container {
- return [[self alloc] initWithWriter:[GRXImmediateWriter writerWithContainer:container]];
+ return [GRXImmediateWriter writerWithContainer:container];
}
+ (instancetype)writerWithValue:(id)value {
- return [[self alloc] initWithWriter:[GRXImmediateWriter writerWithValue:value]];
+ return [GRXImmediateWriter writerWithValue:value];
}
+ (instancetype)writerWithError:(NSError *)error {
- return [[self alloc] initWithWriter:[GRXImmediateWriter writerWithError:error]];
+ return [GRXImmediateWriter writerWithError:error];
}
+ (instancetype)emptyWriter {
- return [[self alloc] initWithWriter:[GRXImmediateWriter emptyWriter]];
+ return [GRXImmediateWriter emptyWriter];
}
@end
diff --git a/src/objective-c/RxLibrary/GRXWriter.h b/src/objective-c/RxLibrary/GRXWriter.h
index dcf44e3143..5d6e1a472a 100644
--- a/src/objective-c/RxLibrary/GRXWriter.h
+++ b/src/objective-c/RxLibrary/GRXWriter.h
@@ -85,7 +85,7 @@ typedef NS_ENUM(NSInteger, GRXWriterState) {
// Unless otherwise indicated by a conforming class, no messages should be sent
// concurrently to a GRXWriter. I.e., conforming classes aren't required to
// be thread-safe.
-@protocol GRXWriter <NSObject>
+@interface GRXWriter : NSObject
// This property can be used to query the current state of the writer, which
// determines how it might currently use its writeable. Some state transitions can
@@ -116,12 +116,3 @@ typedef NS_ENUM(NSInteger, GRXWriterState) {
// can't remember the details in the presence of concurrency.
- (void)finishWithError:(NSError *)errorOrNil;
@end
-
-// A "proxy" class that simply forwards values, completion, and errors from its
-// input writer to its writeable.
-// It is useful as a superclass for pipes that act as a transformation of their
-// input writer, and for classes that represent objects with input and
-// output sequences of values, like an RPC.
-@interface GRXWriter : NSObject<GRXWriter>
-- (instancetype)initWithWriter:(id<GRXWriter>)writer NS_DESIGNATED_INITIALIZER;
-@end
diff --git a/src/objective-c/RxLibrary/GRXWriter.m b/src/objective-c/RxLibrary/GRXWriter.m
index cc14383560..019fcbd785 100644
--- a/src/objective-c/RxLibrary/GRXWriter.m
+++ b/src/objective-c/RxLibrary/GRXWriter.m
@@ -33,80 +33,6 @@
#import "GRXWriter.h"
-@interface GRXWriter () <GRXWriteable>
-@end
-
-@implementation GRXWriter {
- id<GRXWriter> _writer;
- id<GRXWriteable> _writeable;
-}
-
-- (instancetype)init {
- return [self initWithWriter:nil];
-}
-
-// Designated initializer
-- (instancetype)initWithWriter:(id<GRXWriter>)writer {
- if (!writer) {
- [NSException raise:NSInvalidArgumentException format:@"writer can't be nil."];
- }
- if ((self = [super init])) {
- _writer = writer;
- }
- return self;
-}
-
-// This is used to send a completion or an error to the writeable. It nillifies
-// our reference to it in order to guarantee no more messages are sent to it,
-// and to release it.
-- (void)finishOutputWithError:(NSError *)errorOrNil {
- id<GRXWriteable> writeable = _writeable;
- _writeable = nil;
- [writeable writesFinishedWithError:errorOrNil];
-}
-
-// This is used to stop the input writer. It nillifies our reference to it
-// to release it.
-- (void)finishInput {
- id<GRXWriter> writer = _writer;
- _writer = nil;
- writer.state = GRXWriterStateFinished;
-}
-
-#pragma mark GRXWriteable implementation
-
-- (void)writeValue:(id)value {
- [_writeable writeValue:value];
-}
-
-- (void)writesFinishedWithError:(NSError *)errorOrNil {
- _writer = nil;
- [self finishOutputWithError:errorOrNil];
-}
-
-#pragma mark GRXWriter implementation
-
-- (GRXWriterState)state {
- return _writer ? _writer.state : GRXWriterStateFinished;
-}
-
-- (void)setState:(GRXWriterState)state {
- if (state == GRXWriterStateFinished) {
- _writeable = nil;
- [self finishInput];
- } else {
- _writer.state = state;
- }
-}
-
-- (void)startWithWriteable:(id<GRXWriteable>)writeable {
- _writeable = writeable;
- [_writer startWithWriteable:self];
-}
-
-- (void)finishWithError:(NSError *)errorOrNil {
- [self finishOutputWithError:errorOrNil];
- [self finishInput];
-}
+@implementation GRXWriter
@end
diff --git a/src/objective-c/RxLibrary/transformations/GRXMappingWriter.h b/src/objective-c/RxLibrary/transformations/GRXMappingWriter.h
index 55f6f82f20..43b8706864 100644
--- a/src/objective-c/RxLibrary/transformations/GRXMappingWriter.h
+++ b/src/objective-c/RxLibrary/transformations/GRXMappingWriter.h
@@ -31,10 +31,10 @@
*
*/
-#import "RxLibrary/GRXWriter.h"
+#import "RxLibrary/GRXForwardingWriter.h"
// A "proxy" writer that transforms all the values of its input writer by using a mapping function.
-@interface GRXMappingWriter : GRXWriter
-- (instancetype)initWithWriter:(id<GRXWriter>)writer map:(id (^)(id value))map
+@interface GRXMappingWriter : GRXForwardingWriter
+- (instancetype)initWithWriter:(GRXWriter *)writer map:(id (^)(id value))map
NS_DESIGNATED_INITIALIZER;
@end
diff --git a/src/objective-c/RxLibrary/transformations/GRXMappingWriter.m b/src/objective-c/RxLibrary/transformations/GRXMappingWriter.m
index 2cdfea1b67..f3242e4fa9 100644
--- a/src/objective-c/RxLibrary/transformations/GRXMappingWriter.m
+++ b/src/objective-c/RxLibrary/transformations/GRXMappingWriter.m
@@ -37,19 +37,19 @@ static id (^kIdentity)(id value) = ^id(id value) {
return value;
};
-@interface GRXWriter () <GRXWriteable>
+@interface GRXForwardingWriter () <GRXWriteable>
@end
@implementation GRXMappingWriter {
id (^_map)(id value);
}
-- (instancetype)initWithWriter:(id<GRXWriter>)writer {
+- (instancetype)initWithWriter:(GRXWriter *)writer {
return [self initWithWriter:writer map:nil];
}
// Designated initializer
-- (instancetype)initWithWriter:(id<GRXWriter>)writer map:(id (^)(id value))map {
+- (instancetype)initWithWriter:(GRXWriter *)writer map:(id (^)(id value))map {
if ((self = [super initWithWriter:writer])) {
_map = map ?: kIdentity;
}
diff --git a/src/objective-c/tests/GRPCClientTests.m b/src/objective-c/tests/GRPCClientTests.m
index f9c2d5d8d6..3210ad7050 100644
--- a/src/objective-c/tests/GRPCClientTests.m
+++ b/src/objective-c/tests/GRPCClientTests.m
@@ -87,7 +87,7 @@ static ProtoMethod *kUnaryCallMethod;
[call startWithWriteable:responsesWriteable];
- [self waitForExpectationsWithTimeout:2. handler:nil];
+ [self waitForExpectationsWithTimeout:4 handler:nil];
}
- (void)testEmptyRPC {
@@ -109,7 +109,7 @@ static ProtoMethod *kUnaryCallMethod;
[call startWithWriteable:responsesWriteable];
- [self waitForExpectationsWithTimeout:2. handler:nil];
+ [self waitForExpectationsWithTimeout:4 handler:nil];
}
- (void)testSimpleProtoRPC {
@@ -120,7 +120,7 @@ static ProtoMethod *kUnaryCallMethod;
request.responseSize = 100;
request.fillUsername = YES;
request.fillOauthScope = YES;
- id<GRXWriter> requestsWriter = [GRXWriter writerWithValue:[request data]];
+ GRXWriter *requestsWriter = [GRXWriter writerWithValue:[request data]];
GRPCCall *call = [[GRPCCall alloc] initWithHost:kHostAddress
path:kUnaryCallMethod.HTTPPath
@@ -141,7 +141,7 @@ static ProtoMethod *kUnaryCallMethod;
[call startWithWriteable:responsesWriteable];
- [self waitForExpectationsWithTimeout:2. handler:nil];
+ [self waitForExpectationsWithTimeout:4 handler:nil];
}
- (void)testMetadata {
@@ -150,7 +150,7 @@ static ProtoMethod *kUnaryCallMethod;
RMTSimpleRequest *request = [RMTSimpleRequest message];
request.fillUsername = YES;
request.fillOauthScope = YES;
- id<GRXWriter> requestsWriter = [GRXWriter writerWithValue:[request data]];
+ GRXWriter *requestsWriter = [GRXWriter writerWithValue:[request data]];
GRPCCall *call = [[GRPCCall alloc] initWithHost:kHostAddress
path:kUnaryCallMethod.HTTPPath
@@ -173,7 +173,7 @@ static ProtoMethod *kUnaryCallMethod;
[call startWithWriteable:responsesWriteable];
- [self waitForExpectationsWithTimeout:2. handler:nil];
+ [self waitForExpectationsWithTimeout:4 handler:nil];
}
@end
diff --git a/src/objective-c/tests/InteropTests.m b/src/objective-c/tests/InteropTests.m
index 74f6b231cf..501f33317a 100644
--- a/src/objective-c/tests/InteropTests.m
+++ b/src/objective-c/tests/InteropTests.m
@@ -103,7 +103,7 @@
[expectation fulfill];
}];
- [self waitForExpectationsWithTimeout:2 handler:nil];
+ [self waitForExpectationsWithTimeout:4 handler:nil];
}
- (void)testLargeUnaryRPC {
@@ -125,7 +125,7 @@
[expectation fulfill];
}];
- [self waitForExpectationsWithTimeout:4 handler:nil];
+ [self waitForExpectationsWithTimeout:8 handler:nil];
}
- (void)testClientStreamingRPC {
@@ -143,7 +143,7 @@
RMTStreamingInputCallRequest *request4 = [RMTStreamingInputCallRequest message];
request4.payload.body = [NSMutableData dataWithLength:45904];
- id<GRXWriter> writer = [GRXWriter writerWithContainer:@[request1, request2, request3, request4]];
+ GRXWriter *writer = [GRXWriter writerWithContainer:@[request1, request2, request3, request4]];
[_service streamingInputCallWithRequestsWriter:writer
handler:^(RMTStreamingInputCallResponse *response,
@@ -157,7 +157,7 @@
[expectation fulfill];
}];
- [self waitForExpectationsWithTimeout:4 handler:nil];
+ [self waitForExpectationsWithTimeout:8 handler:nil];
}
- (void)testServerStreamingRPC {
@@ -193,7 +193,7 @@
}
}];
- [self waitForExpectationsWithTimeout:4 handler:nil];
+ [self waitForExpectationsWithTimeout:8 handler:nil];
}
- (void)testPingPongRPC {
@@ -236,7 +236,7 @@
[expectation fulfill];
}
}];
- [self waitForExpectationsWithTimeout:2 handler:nil];
+ [self waitForExpectationsWithTimeout:4 handler:nil];
}
- (void)testEmptyStreamRPC {
@@ -282,10 +282,11 @@
[requestsBuffer writeValue:request];
- __block ProtoRPC *call = [_service RPCToFullDuplexCallWithRequestsWriter:requestsBuffer
- eventHandler:^(BOOL done,
- RMTStreamingOutputCallResponse *response,
- NSError *error) {
+ __block ProtoRPC *call =
+ [_service RPCToFullDuplexCallWithRequestsWriter:requestsBuffer
+ eventHandler:^(BOOL done,
+ RMTStreamingOutputCallResponse *response,
+ NSError *error) {
if (receivedResponse) {
XCTAssert(done, @"Unexpected extra response %@", response);
XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);
@@ -299,7 +300,7 @@
}
}];
[call start];
- [self waitForExpectationsWithTimeout:4 handler:nil];
+ [self waitForExpectationsWithTimeout:8 handler:nil];
}
@end
diff --git a/src/objective-c/tests/LocalClearTextTests.m b/src/objective-c/tests/LocalClearTextTests.m
index 10c9f13ea3..4317614dd9 100644
--- a/src/objective-c/tests/LocalClearTextTests.m
+++ b/src/objective-c/tests/LocalClearTextTests.m
@@ -64,7 +64,7 @@ static NSString * const kService = @"RouteGuide";
// interface:kService
// method:@"EmptyCall"];
//
-// id<GRXWriter> requestsWriter = [GRXWriter writerWithValue:[NSData data]];
+// GRXWriter *requestsWriter = [GRXWriter writerWithValue:[NSData data]];
//
// GRPCCall *call = [[GRPCCall alloc] initWithHost:kRouteGuideHost
// method:method
@@ -91,7 +91,7 @@ static NSString * const kService = @"RouteGuide";
service:kService
method:@"RecordRoute"];
- id<GRXWriter> requestsWriter = [GRXWriter emptyWriter];
+ GRXWriter *requestsWriter = [GRXWriter emptyWriter];
GRPCCall *call = [[GRPCCall alloc] initWithHost:kRouteGuideHost
path:method.HTTPPath
@@ -122,7 +122,7 @@ static NSString * const kService = @"RouteGuide";
RGDPoint *point = [RGDPoint message];
point.latitude = 28E7;
point.longitude = -15E7;
- id<GRXWriter> requestsWriter = [GRXWriter writerWithValue:[point data]];
+ GRXWriter *requestsWriter = [GRXWriter writerWithValue:[point data]];
GRPCCall *call = [[GRPCCall alloc] initWithHost:kRouteGuideHost
path:method.HTTPPath
diff --git a/src/php/bin/determine_extension_dir.sh b/src/php/bin/determine_extension_dir.sh
index 3c1fc297fa..b4342ac89f 100755
--- a/src/php/bin/determine_extension_dir.sh
+++ b/src/php/bin/determine_extension_dir.sh
@@ -46,4 +46,6 @@ elif [ ! -e $default_extension_dir/grpc.so ]; then
ln -s $f $module_dir/$(basename $f) &> /dev/null || true
done
extension_dir="-d extension_dir=${module_dir} -d extension=grpc.so"
+else
+ extension_dir="-d extension=grpc.so"
fi