aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-09 22:57:45 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-09 22:57:45 -0700
commit7f79750bb585c2952b8cf29e323799feb4f82f62 (patch)
treed5887658de04191f890d9f343ab5cf18d5f2bb88 /src
parent97f1454fc5ccdfc57e2f1296a52f1f83037cfd78 (diff)
parent36092008c694ff701f5a6f6edeebd37efbe06012 (diff)
Merge pull request #12 from jtattermusch/you-complete-me-csharp
C# shutdown take 2
Diffstat (limited to 'src')
-rw-r--r--src/core/compression/algorithm.c2
-rw-r--r--src/core/compression/algorithm.h49
-rw-r--r--src/core/compression/message_compress.h2
-rw-r--r--src/core/surface/byte_buffer.c32
-rw-r--r--src/core/surface/byte_buffer_reader.c56
-rw-r--r--src/core/surface/call.c18
-rw-r--r--src/core/surface/channel.c34
-rw-r--r--src/core/surface/channel.h10
-rw-r--r--src/core/transport/chttp2_transport.c66
-rw-r--r--src/core/transport/metadata.c2
-rw-r--r--src/cpp/proto/proto_utils.cc4
-rw-r--r--src/cpp/util/byte_buffer.cc2
-rw-r--r--src/cpp/util/time.cc15
-rw-r--r--src/csharp/Grpc.Core.Tests/ClientServerTest.cs2
-rw-r--r--src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj4
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/CompletionQueueEventTest.cs52
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs64
-rw-r--r--src/csharp/Grpc.Core.Tests/PInvokeTest.cs8
-rw-r--r--src/csharp/Grpc.Core/Channel.cs36
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.csproj4
-rw-r--r--src/csharp/Grpc.Core/GrpcEnvironment.cs31
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs35
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs43
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs (renamed from src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs)44
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs102
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs60
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs14
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionRegistry.cs89
-rw-r--r--src/csharp/Grpc.Core/Internal/DebugStats.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/Enums.cs40
-rw-r--r--src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs20
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs27
-rw-r--r--src/csharp/Grpc.Core/Internal/Timespec.cs2
-rw-r--r--src/csharp/Grpc.Core/Server.cs53
-rw-r--r--src/csharp/Grpc.Examples.MathClient/MathClient.cs2
-rw-r--r--src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs2
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClient.cs4
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs2
-rw-r--r--src/csharp/ext/grpc_csharp_ext.c114
-rw-r--r--src/node/ext/byte_buffer.cc2
-rw-r--r--src/node/package.json2
-rw-r--r--src/node/src/common.js4
-rw-r--r--src/node/test/common_test.js90
-rw-r--r--src/node/test/test_messages.proto38
-rw-r--r--src/objective-c/GRPCClient/private/NSData+GRPC.m4
-rw-r--r--src/php/ext/grpc/byte_buffer.c2
-rw-r--r--src/python/requirements.txt2
-rw-r--r--src/python/src/grpc/_adapter/_c/utility.c2
-rw-r--r--src/python/src/setup.py4
-rw-r--r--src/ruby/ext/grpc/rb_byte_buffer.c2
51 files changed, 875 insertions, 433 deletions
diff --git a/src/core/compression/algorithm.c b/src/core/compression/algorithm.c
index ca07002ff9..36ead843d2 100644
--- a/src/core/compression/algorithm.c
+++ b/src/core/compression/algorithm.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/compression/algorithm.h"
+#include <grpc/compression.h>
const char *grpc_compression_algorithm_name(
grpc_compression_algorithm algorithm) {
diff --git a/src/core/compression/algorithm.h b/src/core/compression/algorithm.h
deleted file mode 100644
index 9dd9f57b56..0000000000
--- a/src/core/compression/algorithm.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_INTERNAL_CORE_COMPRESSION_ALGORITHM_H
-#define GRPC_INTERNAL_CORE_COMPRESSION_ALGORITHM_H
-
-/* The various compression algorithms supported by GRPC */
-typedef enum {
- GRPC_COMPRESS_NONE = 0,
- GRPC_COMPRESS_DEFLATE,
- GRPC_COMPRESS_GZIP,
- /* TODO(ctiller): snappy */
- GRPC_COMPRESS_ALGORITHMS_COUNT
-} grpc_compression_algorithm;
-
-const char *grpc_compression_algorithm_name(
- grpc_compression_algorithm algorithm);
-
-#endif /* GRPC_INTERNAL_CORE_COMPRESSION_ALGORITHM_H */
diff --git a/src/core/compression/message_compress.h b/src/core/compression/message_compress.h
index e8aef1a713..aba701a6ee 100644
--- a/src/core/compression/message_compress.h
+++ b/src/core/compression/message_compress.h
@@ -34,7 +34,7 @@
#ifndef GRPC_INTERNAL_CORE_COMPRESSION_MESSAGE_COMPRESS_H
#define GRPC_INTERNAL_CORE_COMPRESSION_MESSAGE_COMPRESS_H
-#include "src/core/compression/algorithm.h"
+#include <grpc/compression.h>
#include <grpc/support/slice_buffer.h>
/* compress 'input' to 'output' using 'algorithm'.
diff --git a/src/core/surface/byte_buffer.c b/src/core/surface/byte_buffer.c
index 12244f6644..4817e00454 100644
--- a/src/core/surface/byte_buffer.c
+++ b/src/core/surface/byte_buffer.c
@@ -35,25 +35,31 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-grpc_byte_buffer *grpc_byte_buffer_create(gpr_slice *slices, size_t nslices) {
+grpc_byte_buffer *grpc_raw_byte_buffer_create(gpr_slice *slices,
+ size_t nslices) {
+ return grpc_raw_compressed_byte_buffer_create(slices, nslices,
+ GRPC_COMPRESS_NONE);
+}
+
+grpc_byte_buffer *grpc_raw_compressed_byte_buffer_create(
+ gpr_slice *slices, size_t nslices, grpc_compression_algorithm compression) {
size_t i;
grpc_byte_buffer *bb = malloc(sizeof(grpc_byte_buffer));
-
- bb->type = GRPC_BB_SLICE_BUFFER;
- gpr_slice_buffer_init(&bb->data.slice_buffer);
+ bb->type = GRPC_BB_RAW;
+ bb->data.raw.compression = compression;
+ gpr_slice_buffer_init(&bb->data.raw.slice_buffer);
for (i = 0; i < nslices; i++) {
gpr_slice_ref(slices[i]);
- gpr_slice_buffer_add(&bb->data.slice_buffer, slices[i]);
+ gpr_slice_buffer_add(&bb->data.raw.slice_buffer, slices[i]);
}
-
return bb;
}
grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
switch (bb->type) {
- case GRPC_BB_SLICE_BUFFER:
- return grpc_byte_buffer_create(bb->data.slice_buffer.slices,
- bb->data.slice_buffer.count);
+ case GRPC_BB_RAW:
+ return grpc_raw_byte_buffer_create(bb->data.raw.slice_buffer.slices,
+ bb->data.raw.slice_buffer.count);
}
gpr_log(GPR_INFO, "should never get here");
abort();
@@ -63,8 +69,8 @@ grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) {
if (!bb) return;
switch (bb->type) {
- case GRPC_BB_SLICE_BUFFER:
- gpr_slice_buffer_destroy(&bb->data.slice_buffer);
+ case GRPC_BB_RAW:
+ gpr_slice_buffer_destroy(&bb->data.raw.slice_buffer);
break;
}
free(bb);
@@ -72,8 +78,8 @@ void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) {
size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) {
switch (bb->type) {
- case GRPC_BB_SLICE_BUFFER:
- return bb->data.slice_buffer.length;
+ case GRPC_BB_RAW:
+ return bb->data.raw.slice_buffer.length;
}
gpr_log(GPR_ERROR, "should never reach here");
abort();
diff --git a/src/core/surface/byte_buffer_reader.c b/src/core/surface/byte_buffer_reader.c
index 41ad700274..86829a686f 100644
--- a/src/core/surface/byte_buffer_reader.c
+++ b/src/core/surface/byte_buffer_reader.c
@@ -33,41 +33,73 @@
#include <grpc/byte_buffer_reader.h>
+#include <grpc/compression.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/slice_buffer.h>
#include <grpc/byte_buffer.h>
+#include "src/core/compression/message_compress.h"
+
+static int is_compressed(grpc_byte_buffer *buffer) {
+ switch (buffer->type) {
+ case GRPC_BB_RAW:
+ if (buffer->data.raw.compression == GRPC_COMPRESS_NONE) {
+ return 0 /* GPR_FALSE */;
+ }
+ break;
+ }
+ return 1 /* GPR_TRUE */;
+}
+
void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
grpc_byte_buffer *buffer) {
- reader->buffer = buffer;
- switch (buffer->type) {
- case GRPC_BB_SLICE_BUFFER:
+ gpr_slice_buffer decompressed_slices_buffer;
+ reader->buffer_in = buffer;
+ switch (reader->buffer_in->type) {
+ case GRPC_BB_RAW:
+ gpr_slice_buffer_init(&decompressed_slices_buffer);
+ if (is_compressed(reader->buffer_in)) {
+ grpc_msg_decompress(reader->buffer_in->data.raw.compression,
+ &reader->buffer_in->data.raw.slice_buffer,
+ &decompressed_slices_buffer);
+ reader->buffer_out = grpc_raw_byte_buffer_create(
+ decompressed_slices_buffer.slices,
+ decompressed_slices_buffer.count);
+ gpr_slice_buffer_destroy(&decompressed_slices_buffer);
+ } else { /* not compressed, use the input buffer as output */
+ reader->buffer_out = reader->buffer_in;
+ }
reader->current.index = 0;
+ break;
}
}
void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) {
- /* no-op: the user is responsible for memory deallocation.
- * Other cleanup operations would go here if needed. */
+ switch (reader->buffer_in->type) {
+ case GRPC_BB_RAW:
+ /* keeping the same if-else structure as in the init function */
+ if (is_compressed(reader->buffer_in)) {
+ grpc_byte_buffer_destroy(reader->buffer_out);
+ }
+ break;
+ }
}
int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
gpr_slice *slice) {
- grpc_byte_buffer *buffer = reader->buffer;
- gpr_slice_buffer *slice_buffer;
- switch (buffer->type) {
- case GRPC_BB_SLICE_BUFFER:
- slice_buffer = &buffer->data.slice_buffer;
+ switch (reader->buffer_in->type) {
+ case GRPC_BB_RAW: {
+ gpr_slice_buffer *slice_buffer;
+ slice_buffer = &reader->buffer_out->data.raw.slice_buffer;
if (reader->current.index < slice_buffer->count) {
*slice = gpr_slice_ref(slice_buffer->slices[reader->current.index]);
reader->current.index += 1;
return 1;
- } else {
- return 0;
}
break;
+ }
}
return 0;
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 74df5745b5..cead5e08dc 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -636,7 +636,7 @@ static void call_on_done_send(void *pc, int success) {
static void finish_message(grpc_call *call) {
/* TODO(ctiller): this could be a lot faster if coded directly */
- grpc_byte_buffer *byte_buffer = grpc_byte_buffer_create(
+ grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create(
call->incoming_message.slices, call->incoming_message.count);
gpr_slice_buffer_reset_and_unref(&call->incoming_message);
@@ -759,7 +759,7 @@ static void call_on_done_recv(void *pc, int success) {
unlock(call);
GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0);
- GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
+ GRPC_TIMER_END(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
}
static int prepare_application_metadata(grpc_call *call, size_t count,
@@ -806,9 +806,9 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
size_t i;
switch (byte_buffer->type) {
- case GRPC_BB_SLICE_BUFFER:
- for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) {
- gpr_slice slice = byte_buffer->data.slice_buffer.slices[i];
+ case GRPC_BB_RAW:
+ for (i = 0; i < byte_buffer->data.raw.slice_buffer.count; i++) {
+ gpr_slice slice = byte_buffer->data.raw.slice_buffer.slices[i];
gpr_slice_ref(slice);
grpc_sopb_add_slice(sopb, slice);
}
@@ -820,7 +820,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
grpc_ioreq_data data;
grpc_metadata_batch mdb;
size_t i;
- char status_str[GPR_LTOA_MIN_BUFSIZE];
GPR_ASSERT(op->send_ops == NULL);
switch (call->write_state) {
@@ -865,13 +864,10 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
/* send status */
/* TODO(ctiller): cache common status values */
data = call->request_data[GRPC_IOREQ_SEND_STATUS];
- gpr_ltoa(data.send_status.code, status_str);
grpc_metadata_batch_add_tail(
&mdb, &call->status_link,
- grpc_mdelem_from_metadata_strings(
- call->metadata_context,
- grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
- grpc_mdstr_from_string(call->metadata_context, status_str)));
+ grpc_channel_get_reffed_status_elem(call->channel,
+ data.send_status.code));
if (data.send_status.details) {
grpc_metadata_batch_add_tail(
&mdb, &call->details_link,
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 947011c613..9175ad0572 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -37,12 +37,20 @@
#include <string.h>
#include "src/core/iomgr/iomgr.h"
+#include "src/core/support/string.h"
#include "src/core/surface/call.h"
#include "src/core/surface/client.h"
#include "src/core/surface/init.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+/** Cache grpc-status: X mdelems for X = 0..NUM_CACHED_STATUS_ELEMS.
+ * Avoids needing to take a metadata context lock for sending status
+ * if the status code is <= NUM_CACHED_STATUS_ELEMS.
+ * Sized to allow the most commonly used codes to fit in
+ * (OK, Cancelled, Unknown). */
+#define NUM_CACHED_STATUS_ELEMS 3
+
typedef struct registered_call {
grpc_mdelem *path;
grpc_mdelem *authority;
@@ -54,10 +62,13 @@ struct grpc_channel {
gpr_refcount refs;
gpr_uint32 max_message_length;
grpc_mdctx *metadata_context;
+ /** mdstr for the grpc-status key */
grpc_mdstr *grpc_status_string;
grpc_mdstr *grpc_message_string;
grpc_mdstr *path_string;
grpc_mdstr *authority_string;
+ /** mdelem for grpc-status: 0 thru grpc-status: 2 */
+ grpc_mdelem *grpc_status_elem[NUM_CACHED_STATUS_ELEMS];
gpr_mu registered_call_mu;
registered_call *registered_calls;
@@ -88,6 +99,13 @@ grpc_channel *grpc_channel_create_from_filters(
channel->metadata_context = mdctx;
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status");
channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message");
+ for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
+ char buf[GPR_LTOA_MIN_BUFSIZE];
+ gpr_ltoa(i, buf);
+ channel->grpc_status_elem[i] = grpc_mdelem_from_metadata_strings(
+ mdctx, grpc_mdstr_ref(channel->grpc_status_string),
+ grpc_mdstr_from_string(mdctx, buf));
+ }
channel->path_string = grpc_mdstr_from_string(mdctx, ":path");
channel->authority_string = grpc_mdstr_from_string(mdctx, ":authority");
grpc_channel_stack_init(filters, num_filters, args, channel->metadata_context,
@@ -175,7 +193,11 @@ void grpc_channel_internal_ref(grpc_channel *channel) {
static void destroy_channel(void *p, int ok) {
grpc_channel *channel = p;
+ size_t i;
grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel));
+ for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
+ grpc_mdelem_unref(channel->grpc_status_elem[i]);
+ }
grpc_mdstr_unref(channel->grpc_status_string);
grpc_mdstr_unref(channel->grpc_message_string);
grpc_mdstr_unref(channel->path_string);
@@ -235,6 +257,18 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) {
return channel->grpc_status_string;
}
+grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) {
+ if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) {
+ return grpc_mdelem_ref(channel->grpc_status_elem[i]);
+ } else {
+ char tmp[GPR_LTOA_MIN_BUFSIZE];
+ gpr_ltoa(i, tmp);
+ return grpc_mdelem_from_metadata_strings(
+ channel->metadata_context, grpc_mdstr_ref(channel->grpc_status_string),
+ grpc_mdstr_from_string(channel->metadata_context, tmp));
+ }
+}
+
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) {
return channel->grpc_message_string;
}
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index 388be35711..6d1ed87900 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -40,8 +40,18 @@ grpc_channel *grpc_channel_create_from_filters(
const grpc_channel_filter **filters, size_t count,
const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client);
+/** Get a (borrowed) pointer to this channels underlying channel stack */
grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel);
+
+/** Get a (borrowed) pointer to the channel wide metadata context */
grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel);
+
+/** Get a grpc_mdelem of grpc-status: X where X is the numeric value of
+ status_code.
+
+ The returned elem is owned by the caller. */
+grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel,
+ int status_code);
grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 630504565b..bd259f7ae3 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -230,7 +230,10 @@ struct transport {
/* basic state management - what are we doing at the moment? */
gpr_uint8 reading;
gpr_uint8 writing;
- gpr_uint8 calling_back;
+ /** are we calling back (via cb) with a channel-level event */
+ gpr_uint8 calling_back_channel;
+ /** are we calling back any grpc_transport_op completion events */
+ gpr_uint8 calling_back_ops;
gpr_uint8 destroying;
gpr_uint8 closed;
error_state error_state;
@@ -357,7 +360,7 @@ static void push_setting(transport *t, grpc_chttp2_setting_id id,
gpr_uint32 value);
static int prepare_callbacks(transport *t);
-static void run_callbacks(transport *t, const grpc_transport_callbacks *cb);
+static void run_callbacks(transport *t);
static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb);
static int prepare_write(transport *t);
@@ -565,7 +568,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
}
gpr_mu_lock(&t->mu);
- t->calling_back = 1;
+ t->calling_back_channel = 1;
ref_transport(t); /* matches unref at end of this function */
gpr_mu_unlock(&t->mu);
@@ -574,7 +577,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
lock(t);
t->cb = sr.callbacks;
t->cb_user_data = sr.user_data;
- t->calling_back = 0;
+ t->calling_back_channel = 0;
if (t->destroying) gpr_cv_signal(&t->cv);
unlock(t);
@@ -595,7 +598,7 @@ static void destroy_transport(grpc_transport *gt) {
We need to be not writing as cancellation finalization may produce some
callbacks that NEED to be made to close out some streams when t->writing
becomes 0. */
- while (t->calling_back || t->writing) {
+ while (t->calling_back_channel || t->writing) {
gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future);
}
drop_connection(t);
@@ -830,28 +833,29 @@ static void unlock(transport *t) {
finish_reads(t);
/* gather any callbacks that need to be made */
- if (!t->calling_back) {
- t->calling_back = perform_callbacks = prepare_callbacks(t);
- if (cb) {
- if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
- call_closed = 1;
- t->calling_back = 1;
- t->cb = NULL; /* no more callbacks */
- t->error_state = ERROR_STATE_NOTIFIED;
- }
- if (t->num_pending_goaways) {
- goaways = t->pending_goaways;
- num_goaways = t->num_pending_goaways;
- t->pending_goaways = NULL;
- t->num_pending_goaways = 0;
- t->cap_pending_goaways = 0;
- t->calling_back = 1;
- }
- }
+ if (!t->calling_back_ops) {
+ t->calling_back_ops = perform_callbacks = prepare_callbacks(t);
+ if (perform_callbacks) ref_transport(t);
}
- if (perform_callbacks || call_closed || num_goaways) {
- ref_transport(t);
+ if (!t->calling_back_channel && cb) {
+ if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
+ call_closed = 1;
+ t->calling_back_channel = 1;
+ t->cb = NULL; /* no more callbacks */
+ t->error_state = ERROR_STATE_NOTIFIED;
+ }
+ if (t->num_pending_goaways) {
+ goaways = t->pending_goaways;
+ num_goaways = t->num_pending_goaways;
+ t->pending_goaways = NULL;
+ t->num_pending_goaways = 0;
+ t->cap_pending_goaways = 0;
+ t->calling_back_channel = 1;
+ }
+ if (call_closed || num_goaways) {
+ ref_transport(t);
+ }
}
/* finally unlock */
@@ -865,7 +869,11 @@ static void unlock(transport *t) {
}
if (perform_callbacks) {
- run_callbacks(t, cb);
+ run_callbacks(t);
+ lock(t);
+ t->calling_back_ops = 0;
+ unlock(t);
+ unref_transport(t);
}
if (call_closed) {
@@ -878,9 +886,9 @@ static void unlock(transport *t) {
perform_write(t, ep);
}
- if (perform_callbacks || call_closed || num_goaways) {
+ if (call_closed || num_goaways) {
lock(t);
- t->calling_back = 0;
+ t->calling_back_channel = 0;
if (t->destroying) gpr_cv_signal(&t->cv);
unlock(t);
unref_transport(t);
@@ -2101,7 +2109,7 @@ static int prepare_callbacks(transport *t) {
return t->executing_callbacks.count > 0;
}
-static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) {
+static void run_callbacks(transport *t) {
size_t i;
for (i = 0; i < t->executing_callbacks.count; i++) {
op_closure c = t->executing_callbacks.callbacks[i];
diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c
index c80d67823f..e75b449e12 100644
--- a/src/core/transport/metadata.c
+++ b/src/core/transport/metadata.c
@@ -120,7 +120,7 @@ static void unlock(grpc_mdctx *ctx) {
if (ctx->refs == 0) {
/* uncomment if you're having trouble diagnosing an mdelem leak to make
things clearer (slows down destruction a lot, however) */
- gc_mdtab(ctx);
+ /* gc_mdtab(ctx); */
if (ctx->mdtab_count && ctx->mdtab_count == ctx->mdtab_free) {
discard_metadata(ctx);
}
diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc
index 7a7e73bba4..f4cf5cf17a 100644
--- a/src/cpp/proto/proto_utils.cc
+++ b/src/cpp/proto/proto_utils.cc
@@ -49,8 +49,8 @@ class GrpcBufferWriter GRPC_FINAL
explicit GrpcBufferWriter(grpc_byte_buffer** bp,
int block_size = kMaxBufferLength)
: block_size_(block_size), byte_count_(0), have_backup_(false) {
- *bp = grpc_byte_buffer_create(NULL, 0);
- slice_buffer_ = &(*bp)->data.slice_buffer;
+ *bp = grpc_raw_byte_buffer_create(NULL, 0);
+ slice_buffer_ = &(*bp)->data.raw.slice_buffer;
}
~GrpcBufferWriter() GRPC_OVERRIDE {
diff --git a/src/cpp/util/byte_buffer.cc b/src/cpp/util/byte_buffer.cc
index 45eaa2fe5b..a78e4226d2 100644
--- a/src/cpp/util/byte_buffer.cc
+++ b/src/cpp/util/byte_buffer.cc
@@ -42,7 +42,7 @@ ByteBuffer::ByteBuffer(Slice* slices, size_t nslices) {
for (size_t i = 0; i < nslices; i++) {
c_slices[i] = slices[i].slice_;
}
- buffer_ = grpc_byte_buffer_create(c_slices.data(), nslices);
+ buffer_ = grpc_raw_byte_buffer_create(c_slices.data(), nslices);
}
void ByteBuffer::Clear() {
diff --git a/src/cpp/util/time.cc b/src/cpp/util/time.cc
index 1fef2a56de..fd94d00b32 100644
--- a/src/cpp/util/time.cc
+++ b/src/cpp/util/time.cc
@@ -42,6 +42,7 @@ using std::chrono::duration_cast;
using std::chrono::nanoseconds;
using std::chrono::seconds;
using std::chrono::system_clock;
+using std::chrono::high_resolution_clock;
namespace grpc {
@@ -59,6 +60,20 @@ void Timepoint2Timespec(const system_clock::time_point& from,
to->tv_nsec = nsecs.count();
}
+void TimepointHR2Timespec(const high_resolution_clock::time_point& from,
+ gpr_timespec* to) {
+ high_resolution_clock::duration deadline = from.time_since_epoch();
+ seconds secs = duration_cast<seconds>(deadline);
+ if (from == high_resolution_clock::time_point::max() ||
+ secs.count() >= gpr_inf_future.tv_sec || secs.count() < 0) {
+ *to = gpr_inf_future;
+ return;
+ }
+ nanoseconds nsecs = duration_cast<nanoseconds>(deadline - secs);
+ to->tv_sec = secs.count();
+ to->tv_nsec = nsecs.count();
+}
+
system_clock::time_point Timespec2Timepoint(gpr_timespec t) {
if (gpr_time_cmp(t, gpr_inf_future) == 0) {
return system_clock::time_point::max();
diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
index b69b933aba..82ded5cc7a 100644
--- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
@@ -86,7 +86,7 @@ namespace Grpc.Core.Tests
server.AddServiceDefinition(ServiceDefinition);
int port = server.AddListeningPort(Host, Server.PickUnusedPort);
server.Start();
- channel = new Channel(Host + ":" + port);
+ channel = new Channel(Host, port);
}
[TearDown]
diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
index 62cb443272..029653967b 100644
--- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
+++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
@@ -3,7 +3,7 @@
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
- <ProductVersion>10.0.0</ProductVersion>
+ <ProductVersion>8.0.30703</ProductVersion>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{86EC5CB4-4EA2-40A2-8057-86542A0353BB}</ProjectGuid>
<OutputType>Library</OutputType>
@@ -46,6 +46,8 @@
<Compile Include="TimespecTest.cs" />
<Compile Include="PInvokeTest.cs" />
<Compile Include="Internal\MetadataArraySafeHandleTest.cs" />
+ <Compile Include="Internal\CompletionQueueSafeHandleTest.cs" />
+ <Compile Include="Internal\CompletionQueueEventTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
diff --git a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueEventTest.cs b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueEventTest.cs
new file mode 100644
index 0000000000..188c6406a2
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueEventTest.cs
@@ -0,0 +1,52 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Runtime.InteropServices;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Internal.Tests
+{
+ public class CompletionQueueEventTest
+ {
+ [Test]
+ public void CreateAndDestroy()
+ {
+ Assert.AreEqual(CompletionQueueEvent.NativeSize, Marshal.SizeOf(typeof(CompletionQueueEvent)));
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs
new file mode 100644
index 0000000000..a2ee183272
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs
@@ -0,0 +1,64 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Internal.Tests
+{
+ public class CompletionQueueSafeHandleTest
+ {
+ [Test]
+ public void CreateAndDestroy()
+ {
+ var cq = CompletionQueueSafeHandle.Create();
+ cq.Dispose();
+ }
+
+ [Test]
+ public void CreateAndShutdown()
+ {
+ var cq = CompletionQueueSafeHandle.Create();
+ cq.Shutdown();
+ var ev = cq.Next();
+ cq.Dispose();
+ Assert.AreEqual(GRPCCompletionType.Shutdown, ev.type);
+ Assert.AreNotEqual(IntPtr.Zero, ev.success);
+ Assert.AreEqual(IntPtr.Zero, ev.tag);
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
index 26f87660df..8b3c910251 100644
--- a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
+++ b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
@@ -48,7 +48,7 @@ namespace Grpc.Core.Tests
int counter;
[DllImport("grpc_csharp_ext.dll")]
- static extern GRPCCallError grpcsharp_test_callback([MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
+ static extern GRPCCallError grpcsharp_test_callback([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_test_nop(IntPtr ptr);
@@ -88,7 +88,7 @@ namespace Grpc.Core.Tests
[Test]
public void NativeCallbackBenchmark()
{
- CompletionCallbackDelegate handler = Handler;
+ OpCompletionDelegate handler = Handler;
counter = 0;
BenchmarkUtil.RunBenchmark(
@@ -114,7 +114,7 @@ namespace Grpc.Core.Tests
10000, 10000,
() =>
{
- grpcsharp_test_callback(new CompletionCallbackDelegate(Handler));
+ grpcsharp_test_callback(new OpCompletionDelegate(Handler));
});
Assert.AreNotEqual(0, counter);
}
@@ -134,7 +134,7 @@ namespace Grpc.Core.Tests
});
}
- private void Handler(bool success, IntPtr ptr)
+ private void Handler(bool success)
{
counter++;
}
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs
index b47d810672..44b610f65b 100644
--- a/src/csharp/Grpc.Core/Channel.cs
+++ b/src/csharp/Grpc.Core/Channel.cs
@@ -45,9 +45,13 @@ namespace Grpc.Core
readonly string target;
/// <summary>
- /// Creates a channel.
+ /// Creates a channel that connects to a specific host.
+ /// Port will default to 80 for an unsecure channel and to 443 a secure channel.
/// </summary>
- public Channel(string target, Credentials credentials = null, ChannelArgs channelArgs = null)
+ /// <param name="host">The DNS name of IP address of the host.</param>
+ /// <param name="credentials">Optional credentials to create a secure channel.</param>
+ /// <param name="channelArgs">Optional channel arguments.</param>
+ public Channel(string host, Credentials credentials = null, ChannelArgs channelArgs = null)
{
using (ChannelArgsSafeHandle nativeChannelArgs = CreateNativeChannelArgs(channelArgs))
{
@@ -55,23 +59,27 @@ namespace Grpc.Core
{
using (CredentialsSafeHandle nativeCredentials = credentials.ToNativeCredentials())
{
- this.handle = ChannelSafeHandle.CreateSecure(nativeCredentials, target, nativeChannelArgs);
+ this.handle = ChannelSafeHandle.CreateSecure(nativeCredentials, host, nativeChannelArgs);
}
}
else
{
- this.handle = ChannelSafeHandle.Create(target, nativeChannelArgs);
+ this.handle = ChannelSafeHandle.Create(host, nativeChannelArgs);
}
}
- this.target = GetOverridenTarget(target, channelArgs);
+ this.target = GetOverridenTarget(host, channelArgs);
}
- public string Target
+ /// <summary>
+ /// Creates a channel that connects to a specific host and port.
+ /// </summary>
+ /// <param name="host">DNS name or IP address</param>
+ /// <param name="port">the port</param>
+ /// <param name="credentials">Optional credentials to create a secure channel.</param>
+ /// <param name="channelArgs">Optional channel arguments.</param>
+ public Channel(string host, int port, Credentials credentials = null, ChannelArgs channelArgs = null) :
+ this(string.Format("{0}:{1}", host, port), credentials, channelArgs)
{
- get
- {
- return this.target;
- }
}
public void Dispose()
@@ -80,6 +88,14 @@ namespace Grpc.Core
GC.SuppressFinalize(this);
}
+ internal string Target
+ {
+ get
+ {
+ return target;
+ }
+ }
+
internal ChannelSafeHandle Handle
{
get
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index fe2d446a35..5c7b9a8bb6 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -73,7 +73,6 @@
<Compile Include="Marshaller.cs" />
<Compile Include="ServerServiceDefinition.cs" />
<Compile Include="Utils\AsyncStreamExtensions.cs" />
- <Compile Include="Internal\BatchContextSafeHandleNotOwned.cs" />
<Compile Include="Utils\BenchmarkUtil.cs" />
<Compile Include="Utils\ExceptionHelper.cs" />
<Compile Include="Internal\CredentialsSafeHandle.cs" />
@@ -101,6 +100,9 @@
<Compile Include="Internal\AtomicCounter.cs" />
<Compile Include="Internal\DebugStats.cs" />
<Compile Include="ServerCallContext.cs" />
+ <Compile Include="Internal\CompletionQueueEvent.cs" />
+ <Compile Include="Internal\CompletionRegistry.cs" />
+ <Compile Include="Internal\BatchContextSafeHandle.cs" />
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index 2e9e5a2ef6..30ff289714 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -54,6 +54,7 @@ namespace Grpc.Core
static volatile GrpcEnvironment instance;
readonly GrpcThreadPool threadPool;
+ readonly CompletionRegistry completionRegistry;
bool isClosed;
/// <summary>
@@ -105,6 +106,19 @@ namespace Grpc.Core
}
}
+ internal static CompletionRegistry CompletionRegistry
+ {
+ get
+ {
+ var inst = instance;
+ if (inst == null)
+ {
+ throw new InvalidOperationException("GRPC environment not initialized");
+ }
+ return inst.completionRegistry;
+ }
+ }
+
/// <summary>
/// Creates gRPC environment.
/// </summary>
@@ -112,6 +126,7 @@ namespace Grpc.Core
{
GrpcLog.RedirectNativeLogs(Console.Error);
grpcsharp_init();
+ completionRegistry = new CompletionRegistry();
threadPool = new GrpcThreadPool(THREAD_POOL_SIZE);
threadPool.Start();
// TODO: use proper logging here
@@ -139,14 +154,24 @@ namespace Grpc.Core
{
var remainingClientCalls = DebugStats.ActiveClientCalls.Count;
if (remainingClientCalls != 0)
- {
- Console.WriteLine("Warning: Detected {0} client calls that weren't disposed properly.", remainingClientCalls);
+ {
+ DebugWarning(string.Format("Detected {0} client calls that weren't disposed properly.", remainingClientCalls));
}
var remainingServerCalls = DebugStats.ActiveServerCalls.Count;
if (remainingServerCalls != 0)
{
- Console.WriteLine("Warning: Detected {0} server calls that weren't disposed properly.", remainingServerCalls);
+ DebugWarning(string.Format("Detected {0} server calls that weren't disposed properly.", remainingServerCalls));
+ }
+ var pendingBatchCompletions = DebugStats.PendingBatchCompletions.Count;
+ if (pendingBatchCompletions != 0)
+ {
+ DebugWarning(string.Format("Detected {0} pending batch completions.", pendingBatchCompletions));
}
}
+
+ private static void DebugWarning(string message)
+ {
+ throw new Exception("Shutdown check: " + message);
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 9bb918d53d..d350f45da6 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -47,9 +47,6 @@ namespace Grpc.Core.Internal
/// </summary>
internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>
{
- readonly CompletionCallbackDelegate unaryResponseHandler;
- readonly CompletionCallbackDelegate finishedHandler;
-
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
@@ -60,8 +57,6 @@ namespace Grpc.Core.Internal
public AsyncCall(Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer)
{
- this.unaryResponseHandler = CreateBatchCompletionCallback(HandleUnaryResponse);
- this.finishedHandler = CreateBatchCompletionCallback(HandleFinished);
}
public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName)
@@ -96,7 +91,21 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
- call.BlockingUnary(cq, payload, unaryResponseHandler, metadataArray);
+ using (var ctx = BatchContextSafeHandle.Create())
+ {
+ call.StartUnary(payload, ctx, metadataArray);
+ var ev = cq.Pluck(ctx.Handle);
+
+ bool success = (ev.success != 0);
+ try
+ {
+ HandleUnaryResponse(success, ctx);
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Exception occured while invoking completion delegate: " + e);
+ }
+ }
}
try
@@ -129,7 +138,7 @@ namespace Grpc.Core.Internal
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
- call.StartUnary(payload, unaryResponseHandler, metadataArray);
+ call.StartUnary(payload, HandleUnaryResponse, metadataArray);
}
return unaryResponseTcs.Task;
}
@@ -151,7 +160,7 @@ namespace Grpc.Core.Internal
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
- call.StartClientStreaming(unaryResponseHandler, metadataArray);
+ call.StartClientStreaming(HandleUnaryResponse, metadataArray);
}
return unaryResponseTcs.Task;
@@ -175,7 +184,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
- call.StartServerStreaming(payload, finishedHandler, metadataArray);
+ call.StartServerStreaming(payload, HandleFinished, metadataArray);
}
}
}
@@ -194,7 +203,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
- call.StartDuplexStreaming(finishedHandler, metadataArray);
+ call.StartDuplexStreaming(HandleFinished, metadataArray);
}
}
}
@@ -229,7 +238,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
- call.StartSendCloseFromClient(halfclosedHandler);
+ call.StartSendCloseFromClient(HandleHalfclosed);
halfcloseRequested = true;
sendCompletionDelegate = completionDelegate;
@@ -274,7 +283,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handler for unary response completion.
/// </summary>
- private void HandleUnaryResponse(bool success, BatchContextSafeHandleNotOwned ctx)
+ private void HandleUnaryResponse(bool success, BatchContextSafeHandle ctx)
{
lock (myLock)
{
@@ -307,7 +316,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>
- private void HandleFinished(bool success, BatchContextSafeHandleNotOwned ctx)
+ private void HandleFinished(bool success, BatchContextSafeHandle ctx)
{
var status = ctx.GetReceivedStatus();
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index b4f4edb17a..64713c8c52 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -51,13 +51,8 @@ namespace Grpc.Core.Internal
readonly Func<TWrite, byte[]> serializer;
readonly Func<byte[], TRead> deserializer;
- protected readonly CompletionCallbackDelegate sendFinishedHandler;
- protected readonly CompletionCallbackDelegate readFinishedHandler;
- protected readonly CompletionCallbackDelegate halfclosedHandler;
-
protected readonly object myLock = new object();
- protected GCHandle gchandle;
protected CallSafeHandle call;
protected bool disposed;
@@ -77,10 +72,6 @@ namespace Grpc.Core.Internal
{
this.serializer = Preconditions.CheckNotNull(serializer);
this.deserializer = Preconditions.CheckNotNull(deserializer);
-
- this.sendFinishedHandler = CreateBatchCompletionCallback(HandleSendFinished);
- this.readFinishedHandler = CreateBatchCompletionCallback(HandleReadFinished);
- this.halfclosedHandler = CreateBatchCompletionCallback(HandleHalfclosed);
}
/// <summary>
@@ -121,9 +112,6 @@ namespace Grpc.Core.Internal
{
lock (myLock)
{
- // Make sure this object and the delegated held by it will not be garbage collected
- // before we release this handle.
- gchandle = GCHandle.Alloc(this);
this.call = call;
}
}
@@ -141,7 +129,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
- call.StartSendMessage(payload, sendFinishedHandler);
+ call.StartSendMessage(payload, HandleSendFinished);
sendCompletionDelegate = completionDelegate;
}
}
@@ -157,7 +145,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckReadingAllowed();
- call.StartReceiveMessage(readFinishedHandler);
+ call.StartReceiveMessage(HandleReadFinished);
readCompletionDelegate = completionDelegate;
}
}
@@ -197,7 +185,6 @@ namespace Grpc.Core.Internal
{
call.Dispose();
}
- gchandle.Free();
disposed = true;
}
@@ -282,29 +269,9 @@ namespace Grpc.Core.Internal
}
/// <summary>
- /// Creates completion callback delegate that wraps the batch completion handler in a try catch block to
- /// prevent propagating exceptions accross managed/unmanaged boundary.
- /// </summary>
- protected CompletionCallbackDelegate CreateBatchCompletionCallback(Action<bool, BatchContextSafeHandleNotOwned> handler)
- {
- return new CompletionCallbackDelegate((success, batchContextPtr) =>
- {
- try
- {
- var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
- handler(success, ctx);
- }
- catch (Exception e)
- {
- Console.WriteLine("Caught exception in a native handler: " + e);
- }
- });
- }
-
- /// <summary>
/// Handles send completion.
/// </summary>
- private void HandleSendFinished(bool success, BatchContextSafeHandleNotOwned ctx)
+ protected void HandleSendFinished(bool success, BatchContextSafeHandle ctx)
{
AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
@@ -328,7 +295,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles halfclose completion.
/// </summary>
- private void HandleHalfclosed(bool success, BatchContextSafeHandleNotOwned ctx)
+ protected void HandleHalfclosed(bool success, BatchContextSafeHandle ctx)
{
AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
@@ -353,7 +320,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles streaming read completion.
/// </summary>
- private void HandleReadFinished(bool success, BatchContextSafeHandleNotOwned ctx)
+ protected void HandleReadFinished(bool success, BatchContextSafeHandle ctx)
{
var payload = ctx.GetReceivedMessage();
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 1f0335e4e6..db1b86937f 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -47,12 +47,10 @@ namespace Grpc.Core.Internal
/// </summary>
internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>
{
- readonly CompletionCallbackDelegate finishedServersideHandler;
readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer) : base(serializer, deserializer)
{
- this.finishedServersideHandler = CreateBatchCompletionCallback(HandleFinishedServerside);
}
public void Initialize(CallSafeHandle call)
@@ -72,7 +70,7 @@ namespace Grpc.Core.Internal
started = true;
- call.StartServerSide(finishedServersideHandler);
+ call.StartServerSide(HandleFinishedServerside);
return finishedServersideTcs.Task;
}
}
@@ -107,7 +105,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
- call.StartSendStatusFromServer(status, halfclosedHandler);
+ call.StartSendStatusFromServer(status, HandleHalfclosed);
halfcloseRequested = true;
sendCompletionDelegate = completionDelegate;
}
@@ -121,7 +119,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles the server side close completion.
/// </summary>
- private void HandleFinishedServerside(bool success, BatchContextSafeHandleNotOwned ctx)
+ private void HandleFinishedServerside(bool success, BatchContextSafeHandle ctx)
{
bool cancelled = ctx.GetReceivedCloseOnServerCancelled();
diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
index b562abaa7a..861cbbe4c6 100644
--- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs
+++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
@@ -41,32 +41,50 @@ namespace Grpc.Core.Internal
/// Not owned version of
/// grpcsharp_batch_context
/// </summary>
- internal class BatchContextSafeHandleNotOwned : SafeHandleZeroIsInvalid
+ internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll")]
- static extern IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandleNotOwned ctx);
+ static extern BatchContextSafeHandle grpcsharp_batch_context_create();
[DllImport("grpc_csharp_ext.dll")]
- static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandleNotOwned ctx, byte[] buffer, UIntPtr bufferLen);
+ static extern IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandleNotOwned ctx);
+ static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandle ctx, byte[] buffer, UIntPtr bufferLen);
[DllImport("grpc_csharp_ext.dll")]
- static extern IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandleNotOwned ctx); // returns const char*
+ static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- static extern CallSafeHandle grpcsharp_batch_context_server_rpc_new_call(BatchContextSafeHandleNotOwned ctx);
+ static extern IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandle ctx); // returns const char*
[DllImport("grpc_csharp_ext.dll")]
- static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandleNotOwned ctx); // returns const char*
+ static extern CallSafeHandle grpcsharp_batch_context_server_rpc_new_call(BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- static extern int grpcsharp_batch_context_recv_close_on_server_cancelled(BatchContextSafeHandleNotOwned ctx);
+ static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandle ctx); // returns const char*
- public BatchContextSafeHandleNotOwned(IntPtr handle) : base(false)
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern int grpcsharp_batch_context_recv_close_on_server_cancelled(BatchContextSafeHandle ctx);
+
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern void grpcsharp_batch_context_destroy(IntPtr ctx);
+
+ private BatchContextSafeHandle()
+ {
+ }
+
+ public static BatchContextSafeHandle Create()
{
- SetHandle(handle);
+ return grpcsharp_batch_context_create();
+ }
+
+ public IntPtr Handle
+ {
+ get
+ {
+ return handle;
+ }
}
public Status GetReceivedStatus()
@@ -102,5 +120,11 @@ namespace Grpc.Core.Internal
{
return grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0;
}
+
+ protected override bool ReleaseHandle()
+ {
+ grpcsharp_batch_context_destroy(handle);
+ return true;
+ }
}
} \ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 491b8414ec..ef92b44402 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -37,8 +37,6 @@ using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
- internal delegate void CompletionCallbackDelegate(bool success, IntPtr batchContextPtr);
-
/// <summary>
/// grpc_call from <grpc/grpc.h>
/// </summary>
@@ -57,49 +55,40 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
- byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
-
- [DllImport("grpc_csharp_ext.dll")]
- static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
- byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
- MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
- byte[] send_buffer, UIntPtr send_buffer_len,
- MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
+ MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
- MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
- byte[] send_buffer, UIntPtr send_buffer_len);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
+ BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, StatusCode statusCode, string statusMessage);
+ static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
+ BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
+ BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
- [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
+ BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_call_destroy(IntPtr call);
@@ -113,64 +102,84 @@ namespace Grpc.Core.Internal
return grpcsharp_channel_create_call(channel, cq, method, host, deadline);
}
- public void StartUnary(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartUnary(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
- AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
+ .CheckOk();
}
- public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartUnary(byte[] payload, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray)
{
- grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray);
+ grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
+ .CheckOk();
}
- public void StartClientStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
- AssertCallOk(grpcsharp_call_start_client_streaming(this, callback, metadataArray));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
}
- public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartServerStreaming(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
- AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray).CheckOk();
}
- public void StartDuplexStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
- AssertCallOk(grpcsharp_call_start_duplex_streaming(this, callback, metadataArray));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
}
- public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback)
+ public void StartSendMessage(byte[] payload, BatchCompletionDelegate callback)
{
- AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong)payload.Length)));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length)).CheckOk();
}
- public void StartSendCloseFromClient(CompletionCallbackDelegate callback)
+ public void StartSendCloseFromClient(BatchCompletionDelegate callback)
{
- AssertCallOk(grpcsharp_call_send_close_from_client(this, callback));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
- public void StartSendStatusFromServer(Status status, CompletionCallbackDelegate callback)
+ public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback)
{
- AssertCallOk(grpcsharp_call_send_status_from_server(this, callback, status.StatusCode, status.Detail));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail).CheckOk();
}
- public void StartReceiveMessage(CompletionCallbackDelegate callback)
+ public void StartReceiveMessage(BatchCompletionDelegate callback)
{
- AssertCallOk(grpcsharp_call_recv_message(this, callback));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_recv_message(this, ctx).CheckOk();
}
- public void StartServerSide(CompletionCallbackDelegate callback)
+ public void StartServerSide(BatchCompletionDelegate callback)
{
- AssertCallOk(grpcsharp_call_start_serverside(this, callback));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
public void Cancel()
{
- AssertCallOk(grpcsharp_call_cancel(this));
+ grpcsharp_call_cancel(this).CheckOk();
}
public void CancelWithStatus(Status status)
{
- AssertCallOk(grpcsharp_call_cancel_with_status(this, status.StatusCode, status.Detail));
+ grpcsharp_call_cancel_with_status(this, status.StatusCode, status.Detail).CheckOk();
}
protected override bool ReleaseHandle()
@@ -179,11 +188,6 @@ namespace Grpc.Core.Internal
return true;
}
- private static void AssertCallOk(GRPCCallError callError)
- {
- Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
- }
-
private static uint GetFlags(bool buffered)
{
return buffered ? 0 : GRPC_WRITE_BUFFER_HINT;
diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs
new file mode 100644
index 0000000000..3f517514a3
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs
@@ -0,0 +1,60 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Runtime.InteropServices;
+
+namespace Grpc.Core.Internal
+{
+ /// <summary>
+ /// grpc_event from grpc/grpc.h
+ /// </summary>
+ [StructLayout(LayoutKind.Sequential)]
+ internal struct CompletionQueueEvent
+ {
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern int grpcsharp_sizeof_grpc_event();
+
+ public GRPCCompletionType type;
+ public int success;
+ public IntPtr tag;
+
+ internal static int NativeSize
+ {
+ get
+ {
+ return grpcsharp_sizeof_grpc_event();
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
index 600d1fc87c..f64f3d4175 100644
--- a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
@@ -46,7 +46,10 @@ namespace Grpc.Core.Internal
static extern void grpcsharp_completion_queue_shutdown(CompletionQueueSafeHandle cq);
[DllImport("grpc_csharp_ext.dll")]
- static extern GRPCCompletionType grpcsharp_completion_queue_next_with_callback(CompletionQueueSafeHandle cq);
+ static extern CompletionQueueEvent grpcsharp_completion_queue_next(CompletionQueueSafeHandle cq);
+
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern CompletionQueueEvent grpcsharp_completion_queue_pluck(CompletionQueueSafeHandle cq, IntPtr tag);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_completion_queue_destroy(IntPtr cq);
@@ -60,9 +63,14 @@ namespace Grpc.Core.Internal
return grpcsharp_completion_queue_create();
}
- public GRPCCompletionType NextWithCallback()
+ public CompletionQueueEvent Next()
+ {
+ return grpcsharp_completion_queue_next(this);
+ }
+
+ public CompletionQueueEvent Pluck(IntPtr tag)
{
- return grpcsharp_completion_queue_next_with_callback(this);
+ return grpcsharp_completion_queue_pluck(this, tag);
}
public void Shutdown()
diff --git a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
new file mode 100644
index 0000000000..80f006ae50
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
@@ -0,0 +1,89 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Runtime.InteropServices;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core.Internal
+{
+ internal delegate void OpCompletionDelegate(bool success);
+
+ internal delegate void BatchCompletionDelegate(bool success, BatchContextSafeHandle ctx);
+
+ internal class CompletionRegistry
+ {
+ readonly ConcurrentDictionary<IntPtr, OpCompletionDelegate> dict = new ConcurrentDictionary<IntPtr, OpCompletionDelegate>();
+
+ public void Register(IntPtr key, OpCompletionDelegate callback)
+ {
+ DebugStats.PendingBatchCompletions.Increment();
+ Preconditions.CheckState(dict.TryAdd(key, callback));
+ }
+
+ public void RegisterBatchCompletion(BatchContextSafeHandle ctx, BatchCompletionDelegate callback)
+ {
+ OpCompletionDelegate opCallback = ((success) => HandleBatchCompletion(success, ctx, callback));
+ Register(ctx.Handle, opCallback);
+ }
+
+ public OpCompletionDelegate Extract(IntPtr key)
+ {
+ OpCompletionDelegate value;
+ Preconditions.CheckState(dict.TryRemove(key, out value));
+ DebugStats.PendingBatchCompletions.Decrement();
+ return value;
+ }
+
+ private static void HandleBatchCompletion(bool success, BatchContextSafeHandle ctx, BatchCompletionDelegate callback)
+ {
+ try
+ {
+ callback(success, ctx);
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Exception occured while invoking completion delegate: " + e);
+ }
+ finally
+ {
+ if (ctx != null)
+ {
+ ctx.Dispose();
+ }
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Internal/DebugStats.cs b/src/csharp/Grpc.Core/Internal/DebugStats.cs
index 476914f751..ef9d9afe11 100644
--- a/src/csharp/Grpc.Core/Internal/DebugStats.cs
+++ b/src/csharp/Grpc.Core/Internal/DebugStats.cs
@@ -41,5 +41,7 @@ namespace Grpc.Core.Internal
public static readonly AtomicCounter ActiveClientCalls = new AtomicCounter();
public static readonly AtomicCounter ActiveServerCalls = new AtomicCounter();
+
+ public static readonly AtomicCounter PendingBatchCompletions = new AtomicCounter();
}
}
diff --git a/src/csharp/Grpc.Core/Internal/Enums.cs b/src/csharp/Grpc.Core/Internal/Enums.cs
index 2b4f6cae0c..af11b5b9f3 100644
--- a/src/csharp/Grpc.Core/Internal/Enums.cs
+++ b/src/csharp/Grpc.Core/Internal/Enums.cs
@@ -33,35 +33,47 @@
using System;
using System.Runtime.InteropServices;
+using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
- /// from grpc/grpc.h
+ /// grpc_call_error from grpc/grpc.h
/// </summary>
internal enum GRPCCallError
{
/* everything went ok */
- GRPC_CALL_OK = 0,
+ OK = 0,
/* something failed, we don't know what */
- GRPC_CALL_ERROR,
+ Error,
/* this method is not available on the server */
- GRPC_CALL_ERROR_NOT_ON_SERVER,
+ NotOnServer,
/* this method is not available on the client */
- GRPC_CALL_ERROR_NOT_ON_CLIENT,
+ NotOnClient,
/* this method must be called before server_accept */
- GRPC_CALL_ERROR_ALREADY_ACCEPTED,
+ AlreadyAccepted,
/* this method must be called before invoke */
- GRPC_CALL_ERROR_ALREADY_INVOKED,
+ AlreadyInvoked,
/* this method must be called after invoke */
- GRPC_CALL_ERROR_NOT_INVOKED,
+ NotInvoked,
/* this call is already finished
(writes_done or write_status has already been called) */
- GRPC_CALL_ERROR_ALREADY_FINISHED,
+ AlreadyFinished,
/* there is already an outstanding read/write operation on the call */
- GRPC_CALL_ERROR_TOO_MANY_OPERATIONS,
+ TooManyOperations,
/* the flags value was illegal for this call */
- GRPC_CALL_ERROR_INVALID_FLAGS
+ InvalidFlags
+ }
+
+ internal static class CallErrorExtensions
+ {
+ /// <summary>
+ /// Checks the call API invocation's result is OK.
+ /// </summary>
+ public static void CheckOk(this GRPCCallError callError)
+ {
+ Preconditions.CheckState(callError == GRPCCallError.OK, "Call error: " + callError);
+ }
}
/// <summary>
@@ -70,12 +82,12 @@ namespace Grpc.Core.Internal
internal enum GRPCCompletionType
{
/* Shutting down */
- GRPC_QUEUE_SHUTDOWN,
+ Shutdown,
/* No event before timeout */
- GRPC_QUEUE_TIMEOUT,
+ Timeout,
/* operation completion */
- GRPC_OP_COMPLETE
+ OpComplete
}
}
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index f4224668f1..89b44a4e2b 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -112,12 +112,26 @@ namespace Grpc.Core.Internal
/// </summary>
private void RunHandlerLoop()
{
- GRPCCompletionType completionType;
+ CompletionQueueEvent ev;
do
{
- completionType = cq.NextWithCallback();
+ ev = cq.Next();
+ if (ev.type == GRPCCompletionType.OpComplete)
+ {
+ bool success = (ev.success != 0);
+ IntPtr tag = ev.tag;
+ try
+ {
+ var callback = GrpcEnvironment.CompletionRegistry.Extract(tag);
+ callback(success);
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Exception occured while invoking completion delegate: " + e);
+ }
+ }
}
- while (completionType != GRPCCompletionType.GRPC_QUEUE_SHUTDOWN);
+ while (ev.type != GRPCCompletionType.Shutdown);
Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting.");
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
index c44ee87bad..eda9afcb87 100644
--- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
@@ -45,9 +45,6 @@ namespace Grpc.Core.Internal
internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll")]
- static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
-
- [DllImport("grpc_csharp_ext.dll")]
static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args);
[DllImport("grpc_csharp_ext.dll")]
@@ -60,12 +57,15 @@ namespace Grpc.Core.Internal
static extern void grpcsharp_server_start(ServerSafeHandle server);
[DllImport("grpc_csharp_ext.dll")]
- static extern void grpcsharp_server_shutdown_and_notify_callback(ServerSafeHandle server, CompletionQueueSafeHandle cq, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
+ static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_cancel_all_calls(ServerSafeHandle server);
[DllImport("grpc_csharp_ext.dll")]
+ static extern void grpcsharp_server_shutdown_and_notify_callback(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
+
+ [DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_destroy(IntPtr server);
private ServerSafeHandle()
@@ -91,15 +91,19 @@ namespace Grpc.Core.Internal
{
grpcsharp_server_start(this);
}
-
- public void ShutdownAndNotify(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback)
+
+ public void ShutdownAndNotify(CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
{
- grpcsharp_server_shutdown_and_notify_callback(this, cq, callback);
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_server_shutdown_and_notify_callback(this, cq, ctx);
}
- public void RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback)
+ public void RequestCall(CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
{
- AssertCallOk(grpcsharp_server_request_call(this, cq, callback));
+ var ctx = BatchContextSafeHandle.Create();
+ GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_server_request_call(this, cq, ctx).CheckOk();
}
protected override bool ReleaseHandle()
@@ -113,10 +117,5 @@ namespace Grpc.Core.Internal
{
grpcsharp_server_cancel_all_calls(this);
}
-
- private static void AssertCallOk(GRPCCallError callError)
- {
- Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
- }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/Timespec.cs b/src/csharp/Grpc.Core/Internal/Timespec.cs
index 94d48c2c49..775af27db9 100644
--- a/src/csharp/Grpc.Core/Internal/Timespec.cs
+++ b/src/csharp/Grpc.Core/Internal/Timespec.cs
@@ -51,7 +51,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern int gprsharp_sizeof_timespec();
- // TODO: revisit this.
+
// NOTE: on linux 64bit sizeof(gpr_timespec) = 16, on windows 32bit sizeof(gpr_timespec) = 8
// so IntPtr seems to have the right size to work on both.
public System.IntPtr tv_sec;
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index 0f4d77eaf3..3352fc93a1 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -52,11 +52,6 @@ namespace Grpc.Core
/// </summary>
public const int PickUnusedPort = 0;
- // TODO(jtattermusch) : make sure the delegate doesn't get garbage collected while
- // native callbacks are in the completion queue.
- readonly CompletionCallbackDelegate serverShutdownHandler;
- readonly CompletionCallbackDelegate newServerRpcHandler;
-
readonly ServerSafeHandle handle;
readonly object myLock = new object();
@@ -69,8 +64,6 @@ namespace Grpc.Core
public Server()
{
this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero);
- this.newServerRpcHandler = HandleNewServerRpc;
- this.serverShutdownHandler = HandleServerShutdown;
}
/// <summary>
@@ -108,7 +101,7 @@ namespace Grpc.Core
/// </summary>
/// <returns>The port on which server will be listening.</returns>
/// <param name="host">the host</param>
- /// <param name="port">the port. If zero, , an unused port is chosen automatically.</param>
+ /// <param name="port">the port. If zero, an unused port is chosen automatically.</param>
public int AddListeningPort(string host, int port, ServerCredentials credentials)
{
Preconditions.CheckNotNull(credentials);
@@ -144,7 +137,7 @@ namespace Grpc.Core
shutdownRequested = true;
}
- handle.ShutdownAndNotify(GetCompletionQueue(), serverShutdownHandler);
+ handle.ShutdownAndNotify(GetCompletionQueue(), HandleServerShutdown);
await shutdownTcs.Task;
handle.Dispose();
}
@@ -173,7 +166,7 @@ namespace Grpc.Core
shutdownRequested = true;
}
- handle.ShutdownAndNotify(GetCompletionQueue(), serverShutdownHandler);
+ handle.ShutdownAndNotify(GetCompletionQueue(), HandleServerShutdown);
handle.CancelAllCalls();
await shutdownTcs.Task;
handle.Dispose();
@@ -208,7 +201,7 @@ namespace Grpc.Core
{
if (!shutdownRequested)
{
- handle.RequestCall(GetCompletionQueue(), newServerRpcHandler);
+ handle.RequestCall(GetCompletionQueue(), HandleNewServerRpc);
}
}
}
@@ -236,44 +229,28 @@ namespace Grpc.Core
/// <summary>
/// Handles the native callback.
/// </summary>
- private void HandleNewServerRpc(bool success, IntPtr batchContextPtr)
+ private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx)
{
- try
- {
- var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
-
- // TODO: handle error
+ // TODO: handle error
- CallSafeHandle call = ctx.GetServerRpcNewCall();
- string method = ctx.GetServerRpcNewMethod();
+ CallSafeHandle call = ctx.GetServerRpcNewCall();
+ string method = ctx.GetServerRpcNewMethod();
- // after server shutdown, the callback returns with null call
- if (!call.IsInvalid)
- {
- Task.Run(async () => await InvokeCallHandler(call, method));
- }
-
- AllowOneRpc();
- }
- catch (Exception e)
+ // after server shutdown, the callback returns with null call
+ if (!call.IsInvalid)
{
- Console.WriteLine("Caught exception in a native handler: " + e);
+ Task.Run(async () => await InvokeCallHandler(call, method));
}
+
+ AllowOneRpc();
}
/// <summary>
/// Handles native callback.
/// </summary>
- private void HandleServerShutdown(bool success, IntPtr batchContextPtr)
+ private void HandleServerShutdown(bool success, BatchContextSafeHandle ctx)
{
- try
- {
- shutdownTcs.SetResult(null);
- }
- catch (Exception e)
- {
- Console.WriteLine("Caught exception in a native handler: " + e);
- }
+ shutdownTcs.SetResult(null);
}
private static CompletionQueueSafeHandle GetCompletionQueue()
diff --git a/src/csharp/Grpc.Examples.MathClient/MathClient.cs b/src/csharp/Grpc.Examples.MathClient/MathClient.cs
index 85d9cdc7a6..360fe928dd 100644
--- a/src/csharp/Grpc.Examples.MathClient/MathClient.cs
+++ b/src/csharp/Grpc.Examples.MathClient/MathClient.cs
@@ -41,7 +41,7 @@ namespace math
{
GrpcEnvironment.Initialize();
- using (Channel channel = new Channel("127.0.0.1:23456"))
+ using (Channel channel = new Channel("127.0.0.1", 23456))
{
Math.IMathClient stub = new Math.MathClient(channel);
MathExamples.DivExample(stub);
diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
index 5aa6f4162d..aadd49f795 100644
--- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
+++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
@@ -60,7 +60,7 @@ namespace math.Tests
server.AddServiceDefinition(Math.BindService(new MathServiceImpl()));
int port = server.AddListeningPort(host, Server.PickUnusedPort);
server.Start();
- channel = new Channel(host + ":" + port);
+ channel = new Channel(host, port);
// TODO(jtattermusch): get rid of the custom header here once we have dedicated tests
// for header support.
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index dfaf18cae1..66171fae57 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -104,8 +104,6 @@ namespace Grpc.IntegrationTesting
{
GrpcEnvironment.Initialize();
- string addr = string.Format("{0}:{1}", options.serverHost, options.serverPort);
-
Credentials credentials = null;
if (options.useTls)
{
@@ -119,7 +117,7 @@ namespace Grpc.IntegrationTesting
.AddString(ChannelArgs.SslTargetNameOverrideKey, options.serverHostOverride).Build();
}
- using (Channel channel = new Channel(addr, credentials, channelArgs))
+ using (Channel channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelArgs))
{
var stubConfig = StubConfiguration.Default;
if (options.testCase == "service_account_creds" || options.testCase == "compute_engine_creds")
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
index ddbfc61a4e..f756dfbc40 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
@@ -65,7 +65,7 @@ namespace Grpc.IntegrationTesting
var channelArgs = ChannelArgs.CreateBuilder()
.AddString(ChannelArgs.SslTargetNameOverrideKey, TestCredentials.DefaultHostOverride).Build();
- channel = new Channel(host + ":" + port, TestCredentials.CreateTestClientCredentials(true), channelArgs);
+ channel = new Channel(host, port, TestCredentials.CreateTestClientCredentials(true), channelArgs);
client = TestService.NewStub(channel);
}
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index dc1bbe36f0..c2a0b729d4 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -60,7 +60,7 @@
grpc_byte_buffer *string_to_byte_buffer(const char *buffer, size_t len) {
gpr_slice slice = gpr_slice_from_copied_buffer(buffer, len);
- grpc_byte_buffer *bb = grpc_byte_buffer_create(&slice, 1);
+ grpc_byte_buffer *bb = grpc_raw_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
return bb;
}
@@ -91,13 +91,9 @@ typedef struct gprcsharp_batch_context {
grpc_call_details call_details;
grpc_metadata_array request_metadata;
} server_rpc_new;
-
- /* callback will be called upon completion */
- callback_funcptr callback;
-
} grpcsharp_batch_context;
-grpcsharp_batch_context *grpcsharp_batch_context_create() {
+GPR_EXPORT grpcsharp_batch_context *GPR_CALLTYPE grpcsharp_batch_context_create() {
grpcsharp_batch_context *ctx = gpr_malloc(sizeof(grpcsharp_batch_context));
memset(ctx, 0, sizeof(grpcsharp_batch_context));
return ctx;
@@ -192,7 +188,7 @@ void grpcsharp_metadata_array_move(grpc_metadata_array *dest,
src->metadata = NULL;
}
-void grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) {
+GPR_EXPORT void GPR_CALLTYPE grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) {
if (!ctx) {
return;
}
@@ -306,25 +302,14 @@ grpcsharp_completion_queue_destroy(grpc_completion_queue *cq) {
grpc_completion_queue_destroy(cq);
}
-GPR_EXPORT grpc_completion_type GPR_CALLTYPE
-grpcsharp_completion_queue_next_with_callback(grpc_completion_queue *cq) {
- grpc_event ev;
- grpcsharp_batch_context *batch_context;
- grpc_completion_type t;
-
- ev = grpc_completion_queue_next(cq, gpr_inf_future);
- t = ev.type;
- if (t == GRPC_OP_COMPLETE && ev.tag) {
- /* NEW API handler */
- batch_context = (grpcsharp_batch_context *)ev.tag;
- batch_context->callback((gpr_int32) ev.success, batch_context);
- grpcsharp_batch_context_destroy(batch_context);
- }
+GPR_EXPORT grpc_event GPR_CALLTYPE
+grpcsharp_completion_queue_next(grpc_completion_queue *cq) {
+ return grpc_completion_queue_next(cq, gpr_inf_future);
+}
- /* return completion type to allow some handling for events that have no
- * tag - such as GRPC_QUEUE_SHUTDOWN
- */
- return t;
+GPR_EXPORT grpc_event GPR_CALLTYPE
+grpcsharp_completion_queue_pluck(grpc_completion_queue *cq, void *tag) {
+ return grpc_completion_queue_pluck(cq, tag, gpr_inf_future);
}
/* Channel */
@@ -413,14 +398,11 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) {
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
-grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback,
+grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
const char *send_buffer, size_t send_buffer_len,
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[6];
- grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
- ctx->callback = callback;
-
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
@@ -454,34 +436,12 @@ grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback,
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
-/* Synchronous unary call */
-GPR_EXPORT void GPR_CALLTYPE
-grpcsharp_call_blocking_unary(grpc_call *call,
- grpc_completion_queue *dedicated_cq,
- callback_funcptr callback,
- const char *send_buffer, size_t send_buffer_len,
- grpc_metadata_array *initial_metadata) {
- GPR_ASSERT(grpcsharp_call_start_unary(call, callback, send_buffer,
- send_buffer_len,
- initial_metadata) == GRPC_CALL_OK);
-
- /* TODO: we would like to use pluck, but we don't know the tag */
- GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) ==
- GRPC_OP_COMPLETE);
- grpc_completion_queue_shutdown(dedicated_cq);
- GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) ==
- GRPC_QUEUE_SHUTDOWN);
-}
-
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_client_streaming(grpc_call *call,
- callback_funcptr callback,
+ grpcsharp_batch_context *ctx,
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[4];
- grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
- ctx->callback = callback;
-
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
@@ -510,13 +470,10 @@ grpcsharp_call_start_client_streaming(grpc_call *call,
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
- grpc_call *call, callback_funcptr callback, const char *send_buffer,
+ grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer,
size_t send_buffer_len, grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[5];
- grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
- ctx->callback = callback;
-
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
@@ -549,13 +506,10 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_duplex_streaming(grpc_call *call,
- callback_funcptr callback,
+ grpcsharp_batch_context *ctx,
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[3];
- grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
- ctx->callback = callback;
-
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
@@ -581,13 +535,10 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
-grpcsharp_call_send_message(grpc_call *call, callback_funcptr callback,
+grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx,
const char *send_buffer, size_t send_buffer_len) {
/* TODO: don't use magic number */
grpc_op ops[1];
- grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
- ctx->callback = callback;
-
ops[0].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[0].data.send_message = ctx->send_message;
@@ -597,12 +548,9 @@ grpcsharp_call_send_message(grpc_call *call, callback_funcptr callback,
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_close_from_client(grpc_call *call,
- callback_funcptr callback) {
+ grpcsharp_batch_context *ctx) {
/* TODO: don't use magic number */
grpc_op ops[1];
- grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
- ctx->callback = callback;
-
ops[0].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
@@ -610,14 +558,11 @@ grpcsharp_call_send_close_from_client(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_status_from_server(grpc_call *call,
- callback_funcptr callback,
+ grpcsharp_batch_context *ctx,
grpc_status_code status_code,
const char *status_details) {
/* TODO: don't use magic number */
grpc_op ops[1];
- grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
- ctx->callback = callback;
-
ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
ops[0].data.send_status_from_server.status = status_code;
ops[0].data.send_status_from_server.status_details =
@@ -629,25 +574,18 @@ grpcsharp_call_send_status_from_server(grpc_call *call,
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
-grpcsharp_call_recv_message(grpc_call *call, callback_funcptr callback) {
+grpcsharp_call_recv_message(grpc_call *call, grpcsharp_batch_context *ctx) {
/* TODO: don't use magic number */
grpc_op ops[1];
- grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
- ctx->callback = callback;
-
ops[0].op = GRPC_OP_RECV_MESSAGE;
ops[0].data.recv_message = &(ctx->recv_message);
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
-grpcsharp_call_start_serverside(grpc_call *call, callback_funcptr callback) {
+grpcsharp_call_start_serverside(grpc_call *call, grpcsharp_batch_context *ctx) {
/* TODO: don't use magic number */
grpc_op ops[2];
-
- grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
- ctx->callback = callback;
-
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[0].data.send_initial_metadata.count = 0;
ops[0].data.send_initial_metadata.metadata = NULL;
@@ -681,9 +619,7 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_start(grpc_server *server) {
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_server_shutdown_and_notify_callback(grpc_server *server,
grpc_completion_queue *cq,
- callback_funcptr callback) {
- grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
- ctx->callback = callback;
+ grpcsharp_batch_context *ctx) {
grpc_server_shutdown_and_notify(server, cq, ctx);
}
@@ -697,10 +633,7 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_destroy(grpc_server *server) {
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_server_request_call(grpc_server *server, grpc_completion_queue *cq,
- callback_funcptr callback) {
- grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
- ctx->callback = callback;
-
+ grpcsharp_batch_context *ctx) {
return grpc_server_request_call(
server, &(ctx->server_rpc_new.call), &(ctx->server_rpc_new.call_details),
&(ctx->server_rpc_new.request_metadata), cq, cq, ctx);
@@ -797,3 +730,8 @@ grpcsharp_test_callback(callback_funcptr callback) {
/* For testing */
GPR_EXPORT void *GPR_CALLTYPE grpcsharp_test_nop(void *ptr) { return ptr; }
+
+/* For testing */
+GPR_EXPORT gpr_int32 GPR_CALLTYPE grpcsharp_sizeof_grpc_event(void) {
+ return sizeof(grpc_event);
+}
diff --git a/src/node/ext/byte_buffer.cc b/src/node/ext/byte_buffer.cc
index 2c84099069..7eff11c2b3 100644
--- a/src/node/ext/byte_buffer.cc
+++ b/src/node/ext/byte_buffer.cc
@@ -57,7 +57,7 @@ grpc_byte_buffer *BufferToByteBuffer(Handle<Value> buffer) {
char *data = ::node::Buffer::Data(buffer);
gpr_slice slice = gpr_slice_malloc(length);
memcpy(GPR_SLICE_START_PTR(slice), data, length);
- grpc_byte_buffer *byte_buffer(grpc_byte_buffer_create(&slice, 1));
+ grpc_byte_buffer *byte_buffer(grpc_raw_byte_buffer_create(&slice, 1));
gpr_slice_unref(slice);
return byte_buffer;
}
diff --git a/src/node/package.json b/src/node/package.json
index 3ea2c065e7..7d4a493af4 100644
--- a/src/node/package.json
+++ b/src/node/package.json
@@ -1,6 +1,6 @@
{
"name": "grpc",
- "version": "0.9.0",
+ "version": "0.9.1",
"author": "Google Inc.",
"description": "gRPC Library for Node",
"homepage": "http://www.grpc.io/",
diff --git a/src/node/src/common.js b/src/node/src/common.js
index 7b543353eb..feaa859a4f 100644
--- a/src/node/src/common.js
+++ b/src/node/src/common.js
@@ -47,7 +47,9 @@ function deserializeCls(cls) {
* @return {cls} The resulting object
*/
return function deserialize(arg_buf) {
- return cls.decode(arg_buf).toRaw();
+ // Convert to a native object with binary fields as Buffers (first argument)
+ // and longs as strings (second argument)
+ return cls.decode(arg_buf).toRaw(false, true);
};
}
diff --git a/src/node/test/common_test.js b/src/node/test/common_test.js
new file mode 100644
index 0000000000..08ba429ed7
--- /dev/null
+++ b/src/node/test/common_test.js
@@ -0,0 +1,90 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+'use strict';
+
+var assert = require('assert');
+
+var common = require('../src/common.js');
+
+var ProtoBuf = require('protobufjs');
+
+var messages_proto = ProtoBuf.loadProtoFile(
+ __dirname + '/test_messages.proto').build();
+
+describe('Proto message serialize and deserialize', function() {
+ var longSerialize = common.serializeCls(messages_proto.LongValues);
+ var longDeserialize = common.deserializeCls(messages_proto.LongValues);
+ var pos_value = '314159265358979';
+ var neg_value = '-27182818284590';
+ it('should preserve positive int64 values', function() {
+ var serialized = longSerialize({int_64: pos_value});
+ assert.strictEqual(longDeserialize(serialized).int_64.toString(),
+ pos_value);
+ });
+ it('should preserve negative int64 values', function() {
+ var serialized = longSerialize({int_64: neg_value});
+ assert.strictEqual(longDeserialize(serialized).int_64.toString(),
+ neg_value);
+ });
+ it('should preserve uint64 values', function() {
+ var serialized = longSerialize({uint_64: pos_value});
+ assert.strictEqual(longDeserialize(serialized).uint_64.toString(),
+ pos_value);
+ });
+ it('should preserve positive sint64 values', function() {
+ var serialized = longSerialize({sint_64: pos_value});
+ assert.strictEqual(longDeserialize(serialized).sint_64.toString(),
+ pos_value);
+ });
+ it('should preserve negative sint64 values', function() {
+ var serialized = longSerialize({sint_64: neg_value});
+ assert.strictEqual(longDeserialize(serialized).sint_64.toString(),
+ neg_value);
+ });
+ it('should preserve fixed64 values', function() {
+ var serialized = longSerialize({fixed_64: pos_value});
+ assert.strictEqual(longDeserialize(serialized).fixed_64.toString(),
+ pos_value);
+ });
+ it('should preserve positive sfixed64 values', function() {
+ var serialized = longSerialize({sfixed_64: pos_value});
+ assert.strictEqual(longDeserialize(serialized).sfixed_64.toString(),
+ pos_value);
+ });
+ it('should preserve negative sfixed64 values', function() {
+ var serialized = longSerialize({sfixed_64: neg_value});
+ assert.strictEqual(longDeserialize(serialized).sfixed_64.toString(),
+ neg_value);
+ });
+});
diff --git a/src/node/test/test_messages.proto b/src/node/test/test_messages.proto
new file mode 100644
index 0000000000..685e9482bd
--- /dev/null
+++ b/src/node/test/test_messages.proto
@@ -0,0 +1,38 @@
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+syntax = "proto3";
+
+message LongValues {
+ int64 int_64 = 1;
+ uint64 uint_64 = 2;
+ sint64 sint_64 = 3;
+ fixed64 fixed_64 = 4;
+ sfixed64 sfixed_64 = 5;
+} \ No newline at end of file
diff --git a/src/objective-c/GRPCClient/private/NSData+GRPC.m b/src/objective-c/GRPCClient/private/NSData+GRPC.m
index 3a7f76887b..e6a6c3c605 100644
--- a/src/objective-c/GRPCClient/private/NSData+GRPC.m
+++ b/src/objective-c/GRPCClient/private/NSData+GRPC.m
@@ -55,7 +55,7 @@ static void CopyByteBufferToCharArray(grpc_byte_buffer *buffer, char *array) {
static grpc_byte_buffer *CopyCharArrayToNewByteBuffer(const char *array,
size_t length) {
gpr_slice slice = gpr_slice_from_copied_buffer(array, length);
- grpc_byte_buffer *buffer = grpc_byte_buffer_create(&slice, 1);
+ grpc_byte_buffer *buffer = grpc_raw_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
return buffer;
}
@@ -85,7 +85,7 @@ static grpc_byte_buffer *CopyCharArrayToNewByteBuffer(const char *array,
// The following implementation is thus not optimal, sometimes requiring two
// copies (one by self.bytes and another by gpr_slice_from_copied_buffer).
// If it turns out to be an issue, we can use enumerateByteRangesUsingblock:
- // to create an array of gpr_slice objects to pass to grpc_byte_buffer_create.
+ // to create an array of gpr_slice objects to pass to grpc_raw_byte_buffer_create.
// That would make it do exactly one copy, always.
return CopyCharArrayToNewByteBuffer((const char *)self.bytes, (size_t)self.length);
}
diff --git a/src/php/ext/grpc/byte_buffer.c b/src/php/ext/grpc/byte_buffer.c
index bb9d3f5337..8be0a20607 100644
--- a/src/php/ext/grpc/byte_buffer.c
+++ b/src/php/ext/grpc/byte_buffer.c
@@ -51,7 +51,7 @@
grpc_byte_buffer *string_to_byte_buffer(char *string, size_t length) {
gpr_slice slice = gpr_slice_from_copied_buffer(string, length);
- grpc_byte_buffer *buffer = grpc_byte_buffer_create(&slice, 1);
+ grpc_byte_buffer *buffer = grpc_raw_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
return buffer;
}
diff --git a/src/python/requirements.txt b/src/python/requirements.txt
index d32d436d3c..43395df03b 100644
--- a/src/python/requirements.txt
+++ b/src/python/requirements.txt
@@ -1,3 +1,3 @@
enum34==1.0.4
futures==2.2.0
-protobuf==3.0.0a2
+protobuf==3.0.0a3
diff --git a/src/python/src/grpc/_adapter/_c/utility.c b/src/python/src/grpc/_adapter/_c/utility.c
index ced34a6816..6722b53f84 100644
--- a/src/python/src/grpc/_adapter/_c/utility.c
+++ b/src/python/src/grpc/_adapter/_c/utility.c
@@ -179,7 +179,7 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) {
PyString_AsStringAndSize(
PyTuple_GET_ITEM(op, MESSAGE_INDEX), &message, &message_size);
message_slice = gpr_slice_from_copied_buffer(message, message_size);
- c_op.data.send_message = grpc_byte_buffer_create(&message_slice, 1);
+ c_op.data.send_message = grpc_raw_byte_buffer_create(&message_slice, 1);
gpr_slice_unref(message_slice);
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
diff --git a/src/python/src/setup.py b/src/python/src/setup.py
index dc655a70f9..5398b09936 100644
--- a/src/python/src/setup.py
+++ b/src/python/src/setup.py
@@ -86,13 +86,13 @@ _PACKAGE_DIRECTORIES = {
setuptools.setup(
name='grpcio',
- version='0.9.0a0',
+ version='0.9.0a1',
ext_modules=[_EXTENSION_MODULE],
packages=list(_PACKAGES),
package_dir=_PACKAGE_DIRECTORIES,
install_requires=[
'enum34==1.0.4',
'futures==2.2.0',
- 'protobuf==3.0.0a2'
+ 'protobuf==3.0.0a3'
]
)
diff --git a/src/ruby/ext/grpc/rb_byte_buffer.c b/src/ruby/ext/grpc/rb_byte_buffer.c
index 1cc22f4aff..0aa34c844e 100644
--- a/src/ruby/ext/grpc/rb_byte_buffer.c
+++ b/src/ruby/ext/grpc/rb_byte_buffer.c
@@ -42,7 +42,7 @@
grpc_byte_buffer* grpc_rb_s_to_byte_buffer(char *string, size_t length) {
gpr_slice slice = gpr_slice_from_copied_buffer(string, length);
- grpc_byte_buffer *buffer = grpc_byte_buffer_create(&slice, 1);
+ grpc_byte_buffer *buffer = grpc_raw_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
return buffer;
}