aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-15 07:05:33 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-15 07:05:33 -0700
commit796708dbe23c3a78c0f2a80d7195fa48df9dc79d (patch)
treebe792de0eb11f688a66d21058df7301f89aa86bc
parentea37c9606cfe82674ca0c4d88c4d1a74aa6697ee (diff)
parent67e5da0bf0aa119e4b78951e019f0ead7a5d23dc (diff)
Merge branch 'we-dont-need-no-backup' of github.com:ctiller/grpc into we-dont-need-no-backup
-rw-r--r--BUILD4
-rw-r--r--Makefile2
-rw-r--r--build.json2
-rw-r--r--include/grpc++/byte_buffer.h1
-rw-r--r--include/grpc/byte_buffer.h60
-rw-r--r--include/grpc/byte_buffer_reader.h3
-rw-r--r--include/grpc/compression.h (renamed from src/core/compression/algorithm.h)6
-rw-r--r--include/grpc/grpc.h29
-rw-r--r--include/grpc/grpc_security.h2
-rw-r--r--src/compiler/cpp_generator.cc11
-rw-r--r--src/core/channel/client_channel.c12
-rw-r--r--src/core/channel/client_setup.c20
-rw-r--r--src/core/compression/algorithm.c2
-rw-r--r--src/core/compression/message_compress.h2
-rw-r--r--src/core/iomgr/pollset_posix.c17
-rw-r--r--src/core/iomgr/pollset_posix.h1
-rw-r--r--src/core/surface/byte_buffer.c32
-rw-r--r--src/core/surface/byte_buffer_reader.c56
-rw-r--r--src/core/surface/call.c49
-rw-r--r--src/core/surface/call.h2
-rw-r--r--src/core/surface/channel.c34
-rw-r--r--src/core/surface/channel.h10
-rw-r--r--src/core/surface/server.c2
-rw-r--r--src/core/transport/metadata.c2
-rw-r--r--src/core/transport/stream_op.h8
-rw-r--r--src/cpp/proto/proto_utils.cc4
-rw-r--r--src/cpp/util/byte_buffer.cc2
-rw-r--r--src/csharp/Grpc.Core.Tests/ChannelOptionsTest.cs105
-rw-r--r--src/csharp/Grpc.Core.Tests/ClientServerTest.cs2
-rw-r--r--src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj8
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/ChannelArgsSafeHandleTest.cs75
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/CompletionQueueEventTest.cs52
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs64
-rw-r--r--src/csharp/Grpc.Core.Tests/PInvokeTest.cs8
-rw-r--r--src/csharp/Grpc.Core/Channel.cs64
-rw-r--r--src/csharp/Grpc.Core/ChannelArgs.cs115
-rw-r--r--src/csharp/Grpc.Core/ChannelOptions.cs178
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.csproj10
-rw-r--r--src/csharp/Grpc.Core/GrpcEnvironment.cs31
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs35
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs43
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs9
-rw-r--r--src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs (renamed from src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs)44
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs102
-rw-r--r--src/csharp/Grpc.Core/Internal/ChannelArgsSafeHandle.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs60
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs14
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionRegistry.cs89
-rw-r--r--src/csharp/Grpc.Core/Internal/DebugStats.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/Enums.cs40
-rw-r--r--src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs20
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs31
-rw-r--r--src/csharp/Grpc.Core/Internal/Timespec.cs2
-rw-r--r--src/csharp/Grpc.Core/Server.cs64
-rw-r--r--src/csharp/Grpc.Examples.MathClient/MathClient.cs2
-rw-r--r--src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs2
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClient.cs12
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs9
-rw-r--r--src/csharp/ext/grpc_csharp_ext.c124
-rw-r--r--src/node/ext/byte_buffer.cc2
-rw-r--r--src/node/package.json2
-rw-r--r--src/node/src/common.js4
-rw-r--r--src/node/test/common_test.js90
-rw-r--r--src/node/test/test_messages.proto38
-rw-r--r--src/objective-c/GRPCClient/private/NSData+GRPC.m4
-rw-r--r--src/php/README.md137
-rwxr-xr-xsrc/php/bin/run_gen_code_test.sh4
-rw-r--r--src/php/composer.json7
-rw-r--r--src/php/ext/grpc/byte_buffer.c2
-rw-r--r--src/php/tests/generated_code/AbstractGeneratedCodeTest.php2
-rwxr-xr-xsrc/php/tests/interop/interop_client.php2
-rw-r--r--src/python/README.md6
-rw-r--r--src/python/requirements.txt2
-rw-r--r--src/python/src/README.rst22
-rw-r--r--src/python/src/grpc/_adapter/_c/types/server.c12
-rw-r--r--src/python/src/grpc/_adapter/_c/utility.c19
-rw-r--r--src/python/src/grpc/_adapter/_intermediary_low.py8
-rw-r--r--src/python/src/grpc/_adapter/_intermediary_low_test.py11
-rw-r--r--src/python/src/grpc/_adapter/_low.py7
-rw-r--r--src/python/src/grpc/_adapter/_low_test.py2
-rw-r--r--src/python/src/setup.py4
-rw-r--r--src/ruby/ext/grpc/rb_byte_buffer.c2
-rw-r--r--test/compiler/python_plugin_test.py292
-rw-r--r--test/core/end2end/cq_verifier.c3
-rw-r--r--test/core/end2end/dualstack_socket_test.c2
-rw-r--r--test/core/end2end/tests/cancel_after_accept.c4
-rw-r--r--test/core/end2end/tests/cancel_after_accept_and_writes_closed.c4
-rw-r--r--test/core/end2end/tests/cancel_after_invoke.c2
-rw-r--r--test/core/end2end/tests/cancel_before_invoke.c2
-rw-r--r--test/core/end2end/tests/census_simple_request.c2
-rw-r--r--test/core/end2end/tests/disappearing_server.c2
-rw-r--r--test/core/end2end/tests/graceful_server_shutdown.c2
-rw-r--r--test/core/end2end/tests/invoke_large_request.c6
-rw-r--r--test/core/end2end/tests/max_concurrent_streams.c2
-rw-r--r--test/core/end2end/tests/max_message_length.c2
-rw-r--r--test/core/end2end/tests/ping_pong_streaming.c4
-rw-r--r--test/core/end2end/tests/registered_call.c2
-rw-r--r--test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c4
-rw-r--r--test/core/end2end/tests/request_response_with_metadata_and_payload.c4
-rw-r--r--test/core/end2end/tests/request_response_with_payload.c4
-rw-r--r--test/core/end2end/tests/request_response_with_payload_and_call_creds.c4
-rw-r--r--test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c4
-rw-r--r--test/core/end2end/tests/request_with_large_metadata.c2
-rw-r--r--test/core/end2end/tests/request_with_payload.c2
-rw-r--r--test/core/end2end/tests/server_finishes_request.c2
-rw-r--r--test/core/end2end/tests/simple_delayed_request.c2
-rw-r--r--test/core/end2end/tests/simple_request.c2
-rw-r--r--test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c2
-rw-r--r--test/core/fling/client.c2
-rw-r--r--test/core/surface/byte_buffer_reader_test.c79
-rw-r--r--test/cpp/qps/client_async.cc2
-rw-r--r--test/cpp/qps/qps_driver.cc3
-rw-r--r--test/cpp/qps/qps_test_with_poll.cc90
-rw-r--r--test/cpp/qps/qps_worker.cc4
-rw-r--r--test/cpp/qps/qpstest.proto114
-rw-r--r--test/cpp/qps/server_async.cc55
-rw-r--r--tools/doxygen/Doxyfile.core2
-rw-r--r--tools/doxygen/Doxyfile.core.internal2
-rwxr-xr-xtools/run_tests/build_python.sh2
-rwxr-xr-xtools/run_tests/jobset.py16
-rwxr-xr-xtools/run_tests/run_tests.py27
-rw-r--r--vsprojects/grpc/grpc.vcxproj2
-rw-r--r--vsprojects/grpc/grpc.vcxproj.filters6
-rw-r--r--vsprojects/grpc_unsecure/grpc_unsecure.vcxproj2
-rw-r--r--vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters6
126 files changed, 2023 insertions, 944 deletions
diff --git a/BUILD b/BUILD
index bc8fa55a37..1a9cdb6879 100644
--- a/BUILD
+++ b/BUILD
@@ -154,7 +154,6 @@ cc_library(
"src/core/channel/http_client_filter.h",
"src/core/channel/http_server_filter.h",
"src/core/channel/noop_filter.h",
- "src/core/compression/algorithm.h",
"src/core/compression/message_compress.h",
"src/core/debug/trace.h",
"src/core/iomgr/alarm.h",
@@ -349,6 +348,7 @@ cc_library(
"include/grpc/grpc_security.h",
"include/grpc/byte_buffer.h",
"include/grpc/byte_buffer_reader.h",
+ "include/grpc/compression.h",
"include/grpc/grpc.h",
"include/grpc/status.h",
"include/grpc/census.h",
@@ -377,7 +377,6 @@ cc_library(
"src/core/channel/http_client_filter.h",
"src/core/channel/http_server_filter.h",
"src/core/channel/noop_filter.h",
- "src/core/compression/algorithm.h",
"src/core/compression/message_compress.h",
"src/core/debug/trace.h",
"src/core/iomgr/alarm.h",
@@ -549,6 +548,7 @@ cc_library(
hdrs = [
"include/grpc/byte_buffer.h",
"include/grpc/byte_buffer_reader.h",
+ "include/grpc/compression.h",
"include/grpc/grpc.h",
"include/grpc/status.h",
"include/grpc/census.h",
diff --git a/Makefile b/Makefile
index 13c4eb9f9f..b467f81ef9 100644
--- a/Makefile
+++ b/Makefile
@@ -3046,6 +3046,7 @@ PUBLIC_HEADERS_C += \
include/grpc/grpc_security.h \
include/grpc/byte_buffer.h \
include/grpc/byte_buffer_reader.h \
+ include/grpc/compression.h \
include/grpc/grpc.h \
include/grpc/status.h \
include/grpc/census.h \
@@ -3290,6 +3291,7 @@ LIBGRPC_UNSECURE_SRC = \
PUBLIC_HEADERS_C += \
include/grpc/byte_buffer.h \
include/grpc/byte_buffer_reader.h \
+ include/grpc/compression.h \
include/grpc/grpc.h \
include/grpc/status.h \
include/grpc/census.h \
diff --git a/build.json b/build.json
index 61229e6cee..af982390fb 100644
--- a/build.json
+++ b/build.json
@@ -101,6 +101,7 @@
"public_headers": [
"include/grpc/byte_buffer.h",
"include/grpc/byte_buffer_reader.h",
+ "include/grpc/compression.h",
"include/grpc/grpc.h",
"include/grpc/status.h"
],
@@ -115,7 +116,6 @@
"src/core/channel/http_client_filter.h",
"src/core/channel/http_server_filter.h",
"src/core/channel/noop_filter.h",
- "src/core/compression/algorithm.h",
"src/core/compression/message_compress.h",
"src/core/debug/trace.h",
"src/core/iomgr/alarm.h",
diff --git a/include/grpc++/byte_buffer.h b/include/grpc++/byte_buffer.h
index ceb62622fd..3e40eaed1d 100644
--- a/include/grpc++/byte_buffer.h
+++ b/include/grpc++/byte_buffer.h
@@ -35,6 +35,7 @@
#define GRPCXX_BYTE_BUFFER_H
#include <grpc/grpc.h>
+#include <grpc/byte_buffer.h>
#include <grpc/support/log.h>
#include <grpc++/config.h>
#include <grpc++/slice.h>
diff --git a/include/grpc/byte_buffer.h b/include/grpc/byte_buffer.h
index 06077a02a9..6d08474d8c 100644
--- a/include/grpc/byte_buffer.h
+++ b/include/grpc/byte_buffer.h
@@ -34,22 +34,74 @@
#ifndef GRPC_BYTE_BUFFER_H
#define GRPC_BYTE_BUFFER_H
-#include <grpc/grpc.h>
+#include <grpc/compression.h>
#include <grpc/support/slice_buffer.h>
#ifdef __cplusplus
extern "C" {
#endif
-typedef enum { GRPC_BB_SLICE_BUFFER } grpc_byte_buffer_type;
+typedef enum {
+ GRPC_BB_RAW
+ /* Future types may include GRPC_BB_PROTOBUF, etc. */
+} grpc_byte_buffer_type;
-/* byte buffers are containers for messages passed in from the public api's */
struct grpc_byte_buffer {
grpc_byte_buffer_type type;
union {
- gpr_slice_buffer slice_buffer;
+ struct {
+ grpc_compression_algorithm compression;
+ gpr_slice_buffer slice_buffer;
+ } raw;
} data;
};
+typedef struct grpc_byte_buffer grpc_byte_buffer;
+
+/** Returns a RAW byte buffer instance over the given slices (up to \a nslices).
+ *
+ * Increases the reference count for all \a slices processed. The user is
+ * responsible for invoking grpc_byte_buffer_destroy on the returned instance.*/
+grpc_byte_buffer *grpc_raw_byte_buffer_create(gpr_slice *slices,
+ size_t nslices);
+
+/** Returns a *compressed* RAW byte buffer instance over the given slices (up to
+ * \a nslices). The \a compression argument defines the compression algorithm
+ * used to generate the data in \a slices.
+ *
+ * Increases the reference count for all \a slices processed. The user is
+ * responsible for invoking grpc_byte_buffer_destroy on the returned instance.*/
+grpc_byte_buffer *grpc_raw_compressed_byte_buffer_create(
+ gpr_slice *slices, size_t nslices, grpc_compression_algorithm compression);
+
+/** Copies input byte buffer \a bb.
+ *
+ * Increases the reference count of all the source slices. The user is
+ * responsible for calling grpc_byte_buffer_destroy over the returned copy. */
+grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb);
+
+/** Returns the size of the given byte buffer, in bytes. */
+size_t grpc_byte_buffer_length(grpc_byte_buffer *bb);
+
+/** Destroys \a byte_buffer deallocating all its memory. */
+void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer);
+
+
+/** Reader for byte buffers. Iterates over slices in the byte buffer */
+struct grpc_byte_buffer_reader;
+typedef struct grpc_byte_buffer_reader grpc_byte_buffer_reader;
+
+/** Initialize \a reader to read over \a buffer */
+void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
+ grpc_byte_buffer *buffer);
+
+/** Cleanup and destroy \a reader */
+void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader);
+
+/** Updates \a slice with the next piece of data from from \a reader and returns
+ * 1. Returns 0 at the end of the stream. Caller is responsible for calling
+ * gpr_slice_unref on the result. */
+int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
+ gpr_slice *slice);
#ifdef __cplusplus
}
diff --git a/include/grpc/byte_buffer_reader.h b/include/grpc/byte_buffer_reader.h
index d3da27cf1c..1ef817cf30 100644
--- a/include/grpc/byte_buffer_reader.h
+++ b/include/grpc/byte_buffer_reader.h
@@ -42,7 +42,8 @@ extern "C" {
#endif
struct grpc_byte_buffer_reader {
- grpc_byte_buffer *buffer;
+ grpc_byte_buffer *buffer_in;
+ grpc_byte_buffer *buffer_out;
/* Different current objects correspond to different types of byte buffers */
union {
/* Index into a slice buffer's array of slices */
diff --git a/src/core/compression/algorithm.h b/include/grpc/compression.h
index 9dd9f57b56..630fa1656a 100644
--- a/src/core/compression/algorithm.h
+++ b/include/grpc/compression.h
@@ -31,8 +31,8 @@
*
*/
-#ifndef GRPC_INTERNAL_CORE_COMPRESSION_ALGORITHM_H
-#define GRPC_INTERNAL_CORE_COMPRESSION_ALGORITHM_H
+#ifndef GRPC_COMPRESSION_H
+#define GRPC_COMPRESSION_H
/* The various compression algorithms supported by GRPC */
typedef enum {
@@ -46,4 +46,4 @@ typedef enum {
const char *grpc_compression_algorithm_name(
grpc_compression_algorithm algorithm);
-#endif /* GRPC_INTERNAL_CORE_COMPRESSION_ALGORITHM_H */
+#endif /* GRPC_COMPRESSION_H */
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index 298e2cf257..2caa9680ba 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -37,6 +37,7 @@
#include <grpc/status.h>
#include <stddef.h>
+#include <grpc/byte_buffer.h>
#include <grpc/support/slice.h>
#include <grpc/support/time.h>
@@ -155,34 +156,6 @@ typedef enum grpc_call_error {
(start_write/add_metadata). Illegal on invoke/accept. */
#define GRPC_WRITE_NO_COMPRESS (0x00000002u)
-/* A buffer of bytes */
-struct grpc_byte_buffer;
-typedef struct grpc_byte_buffer grpc_byte_buffer;
-
-/* Sample helpers to obtain byte buffers (these will certainly move
- someplace else) */
-grpc_byte_buffer *grpc_byte_buffer_create(gpr_slice *slices, size_t nslices);
-grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb);
-size_t grpc_byte_buffer_length(grpc_byte_buffer *bb);
-void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer);
-
-/* Reader for byte buffers. Iterates over slices in the byte buffer */
-struct grpc_byte_buffer_reader;
-typedef struct grpc_byte_buffer_reader grpc_byte_buffer_reader;
-
-/** Initialize \a reader to read over \a buffer */
-void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
- grpc_byte_buffer *buffer);
-
-/** Cleanup and destroy \a reader */
-void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader);
-
-/* At the end of the stream, returns 0. Otherwise, returns 1 and sets slice to
- be the returned slice. Caller is responsible for calling gpr_slice_unref on
- the result. */
-int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
- gpr_slice *slice);
-
/* A single metadata element */
typedef struct grpc_metadata {
const char *key;
diff --git a/include/grpc/grpc_security.h b/include/grpc/grpc_security.h
index 82f8f442a3..e104b6952f 100644
--- a/include/grpc/grpc_security.h
+++ b/include/grpc/grpc_security.h
@@ -117,7 +117,7 @@ grpc_credentials *grpc_service_account_credentials_create(
grpc_credentials *grpc_jwt_credentials_create(const char *json_key,
gpr_timespec token_lifetime);
-/* Creates an Oauth2 Refresh Token crednetials object. May return NULL if the
+/* Creates an Oauth2 Refresh Token credentials object. May return NULL if the
input is invalid.
WARNING: Do NOT use this credentials to connect to a non-google service as
this could result in an oauth2 token leak.
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index b0d2b5d229..c00c85bb90 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -849,6 +849,9 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer,
"::grpc::Status $ns$$Service$::Service::$Method$("
"::grpc::ServerContext* context, "
"const $Request$* request, $Response$* response) {\n");
+ printer->Print(" (void) context;\n");
+ printer->Print(" (void) request;\n");
+ printer->Print(" (void) response;\n");
printer->Print(
" return ::grpc::Status("
"::grpc::StatusCode::UNIMPLEMENTED);\n");
@@ -859,6 +862,9 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer,
"::grpc::ServerContext* context, "
"::grpc::ServerReader< $Request$>* reader, "
"$Response$* response) {\n");
+ printer->Print(" (void) context;\n");
+ printer->Print(" (void) reader;\n");
+ printer->Print(" (void) response;\n");
printer->Print(
" return ::grpc::Status("
"::grpc::StatusCode::UNIMPLEMENTED);\n");
@@ -869,6 +875,9 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer,
"::grpc::ServerContext* context, "
"const $Request$* request, "
"::grpc::ServerWriter< $Response$>* writer) {\n");
+ printer->Print(" (void) context;\n");
+ printer->Print(" (void) request;\n");
+ printer->Print(" (void) writer;\n");
printer->Print(
" return ::grpc::Status("
"::grpc::StatusCode::UNIMPLEMENTED);\n");
@@ -879,6 +888,8 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer,
"::grpc::ServerContext* context, "
"::grpc::ServerReaderWriter< $Response$, $Request$>* "
"stream) {\n");
+ printer->Print(" (void) context;\n");
+ printer->Print(" (void) stream;\n");
printer->Print(
" return ::grpc::Status("
"::grpc::StatusCode::UNIMPLEMENTED);\n");
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 31b1fc3bde..726196e996 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -102,10 +102,17 @@ struct call_data {
static int prepare_activate(grpc_call_element *elem,
grpc_child_channel *on_child) {
call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
if (calld->state == CALL_CANCELLED) return 0;
/* no more access to calld->s.waiting allowed */
GPR_ASSERT(calld->state == CALL_WAITING);
+
+ if (calld->s.waiting_op.bind_pollset) {
+ grpc_transport_setup_del_interested_party(chand->transport_setup,
+ calld->s.waiting_op.bind_pollset);
+ }
+
calld->state = CALL_ACTIVE;
/* create a child call */
@@ -199,6 +206,7 @@ static void cc_start_transport_op(grpc_call_element *elem,
handle_op_after_cancellation(elem, op);
} else {
calld->state = CALL_WAITING;
+ calld->s.waiting_op.bind_pollset = NULL;
if (chand->active_child) {
/* channel is connected - use the connected stack */
if (prepare_activate(elem, chand->active_child)) {
@@ -230,14 +238,14 @@ static void cc_start_transport_op(grpc_call_element *elem,
}
calld->s.waiting_op = *op;
chand->waiting_children[chand->waiting_child_count++] = calld;
+ grpc_transport_setup_add_interested_party(chand->transport_setup,
+ op->bind_pollset);
gpr_mu_unlock(&chand->mu);
/* finally initiate transport setup if needed */
if (initiate_transport_setup) {
grpc_transport_setup_initiate(chand->transport_setup);
}
- grpc_transport_setup_add_interested_party(chand->transport_setup,
- op->bind_pollset);
}
}
break;
diff --git a/src/core/channel/client_setup.c b/src/core/channel/client_setup.c
index 8a318eaa86..42ee23d87f 100644
--- a/src/core/channel/client_setup.c
+++ b/src/core/channel/client_setup.c
@@ -56,12 +56,12 @@ struct grpc_client_setup {
gpr_cv cv;
grpc_client_setup_request *active_request;
int refs;
+ grpc_pollset_set interested_parties;
};
struct grpc_client_setup_request {
/* pointer back to the setup object */
grpc_client_setup *setup;
- grpc_pollset_set interested_parties;
gpr_timespec deadline;
};
@@ -71,7 +71,7 @@ gpr_timespec grpc_client_setup_request_deadline(grpc_client_setup_request *r) {
grpc_pollset_set *grpc_client_setup_get_interested_parties(
grpc_client_setup_request *r) {
- return &r->interested_parties;
+ return &r->setup->interested_parties;
}
static void destroy_setup(grpc_client_setup *s) {
@@ -79,11 +79,11 @@ static void destroy_setup(grpc_client_setup *s) {
gpr_cv_destroy(&s->cv);
s->done(s->user_data);
grpc_channel_args_destroy(s->args);
+ grpc_pollset_set_destroy(&s->interested_parties);
gpr_free(s);
}
static void destroy_request(grpc_client_setup_request *r) {
- grpc_pollset_set_destroy(&r->interested_parties);
gpr_free(r);
}
@@ -94,7 +94,6 @@ static void setup_initiate(grpc_transport_setup *sp) {
int in_alarm = 0;
r->setup = s;
- grpc_pollset_set_init(&r->interested_parties);
/* TODO(klempner): Actually set a deadline */
r->deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(60));
@@ -125,12 +124,8 @@ static void setup_add_interested_party(grpc_transport_setup *sp,
grpc_client_setup *s = (grpc_client_setup *)sp;
gpr_mu_lock(&s->mu);
- if (!s->active_request) {
- gpr_mu_unlock(&s->mu);
- return;
- }
- grpc_pollset_set_add_pollset(&s->active_request->interested_parties, pollset);
+ grpc_pollset_set_add_pollset(&s->interested_parties, pollset);
gpr_mu_unlock(&s->mu);
}
@@ -140,12 +135,8 @@ static void setup_del_interested_party(grpc_transport_setup *sp,
grpc_client_setup *s = (grpc_client_setup *)sp;
gpr_mu_lock(&s->mu);
- if (!s->active_request) {
- gpr_mu_unlock(&s->mu);
- return;
- }
- grpc_pollset_set_del_pollset(&s->active_request->interested_parties, pollset);
+ grpc_pollset_set_del_pollset(&s->interested_parties, pollset);
gpr_mu_unlock(&s->mu);
}
@@ -225,6 +216,7 @@ void grpc_client_setup_create_and_attach(
s->in_alarm = 0;
s->in_cb = 0;
s->cancelled = 0;
+ grpc_pollset_set_init(&s->interested_parties);
grpc_client_channel_set_transport_setup(newly_minted_channel, &s->base);
}
diff --git a/src/core/compression/algorithm.c b/src/core/compression/algorithm.c
index ca07002ff9..36ead843d2 100644
--- a/src/core/compression/algorithm.c
+++ b/src/core/compression/algorithm.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/compression/algorithm.h"
+#include <grpc/compression.h>
const char *grpc_compression_algorithm_name(
grpc_compression_algorithm algorithm) {
diff --git a/src/core/compression/message_compress.h b/src/core/compression/message_compress.h
index e8aef1a713..aba701a6ee 100644
--- a/src/core/compression/message_compress.h
+++ b/src/core/compression/message_compress.h
@@ -34,7 +34,7 @@
#ifndef GRPC_INTERNAL_CORE_COMPRESSION_MESSAGE_COMPRESS_H
#define GRPC_INTERNAL_CORE_COMPRESSION_MESSAGE_COMPRESS_H
-#include "src/core/compression/algorithm.h"
+#include <grpc/compression.h>
#include <grpc/support/slice_buffer.h>
/* compress 'input' to 'output' using 'algorithm'.
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index c701abb91d..db704b9df1 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -99,6 +99,7 @@ void grpc_pollset_init(grpc_pollset *pollset) {
grpc_pollset_kick_init(&pollset->kick_state);
pollset->in_flight_cbs = 0;
pollset->shutting_down = 0;
+ pollset->called_shutdown = 0;
become_basic_pollset(pollset, NULL);
}
@@ -141,7 +142,8 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
if (pollset->shutting_down) {
if (pollset->counter > 0) {
grpc_pollset_kick(pollset);
- } else if (pollset->in_flight_cbs == 0) {
+ } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
+ pollset->called_shutdown = 1;
gpr_mu_unlock(&pollset->mu);
finish_shutdown(pollset);
/* Continuing to access pollset here is safe -- it is the caller's
@@ -157,21 +159,22 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
void grpc_pollset_shutdown(grpc_pollset *pollset,
void (*shutdown_done)(void *arg),
void *shutdown_done_arg) {
- int in_flight_cbs;
- int counter;
+ int call_shutdown = 0;
gpr_mu_lock(&pollset->mu);
GPR_ASSERT(!pollset->shutting_down);
pollset->shutting_down = 1;
- in_flight_cbs = pollset->in_flight_cbs;
- counter = pollset->counter;
+ if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 && pollset->counter == 0) {
+ pollset->called_shutdown = 1;
+ call_shutdown = 1;
+ }
pollset->shutdown_done_cb = shutdown_done;
pollset->shutdown_done_arg = shutdown_done_arg;
- if (counter > 0) {
+ if (pollset->counter > 0) {
grpc_pollset_kick(pollset);
}
gpr_mu_unlock(&pollset->mu);
- if (in_flight_cbs == 0 && counter == 0) {
+ if (call_shutdown) {
finish_shutdown(pollset);
}
}
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index 2b897caa4b..92c258e0cd 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -56,6 +56,7 @@ typedef struct grpc_pollset {
int counter;
int in_flight_cbs;
int shutting_down;
+ int called_shutdown;
void (*shutdown_done_cb)(void *arg);
void *shutdown_done_arg;
union {
diff --git a/src/core/surface/byte_buffer.c b/src/core/surface/byte_buffer.c
index 12244f6644..4817e00454 100644
--- a/src/core/surface/byte_buffer.c
+++ b/src/core/surface/byte_buffer.c
@@ -35,25 +35,31 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-grpc_byte_buffer *grpc_byte_buffer_create(gpr_slice *slices, size_t nslices) {
+grpc_byte_buffer *grpc_raw_byte_buffer_create(gpr_slice *slices,
+ size_t nslices) {
+ return grpc_raw_compressed_byte_buffer_create(slices, nslices,
+ GRPC_COMPRESS_NONE);
+}
+
+grpc_byte_buffer *grpc_raw_compressed_byte_buffer_create(
+ gpr_slice *slices, size_t nslices, grpc_compression_algorithm compression) {
size_t i;
grpc_byte_buffer *bb = malloc(sizeof(grpc_byte_buffer));
-
- bb->type = GRPC_BB_SLICE_BUFFER;
- gpr_slice_buffer_init(&bb->data.slice_buffer);
+ bb->type = GRPC_BB_RAW;
+ bb->data.raw.compression = compression;
+ gpr_slice_buffer_init(&bb->data.raw.slice_buffer);
for (i = 0; i < nslices; i++) {
gpr_slice_ref(slices[i]);
- gpr_slice_buffer_add(&bb->data.slice_buffer, slices[i]);
+ gpr_slice_buffer_add(&bb->data.raw.slice_buffer, slices[i]);
}
-
return bb;
}
grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
switch (bb->type) {
- case GRPC_BB_SLICE_BUFFER:
- return grpc_byte_buffer_create(bb->data.slice_buffer.slices,
- bb->data.slice_buffer.count);
+ case GRPC_BB_RAW:
+ return grpc_raw_byte_buffer_create(bb->data.raw.slice_buffer.slices,
+ bb->data.raw.slice_buffer.count);
}
gpr_log(GPR_INFO, "should never get here");
abort();
@@ -63,8 +69,8 @@ grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) {
if (!bb) return;
switch (bb->type) {
- case GRPC_BB_SLICE_BUFFER:
- gpr_slice_buffer_destroy(&bb->data.slice_buffer);
+ case GRPC_BB_RAW:
+ gpr_slice_buffer_destroy(&bb->data.raw.slice_buffer);
break;
}
free(bb);
@@ -72,8 +78,8 @@ void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) {
size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) {
switch (bb->type) {
- case GRPC_BB_SLICE_BUFFER:
- return bb->data.slice_buffer.length;
+ case GRPC_BB_RAW:
+ return bb->data.raw.slice_buffer.length;
}
gpr_log(GPR_ERROR, "should never reach here");
abort();
diff --git a/src/core/surface/byte_buffer_reader.c b/src/core/surface/byte_buffer_reader.c
index 41ad700274..86829a686f 100644
--- a/src/core/surface/byte_buffer_reader.c
+++ b/src/core/surface/byte_buffer_reader.c
@@ -33,41 +33,73 @@
#include <grpc/byte_buffer_reader.h>
+#include <grpc/compression.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/slice_buffer.h>
#include <grpc/byte_buffer.h>
+#include "src/core/compression/message_compress.h"
+
+static int is_compressed(grpc_byte_buffer *buffer) {
+ switch (buffer->type) {
+ case GRPC_BB_RAW:
+ if (buffer->data.raw.compression == GRPC_COMPRESS_NONE) {
+ return 0 /* GPR_FALSE */;
+ }
+ break;
+ }
+ return 1 /* GPR_TRUE */;
+}
+
void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
grpc_byte_buffer *buffer) {
- reader->buffer = buffer;
- switch (buffer->type) {
- case GRPC_BB_SLICE_BUFFER:
+ gpr_slice_buffer decompressed_slices_buffer;
+ reader->buffer_in = buffer;
+ switch (reader->buffer_in->type) {
+ case GRPC_BB_RAW:
+ gpr_slice_buffer_init(&decompressed_slices_buffer);
+ if (is_compressed(reader->buffer_in)) {
+ grpc_msg_decompress(reader->buffer_in->data.raw.compression,
+ &reader->buffer_in->data.raw.slice_buffer,
+ &decompressed_slices_buffer);
+ reader->buffer_out = grpc_raw_byte_buffer_create(
+ decompressed_slices_buffer.slices,
+ decompressed_slices_buffer.count);
+ gpr_slice_buffer_destroy(&decompressed_slices_buffer);
+ } else { /* not compressed, use the input buffer as output */
+ reader->buffer_out = reader->buffer_in;
+ }
reader->current.index = 0;
+ break;
}
}
void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) {
- /* no-op: the user is responsible for memory deallocation.
- * Other cleanup operations would go here if needed. */
+ switch (reader->buffer_in->type) {
+ case GRPC_BB_RAW:
+ /* keeping the same if-else structure as in the init function */
+ if (is_compressed(reader->buffer_in)) {
+ grpc_byte_buffer_destroy(reader->buffer_out);
+ }
+ break;
+ }
}
int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
gpr_slice *slice) {
- grpc_byte_buffer *buffer = reader->buffer;
- gpr_slice_buffer *slice_buffer;
- switch (buffer->type) {
- case GRPC_BB_SLICE_BUFFER:
- slice_buffer = &buffer->data.slice_buffer;
+ switch (reader->buffer_in->type) {
+ case GRPC_BB_RAW: {
+ gpr_slice_buffer *slice_buffer;
+ slice_buffer = &reader->buffer_out->data.raw.slice_buffer;
if (reader->current.index < slice_buffer->count) {
*slice = gpr_slice_ref(slice_buffer->slices[reader->current.index]);
reader->current.index += 1;
return 1;
- } else {
- return 0;
}
break;
+ }
}
return 0;
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 84a77c45c4..6cffd92e6b 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -99,6 +99,8 @@ typedef enum {
/* Status came from 'the wire' - or somewhere below the surface
layer */
STATUS_FROM_WIRE,
+ /* Status came from the server sending status */
+ STATUS_FROM_SERVER_STATUS,
STATUS_SOURCE_COUNT
} status_source;
@@ -578,10 +580,18 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
call->write_state = WRITE_STATE_WRITE_CLOSED;
}
break;
+ case GRPC_IOREQ_SEND_STATUS:
+ if (call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details !=
+ NULL) {
+ grpc_mdstr_unref(
+ call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details);
+ call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details =
+ NULL;
+ }
+ break;
case GRPC_IOREQ_RECV_CLOSE:
case GRPC_IOREQ_SEND_INITIAL_METADATA:
case GRPC_IOREQ_SEND_TRAILING_METADATA:
- case GRPC_IOREQ_SEND_STATUS:
case GRPC_IOREQ_SEND_CLOSE:
break;
case GRPC_IOREQ_RECV_STATUS:
@@ -665,7 +675,7 @@ static void call_on_done_send(void *pc, int success) {
static void finish_message(grpc_call *call) {
/* TODO(ctiller): this could be a lot faster if coded directly */
- grpc_byte_buffer *byte_buffer = grpc_byte_buffer_create(
+ grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create(
call->incoming_message.slices, call->incoming_message.count);
gpr_slice_buffer_reset_and_unref(&call->incoming_message);
@@ -788,7 +798,7 @@ static void call_on_done_recv(void *pc, int success) {
unlock(call);
GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0);
- GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
+ GRPC_TIMER_END(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
}
static int prepare_application_metadata(grpc_call *call, size_t count,
@@ -835,9 +845,9 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
size_t i;
switch (byte_buffer->type) {
- case GRPC_BB_SLICE_BUFFER:
- for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) {
- gpr_slice slice = byte_buffer->data.slice_buffer.slices[i];
+ case GRPC_BB_RAW:
+ for (i = 0; i < byte_buffer->data.raw.slice_buffer.count; i++) {
+ gpr_slice slice = byte_buffer->data.raw.slice_buffer.slices[i];
gpr_slice_ref(slice);
grpc_sopb_add_slice(sopb, slice);
}
@@ -849,7 +859,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
grpc_ioreq_data data;
grpc_metadata_batch mdb;
size_t i;
- char status_str[GPR_LTOA_MIN_BUFSIZE];
GPR_ASSERT(op->send_ops == NULL);
switch (call->write_state) {
@@ -893,13 +902,10 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
/* send status */
/* TODO(ctiller): cache common status values */
data = call->request_data[GRPC_IOREQ_SEND_STATUS];
- gpr_ltoa(data.send_status.code, status_str);
grpc_metadata_batch_add_tail(
&mdb, &call->status_link,
- grpc_mdelem_from_metadata_strings(
- call->metadata_context,
- grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
- grpc_mdstr_from_string(call->metadata_context, status_str)));
+ grpc_channel_get_reffed_status_elem(call->channel,
+ data.send_status.code));
if (data.send_status.details) {
grpc_metadata_batch_add_tail(
&mdb, &call->details_link,
@@ -907,8 +913,9 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
call->metadata_context,
grpc_mdstr_ref(
grpc_channel_get_message_string(call->channel)),
- grpc_mdstr_from_string(call->metadata_context,
- data.send_status.details)));
+ data.send_status.details));
+ call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details =
+ NULL;
}
grpc_sopb_add_metadata(&call->send_ops, mdb);
}
@@ -1008,6 +1015,14 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
GRPC_CALL_ERROR_INVALID_METADATA);
}
}
+ if (op == GRPC_IOREQ_SEND_STATUS) {
+ set_status_code(call, STATUS_FROM_SERVER_STATUS,
+ reqs[i].data.send_status.code);
+ if (reqs[i].data.send_status.details) {
+ set_status_details(call, STATUS_FROM_SERVER_STATUS,
+ grpc_mdstr_ref(reqs[i].data.send_status.details));
+ }
+ }
have_ops |= 1u << op;
call->request_data[op] = data;
@@ -1281,7 +1296,11 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req->op = GRPC_IOREQ_SEND_STATUS;
req->data.send_status.code = op->data.send_status_from_server.status;
req->data.send_status.details =
- op->data.send_status_from_server.status_details;
+ op->data.send_status_from_server.status_details != NULL
+ ? grpc_mdstr_from_string(
+ call->metadata_context,
+ op->data.send_status_from_server.status_details)
+ : NULL;
req = &reqs[out++];
req->op = GRPC_IOREQ_SEND_CLOSE;
break;
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 17db8c2cdc..7ea6cd7fa9 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -72,7 +72,7 @@ typedef union {
grpc_byte_buffer *send_message;
struct {
grpc_status_code code;
- const char *details;
+ grpc_mdstr *details;
} send_status;
} grpc_ioreq_data;
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index b97f05ad8a..9ecdd374a9 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -37,12 +37,20 @@
#include <string.h>
#include "src/core/iomgr/iomgr.h"
+#include "src/core/support/string.h"
#include "src/core/surface/call.h"
#include "src/core/surface/client.h"
#include "src/core/surface/init.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+/** Cache grpc-status: X mdelems for X = 0..NUM_CACHED_STATUS_ELEMS.
+ * Avoids needing to take a metadata context lock for sending status
+ * if the status code is <= NUM_CACHED_STATUS_ELEMS.
+ * Sized to allow the most commonly used codes to fit in
+ * (OK, Cancelled, Unknown). */
+#define NUM_CACHED_STATUS_ELEMS 3
+
typedef struct registered_call {
grpc_mdelem *path;
grpc_mdelem *authority;
@@ -54,10 +62,13 @@ struct grpc_channel {
gpr_refcount refs;
gpr_uint32 max_message_length;
grpc_mdctx *metadata_context;
+ /** mdstr for the grpc-status key */
grpc_mdstr *grpc_status_string;
grpc_mdstr *grpc_message_string;
grpc_mdstr *path_string;
grpc_mdstr *authority_string;
+ /** mdelem for grpc-status: 0 thru grpc-status: 2 */
+ grpc_mdelem *grpc_status_elem[NUM_CACHED_STATUS_ELEMS];
gpr_mu registered_call_mu;
registered_call *registered_calls;
@@ -88,6 +99,13 @@ grpc_channel *grpc_channel_create_from_filters(
channel->metadata_context = mdctx;
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status");
channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message");
+ for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
+ char buf[GPR_LTOA_MIN_BUFSIZE];
+ gpr_ltoa(i, buf);
+ channel->grpc_status_elem[i] = grpc_mdelem_from_metadata_strings(
+ mdctx, grpc_mdstr_ref(channel->grpc_status_string),
+ grpc_mdstr_from_string(mdctx, buf));
+ }
channel->path_string = grpc_mdstr_from_string(mdctx, ":path");
channel->authority_string = grpc_mdstr_from_string(mdctx, ":authority");
grpc_channel_stack_init(filters, num_filters, args, channel->metadata_context,
@@ -181,7 +199,11 @@ void grpc_channel_internal_ref(grpc_channel *c) {
static void destroy_channel(void *p, int ok) {
grpc_channel *channel = p;
+ size_t i;
grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel));
+ for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
+ grpc_mdelem_unref(channel->grpc_status_elem[i]);
+ }
grpc_mdstr_unref(channel->grpc_status_string);
grpc_mdstr_unref(channel->grpc_message_string);
grpc_mdstr_unref(channel->path_string);
@@ -247,6 +269,18 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) {
return channel->grpc_status_string;
}
+grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) {
+ if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) {
+ return grpc_mdelem_ref(channel->grpc_status_elem[i]);
+ } else {
+ char tmp[GPR_LTOA_MIN_BUFSIZE];
+ gpr_ltoa(i, tmp);
+ return grpc_mdelem_from_metadata_strings(
+ channel->metadata_context, grpc_mdstr_ref(channel->grpc_status_string),
+ grpc_mdstr_from_string(channel->metadata_context, tmp));
+ }
+}
+
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) {
return channel->grpc_message_string;
}
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index d1f62f2598..ba3c0abac9 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -40,8 +40,18 @@ grpc_channel *grpc_channel_create_from_filters(
const grpc_channel_filter **filters, size_t count,
const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client);
+/** Get a (borrowed) pointer to this channels underlying channel stack */
grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel);
+
+/** Get a (borrowed) pointer to the channel wide metadata context */
grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel);
+
+/** Get a grpc_mdelem of grpc-status: X where X is the numeric value of
+ status_code.
+
+ The returned elem is owned by the caller. */
+grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel,
+ int status_code);
grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 525fe2e103..3671efe0d0 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -1010,7 +1010,7 @@ void grpc_server_destroy(grpc_server *server) {
listener *l;
gpr_mu_lock(&server->mu);
- GPR_ASSERT(server->shutdown);
+ GPR_ASSERT(server->shutdown || !server->listeners);
GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
while (server->listeners) {
diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c
index c80d67823f..e75b449e12 100644
--- a/src/core/transport/metadata.c
+++ b/src/core/transport/metadata.c
@@ -120,7 +120,7 @@ static void unlock(grpc_mdctx *ctx) {
if (ctx->refs == 0) {
/* uncomment if you're having trouble diagnosing an mdelem leak to make
things clearer (slows down destruction a lot, however) */
- gc_mdtab(ctx);
+ /* gc_mdtab(ctx); */
if (ctx->mdtab_count && ctx->mdtab_count == ctx->mdtab_free) {
discard_metadata(ctx);
}
diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h
index 95497a3cc8..5215cc87b1 100644
--- a/src/core/transport/stream_op.h
+++ b/src/core/transport/stream_op.h
@@ -58,7 +58,7 @@ typedef enum grpc_stream_op_code {
GRPC_OP_SLICE
} grpc_stream_op_code;
-/* Arguments for GRPC_OP_BEGIN */
+/* Arguments for GRPC_OP_BEGIN_MESSAGE */
typedef struct grpc_begin_message {
/* How many bytes of data will this message contain */
gpr_uint32 length;
@@ -126,10 +126,8 @@ typedef struct grpc_stream_op {
} data;
} grpc_stream_op;
-/* A stream op buffer is a wrapper around stream operations that is dynamically
- extendable.
- TODO(ctiller): inline a few elements into the struct, to avoid common case
- per-call allocations. */
+/** A stream op buffer is a wrapper around stream operations that is
+ * dynamically extendable. */
typedef struct grpc_stream_op_buffer {
grpc_stream_op *ops;
size_t nops;
diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc
index 7a7e73bba4..f4cf5cf17a 100644
--- a/src/cpp/proto/proto_utils.cc
+++ b/src/cpp/proto/proto_utils.cc
@@ -49,8 +49,8 @@ class GrpcBufferWriter GRPC_FINAL
explicit GrpcBufferWriter(grpc_byte_buffer** bp,
int block_size = kMaxBufferLength)
: block_size_(block_size), byte_count_(0), have_backup_(false) {
- *bp = grpc_byte_buffer_create(NULL, 0);
- slice_buffer_ = &(*bp)->data.slice_buffer;
+ *bp = grpc_raw_byte_buffer_create(NULL, 0);
+ slice_buffer_ = &(*bp)->data.raw.slice_buffer;
}
~GrpcBufferWriter() GRPC_OVERRIDE {
diff --git a/src/cpp/util/byte_buffer.cc b/src/cpp/util/byte_buffer.cc
index 45eaa2fe5b..a78e4226d2 100644
--- a/src/cpp/util/byte_buffer.cc
+++ b/src/cpp/util/byte_buffer.cc
@@ -42,7 +42,7 @@ ByteBuffer::ByteBuffer(Slice* slices, size_t nslices) {
for (size_t i = 0; i < nslices; i++) {
c_slices[i] = slices[i].slice_;
}
- buffer_ = grpc_byte_buffer_create(c_slices.data(), nslices);
+ buffer_ = grpc_raw_byte_buffer_create(c_slices.data(), nslices);
}
void ByteBuffer::Clear() {
diff --git a/src/csharp/Grpc.Core.Tests/ChannelOptionsTest.cs b/src/csharp/Grpc.Core.Tests/ChannelOptionsTest.cs
new file mode 100644
index 0000000000..df09857efe
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/ChannelOptionsTest.cs
@@ -0,0 +1,105 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Internal.Tests
+{
+ public class ChannelOptionsTest
+ {
+ [Test]
+ public void IntOption()
+ {
+ var option = new ChannelOption("somename", 1);
+
+ Assert.AreEqual(ChannelOption.OptionType.Integer, option.Type);
+ Assert.AreEqual("somename", option.Name);
+ Assert.AreEqual(1, option.IntValue);
+ Assert.Throws(typeof(InvalidOperationException), () => { var s = option.StringValue; });
+ }
+
+ [Test]
+ public void StringOption()
+ {
+ var option = new ChannelOption("somename", "ABCDEF");
+
+ Assert.AreEqual(ChannelOption.OptionType.String, option.Type);
+ Assert.AreEqual("somename", option.Name);
+ Assert.AreEqual("ABCDEF", option.StringValue);
+ Assert.Throws(typeof(InvalidOperationException), () => { var s = option.IntValue; });
+ }
+
+ [Test]
+ public void ConstructorPreconditions()
+ {
+ Assert.Throws(typeof(NullReferenceException), () => { new ChannelOption(null, "abc"); });
+ Assert.Throws(typeof(NullReferenceException), () => { new ChannelOption(null, 1); });
+ Assert.Throws(typeof(NullReferenceException), () => { new ChannelOption("abc", null); });
+ }
+
+ [Test]
+ public void CreateChannelArgsNull()
+ {
+ var channelArgs = ChannelOptions.CreateChannelArgs(null);
+ Assert.IsTrue(channelArgs.IsInvalid);
+ }
+
+ [Test]
+ public void CreateChannelArgsEmpty()
+ {
+ var options = new List<ChannelOption>();
+ var channelArgs = ChannelOptions.CreateChannelArgs(options);
+ channelArgs.Dispose();
+ }
+
+ [Test]
+ public void CreateChannelArgs()
+ {
+ var options = new List<ChannelOption>
+ {
+ new ChannelOption("ABC", "XYZ"),
+ new ChannelOption("somename", "IJKLM"),
+ new ChannelOption("intoption", 12345),
+ new ChannelOption("GHIJK", 12345),
+ };
+
+ var channelArgs = ChannelOptions.CreateChannelArgs(options);
+ channelArgs.Dispose();
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
index b69b933aba..82ded5cc7a 100644
--- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
@@ -86,7 +86,7 @@ namespace Grpc.Core.Tests
server.AddServiceDefinition(ServiceDefinition);
int port = server.AddListeningPort(Host, Server.PickUnusedPort);
server.Start();
- channel = new Channel(Host + ":" + port);
+ channel = new Channel(Host, port);
}
[TearDown]
diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
index 62cb443272..92e28b7d74 100644
--- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
+++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
@@ -3,8 +3,6 @@
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
- <ProductVersion>10.0.0</ProductVersion>
- <SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{86EC5CB4-4EA2-40A2-8057-86542A0353BB}</ProjectGuid>
<OutputType>Library</OutputType>
<RootNamespace>Grpc.Core.Tests</RootNamespace>
@@ -46,6 +44,10 @@
<Compile Include="TimespecTest.cs" />
<Compile Include="PInvokeTest.cs" />
<Compile Include="Internal\MetadataArraySafeHandleTest.cs" />
+ <Compile Include="Internal\CompletionQueueSafeHandleTest.cs" />
+ <Compile Include="Internal\CompletionQueueEventTest.cs" />
+ <Compile Include="Internal\ChannelArgsSafeHandleTest.cs" />
+ <Compile Include="ChannelOptionsTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
@@ -61,4 +63,4 @@
<Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
</ItemGroup>
<ItemGroup />
-</Project> \ No newline at end of file
+</Project>
diff --git a/src/csharp/Grpc.Core.Tests/Internal/ChannelArgsSafeHandleTest.cs b/src/csharp/Grpc.Core.Tests/Internal/ChannelArgsSafeHandleTest.cs
new file mode 100644
index 0000000000..af0aaa5f01
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/Internal/ChannelArgsSafeHandleTest.cs
@@ -0,0 +1,75 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Internal.Tests
+{
+ public class ChannelArgsSafeHandleTest
+ {
+ [Test]
+ public void CreateEmptyAndDestroy()
+ {
+ var channelArgs = ChannelArgsSafeHandle.Create(0);
+ channelArgs.Dispose();
+ }
+
+ [Test]
+ public void CreateNonEmptyAndDestroy()
+ {
+ var channelArgs = ChannelArgsSafeHandle.Create(5);
+ channelArgs.Dispose();
+ }
+
+ [Test]
+ public void CreateNullAndDestroy()
+ {
+ var channelArgs = ChannelArgsSafeHandle.CreateNull();
+ channelArgs.Dispose();
+ }
+
+ [Test]
+ public void CreateFillAndDestroy()
+ {
+ var channelArgs = ChannelArgsSafeHandle.Create(3);
+ channelArgs.SetInteger(0, "somekey", 12345);
+ channelArgs.SetString(1, "somekey", "abcdefghijkl");
+ channelArgs.SetString(2, "somekey", "XYZ");
+ channelArgs.Dispose();
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueEventTest.cs b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueEventTest.cs
new file mode 100644
index 0000000000..188c6406a2
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueEventTest.cs
@@ -0,0 +1,52 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Runtime.InteropServices;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Internal.Tests
+{
+ public class CompletionQueueEventTest
+ {
+ [Test]
+ public void CreateAndDestroy()
+ {
+ Assert.AreEqual(CompletionQueueEvent.NativeSize, Marshal.SizeOf(typeof(CompletionQueueEvent)));
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs
new file mode 100644
index 0000000000..a2ee183272
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs
@@ -0,0 +1,64 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Internal.Tests
+{
+ public class CompletionQueueSafeHandleTest
+ {
+ [Test]
+ public void CreateAndDestroy()
+ {
+ var cq = CompletionQueueSafeHandle.Create();
+ cq.Dispose();
+ }
+
+ [Test]
+ public void CreateAndShutdown()
+ {
+ var cq = CompletionQueueSafeHandle.Create();
+ cq.Shutdown();
+ var ev = cq.Next();
+ cq.Dispose();
+ Assert.AreEqual(GRPCCompletionType.Shutdown, ev.type);
+ Assert.AreNotEqual(IntPtr.Zero, ev.success);
+ Assert.AreEqual(IntPtr.Zero, ev.tag);
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
index 26f87660df..8b3c910251 100644
--- a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
+++ b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
@@ -48,7 +48,7 @@ namespace Grpc.Core.Tests
int counter;
[DllImport("grpc_csharp_ext.dll")]
- static extern GRPCCallError grpcsharp_test_callback([MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
+ static extern GRPCCallError grpcsharp_test_callback([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_test_nop(IntPtr ptr);
@@ -88,7 +88,7 @@ namespace Grpc.Core.Tests
[Test]
public void NativeCallbackBenchmark()
{
- CompletionCallbackDelegate handler = Handler;
+ OpCompletionDelegate handler = Handler;
counter = 0;
BenchmarkUtil.RunBenchmark(
@@ -114,7 +114,7 @@ namespace Grpc.Core.Tests
10000, 10000,
() =>
{
- grpcsharp_test_callback(new CompletionCallbackDelegate(Handler));
+ grpcsharp_test_callback(new OpCompletionDelegate(Handler));
});
Assert.AreNotEqual(0, counter);
}
@@ -134,7 +134,7 @@ namespace Grpc.Core.Tests
});
}
- private void Handler(bool success, IntPtr ptr)
+ private void Handler(bool success)
{
counter++;
}
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs
index b47d810672..d6bfbb7bc4 100644
--- a/src/csharp/Grpc.Core/Channel.cs
+++ b/src/csharp/Grpc.Core/Channel.cs
@@ -29,6 +29,7 @@
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
+using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
@@ -45,33 +46,41 @@ namespace Grpc.Core
readonly string target;
/// <summary>
- /// Creates a channel.
+ /// Creates a channel that connects to a specific host.
+ /// Port will default to 80 for an unsecure channel and to 443 a secure channel.
/// </summary>
- public Channel(string target, Credentials credentials = null, ChannelArgs channelArgs = null)
+ /// <param name="host">The DNS name of IP address of the host.</param>
+ /// <param name="credentials">Optional credentials to create a secure channel.</param>
+ /// <param name="options">Channel options.</param>
+ public Channel(string host, Credentials credentials = null, IEnumerable<ChannelOption> options = null)
{
- using (ChannelArgsSafeHandle nativeChannelArgs = CreateNativeChannelArgs(channelArgs))
+ using (ChannelArgsSafeHandle nativeChannelArgs = ChannelOptions.CreateChannelArgs(options))
{
if (credentials != null)
{
using (CredentialsSafeHandle nativeCredentials = credentials.ToNativeCredentials())
{
- this.handle = ChannelSafeHandle.CreateSecure(nativeCredentials, target, nativeChannelArgs);
+ this.handle = ChannelSafeHandle.CreateSecure(nativeCredentials, host, nativeChannelArgs);
}
}
else
{
- this.handle = ChannelSafeHandle.Create(target, nativeChannelArgs);
+ this.handle = ChannelSafeHandle.Create(host, nativeChannelArgs);
}
}
- this.target = GetOverridenTarget(target, channelArgs);
+ this.target = GetOverridenTarget(host, options);
}
- public string Target
+ /// <summary>
+ /// Creates a channel that connects to a specific host and port.
+ /// </summary>
+ /// <param name="host">DNS name or IP address</param>
+ /// <param name="port">the port</param>
+ /// <param name="credentials">Optional credentials to create a secure channel.</param>
+ /// <param name="options">Channel options.</param>
+ public Channel(string host, int port, Credentials credentials = null, IEnumerable<ChannelOption> options = null) :
+ this(string.Format("{0}:{1}", host, port), credentials, options)
{
- get
- {
- return this.target;
- }
}
public void Dispose()
@@ -80,6 +89,14 @@ namespace Grpc.Core
GC.SuppressFinalize(this);
}
+ internal string Target
+ {
+ get
+ {
+ return target;
+ }
+ }
+
internal ChannelSafeHandle Handle
{
get
@@ -96,22 +113,25 @@ namespace Grpc.Core
}
}
- private static string GetOverridenTarget(string target, ChannelArgs args)
+ /// <summary>
+ /// Look for SslTargetNameOverride option and return its value instead of originalTarget
+ /// if found.
+ /// </summary>
+ private static string GetOverridenTarget(string originalTarget, IEnumerable<ChannelOption> options)
{
- if (args != null && !string.IsNullOrEmpty(args.GetSslTargetNameOverride()))
+ if (options == null)
{
- return args.GetSslTargetNameOverride();
+ return originalTarget;
}
- return target;
- }
-
- private static ChannelArgsSafeHandle CreateNativeChannelArgs(ChannelArgs args)
- {
- if (args == null)
+ foreach (var option in options)
{
- return ChannelArgsSafeHandle.CreateNull();
+ if (option.Type == ChannelOption.OptionType.String
+ && option.Name == ChannelOptions.SslTargetNameOverride)
+ {
+ return option.StringValue;
+ }
}
- return args.ToNativeChannelArgs();
+ return originalTarget;
}
}
}
diff --git a/src/csharp/Grpc.Core/ChannelArgs.cs b/src/csharp/Grpc.Core/ChannelArgs.cs
deleted file mode 100644
index 74ab310e44..0000000000
--- a/src/csharp/Grpc.Core/ChannelArgs.cs
+++ /dev/null
@@ -1,115 +0,0 @@
-#region Copyright notice and license
-// Copyright 2015, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-#endregion
-using System;
-using System.Collections.Generic;
-using System.Collections.Immutable;
-using System.Runtime.InteropServices;
-using System.Threading;
-using System.Threading.Tasks;
-using Grpc.Core.Internal;
-
-namespace Grpc.Core
-{
- /// <summary>
- /// gRPC channel options.
- /// </summary>
- public class ChannelArgs
- {
- public const string SslTargetNameOverrideKey = "grpc.ssl_target_name_override";
-
- readonly ImmutableDictionary<string, string> stringArgs;
-
- private ChannelArgs(ImmutableDictionary<string, string> stringArgs)
- {
- this.stringArgs = stringArgs;
- }
-
- public string GetSslTargetNameOverride()
- {
- string result;
- if (stringArgs.TryGetValue(SslTargetNameOverrideKey, out result))
- {
- return result;
- }
- return null;
- }
-
- public static Builder CreateBuilder()
- {
- return new Builder();
- }
-
- public class Builder
- {
- readonly Dictionary<string, string> stringArgs = new Dictionary<string, string>();
-
- // TODO: AddInteger not supported yet.
- public Builder AddString(string key, string value)
- {
- stringArgs.Add(key, value);
- return this;
- }
-
- public ChannelArgs Build()
- {
- return new ChannelArgs(stringArgs.ToImmutableDictionary());
- }
- }
-
- /// <summary>
- /// Creates native object for the channel arguments.
- /// </summary>
- /// <returns>The native channel arguments.</returns>
- internal ChannelArgsSafeHandle ToNativeChannelArgs()
- {
- ChannelArgsSafeHandle nativeArgs = null;
- try
- {
- nativeArgs = ChannelArgsSafeHandle.Create(stringArgs.Count);
- int i = 0;
- foreach (var entry in stringArgs)
- {
- nativeArgs.SetString(i, entry.Key, entry.Value);
- i++;
- }
- return nativeArgs;
- }
- catch (Exception)
- {
- if (nativeArgs != null)
- {
- nativeArgs.Dispose();
- }
- throw;
- }
- }
- }
-}
diff --git a/src/csharp/Grpc.Core/ChannelOptions.cs b/src/csharp/Grpc.Core/ChannelOptions.cs
new file mode 100644
index 0000000000..bc23bb59b1
--- /dev/null
+++ b/src/csharp/Grpc.Core/ChannelOptions.cs
@@ -0,0 +1,178 @@
+#region Copyright notice and license
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#endregion
+using System;
+using System.Collections.Generic;
+using System.Collections.Immutable;
+using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core
+{
+ /// <summary>
+ /// Channel option specified when creating a channel.
+ /// Corresponds to grpc_channel_args from grpc/grpc.h.
+ /// </summary>
+ public sealed class ChannelOption
+ {
+ public enum OptionType
+ {
+ Integer,
+ String
+ }
+
+ private readonly OptionType type;
+ private readonly string name;
+ private readonly int intValue;
+ private readonly string stringValue;
+
+ /// <summary>
+ /// Creates a channel option with a string value.
+ /// </summary>
+ /// <param name="name">Name.</param>
+ /// <param name="stringValue">String value.</param>
+ public ChannelOption(string name, string stringValue)
+ {
+ this.type = OptionType.String;
+ this.name = Preconditions.CheckNotNull(name);
+ this.stringValue = Preconditions.CheckNotNull(stringValue);
+ }
+
+ /// <summary>
+ /// Creates a channel option with an integer value.
+ /// </summary>
+ /// <param name="name">Name.</param>
+ /// <param name="stringValue">String value.</param>
+ public ChannelOption(string name, int intValue)
+ {
+ this.type = OptionType.Integer;
+ this.name = Preconditions.CheckNotNull(name);
+ this.intValue = intValue;
+ }
+
+ public OptionType Type
+ {
+ get
+ {
+ return type;
+ }
+ }
+
+ public string Name
+ {
+ get
+ {
+ return name;
+ }
+ }
+
+ public int IntValue
+ {
+ get
+ {
+ Preconditions.CheckState(type == OptionType.Integer);
+ return intValue;
+ }
+ }
+
+ public string StringValue
+ {
+ get
+ {
+ Preconditions.CheckState(type == OptionType.String);
+ return stringValue;
+ }
+ }
+ }
+
+ public static class ChannelOptions
+ {
+ // Override SSL target check. Only to be used for testing.
+ public const string SslTargetNameOverride = "grpc.ssl_target_name_override";
+
+ // Enable census for tracing and stats collection
+ public const string Census = "grpc.census";
+
+ // Maximum number of concurrent incoming streams to allow on a http2 connection
+ public const string MaxConcurrentStreams = "grpc.max_concurrent_streams";
+
+ // Maximum message length that the channel can receive
+ public const string MaxMessageLength = "grpc.max_message_length";
+
+ // Initial sequence number for http2 transports
+ public const string Http2InitialSequenceNumber = "grpc.http2.initial_sequence_number";
+
+ /// <summary>
+ /// Creates native object for a collection of channel options.
+ /// </summary>
+ /// <returns>The native channel arguments.</returns>
+ internal static ChannelArgsSafeHandle CreateChannelArgs(IEnumerable<ChannelOption> options)
+ {
+ if (options == null)
+ {
+ return ChannelArgsSafeHandle.CreateNull();
+ }
+ var optionList = new List<ChannelOption>(options); // It's better to do defensive copy
+ ChannelArgsSafeHandle nativeArgs = null;
+ try
+ {
+ nativeArgs = ChannelArgsSafeHandle.Create(optionList.Count);
+ for (int i = 0; i < optionList.Count; i++)
+ {
+ var option = optionList[i];
+ if (option.Type == ChannelOption.OptionType.Integer)
+ {
+ nativeArgs.SetInteger(i, option.Name, option.IntValue);
+ }
+ else if (option.Type == ChannelOption.OptionType.String)
+ {
+ nativeArgs.SetString(i, option.Name, option.StringValue);
+ }
+ else
+ {
+ throw new InvalidOperationException("Unknown option type");
+ }
+ }
+ return nativeArgs;
+ }
+ catch (Exception)
+ {
+ if (nativeArgs != null)
+ {
+ nativeArgs.Dispose();
+ }
+ throw;
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index fe2d446a35..a36a6a5acc 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -5,8 +5,6 @@
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
- <ProductVersion>8.0.30703</ProductVersion>
- <SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{CCC4440E-49F7-4790-B0AF-FEABB0837AE7}</ProjectGuid>
<OutputType>Library</OutputType>
<RootNamespace>Grpc.Core</RootNamespace>
@@ -73,13 +71,11 @@
<Compile Include="Marshaller.cs" />
<Compile Include="ServerServiceDefinition.cs" />
<Compile Include="Utils\AsyncStreamExtensions.cs" />
- <Compile Include="Internal\BatchContextSafeHandleNotOwned.cs" />
<Compile Include="Utils\BenchmarkUtil.cs" />
<Compile Include="Utils\ExceptionHelper.cs" />
<Compile Include="Internal\CredentialsSafeHandle.cs" />
<Compile Include="Credentials.cs" />
<Compile Include="Internal\ChannelArgsSafeHandle.cs" />
- <Compile Include="ChannelArgs.cs" />
<Compile Include="Internal\AsyncCompletion.cs" />
<Compile Include="Internal\AsyncCallBase.cs" />
<Compile Include="Internal\AsyncCallServer.cs" />
@@ -101,6 +97,10 @@
<Compile Include="Internal\AtomicCounter.cs" />
<Compile Include="Internal\DebugStats.cs" />
<Compile Include="ServerCallContext.cs" />
+ <Compile Include="Internal\CompletionQueueEvent.cs" />
+ <Compile Include="Internal\CompletionRegistry.cs" />
+ <Compile Include="Internal\BatchContextSafeHandle.cs" />
+ <Compile Include="ChannelOptions.cs" />
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />
@@ -130,4 +130,4 @@
</Target>
<Import Project="..\packages\grpc.dependencies.openssl.redist.1.0.2.2\build\portable-net45\grpc.dependencies.openssl.redist.targets" Condition="Exists('..\packages\grpc.dependencies.openssl.redist.1.0.2.2\build\portable-net45\grpc.dependencies.openssl.redist.targets')" />
<Import Project="..\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\portable-net45\grpc.dependencies.zlib.redist.targets" Condition="Exists('..\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\portable-net45\grpc.dependencies.zlib.redist.targets')" />
-</Project> \ No newline at end of file
+</Project>
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index 2e9e5a2ef6..30ff289714 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -54,6 +54,7 @@ namespace Grpc.Core
static volatile GrpcEnvironment instance;
readonly GrpcThreadPool threadPool;
+ readonly CompletionRegistry completionRegistry;
bool isClosed;
/// <summary>
@@ -105,6 +106,19 @@ namespace Grpc.Core
}
}
+ internal static CompletionRegistry CompletionRegistry
+ {
+ get
+ {
+ var inst = instance;
+ if (inst == null)
+ {
+ throw new InvalidOperationException("GRPC environment not initialized");
+ }
+ return inst.completionRegistry;
+ }
+ }
+
/// <summary>
/// Creates gRPC environment.
/// </summary>
@@ -112,6 +126,7 @@ namespace Grpc.Core
{
GrpcLog.RedirectNativeLogs(Console.Error);
grpcsharp_init();
+ completionRegistry = new CompletionRegistry();
threadPool = new GrpcThreadPool(THREAD_POOL_SIZE);
threadPool.Start();
// TODO: use proper logging here
@@ -139,14 +154,24 @@ namespace Grpc.Core
{
var remainingClientCalls = DebugStats.ActiveClientCalls.Count;
if (remainingClientCalls != 0)
- {
- Console.WriteLine("Warning: Detected {0} client calls that weren't disposed properly.", remainingClientCalls);
+ {
+ DebugWarning(string.Format("Detected {0} client calls that weren't disposed properly.", remainingClientCalls));
}
var remainingServerCalls = DebugStats.ActiveServerCalls.Count;
if (remainingServerCalls != 0)
{
- Console.WriteLine("Warning: Detected {0} server calls that weren't disposed properly.", remainingServerCalls);
+ DebugWarning(string.Format("Detected {0} server calls that weren't disposed properly.", remainingServerCalls));
+ }
+ var pendingBatchCompletions = DebugStats.PendingBatchCompletions.Count;
+ if (pendingBatchCompletions != 0)
+ {
+ DebugWarning(string.Format("Detected {0} pending batch completions.", pendingBatchCompletions));
}
}
+
+ private static void DebugWarning(string message)
+ {
+ throw new Exception("Shutdown check: " + message);
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 9bb918d53d..d350f45da6 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -47,9 +47,6 @@ namespace Grpc.Core.Internal
/// </summary>
internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>
{
- readonly CompletionCallbackDelegate unaryResponseHandler;
- readonly CompletionCallbackDelegate finishedHandler;
-
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
@@ -60,8 +57,6 @@ namespace Grpc.Core.Internal
public AsyncCall(Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer)
{
- this.unaryResponseHandler = CreateBatchCompletionCallback(HandleUnaryResponse);
- this.finishedHandler = CreateBatchCompletionCallback(HandleFinished);
}
public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName)
@@ -96,7 +91,21 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
- call.BlockingUnary(cq, payload, unaryResponseHandler, metadataArray);
+ using (var ctx = BatchContextSafeHandle.Create())
+ {
+ call.StartUnary(payload, ctx, metadataArray);
+ var ev = cq.Pluck(ctx.Handle);
+
+ bool success = (ev.success != 0);
+ try
+ {
+ HandleUnaryResponse(success, ctx);
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Exception occured while invoking completion delegate: " + e);
+ }
+ }
}
try
@@ -129,7 +138,7 @@ namespace Grpc.Core.Internal
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
- call.StartUnary(payload, unaryResponseHandler, metadataArray);
+ call.StartUnary(payload, HandleUnaryResponse, metadataArray);
}
return unaryResponseTcs.Task;
}
@@ -151,7 +160,7 @@ namespace Grpc.Core.Internal
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
- call.StartClientStreaming(unaryResponseHandler, metadataArray);
+ call.StartClientStreaming(HandleUnaryResponse, metadataArray);
}
return unaryResponseTcs.Task;
@@ -175,7 +184,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
- call.StartServerStreaming(payload, finishedHandler, metadataArray);
+ call.StartServerStreaming(payload, HandleFinished, metadataArray);
}
}
}
@@ -194,7 +203,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
- call.StartDuplexStreaming(finishedHandler, metadataArray);
+ call.StartDuplexStreaming(HandleFinished, metadataArray);
}
}
}
@@ -229,7 +238,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
- call.StartSendCloseFromClient(halfclosedHandler);
+ call.StartSendCloseFromClient(HandleHalfclosed);
halfcloseRequested = true;
sendCompletionDelegate = completionDelegate;
@@ -274,7 +283,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handler for unary response completion.
/// </summary>
- private void HandleUnaryResponse(bool success, BatchContextSafeHandleNotOwned ctx)
+ private void HandleUnaryResponse(bool success, BatchContextSafeHandle ctx)
{
lock (myLock)
{
@@ -307,7 +316,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>
- private void HandleFinished(bool success, BatchContextSafeHandleNotOwned ctx)
+ private void HandleFinished(bool success, BatchContextSafeHandle ctx)
{
var status = ctx.GetReceivedStatus();
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index b4f4edb17a..64713c8c52 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -51,13 +51,8 @@ namespace Grpc.Core.Internal
readonly Func<TWrite, byte[]> serializer;
readonly Func<byte[], TRead> deserializer;
- protected readonly CompletionCallbackDelegate sendFinishedHandler;
- protected readonly CompletionCallbackDelegate readFinishedHandler;
- protected readonly CompletionCallbackDelegate halfclosedHandler;
-
protected readonly object myLock = new object();
- protected GCHandle gchandle;
protected CallSafeHandle call;
protected bool disposed;
@@ -77,10 +72,6 @@ namespace Grpc.Core.Internal
{
this.serializer = Preconditions.CheckNotNull(serializer);
this.deserializer = Preconditions.CheckNotNull(deserializer);
-
- this.sendFinishedHandler = CreateBatchCompletionCallback(HandleSendFinished);
- this.readFinishedHandler = CreateBatchCompletionCallback(HandleReadFinished);
- this.halfclosedHandler = CreateBatchCompletionCallback(HandleHalfclosed);
}
/// <summary>
@@ -121,9 +112,6 @@ namespace Grpc.Core.Internal
{
lock (myLock)
{
- // Make sure this object and the delegated held by it will not be garbage collected
- // before we release this handle.
- gchandle = GCHandle.Alloc(this);
this.call = call;
}
}
@@ -141,7 +129,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
- call.StartSendMessage(payload, sendFinishedHandler);
+ call.StartSendMessage(payload, HandleSendFinished);
sendCompletionDelegate = completionDelegate;
}
}
@@ -157,7 +145,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckReadingAllowed();
- call.StartReceiveMessage(readFinishedHandler);
+ call.StartReceiveMessage(HandleReadFinished);
readCompletionDelegate = completionDelegate;
}
}
@@ -197,7 +185,6 @@ namespace Grpc.Core.Internal
{
call.Dispose();
}
- gchandle.Free();
disposed = true;
}
@@ -282,29 +269,9 @@ namespace Grpc.Core.Internal
}
/// <summary>
- /// Creates completion callback delegate that wraps the batch completion handler in a try catch block to
- /// prevent propagating exceptions accross managed/unmanaged boundary.
- /// </summary>
- protected CompletionCallbackDelegate CreateBatchCompletionCallback(Action<bool, BatchContextSafeHandleNotOwned> handler)
- {
- return new CompletionCallbackDelegate((success, batchContextPtr) =>
- {
- try
- {
- var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
- handler(success, ctx);
- }
- catch (Exception e)
- {
- Console.WriteLine("Caught exception in a native handler: " + e);
- }
- });
- }
-
- /// <summary>
/// Handles send completion.
/// </summary>
- private void HandleSendFinished(bool success, BatchContextSafeHandleNotOwned ctx)
+ protected void HandleSendFinished(bool success, BatchContextSafeHandle ctx)
{
AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
@@ -328,7 +295,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles halfclose completion.
/// </summary>
- private void HandleHalfclosed(bool success, BatchContextSafeHandleNotOwned ctx)
+ protected void HandleHalfclosed(bool success, BatchContextSafeHandle ctx)
{
AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
@@ -353,7 +320,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles streaming read completion.
/// </summary>
- private void HandleReadFinished(bool success, BatchContextSafeHandleNotOwned ctx)
+ protected void HandleReadFinished(bool success, BatchContextSafeHandle ctx)
{
var payload = ctx.GetReceivedMessage();
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 1f0335e4e6..4f510ba40a 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -47,12 +47,10 @@ namespace Grpc.Core.Internal
/// </summary>
internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>
{
- readonly CompletionCallbackDelegate finishedServersideHandler;
readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer) : base(serializer, deserializer)
{
- this.finishedServersideHandler = CreateBatchCompletionCallback(HandleFinishedServerside);
}
public void Initialize(CallSafeHandle call)
@@ -72,7 +70,7 @@ namespace Grpc.Core.Internal
started = true;
- call.StartServerSide(finishedServersideHandler);
+ call.StartServerSide(HandleFinishedServerside);
return finishedServersideTcs.Task;
}
}
@@ -107,8 +105,9 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
- call.StartSendStatusFromServer(status, halfclosedHandler);
+ call.StartSendStatusFromServer(status, HandleHalfclosed);
halfcloseRequested = true;
+ readingDone = true;
sendCompletionDelegate = completionDelegate;
}
}
@@ -121,7 +120,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles the server side close completion.
/// </summary>
- private void HandleFinishedServerside(bool success, BatchContextSafeHandleNotOwned ctx)
+ private void HandleFinishedServerside(bool success, BatchContextSafeHandle ctx)
{
bool cancelled = ctx.GetReceivedCloseOnServerCancelled();
diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
index b562abaa7a..861cbbe4c6 100644
--- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs
+++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
@@ -41,32 +41,50 @@ namespace Grpc.Core.Internal
/// Not owned version of
/// grpcsharp_batch_context
/// </summary>
- internal class BatchContextSafeHandleNotOwned : SafeHandleZeroIsInvalid
+ internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll")]
- static extern IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandleNotOwned ctx);
+ static extern BatchContextSafeHandle grpcsharp_batch_context_create();
[DllImport("grpc_csharp_ext.dll")]
- static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandleNotOwned ctx, byte[] buffer, UIntPtr bufferLen);
+ static extern IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandleNotOwned ctx);
+ static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandle ctx, byte[] buffer, UIntPtr bufferLen);
[DllImport("grpc_csharp_ext.dll")]
- static extern IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandleNotOwned ctx); // returns const char*
+ static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- static extern CallSafeHandle grpcsharp_batch_context_server_rpc_new_call(BatchContextSafeHandleNotOwned ctx);
+ static extern IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandle ctx); // returns const char*
[DllImport("grpc_csharp_ext.dll")]
- static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandleNotOwned ctx); // returns const char*
+ static extern CallSafeHandle grpcsharp_batch_context_server_rpc_new_call(BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- static extern int grpcsharp_batch_context_recv_close_on_server_cancelled(BatchContextSafeHandleNotOwned ctx);
+ static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandle ctx); // returns const char*
- public BatchContextSafeHandleNotOwned(IntPtr handle) : base(false)
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern int grpcsharp_batch_context_recv_close_on_server_cancelled(BatchContextSafeHandle ctx);
+
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern void grpcsharp_batch_context_destroy(IntPtr ctx);
+
+ private BatchContextSafeHandle()
+ {
+ }
+
+ public static BatchContextSafeHandle Create()
{
- SetHandle(handle);
+ return grpcsharp_batch_context_create();
+ }
+
+ public IntPtr Handle
+ {
+ get
+ {
+ return handle;
+ }
}
public Status GetReceivedStatus()
@@ -102,5 +120,11 @@ namespace Grpc.Core.Internal
{
return grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0;
}
+
+ protected override bool ReleaseHandle()
+ {
+ grpcsharp_batch_context_destroy(handle);
+ return true;
+ }
}
} \ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 491b8414ec..ef92b44402 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -37,8 +37,6 @@ using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
- internal delegate void CompletionCallbackDelegate(bool success, IntPtr batchContextPtr);
-
/// <summary>
/// grpc_call from <grpc/grpc.h>
/// </summary>
@@ -57,49 +55,40 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
- byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
- byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
- MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
- byte[] send_buffer, UIntPtr send_buffer_len,
- MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
+ MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
- MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
- byte[] send_buffer, UIntPtr send_buffer_len);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
+ BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, StatusCode statusCode, string statusMessage);
+ static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
+ BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
+ BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
+ BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_call_destroy(IntPtr call);
@@ -113,64 +102,84 @@ namespace Grpc.Core.Internal
return grpcsharp_channel_create_call(channel, cq, method, host, deadline);
}
- public void StartUnary(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartUnary(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
- AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
+ .CheckOk();
}
- public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartUnary(byte[] payload, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray)
{
- grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray);
+ grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
+ .CheckOk();
}
- public void StartClientStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
- AssertCallOk(grpcsharp_call_start_client_streaming(this, callback, metadataArray));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
}
- public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartServerStreaming(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
- AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray).CheckOk();
}
- public void StartDuplexStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
- AssertCallOk(grpcsharp_call_start_duplex_streaming(this, callback, metadataArray));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
}
- public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback)
+ public void StartSendMessage(byte[] payload, BatchCompletionDelegate callback)
{
- AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong)payload.Length)));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length)).CheckOk();
}
- public void StartSendCloseFromClient(CompletionCallbackDelegate callback)
+ public void StartSendCloseFromClient(BatchCompletionDelegate callback)
{
- AssertCallOk(grpcsharp_call_send_close_from_client(this, callback));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
- public void StartSendStatusFromServer(Status status, CompletionCallbackDelegate callback)
+ public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback)
{
- AssertCallOk(grpcsharp_call_send_status_from_server(this, callback, status.StatusCode, status.Detail));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail).CheckOk();
}
- public void StartReceiveMessage(CompletionCallbackDelegate callback)
+ public void StartReceiveMessage(BatchCompletionDelegate callback)
{
- AssertCallOk(grpcsharp_call_recv_message(this, callback));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_recv_message(this, ctx).CheckOk();
}
- public void StartServerSide(CompletionCallbackDelegate callback)
+ public void StartServerSide(BatchCompletionDelegate callback)
{
- AssertCallOk(grpcsharp_call_start_serverside(this, callback));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
public void Cancel()
{
- AssertCallOk(grpcsharp_call_cancel(this));
+ grpcsharp_call_cancel(this).CheckOk();
}
public void CancelWithStatus(Status status)
{
- AssertCallOk(grpcsharp_call_cancel_with_status(this, status.StatusCode, status.Detail));
+ grpcsharp_call_cancel_with_status(this, status.StatusCode, status.Detail).CheckOk();
}
protected override bool ReleaseHandle()
@@ -179,11 +188,6 @@ namespace Grpc.Core.Internal
return true;
}
- private static void AssertCallOk(GRPCCallError callError)
- {
- Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
- }
-
private static uint GetFlags(bool buffered)
{
return buffered ? 0 : GRPC_WRITE_BUFFER_HINT;
diff --git a/src/csharp/Grpc.Core/Internal/ChannelArgsSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelArgsSafeHandle.cs
index c69f1a0d02..c12aec5a3a 100644
--- a/src/csharp/Grpc.Core/Internal/ChannelArgsSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ChannelArgsSafeHandle.cs
@@ -45,6 +45,9 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll", CharSet = CharSet.Ansi)]
static extern void grpcsharp_channel_args_set_string(ChannelArgsSafeHandle args, UIntPtr index, string key, string value);
+ [DllImport("grpc_csharp_ext.dll", CharSet = CharSet.Ansi)]
+ static extern void grpcsharp_channel_args_set_integer(ChannelArgsSafeHandle args, UIntPtr index, string key, int value);
+
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_channel_args_destroy(IntPtr args);
@@ -67,6 +70,11 @@ namespace Grpc.Core.Internal
grpcsharp_channel_args_set_string(this, new UIntPtr((uint)index), key, value);
}
+ public void SetInteger(int index, string key, int value)
+ {
+ grpcsharp_channel_args_set_integer(this, new UIntPtr((uint)index), key, value);
+ }
+
protected override bool ReleaseHandle()
{
grpcsharp_channel_args_destroy(handle);
diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs
new file mode 100644
index 0000000000..3f517514a3
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs
@@ -0,0 +1,60 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Runtime.InteropServices;
+
+namespace Grpc.Core.Internal
+{
+ /// <summary>
+ /// grpc_event from grpc/grpc.h
+ /// </summary>
+ [StructLayout(LayoutKind.Sequential)]
+ internal struct CompletionQueueEvent
+ {
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern int grpcsharp_sizeof_grpc_event();
+
+ public GRPCCompletionType type;
+ public int success;
+ public IntPtr tag;
+
+ internal static int NativeSize
+ {
+ get
+ {
+ return grpcsharp_sizeof_grpc_event();
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
index 600d1fc87c..f64f3d4175 100644
--- a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
@@ -46,7 +46,10 @@ namespace Grpc.Core.Internal
static extern void grpcsharp_completion_queue_shutdown(CompletionQueueSafeHandle cq);
[DllImport("grpc_csharp_ext.dll")]
- static extern GRPCCompletionType grpcsharp_completion_queue_next_with_callback(CompletionQueueSafeHandle cq);
+ static extern CompletionQueueEvent grpcsharp_completion_queue_next(CompletionQueueSafeHandle cq);
+
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern CompletionQueueEvent grpcsharp_completion_queue_pluck(CompletionQueueSafeHandle cq, IntPtr tag);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_completion_queue_destroy(IntPtr cq);
@@ -60,9 +63,14 @@ namespace Grpc.Core.Internal
return grpcsharp_completion_queue_create();
}
- public GRPCCompletionType NextWithCallback()
+ public CompletionQueueEvent Next()
+ {
+ return grpcsharp_completion_queue_next(this);
+ }
+
+ public CompletionQueueEvent Pluck(IntPtr tag)
{
- return grpcsharp_completion_queue_next_with_callback(this);
+ return grpcsharp_completion_queue_pluck(this, tag);
}
public void Shutdown()
diff --git a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
new file mode 100644
index 0000000000..80f006ae50
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
@@ -0,0 +1,89 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Runtime.InteropServices;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core.Internal
+{
+ internal delegate void OpCompletionDelegate(bool success);
+
+ internal delegate void BatchCompletionDelegate(bool success, BatchContextSafeHandle ctx);
+
+ internal class CompletionRegistry
+ {
+ readonly ConcurrentDictionary<IntPtr, OpCompletionDelegate> dict = new ConcurrentDictionary<IntPtr, OpCompletionDelegate>();
+
+ public void Register(IntPtr key, OpCompletionDelegate callback)
+ {
+ DebugStats.PendingBatchCompletions.Increment();
+ Preconditions.CheckState(dict.TryAdd(key, callback));
+ }
+
+ public void RegisterBatchCompletion(BatchContextSafeHandle ctx, BatchCompletionDelegate callback)
+ {
+ OpCompletionDelegate opCallback = ((success) => HandleBatchCompletion(success, ctx, callback));
+ Register(ctx.Handle, opCallback);
+ }
+
+ public OpCompletionDelegate Extract(IntPtr key)
+ {
+ OpCompletionDelegate value;
+ Preconditions.CheckState(dict.TryRemove(key, out value));
+ DebugStats.PendingBatchCompletions.Decrement();
+ return value;
+ }
+
+ private static void HandleBatchCompletion(bool success, BatchContextSafeHandle ctx, BatchCompletionDelegate callback)
+ {
+ try
+ {
+ callback(success, ctx);
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Exception occured while invoking completion delegate: " + e);
+ }
+ finally
+ {
+ if (ctx != null)
+ {
+ ctx.Dispose();
+ }
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Internal/DebugStats.cs b/src/csharp/Grpc.Core/Internal/DebugStats.cs
index 476914f751..ef9d9afe11 100644
--- a/src/csharp/Grpc.Core/Internal/DebugStats.cs
+++ b/src/csharp/Grpc.Core/Internal/DebugStats.cs
@@ -41,5 +41,7 @@ namespace Grpc.Core.Internal
public static readonly AtomicCounter ActiveClientCalls = new AtomicCounter();
public static readonly AtomicCounter ActiveServerCalls = new AtomicCounter();
+
+ public static readonly AtomicCounter PendingBatchCompletions = new AtomicCounter();
}
}
diff --git a/src/csharp/Grpc.Core/Internal/Enums.cs b/src/csharp/Grpc.Core/Internal/Enums.cs
index 2b4f6cae0c..af11b5b9f3 100644
--- a/src/csharp/Grpc.Core/Internal/Enums.cs
+++ b/src/csharp/Grpc.Core/Internal/Enums.cs
@@ -33,35 +33,47 @@
using System;
using System.Runtime.InteropServices;
+using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
- /// from grpc/grpc.h
+ /// grpc_call_error from grpc/grpc.h
/// </summary>
internal enum GRPCCallError
{
/* everything went ok */
- GRPC_CALL_OK = 0,
+ OK = 0,
/* something failed, we don't know what */
- GRPC_CALL_ERROR,
+ Error,
/* this method is not available on the server */
- GRPC_CALL_ERROR_NOT_ON_SERVER,
+ NotOnServer,
/* this method is not available on the client */
- GRPC_CALL_ERROR_NOT_ON_CLIENT,
+ NotOnClient,
/* this method must be called before server_accept */
- GRPC_CALL_ERROR_ALREADY_ACCEPTED,
+ AlreadyAccepted,
/* this method must be called before invoke */
- GRPC_CALL_ERROR_ALREADY_INVOKED,
+ AlreadyInvoked,
/* this method must be called after invoke */
- GRPC_CALL_ERROR_NOT_INVOKED,
+ NotInvoked,
/* this call is already finished
(writes_done or write_status has already been called) */
- GRPC_CALL_ERROR_ALREADY_FINISHED,
+ AlreadyFinished,
/* there is already an outstanding read/write operation on the call */
- GRPC_CALL_ERROR_TOO_MANY_OPERATIONS,
+ TooManyOperations,
/* the flags value was illegal for this call */
- GRPC_CALL_ERROR_INVALID_FLAGS
+ InvalidFlags
+ }
+
+ internal static class CallErrorExtensions
+ {
+ /// <summary>
+ /// Checks the call API invocation's result is OK.
+ /// </summary>
+ public static void CheckOk(this GRPCCallError callError)
+ {
+ Preconditions.CheckState(callError == GRPCCallError.OK, "Call error: " + callError);
+ }
}
/// <summary>
@@ -70,12 +82,12 @@ namespace Grpc.Core.Internal
internal enum GRPCCompletionType
{
/* Shutting down */
- GRPC_QUEUE_SHUTDOWN,
+ Shutdown,
/* No event before timeout */
- GRPC_QUEUE_TIMEOUT,
+ Timeout,
/* operation completion */
- GRPC_OP_COMPLETE
+ OpComplete
}
}
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index f4224668f1..89b44a4e2b 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -112,12 +112,26 @@ namespace Grpc.Core.Internal
/// </summary>
private void RunHandlerLoop()
{
- GRPCCompletionType completionType;
+ CompletionQueueEvent ev;
do
{
- completionType = cq.NextWithCallback();
+ ev = cq.Next();
+ if (ev.type == GRPCCompletionType.OpComplete)
+ {
+ bool success = (ev.success != 0);
+ IntPtr tag = ev.tag;
+ try
+ {
+ var callback = GrpcEnvironment.CompletionRegistry.Extract(tag);
+ callback(success);
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Exception occured while invoking completion delegate: " + e);
+ }
+ }
}
- while (completionType != GRPCCompletionType.GRPC_QUEUE_SHUTDOWN);
+ while (ev.type != GRPCCompletionType.Shutdown);
Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting.");
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index f494d9e0ff..c0e5bae13f 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -267,8 +267,6 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall);
await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, "No such method."));
- // TODO(jtattermusch): if we don't read what client has sent, the server call never gets disposed.
- await requestStream.ToList();
await finishedTask;
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
index c44ee87bad..83dbb910aa 100644
--- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
@@ -45,10 +45,7 @@ namespace Grpc.Core.Internal
internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll")]
- static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args);
+ static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args);
[DllImport("grpc_csharp_ext.dll")]
static extern int grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr);
@@ -60,19 +57,22 @@ namespace Grpc.Core.Internal
static extern void grpcsharp_server_start(ServerSafeHandle server);
[DllImport("grpc_csharp_ext.dll")]
- static extern void grpcsharp_server_shutdown_and_notify_callback(ServerSafeHandle server, CompletionQueueSafeHandle cq, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
+ static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_cancel_all_calls(ServerSafeHandle server);
[DllImport("grpc_csharp_ext.dll")]
+ static extern void grpcsharp_server_shutdown_and_notify_callback(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
+
+ [DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_destroy(IntPtr server);
private ServerSafeHandle()
{
}
- public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, IntPtr args)
+ public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args)
{
return grpcsharp_server_create(cq, args);
}
@@ -91,15 +91,19 @@ namespace Grpc.Core.Internal
{
grpcsharp_server_start(this);
}
-
- public void ShutdownAndNotify(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback)
+
+ public void ShutdownAndNotify(CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
{
- grpcsharp_server_shutdown_and_notify_callback(this, cq, callback);
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_server_shutdown_and_notify_callback(this, cq, ctx);
}
- public void RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback)
+ public void RequestCall(CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
{
- AssertCallOk(grpcsharp_server_request_call(this, cq, callback));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_server_request_call(this, cq, ctx).CheckOk();
}
protected override bool ReleaseHandle()
@@ -113,10 +117,5 @@ namespace Grpc.Core.Internal
{
grpcsharp_server_cancel_all_calls(this);
}
-
- private static void AssertCallOk(GRPCCallError callError)
- {
- Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
- }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/Timespec.cs b/src/csharp/Grpc.Core/Internal/Timespec.cs
index 94d48c2c49..775af27db9 100644
--- a/src/csharp/Grpc.Core/Internal/Timespec.cs
+++ b/src/csharp/Grpc.Core/Internal/Timespec.cs
@@ -51,7 +51,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern int gprsharp_sizeof_timespec();
- // TODO: revisit this.
+
// NOTE: on linux 64bit sizeof(gpr_timespec) = 16, on windows 32bit sizeof(gpr_timespec) = 8
// so IntPtr seems to have the right size to work on both.
public System.IntPtr tv_sec;
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index 0f4d77eaf3..8e818885d1 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -52,11 +52,6 @@ namespace Grpc.Core
/// </summary>
public const int PickUnusedPort = 0;
- // TODO(jtattermusch) : make sure the delegate doesn't get garbage collected while
- // native callbacks are in the completion queue.
- readonly CompletionCallbackDelegate serverShutdownHandler;
- readonly CompletionCallbackDelegate newServerRpcHandler;
-
readonly ServerSafeHandle handle;
readonly object myLock = new object();
@@ -66,11 +61,16 @@ namespace Grpc.Core
bool startRequested;
bool shutdownRequested;
- public Server()
+ /// <summary>
+ /// Create a new server.
+ /// </summary>
+ /// <param name="options">Channel options.</param>
+ public Server(IEnumerable<ChannelOption> options = null)
{
- this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero);
- this.newServerRpcHandler = HandleNewServerRpc;
- this.serverShutdownHandler = HandleServerShutdown;
+ using (var channelArgs = ChannelOptions.CreateChannelArgs(options))
+ {
+ this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), channelArgs);
+ }
}
/// <summary>
@@ -108,7 +108,7 @@ namespace Grpc.Core
/// </summary>
/// <returns>The port on which server will be listening.</returns>
/// <param name="host">the host</param>
- /// <param name="port">the port. If zero, , an unused port is chosen automatically.</param>
+ /// <param name="port">the port. If zero, an unused port is chosen automatically.</param>
public int AddListeningPort(string host, int port, ServerCredentials credentials)
{
Preconditions.CheckNotNull(credentials);
@@ -144,7 +144,7 @@ namespace Grpc.Core
shutdownRequested = true;
}
- handle.ShutdownAndNotify(GetCompletionQueue(), serverShutdownHandler);
+ handle.ShutdownAndNotify(GetCompletionQueue(), HandleServerShutdown);
await shutdownTcs.Task;
handle.Dispose();
}
@@ -173,7 +173,7 @@ namespace Grpc.Core
shutdownRequested = true;
}
- handle.ShutdownAndNotify(GetCompletionQueue(), serverShutdownHandler);
+ handle.ShutdownAndNotify(GetCompletionQueue(), HandleServerShutdown);
handle.CancelAllCalls();
await shutdownTcs.Task;
handle.Dispose();
@@ -208,7 +208,7 @@ namespace Grpc.Core
{
if (!shutdownRequested)
{
- handle.RequestCall(GetCompletionQueue(), newServerRpcHandler);
+ handle.RequestCall(GetCompletionQueue(), HandleNewServerRpc);
}
}
}
@@ -236,44 +236,28 @@ namespace Grpc.Core
/// <summary>
/// Handles the native callback.
/// </summary>
- private void HandleNewServerRpc(bool success, IntPtr batchContextPtr)
+ private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx)
{
- try
- {
- var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
-
- // TODO: handle error
+ // TODO: handle error
- CallSafeHandle call = ctx.GetServerRpcNewCall();
- string method = ctx.GetServerRpcNewMethod();
-
- // after server shutdown, the callback returns with null call
- if (!call.IsInvalid)
- {
- Task.Run(async () => await InvokeCallHandler(call, method));
- }
+ CallSafeHandle call = ctx.GetServerRpcNewCall();
+ string method = ctx.GetServerRpcNewMethod();
- AllowOneRpc();
- }
- catch (Exception e)
+ // after server shutdown, the callback returns with null call
+ if (!call.IsInvalid)
{
- Console.WriteLine("Caught exception in a native handler: " + e);
+ Task.Run(async () => await InvokeCallHandler(call, method));
}
+
+ AllowOneRpc();
}
/// <summary>
/// Handles native callback.
/// </summary>
- private void HandleServerShutdown(bool success, IntPtr batchContextPtr)
+ private void HandleServerShutdown(bool success, BatchContextSafeHandle ctx)
{
- try
- {
- shutdownTcs.SetResult(null);
- }
- catch (Exception e)
- {
- Console.WriteLine("Caught exception in a native handler: " + e);
- }
+ shutdownTcs.SetResult(null);
}
private static CompletionQueueSafeHandle GetCompletionQueue()
diff --git a/src/csharp/Grpc.Examples.MathClient/MathClient.cs b/src/csharp/Grpc.Examples.MathClient/MathClient.cs
index 85d9cdc7a6..360fe928dd 100644
--- a/src/csharp/Grpc.Examples.MathClient/MathClient.cs
+++ b/src/csharp/Grpc.Examples.MathClient/MathClient.cs
@@ -41,7 +41,7 @@ namespace math
{
GrpcEnvironment.Initialize();
- using (Channel channel = new Channel("127.0.0.1:23456"))
+ using (Channel channel = new Channel("127.0.0.1", 23456))
{
Math.IMathClient stub = new Math.MathClient(channel);
MathExamples.DivExample(stub);
diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
index 5aa6f4162d..aadd49f795 100644
--- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
+++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
@@ -60,7 +60,7 @@ namespace math.Tests
server.AddServiceDefinition(Math.BindService(new MathServiceImpl()));
int port = server.AddListeningPort(host, Server.PickUnusedPort);
server.Start();
- channel = new Channel(host + ":" + port);
+ channel = new Channel(host, port);
// TODO(jtattermusch): get rid of the custom header here once we have dedicated tests
// for header support.
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index dfaf18cae1..f0be522bc6 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -104,22 +104,22 @@ namespace Grpc.IntegrationTesting
{
GrpcEnvironment.Initialize();
- string addr = string.Format("{0}:{1}", options.serverHost, options.serverPort);
-
Credentials credentials = null;
if (options.useTls)
{
credentials = TestCredentials.CreateTestClientCredentials(options.useTestCa);
}
- ChannelArgs channelArgs = null;
+ List<ChannelOption> channelOptions = null;
if (!string.IsNullOrEmpty(options.serverHostOverride))
{
- channelArgs = ChannelArgs.CreateBuilder()
- .AddString(ChannelArgs.SslTargetNameOverrideKey, options.serverHostOverride).Build();
+ channelOptions = new List<ChannelOption>
+ {
+ new ChannelOption(ChannelOptions.SslTargetNameOverride, options.serverHostOverride)
+ };
}
- using (Channel channel = new Channel(addr, credentials, channelArgs))
+ using (Channel channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelOptions))
{
var stubConfig = StubConfiguration.Default;
if (options.testCase == "service_account_creds" || options.testCase == "compute_engine_creds")
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
index ddbfc61a4e..1a733450c1 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
@@ -62,10 +62,11 @@ namespace Grpc.IntegrationTesting
int port = server.AddListeningPort(host, Server.PickUnusedPort, TestCredentials.CreateTestServerCredentials());
server.Start();
- var channelArgs = ChannelArgs.CreateBuilder()
- .AddString(ChannelArgs.SslTargetNameOverrideKey, TestCredentials.DefaultHostOverride).Build();
-
- channel = new Channel(host + ":" + port, TestCredentials.CreateTestClientCredentials(true), channelArgs);
+ var options = new List<ChannelOption>
+ {
+ new ChannelOption(ChannelOptions.SslTargetNameOverride, TestCredentials.DefaultHostOverride)
+ };
+ channel = new Channel(host, port, TestCredentials.CreateTestClientCredentials(true), options);
client = TestService.NewStub(channel);
}
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index dc1bbe36f0..540f926744 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -60,7 +60,7 @@
grpc_byte_buffer *string_to_byte_buffer(const char *buffer, size_t len) {
gpr_slice slice = gpr_slice_from_copied_buffer(buffer, len);
- grpc_byte_buffer *bb = grpc_byte_buffer_create(&slice, 1);
+ grpc_byte_buffer *bb = grpc_raw_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
return bb;
}
@@ -91,13 +91,9 @@ typedef struct gprcsharp_batch_context {
grpc_call_details call_details;
grpc_metadata_array request_metadata;
} server_rpc_new;
-
- /* callback will be called upon completion */
- callback_funcptr callback;
-
} grpcsharp_batch_context;
-grpcsharp_batch_context *grpcsharp_batch_context_create() {
+GPR_EXPORT grpcsharp_batch_context *GPR_CALLTYPE grpcsharp_batch_context_create() {
grpcsharp_batch_context *ctx = gpr_malloc(sizeof(grpcsharp_batch_context));
memset(ctx, 0, sizeof(grpcsharp_batch_context));
return ctx;
@@ -192,7 +188,7 @@ void grpcsharp_metadata_array_move(grpc_metadata_array *dest,
src->metadata = NULL;
}
-void grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) {
+GPR_EXPORT void GPR_CALLTYPE grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) {
if (!ctx) {
return;
}
@@ -306,25 +302,14 @@ grpcsharp_completion_queue_destroy(grpc_completion_queue *cq) {
grpc_completion_queue_destroy(cq);
}
-GPR_EXPORT grpc_completion_type GPR_CALLTYPE
-grpcsharp_completion_queue_next_with_callback(grpc_completion_queue *cq) {
- grpc_event ev;
- grpcsharp_batch_context *batch_context;
- grpc_completion_type t;
-
- ev = grpc_completion_queue_next(cq, gpr_inf_future);
- t = ev.type;
- if (t == GRPC_OP_COMPLETE && ev.tag) {
- /* NEW API handler */
- batch_context = (grpcsharp_batch_context *)ev.tag;
- batch_context->callback((gpr_int32) ev.success, batch_context);
- grpcsharp_batch_context_destroy(batch_context);
- }
+GPR_EXPORT grpc_event GPR_CALLTYPE
+grpcsharp_completion_queue_next(grpc_completion_queue *cq) {
+ return grpc_completion_queue_next(cq, gpr_inf_future);
+}
- /* return completion type to allow some handling for events that have no
- * tag - such as GRPC_QUEUE_SHUTDOWN
- */
- return t;
+GPR_EXPORT grpc_event GPR_CALLTYPE
+grpcsharp_completion_queue_pluck(grpc_completion_queue *cq, void *tag) {
+ return grpc_completion_queue_pluck(cq, tag, gpr_inf_future);
}
/* Channel */
@@ -370,6 +355,16 @@ grpcsharp_channel_args_set_string(grpc_channel_args *args, size_t index,
}
GPR_EXPORT void GPR_CALLTYPE
+grpcsharp_channel_args_set_integer(grpc_channel_args *args, size_t index,
+ const char *key, int value) {
+ GPR_ASSERT(args);
+ GPR_ASSERT(index < args->num_args);
+ args->args[index].type = GRPC_ARG_INTEGER;
+ args->args[index].key = gpr_strdup(key);
+ args->args[index].value.integer = value;
+}
+
+GPR_EXPORT void GPR_CALLTYPE
grpcsharp_channel_args_destroy(grpc_channel_args *args) {
size_t i;
if (args) {
@@ -413,14 +408,11 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) {
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
-grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback,
+grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
const char *send_buffer, size_t send_buffer_len,
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[6];
- grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
- ctx->callback = callback;
-
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
@@ -454,34 +446,12 @@ grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback,
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
-/* Synchronous unary call */
-GPR_EXPORT void GPR_CALLTYPE
-grpcsharp_call_blocking_unary(grpc_call *call,
- grpc_completion_queue *dedicated_cq,
- callback_funcptr callback,
- const char *send_buffer, size_t send_buffer_len,
- grpc_metadata_array *initial_metadata) {
- GPR_ASSERT(grpcsharp_call_start_unary(call, callback, send_buffer,
- send_buffer_len,
- initial_metadata) == GRPC_CALL_OK);
-
- /* TODO: we would like to use pluck, but we don't know the tag */
- GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) ==
- GRPC_OP_COMPLETE);
- grpc_completion_queue_shutdown(dedicated_cq);
- GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) ==
- GRPC_QUEUE_SHUTDOWN);
-}
-
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_client_streaming(grpc_call *call,
- callback_funcptr callback,
+ grpcsharp_batch_context *ctx,
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[4];
- grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
- ctx->callback = callback;
-
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
@@ -510,13 +480,10 @@ grpcsharp_call_start_client_streaming(grpc_call *call,
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
- grpc_call *call, callback_funcptr callback, const char *send_buffer,
+ grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer,
size_t send_buffer_len, grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[5];
- grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
- ctx->callback = callback;
-
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
@@ -549,13 +516,10 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_duplex_streaming(grpc_call *call,
- callback_funcptr callback,
+ grpcsharp_batch_context *ctx,
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[3];
- grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
- ctx->callback = callback;
-
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
@@ -581,13 +545,10 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
-grpcsharp_call_send_message(grpc_call *call, callback_funcptr callback,
+grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx,
const char *send_buffer, size_t send_buffer_len) {
/* TODO: don't use magic number */
grpc_op ops[1];
- grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
- ctx->callback = callback;
-
ops[0].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[0].data.send_message = ctx->send_message;
@@ -597,12 +558,9 @@ grpcsharp_call_send_message(grpc_call *call, callback_funcptr callback,
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_close_from_client(grpc_call *call,
- callback_funcptr callback) {
+ grpcsharp_batch_context *ctx) {
/* TODO: don't use magic number */
grpc_op ops[1];
- grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
- ctx->callback = callback;
-
ops[0].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
@@ -610,14 +568,11 @@ grpcsharp_call_send_close_from_client(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_status_from_server(grpc_call *call,
- callback_funcptr callback,
+ grpcsharp_batch_context *ctx,
grpc_status_code status_code,
const char *status_details) {
/* TODO: don't use magic number */
grpc_op ops[1];
- grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
- ctx->callback = callback;
-
ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
ops[0].data.send_status_from_server.status = status_code;
ops[0].data.send_status_from_server.status_details =
@@ -629,25 +584,18 @@ grpcsharp_call_send_status_from_server(grpc_call *call,
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
-grpcsharp_call_recv_message(grpc_call *call, callback_funcptr callback) {
+grpcsharp_call_recv_message(grpc_call *call, grpcsharp_batch_context *ctx) {
/* TODO: don't use magic number */
grpc_op ops[1];
- grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
- ctx->callback = callback;
-
ops[0].op = GRPC_OP_RECV_MESSAGE;
ops[0].data.recv_message = &(ctx->recv_message);
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
-grpcsharp_call_start_serverside(grpc_call *call, callback_funcptr callback) {
+grpcsharp_call_start_serverside(grpc_call *call, grpcsharp_batch_context *ctx) {
/* TODO: don't use magic number */
grpc_op ops[2];
-
- grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
- ctx->callback = callback;
-
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[0].data.send_initial_metadata.count = 0;
ops[0].data.send_initial_metadata.metadata = NULL;
@@ -681,9 +629,7 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_start(grpc_server *server) {
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_server_shutdown_and_notify_callback(grpc_server *server,
grpc_completion_queue *cq,
- callback_funcptr callback) {
- grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
- ctx->callback = callback;
+ grpcsharp_batch_context *ctx) {
grpc_server_shutdown_and_notify(server, cq, ctx);
}
@@ -697,10 +643,7 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_destroy(grpc_server *server) {
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_server_request_call(grpc_server *server, grpc_completion_queue *cq,
- callback_funcptr callback) {
- grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
- ctx->callback = callback;
-
+ grpcsharp_batch_context *ctx) {
return grpc_server_request_call(
server, &(ctx->server_rpc_new.call), &(ctx->server_rpc_new.call_details),
&(ctx->server_rpc_new.request_metadata), cq, cq, ctx);
@@ -797,3 +740,8 @@ grpcsharp_test_callback(callback_funcptr callback) {
/* For testing */
GPR_EXPORT void *GPR_CALLTYPE grpcsharp_test_nop(void *ptr) { return ptr; }
+
+/* For testing */
+GPR_EXPORT gpr_int32 GPR_CALLTYPE grpcsharp_sizeof_grpc_event(void) {
+ return sizeof(grpc_event);
+}
diff --git a/src/node/ext/byte_buffer.cc b/src/node/ext/byte_buffer.cc
index 2c84099069..7eff11c2b3 100644
--- a/src/node/ext/byte_buffer.cc
+++ b/src/node/ext/byte_buffer.cc
@@ -57,7 +57,7 @@ grpc_byte_buffer *BufferToByteBuffer(Handle<Value> buffer) {
char *data = ::node::Buffer::Data(buffer);
gpr_slice slice = gpr_slice_malloc(length);
memcpy(GPR_SLICE_START_PTR(slice), data, length);
- grpc_byte_buffer *byte_buffer(grpc_byte_buffer_create(&slice, 1));
+ grpc_byte_buffer *byte_buffer(grpc_raw_byte_buffer_create(&slice, 1));
gpr_slice_unref(slice);
return byte_buffer;
}
diff --git a/src/node/package.json b/src/node/package.json
index 3ea2c065e7..7d4a493af4 100644
--- a/src/node/package.json
+++ b/src/node/package.json
@@ -1,6 +1,6 @@
{
"name": "grpc",
- "version": "0.9.0",
+ "version": "0.9.1",
"author": "Google Inc.",
"description": "gRPC Library for Node",
"homepage": "http://www.grpc.io/",
diff --git a/src/node/src/common.js b/src/node/src/common.js
index 7b543353eb..feaa859a4f 100644
--- a/src/node/src/common.js
+++ b/src/node/src/common.js
@@ -47,7 +47,9 @@ function deserializeCls(cls) {
* @return {cls} The resulting object
*/
return function deserialize(arg_buf) {
- return cls.decode(arg_buf).toRaw();
+ // Convert to a native object with binary fields as Buffers (first argument)
+ // and longs as strings (second argument)
+ return cls.decode(arg_buf).toRaw(false, true);
};
}
diff --git a/src/node/test/common_test.js b/src/node/test/common_test.js
new file mode 100644
index 0000000000..08ba429ed7
--- /dev/null
+++ b/src/node/test/common_test.js
@@ -0,0 +1,90 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+'use strict';
+
+var assert = require('assert');
+
+var common = require('../src/common.js');
+
+var ProtoBuf = require('protobufjs');
+
+var messages_proto = ProtoBuf.loadProtoFile(
+ __dirname + '/test_messages.proto').build();
+
+describe('Proto message serialize and deserialize', function() {
+ var longSerialize = common.serializeCls(messages_proto.LongValues);
+ var longDeserialize = common.deserializeCls(messages_proto.LongValues);
+ var pos_value = '314159265358979';
+ var neg_value = '-27182818284590';
+ it('should preserve positive int64 values', function() {
+ var serialized = longSerialize({int_64: pos_value});
+ assert.strictEqual(longDeserialize(serialized).int_64.toString(),
+ pos_value);
+ });
+ it('should preserve negative int64 values', function() {
+ var serialized = longSerialize({int_64: neg_value});
+ assert.strictEqual(longDeserialize(serialized).int_64.toString(),
+ neg_value);
+ });
+ it('should preserve uint64 values', function() {
+ var serialized = longSerialize({uint_64: pos_value});
+ assert.strictEqual(longDeserialize(serialized).uint_64.toString(),
+ pos_value);
+ });
+ it('should preserve positive sint64 values', function() {
+ var serialized = longSerialize({sint_64: pos_value});
+ assert.strictEqual(longDeserialize(serialized).sint_64.toString(),
+ pos_value);
+ });
+ it('should preserve negative sint64 values', function() {
+ var serialized = longSerialize({sint_64: neg_value});
+ assert.strictEqual(longDeserialize(serialized).sint_64.toString(),
+ neg_value);
+ });
+ it('should preserve fixed64 values', function() {
+ var serialized = longSerialize({fixed_64: pos_value});
+ assert.strictEqual(longDeserialize(serialized).fixed_64.toString(),
+ pos_value);
+ });
+ it('should preserve positive sfixed64 values', function() {
+ var serialized = longSerialize({sfixed_64: pos_value});
+ assert.strictEqual(longDeserialize(serialized).sfixed_64.toString(),
+ pos_value);
+ });
+ it('should preserve negative sfixed64 values', function() {
+ var serialized = longSerialize({sfixed_64: neg_value});
+ assert.strictEqual(longDeserialize(serialized).sfixed_64.toString(),
+ neg_value);
+ });
+});
diff --git a/src/node/test/test_messages.proto b/src/node/test/test_messages.proto
new file mode 100644
index 0000000000..685e9482bd
--- /dev/null
+++ b/src/node/test/test_messages.proto
@@ -0,0 +1,38 @@
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+syntax = "proto3";
+
+message LongValues {
+ int64 int_64 = 1;
+ uint64 uint_64 = 2;
+ sint64 sint_64 = 3;
+ fixed64 fixed_64 = 4;
+ sfixed64 sfixed_64 = 5;
+} \ No newline at end of file
diff --git a/src/objective-c/GRPCClient/private/NSData+GRPC.m b/src/objective-c/GRPCClient/private/NSData+GRPC.m
index 3a7f76887b..e6a6c3c605 100644
--- a/src/objective-c/GRPCClient/private/NSData+GRPC.m
+++ b/src/objective-c/GRPCClient/private/NSData+GRPC.m
@@ -55,7 +55,7 @@ static void CopyByteBufferToCharArray(grpc_byte_buffer *buffer, char *array) {
static grpc_byte_buffer *CopyCharArrayToNewByteBuffer(const char *array,
size_t length) {
gpr_slice slice = gpr_slice_from_copied_buffer(array, length);
- grpc_byte_buffer *buffer = grpc_byte_buffer_create(&slice, 1);
+ grpc_byte_buffer *buffer = grpc_raw_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
return buffer;
}
@@ -85,7 +85,7 @@ static grpc_byte_buffer *CopyCharArrayToNewByteBuffer(const char *array,
// The following implementation is thus not optimal, sometimes requiring two
// copies (one by self.bytes and another by gpr_slice_from_copied_buffer).
// If it turns out to be an issue, we can use enumerateByteRangesUsingblock:
- // to create an array of gpr_slice objects to pass to grpc_byte_buffer_create.
+ // to create an array of gpr_slice objects to pass to grpc_raw_byte_buffer_create.
// That would make it do exactly one copy, always.
return CopyCharArrayToNewByteBuffer((const char *)self.bytes, (size_t)self.length);
}
diff --git a/src/php/README.md b/src/php/README.md
index 40c79e0dd4..cb9b48aee3 100644
--- a/src/php/README.md
+++ b/src/php/README.md
@@ -7,51 +7,122 @@ This directory contains source code for PHP implementation of gRPC layered on sh
Pre-Alpha : This gRPC PHP implementation is work-in-progress and is not expected to work yet.
-
-## LAYOUT
-
-Directory structure is as generated by the PHP utility
-[ext_skel](http://php.net/manual/en/internals2.buildsys.skeleton.php)
-
## ENVIRONMENT
Install `php5` and `php5-dev`.
-To run the tests, additionally install `php5-readline` and `phpunit`.
+To run the tests, additionally install `phpunit`.
Alternatively, build and install PHP 5.5 or later from source with standard
configuration options.
-To also download and install protoc and the PHP code generator.
+## Build from Homebrew
+
+On Mac OS X, install [homebrew][]. On Linux, install [linuxbrew][]. Run the following command to
+install gRPC.
+
+```sh
+$ curl -fsSL https://goo.gl/getgrpc | bash -s php
+```
+
+This will download and run the [gRPC install script][] and compile the gRPC PHP extension.
+
+## Build from Source
+
+Clone this repository
+
+```
+$ git clone https://github.com/grpc/grpc.git
+```
+
+Build and install the Protocol Buffers compiler (protoc)
+
+```
+$ cd grpc
+$ git pull --recurse-submodules && git submodule update --init --recursive
+$ cd third_party/protobuf
+$ ./autogen.sh
+$ ./configure
+$ make
+$ make check
+$ sudo make install
+```
+
+Build and install the gRPC C core
+
+```sh
+$ cd grpc
+$ make
+$ sudo make install
+```
+
+Build the gRPC PHP extension
-```bash
-apt-get install -y procps
-curl -sSL https://get.rvm.io | sudo bash -s stable --ruby
-git clone git@github.com:google/protobuf.git
-cd protobuf
-./configure
-make
-make install
-git clone git@github.com:murgatroid99/Protobuf-PHP.git
-cd Protobuf-PHP
-rake pear:package version=1.0
-pear install Protobuf-1.0.tgz
+```sh
+$ cd grpc/src/php/ext/grpc
+$ phpize
+$ ./configure
+$ make
+$ sudo make install
```
-## BUILDING
+In your php.ini file, add the line `extension=grpc.so` to load the extension
+at PHP startup.
- 1. In ./ext/grpc, run the command `phpize` (distributed with PHP)
- 2. Run `./ext/grpc/configure`
- 3. In ./ext/grpc, run `make` and `sudo make install`
- 4. In your php.ini file, add the line `extension=grpc.so` to load the
- extension at PHP startup.
+Install Composer
-## PHPUnit
+```sh
+$ cd grpc/src/php
+$ curl -sS https://getcomposer.org/installer | php
+$ php composer.phar install
+```
+
+## Unit Tests
+
+Run unit tests
+
+```sh
+$ cd grpc/src/php
+$ ./bin/run_tests.sh
+```
+
+## Generated Code Tests
+
+Install `protoc-gen-php`
+
+```sh
+$ cd grpc/src/php/vendor/datto/protobuf-php
+$ gem install rake ronn
+$ rake pear:package version=1.0
+$ sudo pear install Protobuf-1.0.tgz
+```
+
+Generate client stub code
+
+```sh
+$ cd grpc/src/php
+$ ./bin/generate_proto_php.sh
+```
+
+Run a local server serving the math services
+
+ - Please see [Node][] on how to run an example server
+
+```sh
+$ cd grpc/src/node
+$ npm install
+$ nodejs examples/math_server.js
+```
+
+Run the generated code tests
+
+```sh
+$ cd grpc/src/php
+$ ./bin/run_gen_code_test.sh
+```
-This repo now has PHPUnit tests, which can by run by executing
-`./bin/run_tests.sh` after building.
+[homebrew]:http://brew.sh
+[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation
+[gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
+[Node]:https://github.com/grpc/grpc/tree/master/src/node/examples
-There is also a generated code test (`./bin/run_gen_code_test.sh`), which tests
-the stub `./tests/generated_code/math.php` against a running localhost server
-serving the math service. That stub is generated from
-`./tests/generated_code/math.proto`.
diff --git a/src/php/bin/run_gen_code_test.sh b/src/php/bin/run_gen_code_test.sh
index 4882a2b846..1be2ed3f72 100755
--- a/src/php/bin/run_gen_code_test.sh
+++ b/src/php/bin/run_gen_code_test.sh
@@ -30,8 +30,8 @@
cd $(dirname $0)
GRPC_TEST_HOST=localhost:50051 php -d extension_dir=../ext/grpc/modules/ \
- -d extension=grpc.so /usr/local/bin/phpunit -v --debug --strict \
+ -d extension=grpc.so `which phpunit` -v --debug --strict \
../tests/generated_code/GeneratedCodeTest.php
GRPC_TEST_HOST=localhost:50051 php -d extension_dir=../ext/grpc/modules/ \
- -d extension=grpc.so /usr/local/bin/phpunit -v --debug --strict \
+ -d extension=grpc.so `which phpunit` -v --debug --strict \
../tests/generated_code/GeneratedCodeWithCallbackTest.php
diff --git a/src/php/composer.json b/src/php/composer.json
index ba7a1302f2..b0115bdadd 100644
--- a/src/php/composer.json
+++ b/src/php/composer.json
@@ -4,8 +4,15 @@
"version": "0.5.0",
"homepage": "http://grpc.io",
"license": "BSD-3-Clause",
+ "repositories": [
+ {
+ "type": "vcs",
+ "url": "https://github.com/stanley-cheung/Protobuf-PHP"
+ }
+ ],
"require": {
"php": ">=5.5.0",
+ "datto/protobuf-php": "dev-master",
"google/auth": "dev-master"
},
"autoload": {
diff --git a/src/php/ext/grpc/byte_buffer.c b/src/php/ext/grpc/byte_buffer.c
index bb9d3f5337..8be0a20607 100644
--- a/src/php/ext/grpc/byte_buffer.c
+++ b/src/php/ext/grpc/byte_buffer.c
@@ -51,7 +51,7 @@
grpc_byte_buffer *string_to_byte_buffer(char *string, size_t length) {
gpr_slice slice = gpr_slice_from_copied_buffer(string, length);
- grpc_byte_buffer *buffer = grpc_byte_buffer_create(&slice, 1);
+ grpc_byte_buffer *buffer = grpc_raw_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
return buffer;
}
diff --git a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php
index 2d2352b199..6102aaf0a8 100644
--- a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php
+++ b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php
@@ -32,8 +32,6 @@
*
*/
require_once realpath(dirname(__FILE__) . '/../../vendor/autoload.php');
-require 'DrSlump/Protobuf.php';
-\DrSlump\Protobuf::autoload();
require 'math.php';
abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase {
/* These tests require that a server exporting the math service must be
diff --git a/src/php/tests/interop/interop_client.php b/src/php/tests/interop/interop_client.php
index bf8d0cd93c..9aee01cd4d 100755
--- a/src/php/tests/interop/interop_client.php
+++ b/src/php/tests/interop/interop_client.php
@@ -32,8 +32,6 @@
*
*/
require_once realpath(dirname(__FILE__) . '/../../vendor/autoload.php');
-require 'DrSlump/Protobuf.php';
-\DrSlump\Protobuf::autoload();
require 'empty.php';
require 'message_set.php';
require 'messages.php';
diff --git a/src/python/README.md b/src/python/README.md
index 0bca457a33..2beb3a913a 100644
--- a/src/python/README.md
+++ b/src/python/README.md
@@ -20,6 +20,10 @@ $ curl -fsSL https://goo.gl/getgrpc | bash -s python
```
This will download and run the [gRPC install script][], then install the latest version of the gRPC Python package. It also installs the Protocol Buffers compiler (_protoc_) and the gRPC _protoc_ plugin for python.
+EXAMPLES
+--------
+Please read our online documentation for a [Quick Start][] and a [detailed example][]
+
BUILDING FROM SOURCE
---------------------
- Clone this repository
@@ -58,3 +62,5 @@ $ ../../tools/distrib/python/submit.py
[homebrew]:http://brew.sh
[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation
[gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
+[Quick Start]:http://www.grpc.io/docs/tutorials/basic/python.html
+[detailed example]:http://www.grpc.io/docs/installation/python.html
diff --git a/src/python/requirements.txt b/src/python/requirements.txt
index d32d436d3c..43395df03b 100644
--- a/src/python/requirements.txt
+++ b/src/python/requirements.txt
@@ -1,3 +1,3 @@
enum34==1.0.4
futures==2.2.0
-protobuf==3.0.0a2
+protobuf==3.0.0a3
diff --git a/src/python/src/README.rst b/src/python/src/README.rst
index bc1815febc..00bdecf56f 100644
--- a/src/python/src/README.rst
+++ b/src/python/src/README.rst
@@ -6,22 +6,18 @@ Package for GRPC Python.
Dependencies
------------
-Ensure that you have installed GRPC core.
-
-On debian linux systems, install from our released deb package:
+Ensure you have installed the gRPC core. On Mac OS X, install homebrew_. On Linux, install linuxbrew_.
+Run the following command to install gRPC Python.
::
- $ wget https://github.com/grpc/grpc/releases/download/release-0_5_0/libgrpc_0.5.0_amd64.deb
- $ wget https://github.com/grpc/grpc/releases/download/release-0_5_0/libgrpc-dev_0.5.0_amd64.deb
- $ sudo dpkg -i libgrpc_0.5.0_amd64.deb libgrpc-dev_0.5.0_amd64.deb
-
-Otherwise, install from source:
+ $ curl -fsSL https://goo.gl/getgrpc | bash -s python
-::
+This will download and run the [gRPC install script][] to install grpc core. The script then uses pip to install this package. It also installs the Protocol Buffers compiler (_protoc_) and the gRPC _protoc_ plugin for python.
- git clone https://github.com/grpc/grpc.git
- cd grpc
- ./configure
- make && make install
+Otherwise, `install from source`_
+.. _`install from source`: https://github.com/grpc/grpc/blob/master/src/python/README.md#building-from-source
+.. _homebrew: http://brew.sh
+.. _linuxbrew: https://github.com/Homebrew/linuxbrew#installation
+.. _`gRPC install script`: https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
diff --git a/src/python/src/grpc/_adapter/_c/types/server.c b/src/python/src/grpc/_adapter/_c/types/server.c
index 65d84b58fe..26b38da8f1 100644
--- a/src/python/src/grpc/_adapter/_c/types/server.c
+++ b/src/python/src/grpc/_adapter/_c/types/server.c
@@ -167,17 +167,13 @@ PyObject *pygrpc_Server_start(Server *self, PyObject *ignored) {
PyObject *pygrpc_Server_shutdown(
Server *self, PyObject *args, PyObject *kwargs) {
- PyObject *user_tag = NULL;
+ PyObject *user_tag;
pygrpc_tag *tag;
static char *keywords[] = {"tag", NULL};
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|O", keywords, &user_tag)) {
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O", keywords, &user_tag)) {
return NULL;
}
- if (user_tag) {
- tag = pygrpc_produce_server_shutdown_tag(user_tag);
- grpc_server_shutdown_and_notify(self->c_serv, tag);
- } else {
- grpc_server_shutdown(self->c_serv);
- }
+ tag = pygrpc_produce_server_shutdown_tag(user_tag);
+ grpc_server_shutdown_and_notify(self->c_serv, self->cq->c_cq, tag);
Py_RETURN_NONE;
}
diff --git a/src/python/src/grpc/_adapter/_c/utility.c b/src/python/src/grpc/_adapter/_c/utility.c
index ced34a6816..cfb8270832 100644
--- a/src/python/src/grpc/_adapter/_c/utility.c
+++ b/src/python/src/grpc/_adapter/_c/utility.c
@@ -40,6 +40,8 @@
#include <grpc/support/alloc.h>
#include <grpc/support/slice.h>
#include <grpc/support/time.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/log.h>
#include "grpc/_adapter/_c/types.h"
@@ -122,7 +124,8 @@ PyObject *pygrpc_consume_event(grpc_event event) {
event.success ? Py_True : Py_False);
} else {
result = Py_BuildValue("iOOONO", GRPC_OP_COMPLETE, tag->user_tag,
- tag->call, Py_None, pygrpc_consume_ops(tag->ops, tag->nops),
+ tag->call ? tag->call : Py_None, Py_None,
+ pygrpc_consume_ops(tag->ops, tag->nops),
event.success ? Py_True : Py_False);
}
break;
@@ -156,9 +159,10 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) {
return 0;
}
if (PyTuple_Size(op) != OP_TUPLE_SIZE) {
- char buf[64];
- snprintf(buf, sizeof(buf), "expected tuple op of length %d", OP_TUPLE_SIZE);
+ char *buf;
+ gpr_asprintf(&buf, "expected tuple op of length %d", OP_TUPLE_SIZE);
PyErr_SetString(PyExc_ValueError, buf);
+ gpr_free(buf);
return 0;
}
type = PyInt_AsLong(PyTuple_GET_ITEM(op, TYPE_INDEX));
@@ -179,7 +183,7 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) {
PyString_AsStringAndSize(
PyTuple_GET_ITEM(op, MESSAGE_INDEX), &message, &message_size);
message_slice = gpr_slice_from_copied_buffer(message, message_size);
- c_op.data.send_message = grpc_byte_buffer_create(&message_slice, 1);
+ c_op.data.send_message = grpc_raw_byte_buffer_create(&message_slice, 1);
gpr_slice_unref(message_slice);
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
@@ -353,9 +357,14 @@ double pygrpc_cast_gpr_timespec_to_double(gpr_timespec timespec) {
return timespec.tv_sec + 1e-9*timespec.tv_nsec;
}
+/* Because C89 doesn't have a way to check for infinity... */
+static int pygrpc_isinf(double x) {
+ return x * 0 != 0;
+}
+
gpr_timespec pygrpc_cast_double_to_gpr_timespec(double seconds) {
gpr_timespec result;
- if isinf(seconds) {
+ if (pygrpc_isinf(seconds)) {
result = seconds > 0.0 ? gpr_inf_future : gpr_inf_past;
} else {
result.tv_sec = (time_t)seconds;
diff --git a/src/python/src/grpc/_adapter/_intermediary_low.py b/src/python/src/grpc/_adapter/_intermediary_low.py
index a6e325c4e5..6b96aef1d3 100644
--- a/src/python/src/grpc/_adapter/_intermediary_low.py
+++ b/src/python/src/grpc/_adapter/_intermediary_low.py
@@ -100,7 +100,7 @@ class _TagAdapter(collections.namedtuple('_TagAdapter', [
class Call(object):
"""Adapter from old _low.Call interface to new _low.Call."""
-
+
def __init__(self, channel, completion_queue, method, host, deadline):
self._internal = channel._internal.create_call(
completion_queue._internal, method, host, deadline)
@@ -207,7 +207,7 @@ class CompletionQueue(object):
complete_accepted = ev.success if kind == Event.Kind.COMPLETE_ACCEPTED else None
service_acceptance = ServiceAcceptance(Call._from_internal(ev.call), ev.call_details.method, ev.call_details.host, ev.call_details.deadline) if kind == Event.Kind.SERVICE_ACCEPTED else None
message_bytes = ev.results[0].message if kind == Event.Kind.READ_ACCEPTED else None
- status = Status(ev.results[0].status.code, ev.results[0].status.details) if (kind == Event.Kind.FINISH and ev.results[0].status) else Status(_types.StatusCode.CANCELLED if ev.results[0].cancelled else _types.StatusCode.OK, '') if ev.results[0].cancelled is not None else None
+ status = Status(ev.results[0].status.code, ev.results[0].status.details) if (kind == Event.Kind.FINISH and ev.results[0].status) else Status(_types.StatusCode.CANCELLED if ev.results[0].cancelled else _types.StatusCode.OK, '') if len(ev.results) > 0 and ev.results[0].cancelled is not None else None
metadata = ev.results[0].initial_metadata if (kind in [Event.Kind.SERVICE_ACCEPTED, Event.Kind.METADATA_ACCEPTED]) else (ev.results[0].trailing_metadata if kind == Event.Kind.FINISH else None)
else:
raise RuntimeError('unknown event')
@@ -241,7 +241,7 @@ class Server(object):
return self._internal.request_call(self._internal_cq, _TagAdapter(tag, Event.Kind.SERVICE_ACCEPTED))
def stop(self):
- return self._internal.shutdown()
+ return self._internal.shutdown(_TagAdapter(None, Event.Kind.STOP))
class ClientCredentials(object):
@@ -253,6 +253,6 @@ class ClientCredentials(object):
class ServerCredentials(object):
"""Adapter from old _low.ServerCredentials interface to new _low.ServerCredentials."""
-
+
def __init__(self, root_credentials, pair_sequence):
self._internal = _low.ServerCredentials.ssl(root_credentials, list(pair_sequence))
diff --git a/src/python/src/grpc/_adapter/_intermediary_low_test.py b/src/python/src/grpc/_adapter/_intermediary_low_test.py
index 6ff51c43a6..478346341b 100644
--- a/src/python/src/grpc/_adapter/_intermediary_low_test.py
+++ b/src/python/src/grpc/_adapter/_intermediary_low_test.py
@@ -94,14 +94,6 @@ class EchoTest(unittest.TestCase):
def tearDown(self):
self.server.stop()
- # NOTE(nathaniel): Yep, this is weird; it's a consequence of
- # grpc_server_destroy's being what has the effect of telling the server's
- # completion queue to pump out all pending events/tags immediately rather
- # than gracefully completing all outstanding RPCs while accepting no new
- # ones.
- # TODO(nathaniel): Deallocation of a Python object shouldn't have this kind
- # of observable side effect let alone such an important one.
- del self.server
self.server_completion_queue.stop()
self.client_completion_queue.stop()
while True:
@@ -114,6 +106,7 @@ class EchoTest(unittest.TestCase):
break
self.server_completion_queue = None
self.client_completion_queue = None
+ del self.server
def _perform_echo_test(self, test_data):
method = 'test method'
@@ -316,7 +309,6 @@ class CancellationTest(unittest.TestCase):
def tearDown(self):
self.server.stop()
- del self.server
self.server_completion_queue.stop()
self.client_completion_queue.stop()
while True:
@@ -327,6 +319,7 @@ class CancellationTest(unittest.TestCase):
event = self.client_completion_queue.get(0)
if event is not None and event.kind is _low.Event.Kind.STOP:
break
+ del self.server
def testCancellation(self):
method = 'test method'
diff --git a/src/python/src/grpc/_adapter/_low.py b/src/python/src/grpc/_adapter/_low.py
index 0c1d3b40a5..dcf67dbc11 100644
--- a/src/python/src/grpc/_adapter/_low.py
+++ b/src/python/src/grpc/_adapter/_low.py
@@ -101,11 +101,8 @@ class Server(_types.Server):
def start(self):
return self.server.start()
- def shutdown(self, tag=_NO_TAG):
- if tag is _NO_TAG:
- return self.server.shutdown()
- else:
- return self.server.shutdown(tag)
+ def shutdown(self, tag=None):
+ return self.server.shutdown(tag)
def request_call(self, completion_queue, tag):
return self.server.request_call(completion_queue.completion_queue, tag)
diff --git a/src/python/src/grpc/_adapter/_low_test.py b/src/python/src/grpc/_adapter/_low_test.py
index e53b176caf..8a9f1a0c49 100644
--- a/src/python/src/grpc/_adapter/_low_test.py
+++ b/src/python/src/grpc/_adapter/_low_test.py
@@ -48,7 +48,6 @@ class InsecureServerInsecureClient(unittest.TestCase):
def tearDown(self):
self.server.shutdown()
del self.client_channel
- del self.server
self.client_completion_queue.shutdown()
while self.client_completion_queue.next().type != _types.EventType.QUEUE_SHUTDOWN:
@@ -59,6 +58,7 @@ class InsecureServerInsecureClient(unittest.TestCase):
del self.client_completion_queue
del self.server_completion_queue
+ del self.server
def testEcho(self):
DEADLINE = time.time()+5
diff --git a/src/python/src/setup.py b/src/python/src/setup.py
index dc655a70f9..5398b09936 100644
--- a/src/python/src/setup.py
+++ b/src/python/src/setup.py
@@ -86,13 +86,13 @@ _PACKAGE_DIRECTORIES = {
setuptools.setup(
name='grpcio',
- version='0.9.0a0',
+ version='0.9.0a1',
ext_modules=[_EXTENSION_MODULE],
packages=list(_PACKAGES),
package_dir=_PACKAGE_DIRECTORIES,
install_requires=[
'enum34==1.0.4',
'futures==2.2.0',
- 'protobuf==3.0.0a2'
+ 'protobuf==3.0.0a3'
]
)
diff --git a/src/ruby/ext/grpc/rb_byte_buffer.c b/src/ruby/ext/grpc/rb_byte_buffer.c
index 1cc22f4aff..0aa34c844e 100644
--- a/src/ruby/ext/grpc/rb_byte_buffer.c
+++ b/src/ruby/ext/grpc/rb_byte_buffer.c
@@ -42,7 +42,7 @@
grpc_byte_buffer* grpc_rb_s_to_byte_buffer(char *string, size_t length) {
gpr_slice slice = gpr_slice_from_copied_buffer(string, length);
- grpc_byte_buffer *buffer = grpc_byte_buffer_create(&slice, 1);
+ grpc_byte_buffer *buffer = grpc_raw_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
return buffer;
}
diff --git a/test/compiler/python_plugin_test.py b/test/compiler/python_plugin_test.py
index 367effdb1a..0e58d912b9 100644
--- a/test/compiler/python_plugin_test.py
+++ b/test/compiler/python_plugin_test.py
@@ -36,6 +36,7 @@ import shutil
import subprocess
import sys
import tempfile
+import threading
import time
import unittest
@@ -49,13 +50,13 @@ STUB_IDENTIFIER = 'EarlyAdopterTestServiceStub'
SERVER_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_server'
STUB_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_stub'
-# Timeouts and delays.
-SHORT_TIMEOUT = 0.1
-NORMAL_TIMEOUT = 1
-LONG_TIMEOUT = 2
-DOES_NOT_MATTER_DELAY = 0
+# The timeout used in tests of RPCs that are supposed to expire.
+SHORT_TIMEOUT = 2
+# The timeout used in tests of RPCs that are not supposed to expire. The
+# absurdly large value doesn't matter since no passing execution of this test
+# module will ever wait the duration.
+LONG_TIMEOUT = 600
NO_DELAY = 0
-LONG_DELAY = 1
# Build mode environment variable set by tools/run_tests/run_tests.py.
_build_mode = os.environ['CONFIG']
@@ -64,47 +65,54 @@ _build_mode = os.environ['CONFIG']
class _ServicerMethods(object):
def __init__(self, test_pb2, delay):
+ self._condition = threading.Condition()
+ self._delay = delay
self._paused = False
- self._failed = False
- self.test_pb2 = test_pb2
- self.delay = delay
+ self._fail = False
+ self._test_pb2 = test_pb2
@contextlib.contextmanager
def pause(self): # pylint: disable=invalid-name
- self._paused = True
+ with self._condition:
+ self._paused = True
yield
- self._paused = False
+ with self._condition:
+ self._paused = False
+ self._condition.notify_all()
@contextlib.contextmanager
def fail(self): # pylint: disable=invalid-name
- self._failed = True
+ with self._condition:
+ self._fail = True
yield
- self._failed = False
+ with self._condition:
+ self._fail = False
def _control(self): # pylint: disable=invalid-name
- if self._failed:
- raise ValueError()
- time.sleep(self.delay)
- while self._paused:
- time.sleep(0)
-
- def UnaryCall(self, request, unused_context):
- response = self.test_pb2.SimpleResponse()
- response.payload.payload_type = self.test_pb2.COMPRESSABLE
+ with self._condition:
+ if self._fail:
+ raise ValueError()
+ while self._paused:
+ self._condition.wait()
+ time.sleep(self._delay)
+
+ def UnaryCall(self, request, unused_rpc_context):
+ response = self._test_pb2.SimpleResponse()
+ response.payload.payload_type = self._test_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * request.response_size
self._control()
return response
- def StreamingOutputCall(self, request, unused_context):
+ def StreamingOutputCall(self, request, unused_rpc_context):
for parameter in request.response_parameters:
- response = self.test_pb2.StreamingOutputCallResponse()
- response.payload.payload_type = self.test_pb2.COMPRESSABLE
+ response = self._test_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._test_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
yield response
- def StreamingInputCall(self, request_iter, unused_context):
- response = self.test_pb2.StreamingInputCallResponse()
+ def StreamingInputCall(self, request_iter, unused_rpc_context):
+ response = self._test_pb2.StreamingInputCallResponse()
aggregated_payload_size = 0
for request in request_iter:
aggregated_payload_size += len(request.payload.payload_compressable)
@@ -112,21 +120,21 @@ class _ServicerMethods(object):
self._control()
return response
- def FullDuplexCall(self, request_iter, unused_context):
+ def FullDuplexCall(self, request_iter, unused_rpc_context):
for request in request_iter:
for parameter in request.response_parameters:
- response = self.test_pb2.StreamingOutputCallResponse()
- response.payload.payload_type = self.test_pb2.COMPRESSABLE
+ response = self._test_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._test_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
yield response
- def HalfDuplexCall(self, request_iter, unused_context):
+ def HalfDuplexCall(self, request_iter, unused_rpc_context):
responses = []
for request in request_iter:
for parameter in request.response_parameters:
- response = self.test_pb2.StreamingOutputCallResponse()
- response.payload.payload_type = self.test_pb2.COMPRESSABLE
+ response = self._test_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._test_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
responses.append(response)
@@ -147,12 +155,11 @@ def _CreateService(test_pb2, delay):
waiting for the service.
Args:
- test_pb2: the test_pb2 module generated by this test
- delay: delay in seconds per response from the servicer
- timeout: how long the stub will wait for the servicer by default.
+ test_pb2: The test_pb2 module generated by this test.
+ delay: Delay in seconds per response from the servicer.
Yields:
- A three-tuple (servicer_methods, servicer, stub), where the servicer is
+ A (servicer_methods, servicer, stub) three-tuple where servicer_methods is
the back-end of the service bound to the stub and the server and stub
are both activated and ready for use.
"""
@@ -185,7 +192,7 @@ def _CreateService(test_pb2, delay):
yield servicer_methods, stub, server
-def StreamingInputRequest(test_pb2):
+def _streaming_input_request_iterator(test_pb2):
for _ in range(3):
request = test_pb2.StreamingInputCallRequest()
request.payload.payload_type = test_pb2.COMPRESSABLE
@@ -193,7 +200,7 @@ def StreamingInputRequest(test_pb2):
yield request
-def StreamingOutputRequest(test_pb2):
+def _streaming_output_request(test_pb2):
request = test_pb2.StreamingOutputCallRequest()
sizes = [1, 2, 3]
request.response_parameters.add(size=sizes[0], interval_us=0)
@@ -202,7 +209,7 @@ def StreamingOutputRequest(test_pb2):
return request
-def FullDuplexRequest(test_pb2):
+def _full_duplex_request_iterator(test_pb2):
request = test_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
@@ -250,7 +257,7 @@ class PythonPluginTest(unittest.TestCase):
if exc.errno != errno.ENOENT:
raise
- # TODO(atash): Figure out which of theses tests is hanging flakily with small
+ # TODO(atash): Figure out which of these tests is hanging flakily with small
# probability.
def testImportAttributes(self):
@@ -265,37 +272,36 @@ class PythonPluginTest(unittest.TestCase):
def testUpDown(self):
import test_pb2
with _CreateService(
- test_pb2, DOES_NOT_MATTER_DELAY) as (servicer, stub, unused_server):
+ test_pb2, NO_DELAY) as (servicer, stub, unused_server):
request = test_pb2.SimpleRequest(response_size=13)
def testUnaryCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
- with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server):
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods.
request = test_pb2.SimpleRequest(response_size=13)
- response = stub.UnaryCall(request, NORMAL_TIMEOUT)
- expected_response = servicer.UnaryCall(request, None)
+ response = stub.UnaryCall(request, timeout)
+ expected_response = methods.UnaryCall(request, 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testUnaryCallAsync(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = test_pb2.SimpleRequest(response_size=13)
- with _CreateService(test_pb2, LONG_DELAY) as (
- servicer, stub, unused_server):
- start_time = time.clock()
- response_future = stub.UnaryCall.async(request, LONG_TIMEOUT)
- # Check that we didn't block on the asynchronous call.
- self.assertGreater(LONG_DELAY, time.clock() - start_time)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ # Check that the call does not block waiting for the server to respond.
+ with methods.pause():
+ response_future = stub.UnaryCall.async(request, LONG_TIMEOUT)
response = response_future.result()
- expected_response = servicer.UnaryCall(request, None)
+ expected_response = methods.UnaryCall(request, 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testUnaryCallAsyncExpired(self):
import test_pb2 # pylint: disable=g-import-not-at-top
- # set the timeout super low...
- with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
- servicer, stub, unused_server):
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
request = test_pb2.SimpleRequest(response_size=13)
- with servicer.pause():
+ with methods.pause():
response_future = stub.UnaryCall.async(request, SHORT_TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
response_future.result()
@@ -305,9 +311,9 @@ class PythonPluginTest(unittest.TestCase):
def testUnaryCallAsyncCancelled(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = test_pb2.SimpleRequest(response_size=13)
- with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
- servicer, stub, unused_server):
- with servicer.pause():
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
response_future = stub.UnaryCall.async(request, 1)
response_future.cancel()
self.assertTrue(response_future.cancelled())
@@ -315,30 +321,31 @@ class PythonPluginTest(unittest.TestCase):
def testUnaryCallAsyncFailed(self):
import test_pb2 # pylint: disable=g-import-not-at-top
request = test_pb2.SimpleRequest(response_size=13)
- with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
- servicer, stub, unused_server):
- with servicer.fail():
- response_future = stub.UnaryCall.async(request, NORMAL_TIMEOUT)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.fail():
+ response_future = stub.UnaryCall.async(request, LONG_TIMEOUT)
self.assertIsNotNone(response_future.exception())
def testStreamingOutputCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
- request = StreamingOutputRequest(test_pb2)
- with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server):
- responses = stub.StreamingOutputCall(request, NORMAL_TIMEOUT)
- expected_responses = servicer.StreamingOutputCall(request, None)
- for check in itertools.izip_longest(expected_responses, responses):
- expected_response, response = check
+ request = _streaming_output_request(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ responses = stub.StreamingOutputCall(request, LONG_TIMEOUT)
+ expected_responses = methods.StreamingOutputCall(
+ request, 'not a real RpcContext!')
+ for expected_response, response in itertools.izip_longest(
+ expected_responses, responses):
self.assertEqual(expected_response, response)
@unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
'forever and fix.')
def testStreamingOutputCallExpired(self):
import test_pb2 # pylint: disable=g-import-not-at-top
- request = StreamingOutputRequest(test_pb2)
- with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
- servicer, stub, unused_server):
- with servicer.pause():
+ request = _streaming_output_request(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
list(responses)
@@ -347,9 +354,9 @@ class PythonPluginTest(unittest.TestCase):
'forever and fix.')
def testStreamingOutputCallCancelled(self):
import test_pb2 # pylint: disable=g-import-not-at-top
- request = StreamingOutputRequest(test_pb2)
- with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
- unused_servicer, stub, unused_server):
+ request = _streaming_output_request(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ unused_methods, stub, unused_server):
responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT)
next(responses)
responses.cancel()
@@ -360,10 +367,10 @@ class PythonPluginTest(unittest.TestCase):
'instead of raising the proper error.')
def testStreamingOutputCallFailed(self):
import test_pb2 # pylint: disable=g-import-not-at-top
- request = StreamingOutputRequest(test_pb2)
- with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
- servicer, stub, unused_server):
- with servicer.fail():
+ request = _streaming_output_request(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.fail():
responses = stub.StreamingOutputCall(request, 1)
self.assertIsNotNone(responses)
with self.assertRaises(exceptions.ServicerError):
@@ -373,34 +380,32 @@ class PythonPluginTest(unittest.TestCase):
'forever and fix.')
def testStreamingInputCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
- with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server):
- response = stub.StreamingInputCall(StreamingInputRequest(test_pb2),
- NORMAL_TIMEOUT)
- expected_response = servicer.StreamingInputCall(
- StreamingInputRequest(test_pb2), None)
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ response = stub.StreamingInputCall(
+ _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT)
+ expected_response = methods.StreamingInputCall(
+ _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testStreamingInputCallAsync(self):
import test_pb2 # pylint: disable=g-import-not-at-top
- with _CreateService(test_pb2, LONG_DELAY) as (
- servicer, stub, unused_server):
- start_time = time.clock()
- response_future = stub.StreamingInputCall.async(
- StreamingInputRequest(test_pb2), LONG_TIMEOUT)
- self.assertGreater(LONG_DELAY, time.clock() - start_time)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
+ response_future = stub.StreamingInputCall.async(
+ _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT)
response = response_future.result()
- expected_response = servicer.StreamingInputCall(
- StreamingInputRequest(test_pb2), None)
+ expected_response = methods.StreamingInputCall(
+ _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testStreamingInputCallAsyncExpired(self):
import test_pb2 # pylint: disable=g-import-not-at-top
- # set the timeout super low...
- with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
- servicer, stub, unused_server):
- with servicer.pause():
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
response_future = stub.StreamingInputCall.async(
- StreamingInputRequest(test_pb2), SHORT_TIMEOUT)
+ _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
response_future.result()
self.assertIsInstance(
@@ -408,11 +413,12 @@ class PythonPluginTest(unittest.TestCase):
def testStreamingInputCallAsyncCancelled(self):
import test_pb2 # pylint: disable=g-import-not-at-top
- with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
- servicer, stub, unused_server):
- with servicer.pause():
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
+ timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods.
response_future = stub.StreamingInputCall.async(
- StreamingInputRequest(test_pb2), NORMAL_TIMEOUT)
+ _streaming_input_request_iterator(test_pb2), timeout)
response_future.cancel()
self.assertTrue(response_future.cancelled())
with self.assertRaises(future.CancelledError):
@@ -420,33 +426,33 @@ class PythonPluginTest(unittest.TestCase):
def testStreamingInputCallAsyncFailed(self):
import test_pb2 # pylint: disable=g-import-not-at-top
- with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
- servicer, stub, unused_server):
- with servicer.fail():
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.fail():
response_future = stub.StreamingInputCall.async(
- StreamingInputRequest(test_pb2), SHORT_TIMEOUT)
+ _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT)
self.assertIsNotNone(response_future.exception())
def testFullDuplexCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
- with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server):
- responses = stub.FullDuplexCall(FullDuplexRequest(test_pb2),
- NORMAL_TIMEOUT)
- expected_responses = servicer.FullDuplexCall(FullDuplexRequest(test_pb2),
- None)
- for check in itertools.izip_longest(expected_responses, responses):
- expected_response, response = check
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ responses = stub.FullDuplexCall(
+ _full_duplex_request_iterator(test_pb2), LONG_TIMEOUT)
+ expected_responses = methods.FullDuplexCall(
+ _full_duplex_request_iterator(test_pb2), 'not a real RpcContext!')
+ for expected_response, response in itertools.izip_longest(
+ expected_responses, responses):
self.assertEqual(expected_response, response)
@unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
'forever and fix.')
def testFullDuplexCallExpired(self):
import test_pb2 # pylint: disable=g-import-not-at-top
- request = FullDuplexRequest(test_pb2)
- with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
- servicer, stub, unused_server):
- with servicer.pause():
- responses = stub.FullDuplexCall(request, SHORT_TIMEOUT)
+ request_iterator = _full_duplex_request_iterator(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
+ responses = stub.FullDuplexCall(request_iterator, SHORT_TIMEOUT)
with self.assertRaises(exceptions.ExpirationError):
list(responses)
@@ -454,9 +460,9 @@ class PythonPluginTest(unittest.TestCase):
'forever and fix.')
def testFullDuplexCallCancelled(self):
import test_pb2 # pylint: disable=g-import-not-at-top
- with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server):
- request = FullDuplexRequest(test_pb2)
- responses = stub.FullDuplexCall(request, NORMAL_TIMEOUT)
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ request_iterator = _full_duplex_request_iterator(test_pb2)
+ responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT)
next(responses)
responses.cancel()
with self.assertRaises(future.CancelledError):
@@ -466,11 +472,11 @@ class PythonPluginTest(unittest.TestCase):
'and fix.')
def testFullDuplexCallFailed(self):
import test_pb2 # pylint: disable=g-import-not-at-top
- request = FullDuplexRequest(test_pb2)
- with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
- servicer, stub, unused_server):
- with servicer.fail():
- responses = stub.FullDuplexCall(request, NORMAL_TIMEOUT)
+ request_iterator = _full_duplex_request_iterator(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.fail():
+ responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT)
self.assertIsNotNone(responses)
with self.assertRaises(exceptions.ServicerError):
next(responses)
@@ -479,9 +485,9 @@ class PythonPluginTest(unittest.TestCase):
'forever and fix.')
def testHalfDuplexCall(self):
import test_pb2 # pylint: disable=g-import-not-at-top
- with _CreateService(test_pb2, DOES_NOT_MATTER_DELAY) as (
- servicer, stub, unused_server):
- def HalfDuplexRequest():
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ def half_duplex_request_iterator():
request = test_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
@@ -489,30 +495,38 @@ class PythonPluginTest(unittest.TestCase):
request.response_parameters.add(size=2, interval_us=0)
request.response_parameters.add(size=3, interval_us=0)
yield request
- responses = stub.HalfDuplexCall(HalfDuplexRequest(), NORMAL_TIMEOUT)
- expected_responses = servicer.HalfDuplexCall(HalfDuplexRequest(), None)
+ responses = stub.HalfDuplexCall(
+ half_duplex_request_iterator(), LONG_TIMEOUT)
+ expected_responses = methods.HalfDuplexCall(
+ half_duplex_request_iterator(), 'not a real RpcContext!')
for check in itertools.izip_longest(expected_responses, responses):
expected_response, response = check
self.assertEqual(expected_response, response)
def testHalfDuplexCallWedged(self):
import test_pb2 # pylint: disable=g-import-not-at-top
- wait_flag = [False]
+ condition = threading.Condition()
+ wait_cell = [False]
@contextlib.contextmanager
def wait(): # pylint: disable=invalid-name
# Where's Python 3's 'nonlocal' statement when you need it?
- wait_flag[0] = True
+ with condition:
+ wait_cell[0] = True
yield
- wait_flag[0] = False
- def HalfDuplexRequest():
+ with condition:
+ wait_cell[0] = False
+ condition.notify_all()
+ def half_duplex_request_iterator():
request = test_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
- while wait_flag[0]:
- time.sleep(0.1)
- with _CreateService(test_pb2, NO_DELAY) as (servicer, stub, unused_server):
+ with condition:
+ while wait_cell[0]:
+ condition.wait()
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
with wait():
- responses = stub.HalfDuplexCall(HalfDuplexRequest(), NORMAL_TIMEOUT)
+ responses = stub.HalfDuplexCall(
+ half_duplex_request_iterator(), SHORT_TIMEOUT)
# half-duplex waits for the client to send all info
with self.assertRaises(exceptions.ExpirationError):
next(responses)
diff --git a/test/core/end2end/cq_verifier.c b/test/core/end2end/cq_verifier.c
index d37454d9cf..33f7c02b61 100644
--- a/test/core/end2end/cq_verifier.c
+++ b/test/core/end2end/cq_verifier.c
@@ -133,7 +133,8 @@ static int byte_buffer_eq_slice(grpc_byte_buffer *bb, gpr_slice b) {
if (!bb) return 0;
- a = merge_slices(bb->data.slice_buffer.slices, bb->data.slice_buffer.count);
+ a = merge_slices(bb->data.raw.slice_buffer.slices,
+ bb->data.raw.slice_buffer.count);
ok = GPR_SLICE_LENGTH(a) == GPR_SLICE_LENGTH(b) &&
0 == memcmp(GPR_SLICE_START_PTR(a), GPR_SLICE_START_PTR(b),
GPR_SLICE_LENGTH(a));
diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c
index 1142c37ea4..e5310a4695 100644
--- a/test/core/end2end/dualstack_socket_test.c
+++ b/test/core/end2end/dualstack_socket_test.c
@@ -177,7 +177,7 @@ void test_connect(const char *server_host, const char *client_host, int port,
GPR_ASSERT(0 == strcmp(details, "xyz"));
GPR_ASSERT(0 == strcmp(call_details.method, "/foo"));
GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr"));
- GPR_ASSERT(was_cancelled == 0);
+ GPR_ASSERT(was_cancelled == 1);
grpc_call_destroy(s);
} else {
diff --git a/test/core/end2end/tests/cancel_after_accept.c b/test/core/end2end/tests/cancel_after_accept.c
index 09a1e488de..fedcea6685 100644
--- a/test/core/end2end/tests/cancel_after_accept.c
+++ b/test/core/end2end/tests/cancel_after_accept.c
@@ -118,9 +118,9 @@ static void test_cancel_after_accept(grpc_end2end_test_config config,
gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
gpr_slice response_payload_slice = gpr_slice_from_copied_string("hello you");
grpc_byte_buffer *request_payload =
- grpc_byte_buffer_create(&request_payload_slice, 1);
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer *response_payload =
- grpc_byte_buffer_create(&response_payload_slice, 1);
+ grpc_raw_byte_buffer_create(&response_payload_slice, 1);
int was_cancelled = 2;
c = grpc_channel_create_call(f.client, f.cq, "/foo",
diff --git a/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c b/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c
index 604ab8cb8f..c31bd7fa26 100644
--- a/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c
+++ b/test/core/end2end/tests/cancel_after_accept_and_writes_closed.c
@@ -118,9 +118,9 @@ static void test_cancel_after_accept_and_writes_closed(
gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
gpr_slice response_payload_slice = gpr_slice_from_copied_string("hello you");
grpc_byte_buffer *request_payload =
- grpc_byte_buffer_create(&request_payload_slice, 1);
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer *response_payload =
- grpc_byte_buffer_create(&response_payload_slice, 1);
+ grpc_raw_byte_buffer_create(&response_payload_slice, 1);
int was_cancelled = 2;
c = grpc_channel_create_call(f.client, f.cq, "/foo",
diff --git a/test/core/end2end/tests/cancel_after_invoke.c b/test/core/end2end/tests/cancel_after_invoke.c
index b9438b4a47..2b66e5a37e 100644
--- a/test/core/end2end/tests/cancel_after_invoke.c
+++ b/test/core/end2end/tests/cancel_after_invoke.c
@@ -117,7 +117,7 @@ static void test_cancel_after_invoke(grpc_end2end_test_config config,
grpc_byte_buffer *response_payload_recv = NULL;
gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
grpc_byte_buffer *request_payload =
- grpc_byte_buffer_create(&request_payload_slice, 1);
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
c = grpc_channel_create_call(f.client, f.cq, "/foo",
"foo.test.google.fr", deadline);
diff --git a/test/core/end2end/tests/cancel_before_invoke.c b/test/core/end2end/tests/cancel_before_invoke.c
index 7455c1cb3d..697f347066 100644
--- a/test/core/end2end/tests/cancel_before_invoke.c
+++ b/test/core/end2end/tests/cancel_before_invoke.c
@@ -114,7 +114,7 @@ static void test_cancel_before_invoke(grpc_end2end_test_config config,
grpc_byte_buffer *response_payload_recv = NULL;
gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
grpc_byte_buffer *request_payload =
- grpc_byte_buffer_create(&request_payload_slice, 1);
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
c = grpc_channel_create_call(f.client, f.cq, "/foo",
"foo.test.google.fr", deadline);
diff --git a/test/core/end2end/tests/census_simple_request.c b/test/core/end2end/tests/census_simple_request.c
index f3a46e23c9..3aa3018de0 100644
--- a/test/core/end2end/tests/census_simple_request.c
+++ b/test/core/end2end/tests/census_simple_request.c
@@ -164,7 +164,7 @@ static void test_body(grpc_end2end_test_fixture f) {
GPR_ASSERT(0 == strcmp(details, "xyz"));
GPR_ASSERT(0 == strcmp(call_details.method, "/foo"));
GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234"));
- GPR_ASSERT(was_cancelled == 0);
+ GPR_ASSERT(was_cancelled == 1);
gpr_free(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
diff --git a/test/core/end2end/tests/disappearing_server.c b/test/core/end2end/tests/disappearing_server.c
index d962f870b0..a93ac31924 100644
--- a/test/core/end2end/tests/disappearing_server.c
+++ b/test/core/end2end/tests/disappearing_server.c
@@ -157,7 +157,7 @@ static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f,
GPR_ASSERT(0 == strcmp(details, "xyz"));
GPR_ASSERT(0 == strcmp(call_details.method, "/foo"));
GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234"));
- GPR_ASSERT(was_cancelled == 0);
+ GPR_ASSERT(was_cancelled == 1);
gpr_free(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
diff --git a/test/core/end2end/tests/graceful_server_shutdown.c b/test/core/end2end/tests/graceful_server_shutdown.c
index c1fc6fb4c5..82b20fb3d7 100644
--- a/test/core/end2end/tests/graceful_server_shutdown.c
+++ b/test/core/end2end/tests/graceful_server_shutdown.c
@@ -173,7 +173,7 @@ static void test_early_server_shutdown_finishes_inflight_calls(
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);
GPR_ASSERT(0 == strcmp(call_details.method, "/foo"));
GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr"));
- GPR_ASSERT(was_cancelled == 0);
+ GPR_ASSERT(was_cancelled == 1);
gpr_free(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
diff --git a/test/core/end2end/tests/invoke_large_request.c b/test/core/end2end/tests/invoke_large_request.c
index fb4d60ccb5..cc976639d2 100644
--- a/test/core/end2end/tests/invoke_large_request.c
+++ b/test/core/end2end/tests/invoke_large_request.c
@@ -107,9 +107,9 @@ static void test_invoke_large_request(grpc_end2end_test_config config) {
grpc_call *c;
grpc_call *s;
grpc_byte_buffer *request_payload =
- grpc_byte_buffer_create(&request_payload_slice, 1);
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer *response_payload =
- grpc_byte_buffer_create(&response_payload_slice, 1);
+ grpc_raw_byte_buffer_create(&response_payload_slice, 1);
gpr_timespec deadline = n_seconds_time(30);
cq_verifier *cqv = cq_verifier_create(f.cq);
grpc_op ops[6];
@@ -198,7 +198,7 @@ static void test_invoke_large_request(grpc_end2end_test_config config) {
GPR_ASSERT(0 == strcmp(details, "xyz"));
GPR_ASSERT(0 == strcmp(call_details.method, "/foo"));
GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr"));
- GPR_ASSERT(was_cancelled == 0);
+ GPR_ASSERT(was_cancelled == 1);
gpr_free(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
diff --git a/test/core/end2end/tests/max_concurrent_streams.c b/test/core/end2end/tests/max_concurrent_streams.c
index 8cfa15a9bc..792d757f75 100644
--- a/test/core/end2end/tests/max_concurrent_streams.c
+++ b/test/core/end2end/tests/max_concurrent_streams.c
@@ -166,7 +166,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
GPR_ASSERT(0 == strcmp(details, "xyz"));
GPR_ASSERT(0 == strcmp(call_details.method, "/foo"));
GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234"));
- GPR_ASSERT(was_cancelled == 0);
+ GPR_ASSERT(was_cancelled == 1);
gpr_free(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
diff --git a/test/core/end2end/tests/max_message_length.c b/test/core/end2end/tests/max_message_length.c
index ad72091cfe..66c2e37ec5 100644
--- a/test/core/end2end/tests/max_message_length.c
+++ b/test/core/end2end/tests/max_message_length.c
@@ -106,7 +106,7 @@ static void test_max_message_length(grpc_end2end_test_config config) {
grpc_op *op;
gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
grpc_byte_buffer *request_payload =
- grpc_byte_buffer_create(&request_payload_slice, 1);
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array request_metadata_recv;
diff --git a/test/core/end2end/tests/ping_pong_streaming.c b/test/core/end2end/tests/ping_pong_streaming.c
index c9f161cadf..95b289b104 100644
--- a/test/core/end2end/tests/ping_pong_streaming.c
+++ b/test/core/end2end/tests/ping_pong_streaming.c
@@ -162,8 +162,8 @@ static void test_pingpong_streaming(grpc_end2end_test_config config,
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(101)));
for (i = 0; i < messages; i++) {
- request_payload = grpc_byte_buffer_create(&request_payload_slice, 1);
- response_payload = grpc_byte_buffer_create(&response_payload_slice, 1);
+ request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1);
+ response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1);
op = ops;
op->op = GRPC_OP_SEND_MESSAGE;
diff --git a/test/core/end2end/tests/registered_call.c b/test/core/end2end/tests/registered_call.c
index 70c70e9c40..ee7de233c1 100644
--- a/test/core/end2end/tests/registered_call.c
+++ b/test/core/end2end/tests/registered_call.c
@@ -167,7 +167,7 @@ static void simple_request_body(grpc_end2end_test_fixture f, void *rc) {
GPR_ASSERT(0 == strcmp(details, "xyz"));
GPR_ASSERT(0 == strcmp(call_details.method, "/foo"));
GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234"));
- GPR_ASSERT(was_cancelled == 0);
+ GPR_ASSERT(was_cancelled == 1);
gpr_free(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
diff --git a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
index fbdab81cd0..0d29b30a81 100644
--- a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
+++ b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
@@ -103,9 +103,9 @@ static void test_request_response_with_metadata_and_payload(
gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
gpr_slice response_payload_slice = gpr_slice_from_copied_string("hello you");
grpc_byte_buffer *request_payload =
- grpc_byte_buffer_create(&request_payload_slice, 1);
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer *response_payload =
- grpc_byte_buffer_create(&response_payload_slice, 1);
+ grpc_raw_byte_buffer_create(&response_payload_slice, 1);
gpr_timespec deadline = five_seconds_time();
grpc_metadata meta_c[2] = {
{"key1-bin",
diff --git a/test/core/end2end/tests/request_response_with_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_metadata_and_payload.c
index f1b57f3457..f36e2e76fb 100644
--- a/test/core/end2end/tests/request_response_with_metadata_and_payload.c
+++ b/test/core/end2end/tests/request_response_with_metadata_and_payload.c
@@ -103,9 +103,9 @@ static void test_request_response_with_metadata_and_payload(
gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
gpr_slice response_payload_slice = gpr_slice_from_copied_string("hello you");
grpc_byte_buffer *request_payload =
- grpc_byte_buffer_create(&request_payload_slice, 1);
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer *response_payload =
- grpc_byte_buffer_create(&response_payload_slice, 1);
+ grpc_raw_byte_buffer_create(&response_payload_slice, 1);
gpr_timespec deadline = five_seconds_time();
grpc_metadata meta_c[2] = {{"key1", "val1", 4, {{NULL, NULL, NULL}}},
{"key2", "val2", 4, {{NULL, NULL, NULL}}}};
diff --git a/test/core/end2end/tests/request_response_with_payload.c b/test/core/end2end/tests/request_response_with_payload.c
index 543a625919..f44ee549a0 100644
--- a/test/core/end2end/tests/request_response_with_payload.c
+++ b/test/core/end2end/tests/request_response_with_payload.c
@@ -101,9 +101,9 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) {
grpc_call *c;
grpc_call *s;
grpc_byte_buffer *request_payload =
- grpc_byte_buffer_create(&request_payload_slice, 1);
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer *response_payload =
- grpc_byte_buffer_create(&response_payload_slice, 1);
+ grpc_raw_byte_buffer_create(&response_payload_slice, 1);
gpr_timespec deadline = five_seconds_time();
cq_verifier *cqv = cq_verifier_create(f.cq);
grpc_op ops[6];
diff --git a/test/core/end2end/tests/request_response_with_payload_and_call_creds.c b/test/core/end2end/tests/request_response_with_payload_and_call_creds.c
index 45057f07c5..6fb9b2662d 100644
--- a/test/core/end2end/tests/request_response_with_payload_and_call_creds.c
+++ b/test/core/end2end/tests/request_response_with_payload_and_call_creds.c
@@ -154,9 +154,9 @@ static void request_response_with_payload_and_call_creds(
gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
gpr_slice response_payload_slice = gpr_slice_from_copied_string("hello you");
grpc_byte_buffer *request_payload =
- grpc_byte_buffer_create(&request_payload_slice, 1);
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer *response_payload =
- grpc_byte_buffer_create(&response_payload_slice, 1);
+ grpc_raw_byte_buffer_create(&response_payload_slice, 1);
gpr_timespec deadline = five_seconds_time();
grpc_end2end_test_fixture f = begin_test(config, test_name, NULL, NULL);
diff --git a/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c
index 4a6563b275..0f65c82241 100644
--- a/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c
+++ b/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c
@@ -103,9 +103,9 @@ static void test_request_response_with_metadata_and_payload(
gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
gpr_slice response_payload_slice = gpr_slice_from_copied_string("hello you");
grpc_byte_buffer *request_payload =
- grpc_byte_buffer_create(&request_payload_slice, 1);
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer *response_payload =
- grpc_byte_buffer_create(&response_payload_slice, 1);
+ grpc_raw_byte_buffer_create(&response_payload_slice, 1);
gpr_timespec deadline = five_seconds_time();
grpc_metadata meta_c[2] = {{"key1", "val1", 4, {{NULL, NULL, NULL}}}, {"key2", "val2", 4, {{NULL, NULL, NULL}}}};
grpc_metadata meta_s[2] = {{"key3", "val3", 4, {{NULL, NULL, NULL}}}, {"key4", "val4", 4, {{NULL, NULL, NULL}}}};
diff --git a/test/core/end2end/tests/request_with_large_metadata.c b/test/core/end2end/tests/request_with_large_metadata.c
index 7e3cc4c084..3abacf8ef9 100644
--- a/test/core/end2end/tests/request_with_large_metadata.c
+++ b/test/core/end2end/tests/request_with_large_metadata.c
@@ -101,7 +101,7 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) {
grpc_call *s;
gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
grpc_byte_buffer *request_payload =
- grpc_byte_buffer_create(&request_payload_slice, 1);
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
gpr_timespec deadline = five_seconds_time();
grpc_metadata meta;
grpc_end2end_test_fixture f = begin_test(config, "test_request_with_large_metadata", NULL, NULL);
diff --git a/test/core/end2end/tests/request_with_payload.c b/test/core/end2end/tests/request_with_payload.c
index 55bc60e792..8972d7fea1 100644
--- a/test/core/end2end/tests/request_with_payload.c
+++ b/test/core/end2end/tests/request_with_payload.c
@@ -101,7 +101,7 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) {
grpc_call *s;
gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
grpc_byte_buffer *request_payload =
- grpc_byte_buffer_create(&request_payload_slice, 1);
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
gpr_timespec deadline = five_seconds_time();
grpc_end2end_test_fixture f = begin_test(config, "test_invoke_request_with_payload", NULL, NULL);
cq_verifier *cqv = cq_verifier_create(f.cq);
diff --git a/test/core/end2end/tests/server_finishes_request.c b/test/core/end2end/tests/server_finishes_request.c
index fe8657a2cc..b3386d5292 100644
--- a/test/core/end2end/tests/server_finishes_request.c
+++ b/test/core/end2end/tests/server_finishes_request.c
@@ -166,7 +166,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
GPR_ASSERT(0 == strcmp(details, "xyz"));
GPR_ASSERT(0 == strcmp(call_details.method, "/foo"));
GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234"));
- GPR_ASSERT(was_cancelled == 0);
+ GPR_ASSERT(was_cancelled == 1);
gpr_free(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
diff --git a/test/core/end2end/tests/simple_delayed_request.c b/test/core/end2end/tests/simple_delayed_request.c
index f399d0ea9b..5d36172ba1 100644
--- a/test/core/end2end/tests/simple_delayed_request.c
+++ b/test/core/end2end/tests/simple_delayed_request.c
@@ -162,7 +162,7 @@ static void simple_delayed_request_body(grpc_end2end_test_config config,
GPR_ASSERT(0 == strcmp(details, "xyz"));
GPR_ASSERT(0 == strcmp(call_details.method, "/foo"));
GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr"));
- GPR_ASSERT(was_cancelled == 0);
+ GPR_ASSERT(was_cancelled == 1);
gpr_free(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
diff --git a/test/core/end2end/tests/simple_request.c b/test/core/end2end/tests/simple_request.c
index 2246ad540c..a2df20d20f 100644
--- a/test/core/end2end/tests/simple_request.c
+++ b/test/core/end2end/tests/simple_request.c
@@ -168,7 +168,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
GPR_ASSERT(0 == strcmp(details, "xyz"));
GPR_ASSERT(0 == strcmp(call_details.method, "/foo"));
GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234"));
- GPR_ASSERT(was_cancelled == 0);
+ GPR_ASSERT(was_cancelled == 1);
gpr_free(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
diff --git a/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c b/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c
index ff00a5aa6e..5cd484d42f 100644
--- a/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c
+++ b/test/core/end2end/tests/simple_request_with_high_initial_sequence_number.c
@@ -168,7 +168,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
GPR_ASSERT(0 == strcmp(details, "xyz"));
GPR_ASSERT(0 == strcmp(call_details.method, "/foo"));
GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234"));
- GPR_ASSERT(was_cancelled == 0);
+ GPR_ASSERT(was_cancelled == 1);
gpr_free(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
diff --git a/test/core/fling/client.c b/test/core/fling/client.c
index 37d787c7c3..ee5e390c39 100644
--- a/test/core/fling/client.c
+++ b/test/core/fling/client.c
@@ -183,7 +183,7 @@ int main(int argc, char **argv) {
channel = grpc_channel_create(target, NULL);
cq = grpc_completion_queue_create();
- the_buffer = grpc_byte_buffer_create(&slice, payload_size);
+ the_buffer = grpc_raw_byte_buffer_create(&slice, payload_size);
histogram = gpr_histogram_create(0.01, 60e9);
sc.init();
diff --git a/test/core/surface/byte_buffer_reader_test.c b/test/core/surface/byte_buffer_reader_test.c
index c2f5fc2eb7..87a2cd7a43 100644
--- a/test/core/surface/byte_buffer_reader_test.c
+++ b/test/core/surface/byte_buffer_reader_test.c
@@ -42,6 +42,8 @@
#include <grpc/support/time.h>
#include "test/core/util/test_config.h"
+#include "src/core/compression/message_compress.h"
+
#include <string.h>
#define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x)
@@ -55,7 +57,7 @@ static void test_read_one_slice(void) {
LOG_TEST("test_read_one_slice");
slice = gpr_slice_from_copied_string("test");
- buffer = grpc_byte_buffer_create(&slice, 1);
+ buffer = grpc_raw_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
grpc_byte_buffer_reader_init(&reader, buffer);
first_code = grpc_byte_buffer_reader_next(&reader, &first_slice);
@@ -77,7 +79,28 @@ static void test_read_one_slice_malloc(void) {
LOG_TEST("test_read_one_slice_malloc");
slice = gpr_slice_malloc(4);
memcpy(GPR_SLICE_START_PTR(slice), "test", 4);
- buffer = grpc_byte_buffer_create(&slice, 1);
+ buffer = grpc_raw_byte_buffer_create(&slice, 1);
+ gpr_slice_unref(slice);
+ grpc_byte_buffer_reader_init(&reader, buffer);
+ first_code = grpc_byte_buffer_reader_next(&reader, &first_slice);
+ GPR_ASSERT(first_code != 0);
+ GPR_ASSERT(memcmp(GPR_SLICE_START_PTR(first_slice), "test", 4) == 0);
+ gpr_slice_unref(first_slice);
+ second_code = grpc_byte_buffer_reader_next(&reader, &second_slice);
+ GPR_ASSERT(second_code == 0);
+ grpc_byte_buffer_destroy(buffer);
+}
+
+static void test_read_none_compressed_slice(void) {
+ gpr_slice slice;
+ grpc_byte_buffer *buffer;
+ grpc_byte_buffer_reader reader;
+ gpr_slice first_slice, second_slice;
+ int first_code, second_code;
+
+ LOG_TEST("test_read_none_compressed_slice");
+ slice = gpr_slice_from_copied_string("test");
+ buffer = grpc_raw_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
grpc_byte_buffer_reader_init(&reader, buffer);
first_code = grpc_byte_buffer_reader_next(&reader, &first_slice);
@@ -89,9 +112,61 @@ static void test_read_one_slice_malloc(void) {
grpc_byte_buffer_destroy(buffer);
}
+static void read_compressed_slice(grpc_compression_algorithm algorithm,
+ int input_size) {
+ gpr_slice input_slice;
+ gpr_slice_buffer sliceb_in;
+ gpr_slice_buffer sliceb_out;
+ grpc_byte_buffer *buffer;
+ grpc_byte_buffer_reader reader;
+ gpr_slice read_slice;
+ int read_count = 0;
+
+ gpr_slice_buffer_init(&sliceb_in);
+ gpr_slice_buffer_init(&sliceb_out);
+
+ input_slice = gpr_slice_malloc(input_size);
+ memset(GPR_SLICE_START_PTR(input_slice), 'a', input_size);
+ gpr_slice_buffer_add(&sliceb_in, input_slice); /* takes ownership */
+ GPR_ASSERT(grpc_msg_compress(algorithm, &sliceb_in, &sliceb_out));
+
+ buffer = grpc_raw_compressed_byte_buffer_create(
+ sliceb_out.slices, sliceb_out.count, algorithm);
+ grpc_byte_buffer_reader_init(&reader, buffer);
+
+ while (grpc_byte_buffer_reader_next(&reader, &read_slice)) {
+ GPR_ASSERT(memcmp(GPR_SLICE_START_PTR(read_slice),
+ GPR_SLICE_START_PTR(input_slice) + read_count,
+ GPR_SLICE_LENGTH(read_slice)) == 0);
+ read_count += GPR_SLICE_LENGTH(read_slice);
+ gpr_slice_unref(read_slice);
+ }
+ GPR_ASSERT(read_count == input_size);
+ grpc_byte_buffer_reader_destroy(&reader);
+ grpc_byte_buffer_destroy(buffer);
+ gpr_slice_buffer_destroy(&sliceb_out);
+ gpr_slice_buffer_destroy(&sliceb_in);
+}
+
+static void test_read_gzip_compressed_slice(void) {
+ const int INPUT_SIZE = 2048;
+ LOG_TEST("test_read_gzip_compressed_slice");
+ read_compressed_slice(GRPC_COMPRESS_GZIP, INPUT_SIZE);
+}
+
+static void test_read_deflate_compressed_slice(void) {
+ const int INPUT_SIZE = 2048;
+ LOG_TEST("test_read_deflate_compressed_slice");
+ read_compressed_slice(GRPC_COMPRESS_DEFLATE, INPUT_SIZE);
+}
+
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
test_read_one_slice();
test_read_one_slice_malloc();
+ test_read_none_compressed_slice();
+ test_read_gzip_compressed_slice();
+ test_read_deflate_compressed_slice();
+
return 0;
}
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 921836e201..1b7a8d26b2 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -62,7 +62,7 @@ typedef std::list<grpc_time> deadline_list;
class ClientRpcContext {
public:
- ClientRpcContext(int ch) : channel_id_(ch) {}
+ explicit ClientRpcContext(int ch) : channel_id_(ch) {}
virtual ~ClientRpcContext() {}
// next state, return false if done. Collect stats when appropriate
virtual bool RunNextState(bool, Histogram* hist) = 0;
diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc
index eceb5103f6..d534846365 100644
--- a/test/cpp/qps/qps_driver.cc
+++ b/test/cpp/qps/qps_driver.cc
@@ -132,6 +132,9 @@ static void QpsDriver() {
pareto->set_alpha(FLAGS_load_param_2);
break;
}
+ default:
+ GPR_ASSERT(false);
+ break;
}
ServerConfig server_config;
diff --git a/test/cpp/qps/qps_test_with_poll.cc b/test/cpp/qps/qps_test_with_poll.cc
new file mode 100644
index 0000000000..90a8da8d11
--- /dev/null
+++ b/test/cpp/qps/qps_test_with_poll.cc
@@ -0,0 +1,90 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <set>
+
+#include <grpc/support/log.h>
+
+#include <signal.h>
+
+#include "test/cpp/qps/driver.h"
+#include "test/cpp/qps/report.h"
+#include "test/cpp/util/benchmark_config.h"
+
+extern "C" {
+#include "src/core/iomgr/pollset_posix.h"
+}
+
+namespace grpc {
+namespace testing {
+
+static const int WARMUP = 5;
+static const int BENCHMARK = 5;
+
+static void RunQPS() {
+ gpr_log(GPR_INFO, "Running QPS test");
+
+ ClientConfig client_config;
+ client_config.set_client_type(ASYNC_CLIENT);
+ client_config.set_enable_ssl(false);
+ client_config.set_outstanding_rpcs_per_channel(1000);
+ client_config.set_client_channels(8);
+ client_config.set_payload_size(1);
+ client_config.set_async_client_threads(8);
+ client_config.set_rpc_type(UNARY);
+
+ ServerConfig server_config;
+ server_config.set_server_type(ASYNC_SERVER);
+ server_config.set_enable_ssl(false);
+ server_config.set_threads(4);
+
+ const auto result =
+ RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
+
+ GetReporter()->ReportQPSPerCore(*result);
+ GetReporter()->ReportLatency(*result);
+}
+
+} // namespace testing
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ grpc::testing::InitBenchmark(&argc, &argv, true);
+
+ grpc_platform_become_multipoller = grpc_poll_become_multipoller;
+
+ signal(SIGPIPE, SIG_IGN);
+ grpc::testing::RunQPS();
+
+ return 0;
+}
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index fb49271991..40cc4cb89b 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -71,6 +71,8 @@ std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
return (config.rpc_type() == RpcType::UNARY)
? CreateAsyncUnaryClient(config)
: CreateAsyncStreamingClient(config);
+ default:
+ abort();
}
abort();
}
@@ -82,6 +84,8 @@ std::unique_ptr<Server> CreateServer(const ServerConfig& config,
return CreateSynchronousServer(config, server_port);
case ServerType::ASYNC_SERVER:
return CreateAsyncServer(config, server_port);
+ default:
+ abort();
}
abort();
}
diff --git a/test/cpp/qps/qpstest.proto b/test/cpp/qps/qpstest.proto
index d977c9b48b..ef1f9451e9 100644
--- a/test/cpp/qps/qpstest.proto
+++ b/test/cpp/qps/qpstest.proto
@@ -30,92 +30,92 @@
// An integration test service that covers all the method signature permutations
// of unary/streaming requests/responses.
-syntax = "proto2";
+syntax = "proto3";
package grpc.testing;
enum PayloadType {
// Compressable text format.
- COMPRESSABLE = 1;
+ COMPRESSABLE = 0;
// Uncompressable binary format.
- UNCOMPRESSABLE = 2;
+ UNCOMPRESSABLE = 1;
// Randomly chosen from all other formats defined in this enum.
- RANDOM = 3;
+ RANDOM = 2;
}
message StatsRequest {
// run number
- optional int32 test_num = 1;
+ int32 test_num = 1;
}
message ServerStats {
// wall clock time
- required double time_elapsed = 1;
+ double time_elapsed = 1;
// user time used by the server process and threads
- required double time_user = 2;
+ double time_user = 2;
// server time used by the server process and all threads
- required double time_system = 3;
+ double time_system = 3;
}
message Payload {
// The type of data in body.
- optional PayloadType type = 1;
+ PayloadType type = 1;
// Primary contents of payload.
- optional bytes body = 2;
+ bytes body = 2;
}
message HistogramData {
repeated uint32 bucket = 1;
- required double min_seen = 2;
- required double max_seen = 3;
- required double sum = 4;
- required double sum_of_squares = 5;
- required double count = 6;
+ double min_seen = 2;
+ double max_seen = 3;
+ double sum = 4;
+ double sum_of_squares = 5;
+ double count = 6;
}
enum ClientType {
- SYNCHRONOUS_CLIENT = 1;
- ASYNC_CLIENT = 2;
+ SYNCHRONOUS_CLIENT = 0;
+ ASYNC_CLIENT = 1;
}
enum ServerType {
- SYNCHRONOUS_SERVER = 1;
- ASYNC_SERVER = 2;
+ SYNCHRONOUS_SERVER = 0;
+ ASYNC_SERVER = 1;
}
enum RpcType {
- UNARY = 1;
- STREAMING = 2;
+ UNARY = 0;
+ STREAMING = 1;
}
enum LoadType {
- CLOSED_LOOP = 1;
- POISSON = 2;
- UNIFORM = 3;
- DETERMINISTIC = 4;
- PARETO = 5;
+ CLOSED_LOOP = 0;
+ POISSON = 1;
+ UNIFORM = 2;
+ DETERMINISTIC = 3;
+ PARETO = 4;
}
message PoissonParams {
- optional double offered_load = 1;
+ double offered_load = 1;
}
message UniformParams {
- optional double interarrival_lo = 1;
- optional double interarrival_hi = 2;
+ double interarrival_lo = 1;
+ double interarrival_hi = 2;
}
message DeterministicParams {
- optional double offered_load = 1;
+ double offered_load = 1;
}
message ParetoParams {
- optional double interarrival_base = 1;
- optional double alpha = 2;
+ double interarrival_base = 1;
+ double alpha = 2;
}
message LoadParams {
@@ -129,17 +129,17 @@ message LoadParams {
message ClientConfig {
repeated string server_targets = 1;
- required ClientType client_type = 2;
- optional bool enable_ssl = 3 [default = false];
- required int32 outstanding_rpcs_per_channel = 4;
- required int32 client_channels = 5;
- required int32 payload_size = 6;
+ ClientType client_type = 2;
+ bool enable_ssl = 3;
+ int32 outstanding_rpcs_per_channel = 4;
+ int32 client_channels = 5;
+ int32 payload_size = 6;
// only for async client:
- optional int32 async_client_threads = 7;
- optional RpcType rpc_type = 8 [default = UNARY];
- optional string host = 9;
- optional LoadType load_type = 10 [default = CLOSED_LOOP];
- optional LoadParams load_params = 11;
+ int32 async_client_threads = 7;
+ RpcType rpc_type = 8;
+ string host = 9;
+ LoadType load_type = 10;
+ LoadParams load_params = 11;
}
// Request current stats
@@ -154,21 +154,21 @@ message ClientArgs {
}
message ClientStats {
- required HistogramData latencies = 1;
- required double time_elapsed = 3;
- required double time_user = 4;
- required double time_system = 5;
+ HistogramData latencies = 1;
+ double time_elapsed = 2;
+ double time_user = 3;
+ double time_system = 4;
}
message ClientStatus {
- optional ClientStats stats = 1;
+ ClientStats stats = 1;
}
message ServerConfig {
- required ServerType server_type = 1;
- optional int32 threads = 2 [default = 1];
- optional bool enable_ssl = 3 [default = false];
- optional string host = 4;
+ ServerType server_type = 1;
+ int32 threads = 2;
+ bool enable_ssl = 3;
+ string host = 4;
}
message ServerArgs {
@@ -179,25 +179,25 @@ message ServerArgs {
}
message ServerStatus {
- optional ServerStats stats = 1;
- required int32 port = 2;
+ ServerStats stats = 1;
+ int32 port = 2;
}
message SimpleRequest {
// Desired payload type in the response from the server.
// If response_type is RANDOM, server randomly chooses one from other formats.
- optional PayloadType response_type = 1 [default = COMPRESSABLE];
+ PayloadType response_type = 1;
// Desired payload size in the response from the server.
// If response_type is COMPRESSABLE, this denotes the size before compression.
- optional int32 response_size = 2 [default = 0];
+ int32 response_size = 2;
// Optional input payload sent along with the request.
- optional Payload payload = 3;
+ Payload payload = 3;
}
message SimpleResponse {
- optional Payload payload = 1;
+ Payload payload = 1;
}
service TestService {
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 4b0678bb2c..210aef4fd6 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -73,31 +73,35 @@ class AsyncQpsServerTest : public Server {
gpr_free(server_address);
builder.RegisterAsyncService(&async_service_);
- srv_cq_ = builder.AddCompletionQueue();
+ for (int i = 0; i < config.threads(); i++) {
+ srv_cqs_.emplace_back(std::move(builder.AddCompletionQueue()));
+ }
server_ = builder.BuildAndStart();
using namespace std::placeholders;
- request_unary_ =
- std::bind(&TestService::AsyncService::RequestUnaryCall, &async_service_,
- _1, _2, _3, srv_cq_.get(), srv_cq_.get(), _4);
- request_streaming_ =
- std::bind(&TestService::AsyncService::RequestStreamingCall,
- &async_service_, _1, _2, srv_cq_.get(), srv_cq_.get(), _3);
- for (int i = 0; i < 100; i++) {
- contexts_.push_front(
- new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
- request_unary_, ProcessRPC));
- contexts_.push_front(
- new ServerRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
- request_streaming_, ProcessRPC));
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < config.threads(); j++) {
+ auto request_unary = std::bind(
+ &TestService::AsyncService::RequestUnaryCall, &async_service_, _1,
+ _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4);
+ auto request_streaming = std::bind(
+ &TestService::AsyncService::RequestStreamingCall, &async_service_,
+ _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3);
+ contexts_.push_front(
+ new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
+ request_unary, ProcessRPC));
+ contexts_.push_front(
+ new ServerRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
+ request_streaming, ProcessRPC));
+ }
}
for (int i = 0; i < config.threads(); i++) {
threads_.push_back(std::thread([=]() {
// Wait until work is available or we are shutting down
bool ok;
void *got_tag;
- while (srv_cq_->Next(&got_tag, &ok)) {
+ while (srv_cqs_[i]->Next(&got_tag, &ok)) {
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
bool still_going = ctx->RunNextState(ok);
@@ -125,11 +129,13 @@ class AsyncQpsServerTest : public Server {
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join();
}
- srv_cq_->Shutdown();
- bool ok;
- void *got_tag;
- while (srv_cq_->Next(&got_tag, &ok))
- ;
+ for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
+ (*cq)->Shutdown();
+ bool ok;
+ void *got_tag;
+ while ((*cq)->Next(&got_tag, &ok))
+ ;
+ }
while (!contexts_.empty()) {
delete contexts_.front();
contexts_.pop_front();
@@ -306,15 +312,8 @@ class AsyncQpsServerTest : public Server {
}
std::vector<std::thread> threads_;
std::unique_ptr<grpc::Server> server_;
- std::unique_ptr<grpc::ServerCompletionQueue> srv_cq_;
+ std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
TestService::AsyncService async_service_;
- std::function<void(ServerContext *, SimpleRequest *,
- grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)>
- request_unary_;
- std::function<void(
- ServerContext *,
- grpc::ServerAsyncReaderWriter<SimpleResponse, SimpleRequest> *, void *)>
- request_streaming_;
std::forward_list<ServerRpcContext *> contexts_;
std::mutex shutdown_mutex_;
diff --git a/tools/doxygen/Doxyfile.core b/tools/doxygen/Doxyfile.core
index 9a7a722378..7cc96b2e06 100644
--- a/tools/doxygen/Doxyfile.core
+++ b/tools/doxygen/Doxyfile.core
@@ -760,7 +760,7 @@ WARN_LOGFILE =
# spaces.
# Note: If this tag is empty the current directory is searched.
-INPUT = include/grpc/grpc_security.h include/grpc/byte_buffer.h include/grpc/byte_buffer_reader.h include/grpc/grpc.h include/grpc/status.h include/grpc/census.h include/grpc/support/alloc.h include/grpc/support/atm.h include/grpc/support/atm_gcc_atomic.h include/grpc/support/atm_gcc_sync.h include/grpc/support/atm_win32.h include/grpc/support/cancellable_platform.h include/grpc/support/cmdline.h include/grpc/support/cpu.h include/grpc/support/histogram.h include/grpc/support/host_port.h include/grpc/support/log.h include/grpc/support/log_win32.h include/grpc/support/port_platform.h include/grpc/support/slice.h include/grpc/support/slice_buffer.h include/grpc/support/string_util.h include/grpc/support/subprocess.h include/grpc/support/sync.h include/grpc/support/sync_generic.h include/grpc/support/sync_posix.h include/grpc/support/sync_win32.h include/grpc/support/thd.h include/grpc/support/time.h include/grpc/support/tls.h include/grpc/support/tls_gcc.h include/grpc/support/tls_msvc.h include/grpc/support/tls_pthread.h include/grpc/support/useful.h
+INPUT = include/grpc/grpc_security.h include/grpc/byte_buffer.h include/grpc/byte_buffer_reader.h include/grpc/compression.h include/grpc/grpc.h include/grpc/status.h include/grpc/census.h include/grpc/support/alloc.h include/grpc/support/atm.h include/grpc/support/atm_gcc_atomic.h include/grpc/support/atm_gcc_sync.h include/grpc/support/atm_win32.h include/grpc/support/cancellable_platform.h include/grpc/support/cmdline.h include/grpc/support/cpu.h include/grpc/support/histogram.h include/grpc/support/host_port.h include/grpc/support/log.h include/grpc/support/log_win32.h include/grpc/support/port_platform.h include/grpc/support/slice.h include/grpc/support/slice_buffer.h include/grpc/support/string_util.h include/grpc/support/subprocess.h include/grpc/support/sync.h include/grpc/support/sync_generic.h include/grpc/support/sync_posix.h include/grpc/support/sync_win32.h include/grpc/support/thd.h include/grpc/support/time.h include/grpc/support/tls.h include/grpc/support/tls_gcc.h include/grpc/support/tls_msvc.h include/grpc/support/tls_pthread.h include/grpc/support/useful.h
# This tag can be used to specify the character encoding of the source files
# that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index 7f6fe6837e..1aae2f35e9 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -760,7 +760,7 @@ WARN_LOGFILE =
# spaces.
# Note: If this tag is empty the current directory is searched.
-INPUT = include/grpc/grpc_security.h include/grpc/byte_buffer.h include/grpc/byte_buffer_reader.h include/grpc/grpc.h include/grpc/status.h include/grpc/census.h src/core/httpcli/format_request.h src/core/httpcli/httpcli.h src/core/httpcli/httpcli_security_connector.h src/core/httpcli/parser.h src/core/security/auth_filters.h src/core/security/base64.h src/core/security/credentials.h src/core/security/json_token.h src/core/security/secure_endpoint.h src/core/security/secure_transport_setup.h src/core/security/security_connector.h src/core/security/security_context.h src/core/tsi/fake_transport_security.h src/core/tsi/ssl_transport_security.h src/core/tsi/transport_security.h src/core/tsi/transport_security_interface.h src/core/census/grpc_context.h src/core/channel/channel_args.h src/core/channel/channel_stack.h src/core/channel/child_channel.h src/core/channel/client_channel.h src/core/channel/client_setup.h src/core/channel/connected_channel.h src/core/channel/http_client_filter.h src/core/channel/http_server_filter.h src/core/channel/noop_filter.h src/core/compression/algorithm.h src/core/compression/message_compress.h src/core/debug/trace.h src/core/iomgr/alarm.h src/core/iomgr/alarm_heap.h src/core/iomgr/alarm_internal.h src/core/iomgr/endpoint.h src/core/iomgr/endpoint_pair.h src/core/iomgr/fd_posix.h src/core/iomgr/iocp_windows.h src/core/iomgr/iomgr.h src/core/iomgr/iomgr_internal.h src/core/iomgr/iomgr_posix.h src/core/iomgr/pollset.h src/core/iomgr/pollset_kick_posix.h src/core/iomgr/pollset_posix.h src/core/iomgr/pollset_set_posix.h src/core/iomgr/pollset_set_windows.h src/core/iomgr/pollset_windows.h src/core/iomgr/resolve_address.h src/core/iomgr/sockaddr.h src/core/iomgr/sockaddr_posix.h src/core/iomgr/sockaddr_utils.h src/core/iomgr/sockaddr_win32.h src/core/iomgr/socket_utils_posix.h src/core/iomgr/socket_windows.h src/core/iomgr/tcp_client.h src/core/iomgr/tcp_posix.h src/core/iomgr/tcp_server.h src/core/iomgr/tcp_windows.h src/core/iomgr/time_averaged_stats.h src/core/iomgr/wakeup_fd_pipe.h src/core/iomgr/wakeup_fd_posix.h src/core/json/json.h src/core/json/json_common.h src/core/json/json_reader.h src/core/json/json_writer.h src/core/profiling/timers.h src/core/profiling/timers_preciseclock.h src/core/surface/byte_buffer_queue.h src/core/surface/call.h src/core/surface/channel.h src/core/surface/client.h src/core/surface/completion_queue.h src/core/surface/event_string.h src/core/surface/init.h src/core/surface/server.h src/core/surface/surface_trace.h src/core/transport/chttp2/alpn.h src/core/transport/chttp2/bin_encoder.h src/core/transport/chttp2/frame.h src/core/transport/chttp2/frame_data.h src/core/transport/chttp2/frame_goaway.h src/core/transport/chttp2/frame_ping.h src/core/transport/chttp2/frame_rst_stream.h src/core/transport/chttp2/frame_settings.h src/core/transport/chttp2/frame_window_update.h src/core/transport/chttp2/hpack_parser.h src/core/transport/chttp2/hpack_table.h src/core/transport/chttp2/http2_errors.h src/core/transport/chttp2/huffsyms.h src/core/transport/chttp2/status_conversion.h src/core/transport/chttp2/stream_encoder.h src/core/transport/chttp2/stream_map.h src/core/transport/chttp2/timeout_encoding.h src/core/transport/chttp2/varint.h src/core/transport/chttp2_transport.h src/core/transport/metadata.h src/core/transport/stream_op.h src/core/transport/transport.h src/core/transport/transport_impl.h src/core/census/context.h src/core/httpcli/format_request.c src/core/httpcli/httpcli.c src/core/httpcli/httpcli_security_connector.c src/core/httpcli/parser.c src/core/security/base64.c src/core/security/client_auth_filter.c src/core/security/credentials.c src/core/security/credentials_metadata.c src/core/security/credentials_posix.c src/core/security/credentials_win32.c src/core/security/google_default_credentials.c src/core/security/json_token.c src/core/security/secure_endpoint.c src/core/security/secure_transport_setup.c src/core/security/security_connector.c src/core/security/security_context.c src/core/security/server_auth_filter.c src/core/security/server_secure_chttp2.c src/core/surface/init_secure.c src/core/surface/secure_channel_create.c src/core/tsi/fake_transport_security.c src/core/tsi/ssl_transport_security.c src/core/tsi/transport_security.c src/core/census/grpc_context.c src/core/channel/channel_args.c src/core/channel/channel_stack.c src/core/channel/child_channel.c src/core/channel/client_channel.c src/core/channel/client_setup.c src/core/channel/connected_channel.c src/core/channel/http_client_filter.c src/core/channel/http_server_filter.c src/core/channel/noop_filter.c src/core/compression/algorithm.c src/core/compression/message_compress.c src/core/debug/trace.c src/core/iomgr/alarm.c src/core/iomgr/alarm_heap.c src/core/iomgr/endpoint.c src/core/iomgr/endpoint_pair_posix.c src/core/iomgr/endpoint_pair_windows.c src/core/iomgr/fd_posix.c src/core/iomgr/iocp_windows.c src/core/iomgr/iomgr.c src/core/iomgr/iomgr_posix.c src/core/iomgr/iomgr_windows.c src/core/iomgr/pollset_kick_posix.c src/core/iomgr/pollset_multipoller_with_epoll.c src/core/iomgr/pollset_multipoller_with_poll_posix.c src/core/iomgr/pollset_posix.c src/core/iomgr/pollset_set_posix.c src/core/iomgr/pollset_set_windows.c src/core/iomgr/pollset_windows.c src/core/iomgr/resolve_address_posix.c src/core/iomgr/resolve_address_windows.c src/core/iomgr/sockaddr_utils.c src/core/iomgr/socket_utils_common_posix.c src/core/iomgr/socket_utils_linux.c src/core/iomgr/socket_utils_posix.c src/core/iomgr/socket_windows.c src/core/iomgr/tcp_client_posix.c src/core/iomgr/tcp_client_windows.c src/core/iomgr/tcp_posix.c src/core/iomgr/tcp_server_posix.c src/core/iomgr/tcp_server_windows.c src/core/iomgr/tcp_windows.c src/core/iomgr/time_averaged_stats.c src/core/iomgr/wakeup_fd_eventfd.c src/core/iomgr/wakeup_fd_nospecial.c src/core/iomgr/wakeup_fd_pipe.c src/core/iomgr/wakeup_fd_posix.c src/core/json/json.c src/core/json/json_reader.c src/core/json/json_string.c src/core/json/json_writer.c src/core/profiling/basic_timers.c src/core/profiling/stap_timers.c src/core/surface/byte_buffer.c src/core/surface/byte_buffer_queue.c src/core/surface/byte_buffer_reader.c src/core/surface/call.c src/core/surface/call_details.c src/core/surface/call_log_batch.c src/core/surface/channel.c src/core/surface/channel_create.c src/core/surface/client.c src/core/surface/completion_queue.c src/core/surface/event_string.c src/core/surface/init.c src/core/surface/lame_client.c src/core/surface/metadata_array.c src/core/surface/server.c src/core/surface/server_chttp2.c src/core/surface/server_create.c src/core/surface/surface_trace.c src/core/transport/chttp2/alpn.c src/core/transport/chttp2/bin_encoder.c src/core/transport/chttp2/frame_data.c src/core/transport/chttp2/frame_goaway.c src/core/transport/chttp2/frame_ping.c src/core/transport/chttp2/frame_rst_stream.c src/core/transport/chttp2/frame_settings.c src/core/transport/chttp2/frame_window_update.c src/core/transport/chttp2/hpack_parser.c src/core/transport/chttp2/hpack_table.c src/core/transport/chttp2/huffsyms.c src/core/transport/chttp2/status_conversion.c src/core/transport/chttp2/stream_encoder.c src/core/transport/chttp2/stream_map.c src/core/transport/chttp2/timeout_encoding.c src/core/transport/chttp2/varint.c src/core/transport/chttp2_transport.c src/core/transport/metadata.c src/core/transport/stream_op.c src/core/transport/transport.c src/core/transport/transport_op_string.c src/core/census/context.c src/core/census/initialize.c include/grpc/support/alloc.h include/grpc/support/atm.h include/grpc/support/atm_gcc_atomic.h include/grpc/support/atm_gcc_sync.h include/grpc/support/atm_win32.h include/grpc/support/cancellable_platform.h include/grpc/support/cmdline.h include/grpc/support/cpu.h include/grpc/support/histogram.h include/grpc/support/host_port.h include/grpc/support/log.h include/grpc/support/log_win32.h include/grpc/support/port_platform.h include/grpc/support/slice.h include/grpc/support/slice_buffer.h include/grpc/support/string_util.h include/grpc/support/subprocess.h include/grpc/support/sync.h include/grpc/support/sync_generic.h include/grpc/support/sync_posix.h include/grpc/support/sync_win32.h include/grpc/support/thd.h include/grpc/support/time.h include/grpc/support/tls.h include/grpc/support/tls_gcc.h include/grpc/support/tls_msvc.h include/grpc/support/tls_pthread.h include/grpc/support/useful.h src/core/support/env.h src/core/support/file.h src/core/support/murmur_hash.h src/core/support/string.h src/core/support/string_win32.h src/core/support/thd_internal.h src/core/support/alloc.c src/core/support/cancellable.c src/core/support/cmdline.c src/core/support/cpu_iphone.c src/core/support/cpu_linux.c src/core/support/cpu_posix.c src/core/support/cpu_windows.c src/core/support/env_linux.c src/core/support/env_posix.c src/core/support/env_win32.c src/core/support/file.c src/core/support/file_posix.c src/core/support/file_win32.c src/core/support/histogram.c src/core/support/host_port.c src/core/support/log.c src/core/support/log_android.c src/core/support/log_linux.c src/core/support/log_posix.c src/core/support/log_win32.c src/core/support/murmur_hash.c src/core/support/slice.c src/core/support/slice_buffer.c src/core/support/string.c src/core/support/string_posix.c src/core/support/string_win32.c src/core/support/subprocess_posix.c src/core/support/sync.c src/core/support/sync_posix.c src/core/support/sync_win32.c src/core/support/thd.c src/core/support/thd_posix.c src/core/support/thd_win32.c src/core/support/time.c src/core/support/time_posix.c src/core/support/time_win32.c src/core/support/tls_pthread.c
+INPUT = include/grpc/grpc_security.h include/grpc/byte_buffer.h include/grpc/byte_buffer_reader.h include/grpc/compression.h include/grpc/grpc.h include/grpc/status.h include/grpc/census.h src/core/httpcli/format_request.h src/core/httpcli/httpcli.h src/core/httpcli/httpcli_security_connector.h src/core/httpcli/parser.h src/core/security/auth_filters.h src/core/security/base64.h src/core/security/credentials.h src/core/security/json_token.h src/core/security/secure_endpoint.h src/core/security/secure_transport_setup.h src/core/security/security_connector.h src/core/security/security_context.h src/core/tsi/fake_transport_security.h src/core/tsi/ssl_transport_security.h src/core/tsi/transport_security.h src/core/tsi/transport_security_interface.h src/core/census/grpc_context.h src/core/channel/channel_args.h src/core/channel/channel_stack.h src/core/channel/child_channel.h src/core/channel/client_channel.h src/core/channel/client_setup.h src/core/channel/connected_channel.h src/core/channel/http_client_filter.h src/core/channel/http_server_filter.h src/core/channel/noop_filter.h src/core/compression/message_compress.h src/core/debug/trace.h src/core/iomgr/alarm.h src/core/iomgr/alarm_heap.h src/core/iomgr/alarm_internal.h src/core/iomgr/endpoint.h src/core/iomgr/endpoint_pair.h src/core/iomgr/fd_posix.h src/core/iomgr/iocp_windows.h src/core/iomgr/iomgr.h src/core/iomgr/iomgr_internal.h src/core/iomgr/iomgr_posix.h src/core/iomgr/pollset.h src/core/iomgr/pollset_kick_posix.h src/core/iomgr/pollset_posix.h src/core/iomgr/pollset_set_posix.h src/core/iomgr/pollset_set_windows.h src/core/iomgr/pollset_windows.h src/core/iomgr/resolve_address.h src/core/iomgr/sockaddr.h src/core/iomgr/sockaddr_posix.h src/core/iomgr/sockaddr_utils.h src/core/iomgr/sockaddr_win32.h src/core/iomgr/socket_utils_posix.h src/core/iomgr/socket_windows.h src/core/iomgr/tcp_client.h src/core/iomgr/tcp_posix.h src/core/iomgr/tcp_server.h src/core/iomgr/tcp_windows.h src/core/iomgr/time_averaged_stats.h src/core/iomgr/wakeup_fd_pipe.h src/core/iomgr/wakeup_fd_posix.h src/core/json/json.h src/core/json/json_common.h src/core/json/json_reader.h src/core/json/json_writer.h src/core/profiling/timers.h src/core/profiling/timers_preciseclock.h src/core/surface/byte_buffer_queue.h src/core/surface/call.h src/core/surface/channel.h src/core/surface/client.h src/core/surface/completion_queue.h src/core/surface/event_string.h src/core/surface/init.h src/core/surface/server.h src/core/surface/surface_trace.h src/core/transport/chttp2/alpn.h src/core/transport/chttp2/bin_encoder.h src/core/transport/chttp2/frame.h src/core/transport/chttp2/frame_data.h src/core/transport/chttp2/frame_goaway.h src/core/transport/chttp2/frame_ping.h src/core/transport/chttp2/frame_rst_stream.h src/core/transport/chttp2/frame_settings.h src/core/transport/chttp2/frame_window_update.h src/core/transport/chttp2/hpack_parser.h src/core/transport/chttp2/hpack_table.h src/core/transport/chttp2/http2_errors.h src/core/transport/chttp2/huffsyms.h src/core/transport/chttp2/status_conversion.h src/core/transport/chttp2/stream_encoder.h src/core/transport/chttp2/stream_map.h src/core/transport/chttp2/timeout_encoding.h src/core/transport/chttp2/varint.h src/core/transport/chttp2_transport.h src/core/transport/metadata.h src/core/transport/stream_op.h src/core/transport/transport.h src/core/transport/transport_impl.h src/core/census/context.h src/core/httpcli/format_request.c src/core/httpcli/httpcli.c src/core/httpcli/httpcli_security_connector.c src/core/httpcli/parser.c src/core/security/base64.c src/core/security/client_auth_filter.c src/core/security/credentials.c src/core/security/credentials_metadata.c src/core/security/credentials_posix.c src/core/security/credentials_win32.c src/core/security/google_default_credentials.c src/core/security/json_token.c src/core/security/secure_endpoint.c src/core/security/secure_transport_setup.c src/core/security/security_connector.c src/core/security/security_context.c src/core/security/server_auth_filter.c src/core/security/server_secure_chttp2.c src/core/surface/init_secure.c src/core/surface/secure_channel_create.c src/core/tsi/fake_transport_security.c src/core/tsi/ssl_transport_security.c src/core/tsi/transport_security.c src/core/census/grpc_context.c src/core/channel/channel_args.c src/core/channel/channel_stack.c src/core/channel/child_channel.c src/core/channel/client_channel.c src/core/channel/client_setup.c src/core/channel/connected_channel.c src/core/channel/http_client_filter.c src/core/channel/http_server_filter.c src/core/channel/noop_filter.c src/core/compression/algorithm.c src/core/compression/message_compress.c src/core/debug/trace.c src/core/iomgr/alarm.c src/core/iomgr/alarm_heap.c src/core/iomgr/endpoint.c src/core/iomgr/endpoint_pair_posix.c src/core/iomgr/endpoint_pair_windows.c src/core/iomgr/fd_posix.c src/core/iomgr/iocp_windows.c src/core/iomgr/iomgr.c src/core/iomgr/iomgr_posix.c src/core/iomgr/iomgr_windows.c src/core/iomgr/pollset_kick_posix.c src/core/iomgr/pollset_multipoller_with_epoll.c src/core/iomgr/pollset_multipoller_with_poll_posix.c src/core/iomgr/pollset_posix.c src/core/iomgr/pollset_set_posix.c src/core/iomgr/pollset_set_windows.c src/core/iomgr/pollset_windows.c src/core/iomgr/resolve_address_posix.c src/core/iomgr/resolve_address_windows.c src/core/iomgr/sockaddr_utils.c src/core/iomgr/socket_utils_common_posix.c src/core/iomgr/socket_utils_linux.c src/core/iomgr/socket_utils_posix.c src/core/iomgr/socket_windows.c src/core/iomgr/tcp_client_posix.c src/core/iomgr/tcp_client_windows.c src/core/iomgr/tcp_posix.c src/core/iomgr/tcp_server_posix.c src/core/iomgr/tcp_server_windows.c src/core/iomgr/tcp_windows.c src/core/iomgr/time_averaged_stats.c src/core/iomgr/wakeup_fd_eventfd.c src/core/iomgr/wakeup_fd_nospecial.c src/core/iomgr/wakeup_fd_pipe.c src/core/iomgr/wakeup_fd_posix.c src/core/json/json.c src/core/json/json_reader.c src/core/json/json_string.c src/core/json/json_writer.c src/core/profiling/basic_timers.c src/core/profiling/stap_timers.c src/core/surface/byte_buffer.c src/core/surface/byte_buffer_queue.c src/core/surface/byte_buffer_reader.c src/core/surface/call.c src/core/surface/call_details.c src/core/surface/call_log_batch.c src/core/surface/channel.c src/core/surface/channel_create.c src/core/surface/client.c src/core/surface/completion_queue.c src/core/surface/event_string.c src/core/surface/init.c src/core/surface/lame_client.c src/core/surface/metadata_array.c src/core/surface/server.c src/core/surface/server_chttp2.c src/core/surface/server_create.c src/core/surface/surface_trace.c src/core/transport/chttp2/alpn.c src/core/transport/chttp2/bin_encoder.c src/core/transport/chttp2/frame_data.c src/core/transport/chttp2/frame_goaway.c src/core/transport/chttp2/frame_ping.c src/core/transport/chttp2/frame_rst_stream.c src/core/transport/chttp2/frame_settings.c src/core/transport/chttp2/frame_window_update.c src/core/transport/chttp2/hpack_parser.c src/core/transport/chttp2/hpack_table.c src/core/transport/chttp2/huffsyms.c src/core/transport/chttp2/status_conversion.c src/core/transport/chttp2/stream_encoder.c src/core/transport/chttp2/stream_map.c src/core/transport/chttp2/timeout_encoding.c src/core/transport/chttp2/varint.c src/core/transport/chttp2_transport.c src/core/transport/metadata.c src/core/transport/stream_op.c src/core/transport/transport.c src/core/transport/transport_op_string.c src/core/census/context.c src/core/census/initialize.c include/grpc/support/alloc.h include/grpc/support/atm.h include/grpc/support/atm_gcc_atomic.h include/grpc/support/atm_gcc_sync.h include/grpc/support/atm_win32.h include/grpc/support/cancellable_platform.h include/grpc/support/cmdline.h include/grpc/support/cpu.h include/grpc/support/histogram.h include/grpc/support/host_port.h include/grpc/support/log.h include/grpc/support/log_win32.h include/grpc/support/port_platform.h include/grpc/support/slice.h include/grpc/support/slice_buffer.h include/grpc/support/string_util.h include/grpc/support/subprocess.h include/grpc/support/sync.h include/grpc/support/sync_generic.h include/grpc/support/sync_posix.h include/grpc/support/sync_win32.h include/grpc/support/thd.h include/grpc/support/time.h include/grpc/support/tls.h include/grpc/support/tls_gcc.h include/grpc/support/tls_msvc.h include/grpc/support/tls_pthread.h include/grpc/support/useful.h src/core/support/env.h src/core/support/file.h src/core/support/murmur_hash.h src/core/support/string.h src/core/support/string_win32.h src/core/support/thd_internal.h src/core/support/alloc.c src/core/support/cancellable.c src/core/support/cmdline.c src/core/support/cpu_iphone.c src/core/support/cpu_linux.c src/core/support/cpu_posix.c src/core/support/cpu_windows.c src/core/support/env_linux.c src/core/support/env_posix.c src/core/support/env_win32.c src/core/support/file.c src/core/support/file_posix.c src/core/support/file_win32.c src/core/support/histogram.c src/core/support/host_port.c src/core/support/log.c src/core/support/log_android.c src/core/support/log_linux.c src/core/support/log_posix.c src/core/support/log_win32.c src/core/support/murmur_hash.c src/core/support/slice.c src/core/support/slice_buffer.c src/core/support/string.c src/core/support/string_posix.c src/core/support/string_win32.c src/core/support/subprocess_posix.c src/core/support/sync.c src/core/support/sync_posix.c src/core/support/sync_win32.c src/core/support/thd.c src/core/support/thd_posix.c src/core/support/thd_win32.c src/core/support/time.c src/core/support/time_posix.c src/core/support/time_win32.c src/core/support/tls_pthread.c
# This tag can be used to specify the character encoding of the source files
# that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses
diff --git a/tools/run_tests/build_python.sh b/tools/run_tests/build_python.sh
index d0f09e4d8b..53db6af0ea 100755
--- a/tools/run_tests/build_python.sh
+++ b/tools/run_tests/build_python.sh
@@ -38,5 +38,5 @@ rm -rf python2.7_virtual_environment
virtualenv -p /usr/bin/python2.7 python2.7_virtual_environment
source python2.7_virtual_environment/bin/activate
pip install -r src/python/requirements.txt
-CFLAGS=-I$root/include LDFLAGS=-L$root/libs/$CONFIG pip install src/python/src
+CFLAGS="-I$root/include -std=c89" LDFLAGS=-L$root/libs/$CONFIG pip install src/python/src
pip install src/python/interop
diff --git a/tools/run_tests/jobset.py b/tools/run_tests/jobset.py
index 985b7a7f16..058a30d1ce 100755
--- a/tools/run_tests/jobset.py
+++ b/tools/run_tests/jobset.py
@@ -223,6 +223,7 @@ class Jobset(object):
self._travis = travis
self._cache = cache
self._stop_on_failure = stop_on_failure
+ self._hashes = {}
def start(self, spec):
"""Start a job. Return True on success, False on failure."""
@@ -231,11 +232,15 @@ class Jobset(object):
self.reap()
if self.cancelled(): return False
if spec.hash_targets:
- bin_hash = hashlib.sha1()
- for fn in spec.hash_targets:
- with open(which(fn)) as f:
- bin_hash.update(f.read())
- bin_hash = bin_hash.hexdigest()
+ if spec.identity() in self._hashes:
+ bin_hash = self._hashes[spec.identity()]
+ else:
+ bin_hash = hashlib.sha1()
+ for fn in spec.hash_targets:
+ with open(which(fn)) as f:
+ bin_hash.update(f.read())
+ bin_hash = bin_hash.hexdigest()
+ self._hashes[spec.identity()] = bin_hash
should_run = self._cache.should_run(spec.identity(), bin_hash)
else:
bin_hash = None
@@ -266,6 +271,7 @@ class Jobset(object):
for job in self._running:
job.kill()
dead.add(job)
+ break
for job in dead:
self._completed += 1
self._running.remove(job)
diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py
index 65024795e4..77b9f4b5f8 100755
--- a/tools/run_tests/run_tests.py
+++ b/tools/run_tests/run_tests.py
@@ -50,6 +50,9 @@ ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..'))
os.chdir(ROOT)
+_FORCE_ENVIRON_FOR_WRAPPERS = {}
+
+
# SimpleConfig: just compile with CONFIG=config, and run the binary to test
class SimpleConfig(object):
@@ -146,7 +149,7 @@ class NodeLanguage(object):
def test_specs(self, config, travis):
return [config.job_spec(['tools/run_tests/run_node.sh'], None,
- environ={'GRPC_TRACE': 'surface,batch'})]
+ environ=_FORCE_ENVIRON_FOR_WRAPPERS)]
def make_targets(self):
return ['static_c', 'shared_c']
@@ -165,7 +168,7 @@ class PhpLanguage(object):
def test_specs(self, config, travis):
return [config.job_spec(['src/php/bin/run_tests.sh'], None,
- environ={'GRPC_TRACE': 'surface,batch'})]
+ environ=_FORCE_ENVIRON_FOR_WRAPPERS)]
def make_targets(self):
return ['static_c', 'shared_c']
@@ -190,13 +193,13 @@ class PythonLanguage(object):
modules = [config.job_spec(['tools/run_tests/run_python.sh', '-m',
test['module']],
None,
- environ={'GRPC_TRACE': 'surface,batch'},
+ environ=_FORCE_ENVIRON_FOR_WRAPPERS,
shortname=test['module'])
for test in self._tests if 'module' in test]
files = [config.job_spec(['tools/run_tests/run_python.sh',
test['file']],
None,
- environ={'GRPC_TRACE': 'surface,batch'},
+ environ=_FORCE_ENVIRON_FOR_WRAPPERS,
shortname=test['file'])
for test in self._tests if 'file' in test]
return files + modules
@@ -218,7 +221,7 @@ class RubyLanguage(object):
def test_specs(self, config, travis):
return [config.job_spec(['tools/run_tests/run_ruby.sh'], None,
- environ={'GRPC_TRACE': 'surface,batch'})]
+ environ=_FORCE_ENVIRON_FOR_WRAPPERS)]
def make_targets(self):
return ['run_dep_checks']
@@ -251,7 +254,7 @@ class CSharpLanguage(object):
cmd = 'tools/run_tests/run_csharp.sh'
return [config.job_spec([cmd, assembly],
None, shortname=assembly,
- environ={'GRPC_TRACE': 'surface,batch'})
+ environ=_FORCE_ENVIRON_FOR_WRAPPERS)
for assembly in assemblies ]
def make_targets(self):
@@ -313,7 +316,7 @@ _CONFIGS = {
'dbg': SimpleConfig('dbg'),
'opt': SimpleConfig('opt'),
'tsan': SimpleConfig('tsan', environ={
- 'TSAN_OPTIONS': 'suppressions=tools/tsan_suppressions.txt'}),
+ 'TSAN_OPTIONS': 'suppressions=tools/tsan_suppressions.txt:halt_on_error=1'}),
'msan': SimpleConfig('msan'),
'ubsan': SimpleConfig('ubsan'),
'asan': SimpleConfig('asan', environ={
@@ -402,6 +405,9 @@ run_configs = set(_CONFIGS[cfg]
for x in args.config))
build_configs = set(cfg.build_config for cfg in run_configs)
+if args.travis:
+ _FORCE_ENVIRON_FOR_WRAPPERS = {'GRPC_TRACE': 'surface,batch'}
+
make_targets = []
languages = set(_LANGUAGES[l]
for l in itertools.chain.from_iterable(
@@ -452,6 +458,7 @@ class TestCache(object):
def __init__(self, use_cache_results):
self._last_successful_run = {}
self._use_cache_results = use_cache_results
+ self._last_save = time.time()
def should_run(self, cmdline, bin_hash):
if cmdline not in self._last_successful_run:
@@ -464,7 +471,8 @@ class TestCache(object):
def finished(self, cmdline, bin_hash):
self._last_successful_run[cmdline] = bin_hash
- self.save()
+ if time.time() - self._last_save > 1:
+ self.save()
def dump(self):
return [{'cmdline': k, 'hash': v}
@@ -476,6 +484,7 @@ class TestCache(object):
def save(self):
with open('.run_tests_cache', 'w') as f:
f.write(json.dumps(self.dump()))
+ self._last_save = time.time()
def maybe_load(self):
if os.path.exists('.run_tests_cache'):
@@ -518,6 +527,8 @@ def _build_and_run(check_cancelled, newline_on_success, travis, cache):
for antagonist in antagonists:
antagonist.kill()
+ if cache: cache.save()
+
return 0
diff --git a/vsprojects/grpc/grpc.vcxproj b/vsprojects/grpc/grpc.vcxproj
index d82a4c4e1a..10be8f5992 100644
--- a/vsprojects/grpc/grpc.vcxproj
+++ b/vsprojects/grpc/grpc.vcxproj
@@ -150,6 +150,7 @@
<ClInclude Include="..\..\include\grpc\grpc_security.h" />
<ClInclude Include="..\..\include\grpc\byte_buffer.h" />
<ClInclude Include="..\..\include\grpc\byte_buffer_reader.h" />
+ <ClInclude Include="..\..\include\grpc\compression.h" />
<ClInclude Include="..\..\include\grpc\grpc.h" />
<ClInclude Include="..\..\include\grpc\status.h" />
<ClInclude Include="..\..\include\grpc\census.h" />
@@ -181,7 +182,6 @@
<ClInclude Include="..\..\src\core\channel\http_client_filter.h" />
<ClInclude Include="..\..\src\core\channel\http_server_filter.h" />
<ClInclude Include="..\..\src\core\channel\noop_filter.h" />
- <ClInclude Include="..\..\src\core\compression\algorithm.h" />
<ClInclude Include="..\..\src\core\compression\message_compress.h" />
<ClInclude Include="..\..\src\core\debug\trace.h" />
<ClInclude Include="..\..\src\core\iomgr\alarm.h" />
diff --git a/vsprojects/grpc/grpc.vcxproj.filters b/vsprojects/grpc/grpc.vcxproj.filters
index 32d8231da9..4af61e51eb 100644
--- a/vsprojects/grpc/grpc.vcxproj.filters
+++ b/vsprojects/grpc/grpc.vcxproj.filters
@@ -366,6 +366,9 @@
<ClInclude Include="..\..\include\grpc\byte_buffer_reader.h">
<Filter>include\grpc</Filter>
</ClInclude>
+ <ClInclude Include="..\..\include\grpc\compression.h">
+ <Filter>include\grpc</Filter>
+ </ClInclude>
<ClInclude Include="..\..\include\grpc\grpc.h">
<Filter>include\grpc</Filter>
</ClInclude>
@@ -455,9 +458,6 @@
<ClInclude Include="..\..\src\core\channel\noop_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>
- <ClInclude Include="..\..\src\core\compression\algorithm.h">
- <Filter>src\core\compression</Filter>
- </ClInclude>
<ClInclude Include="..\..\src\core\compression\message_compress.h">
<Filter>src\core\compression</Filter>
</ClInclude>
diff --git a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
index bd525ec768..bd8ae4670a 100644
--- a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
+++ b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
@@ -148,6 +148,7 @@
<ItemGroup>
<ClInclude Include="..\..\include\grpc\byte_buffer.h" />
<ClInclude Include="..\..\include\grpc\byte_buffer_reader.h" />
+ <ClInclude Include="..\..\include\grpc\compression.h" />
<ClInclude Include="..\..\include\grpc\grpc.h" />
<ClInclude Include="..\..\include\grpc\status.h" />
<ClInclude Include="..\..\include\grpc\census.h" />
@@ -163,7 +164,6 @@
<ClInclude Include="..\..\src\core\channel\http_client_filter.h" />
<ClInclude Include="..\..\src\core\channel\http_server_filter.h" />
<ClInclude Include="..\..\src\core\channel\noop_filter.h" />
- <ClInclude Include="..\..\src\core\compression\algorithm.h" />
<ClInclude Include="..\..\src\core\compression\message_compress.h" />
<ClInclude Include="..\..\src\core\debug\trace.h" />
<ClInclude Include="..\..\src\core\iomgr\alarm.h" />
diff --git a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters
index 39a85cf05b..dde460c83f 100644
--- a/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters
+++ b/vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters
@@ -297,6 +297,9 @@
<ClInclude Include="..\..\include\grpc\byte_buffer_reader.h">
<Filter>include\grpc</Filter>
</ClInclude>
+ <ClInclude Include="..\..\include\grpc\compression.h">
+ <Filter>include\grpc</Filter>
+ </ClInclude>
<ClInclude Include="..\..\include\grpc\grpc.h">
<Filter>include\grpc</Filter>
</ClInclude>
@@ -338,9 +341,6 @@
<ClInclude Include="..\..\src\core\channel\noop_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>
- <ClInclude Include="..\..\src\core\compression\algorithm.h">
- <Filter>src\core\compression</Filter>
- </ClInclude>
<ClInclude Include="..\..\src\core\compression\message_compress.h">
<Filter>src\core\compression</Filter>
</ClInclude>