diff options
Diffstat (limited to 'src/csharp/ext/grpc_csharp_ext.c')
-rw-r--r-- | src/csharp/ext/grpc_csharp_ext.c | 165 |
1 files changed, 69 insertions, 96 deletions
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 173e5c8a46..ec125db78b 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -33,10 +33,12 @@ #include "src/core/support/string.h" +#include <grpc/byte_buffer_reader.h> #include <grpc/support/port_platform.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/slice.h> +#include <grpc/support/string_util.h> #include <grpc/support/thd.h> #include <grpc/grpc.h> #include <grpc/grpc_security.h> @@ -58,13 +60,11 @@ grpc_byte_buffer *string_to_byte_buffer(const char *buffer, size_t len) { gpr_slice slice = gpr_slice_from_copied_buffer(buffer, len); - grpc_byte_buffer *bb = grpc_byte_buffer_create(&slice, 1); + grpc_byte_buffer *bb = grpc_raw_byte_buffer_create(&slice, 1); gpr_slice_unref(slice); return bb; } -typedef void(GPR_CALLTYPE *callback_funcptr)(gpr_int32 success, void *batch_context); - /* * Helper to maintain lifetime of batch op inputs and store batch op outputs. */ @@ -89,13 +89,9 @@ typedef struct gprcsharp_batch_context { grpc_call_details call_details; grpc_metadata_array request_metadata; } server_rpc_new; - - /* callback will be called upon completion */ - callback_funcptr callback; - } grpcsharp_batch_context; -grpcsharp_batch_context *grpcsharp_batch_context_create() { +GPR_EXPORT grpcsharp_batch_context *GPR_CALLTYPE grpcsharp_batch_context_create() { grpcsharp_batch_context *ctx = gpr_malloc(sizeof(grpcsharp_batch_context)); memset(ctx, 0, sizeof(grpcsharp_batch_context)); return ctx; @@ -190,7 +186,7 @@ void grpcsharp_metadata_array_move(grpc_metadata_array *dest, src->metadata = NULL; } -void grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) { +GPR_EXPORT void GPR_CALLTYPE grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) { if (!ctx) { return; } @@ -236,13 +232,13 @@ GPR_EXPORT gpr_intptr GPR_CALLTYPE grpcsharp_batch_context_recv_message_length( */ GPR_EXPORT void GPR_CALLTYPE grpcsharp_batch_context_recv_message_to_buffer( const grpcsharp_batch_context *ctx, char *buffer, size_t buffer_len) { - grpc_byte_buffer_reader *reader; + grpc_byte_buffer_reader reader; gpr_slice slice; size_t offset = 0; - reader = grpc_byte_buffer_reader_create(ctx->recv_message); + grpc_byte_buffer_reader_init(&reader, ctx->recv_message); - while (grpc_byte_buffer_reader_next(reader, &slice)) { + while (grpc_byte_buffer_reader_next(&reader, &slice)) { size_t len = GPR_SLICE_LENGTH(slice); GPR_ASSERT(offset + len <= buffer_len); memcpy(buffer + offset, GPR_SLICE_START_PTR(slice), @@ -250,7 +246,6 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_batch_context_recv_message_to_buffer( offset += len; gpr_slice_unref(slice); } - grpc_byte_buffer_reader_destroy(reader); } GPR_EXPORT grpc_status_code GPR_CALLTYPE @@ -305,25 +300,14 @@ grpcsharp_completion_queue_destroy(grpc_completion_queue *cq) { grpc_completion_queue_destroy(cq); } -GPR_EXPORT grpc_completion_type GPR_CALLTYPE -grpcsharp_completion_queue_next_with_callback(grpc_completion_queue *cq) { - grpc_event ev; - grpcsharp_batch_context *batch_context; - grpc_completion_type t; - - ev = grpc_completion_queue_next(cq, gpr_inf_future); - t = ev.type; - if (t == GRPC_OP_COMPLETE && ev.tag) { - /* NEW API handler */ - batch_context = (grpcsharp_batch_context *)ev.tag; - batch_context->callback((gpr_int32) ev.success, batch_context); - grpcsharp_batch_context_destroy(batch_context); - } +GPR_EXPORT grpc_event GPR_CALLTYPE +grpcsharp_completion_queue_next(grpc_completion_queue *cq) { + return grpc_completion_queue_next(cq, gpr_inf_future); +} - /* return completion type to allow some handling for events that have no - * tag - such as GRPC_QUEUE_SHUTDOWN - */ - return t; +GPR_EXPORT grpc_event GPR_CALLTYPE +grpcsharp_completion_queue_pluck(grpc_completion_queue *cq, void *tag) { + return grpc_completion_queue_pluck(cq, tag, gpr_inf_future); } /* Channel */ @@ -369,6 +353,16 @@ grpcsharp_channel_args_set_string(grpc_channel_args *args, size_t index, } GPR_EXPORT void GPR_CALLTYPE +grpcsharp_channel_args_set_integer(grpc_channel_args *args, size_t index, + const char *key, int value) { + GPR_ASSERT(args); + GPR_ASSERT(index < args->num_args); + args->args[index].type = GRPC_ARG_INTEGER; + args->args[index].key = gpr_strdup(key); + args->args[index].value.integer = value; +} + +GPR_EXPORT void GPR_CALLTYPE grpcsharp_channel_args_destroy(grpc_channel_args *args) { size_t i; if (args) { @@ -412,32 +406,34 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) { } GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback, +grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer, size_t send_buffer_len, grpc_metadata_array *initial_metadata) { /* TODO: don't use magic number */ grpc_op ops[6]; - grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); - ctx->callback = callback; - ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; grpcsharp_metadata_array_move(&(ctx->send_initial_metadata), initial_metadata); ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count; ops[0].data.send_initial_metadata.metadata = ctx->send_initial_metadata.metadata; + ops[0].flags = 0; ops[1].op = GRPC_OP_SEND_MESSAGE; ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); ops[1].data.send_message = ctx->send_message; + ops[1].flags = 0; ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + ops[2].flags = 0; ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + ops[3].flags = 0; ops[4].op = GRPC_OP_RECV_MESSAGE; ops[4].data.recv_message = &(ctx->recv_message); + ops[4].flags = 0; ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT; ops[5].data.recv_status_on_client.trailing_metadata = @@ -449,50 +445,32 @@ grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback, &(ctx->recv_status_on_client.status_details); ops[5].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); + ops[5].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } -/* Synchronous unary call */ -GPR_EXPORT void GPR_CALLTYPE -grpcsharp_call_blocking_unary(grpc_call *call, - grpc_completion_queue *dedicated_cq, - callback_funcptr callback, - const char *send_buffer, size_t send_buffer_len, - grpc_metadata_array *initial_metadata) { - GPR_ASSERT(grpcsharp_call_start_unary(call, callback, send_buffer, - send_buffer_len, - initial_metadata) == GRPC_CALL_OK); - - /* TODO: we would like to use pluck, but we don't know the tag */ - GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) == - GRPC_OP_COMPLETE); - grpc_completion_queue_shutdown(dedicated_cq); - GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) == - GRPC_QUEUE_SHUTDOWN); -} - GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_client_streaming(grpc_call *call, - callback_funcptr callback, + grpcsharp_batch_context *ctx, grpc_metadata_array *initial_metadata) { /* TODO: don't use magic number */ grpc_op ops[4]; - grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); - ctx->callback = callback; - ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; grpcsharp_metadata_array_move(&(ctx->send_initial_metadata), initial_metadata); ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count; ops[0].data.send_initial_metadata.metadata = ctx->send_initial_metadata.metadata; + ops[0].flags = 0; ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + ops[1].flags = 0; ops[2].op = GRPC_OP_RECV_MESSAGE; ops[2].data.recv_message = &(ctx->recv_message); + ops[2].flags = 0; ops[3].op = GRPC_OP_RECV_STATUS_ON_CLIENT; ops[3].data.recv_status_on_client.trailing_metadata = @@ -504,33 +482,35 @@ grpcsharp_call_start_client_streaming(grpc_call *call, &(ctx->recv_status_on_client.status_details); ops[3].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); + ops[3].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming( - grpc_call *call, callback_funcptr callback, const char *send_buffer, + grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer, size_t send_buffer_len, grpc_metadata_array *initial_metadata) { /* TODO: don't use magic number */ grpc_op ops[5]; - grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); - ctx->callback = callback; - ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; grpcsharp_metadata_array_move(&(ctx->send_initial_metadata), initial_metadata); ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count; ops[0].data.send_initial_metadata.metadata = ctx->send_initial_metadata.metadata; + ops[0].flags = 0; ops[1].op = GRPC_OP_SEND_MESSAGE; ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); ops[1].data.send_message = ctx->send_message; + ops[1].flags = 0; ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + ops[2].flags = 0; ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + ops[3].flags = 0; ops[4].op = GRPC_OP_RECV_STATUS_ON_CLIENT; ops[4].data.recv_status_on_client.trailing_metadata = @@ -542,28 +522,28 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming( &(ctx->recv_status_on_client.status_details); ops[4].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); + ops[4].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_duplex_streaming(grpc_call *call, - callback_funcptr callback, + grpcsharp_batch_context *ctx, grpc_metadata_array *initial_metadata) { /* TODO: don't use magic number */ grpc_op ops[3]; - grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); - ctx->callback = callback; - ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; grpcsharp_metadata_array_move(&(ctx->send_initial_metadata), initial_metadata); ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count; ops[0].data.send_initial_metadata.metadata = ctx->send_initial_metadata.metadata; + ops[0].flags = 0; ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + ops[1].flags = 0; ops[2].op = GRPC_OP_RECV_STATUS_ON_CLIENT; ops[2].data.recv_status_on_client.trailing_metadata = @@ -575,85 +555,76 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call, &(ctx->recv_status_on_client.status_details); ops[2].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); + ops[2].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_call_send_message(grpc_call *call, callback_funcptr callback, +grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer, size_t send_buffer_len) { /* TODO: don't use magic number */ grpc_op ops[1]; - grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); - ctx->callback = callback; - ops[0].op = GRPC_OP_SEND_MESSAGE; ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); ops[0].data.send_message = ctx->send_message; + ops[0].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_close_from_client(grpc_call *call, - callback_funcptr callback) { + grpcsharp_batch_context *ctx) { /* TODO: don't use magic number */ grpc_op ops[1]; - grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); - ctx->callback = callback; - ops[0].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + ops[0].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(grpc_call *call, - callback_funcptr callback, + grpcsharp_batch_context *ctx, grpc_status_code status_code, const char *status_details) { /* TODO: don't use magic number */ grpc_op ops[1]; - grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); - ctx->callback = callback; - ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER; ops[0].data.send_status_from_server.status = status_code; ops[0].data.send_status_from_server.status_details = gpr_strdup(status_details); ops[0].data.send_status_from_server.trailing_metadata = NULL; ops[0].data.send_status_from_server.trailing_metadata_count = 0; + ops[0].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_call_recv_message(grpc_call *call, callback_funcptr callback) { +grpcsharp_call_recv_message(grpc_call *call, grpcsharp_batch_context *ctx) { /* TODO: don't use magic number */ grpc_op ops[1]; - grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); - ctx->callback = callback; - ops[0].op = GRPC_OP_RECV_MESSAGE; ops[0].data.recv_message = &(ctx->recv_message); + ops[0].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_call_start_serverside(grpc_call *call, callback_funcptr callback) { +grpcsharp_call_start_serverside(grpc_call *call, grpcsharp_batch_context *ctx) { /* TODO: don't use magic number */ grpc_op ops[2]; - - grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); - ctx->callback = callback; - ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; ops[0].data.send_initial_metadata.count = 0; ops[0].data.send_initial_metadata.metadata = NULL; + ops[0].flags = 0; ops[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER; ops[1].data.recv_close_on_server.cancelled = (&ctx->recv_close_on_server_cancelled); + ops[1].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } @@ -680,9 +651,7 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_start(grpc_server *server) { GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_shutdown_and_notify_callback(grpc_server *server, grpc_completion_queue *cq, - callback_funcptr callback) { - grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); - ctx->callback = callback; + grpcsharp_batch_context *ctx) { grpc_server_shutdown_and_notify(server, cq, ctx); } @@ -696,10 +665,7 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_destroy(grpc_server *server) { GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_server_request_call(grpc_server *server, grpc_completion_queue *cq, - callback_funcptr callback) { - grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); - ctx->callback = callback; - + grpcsharp_batch_context *ctx) { return grpc_server_request_call( server, &(ctx->server_rpc_new.call), &(ctx->server_rpc_new.call_details), &(ctx->server_rpc_new.request_metadata), cq, cq, ctx); @@ -788,11 +754,18 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_redirect_log(grpcsharp_log_func func) { gpr_set_log_function(grpcsharp_log_handler); } +typedef void(GPR_CALLTYPE *test_callback_funcptr)(gpr_int32 success); + /* For testing */ GPR_EXPORT void GPR_CALLTYPE -grpcsharp_test_callback(callback_funcptr callback) { - callback(1, NULL); +grpcsharp_test_callback(test_callback_funcptr callback) { + callback(1); } /* For testing */ GPR_EXPORT void *GPR_CALLTYPE grpcsharp_test_nop(void *ptr) { return ptr; } + +/* For testing */ +GPR_EXPORT gpr_int32 GPR_CALLTYPE grpcsharp_sizeof_grpc_event(void) { + return sizeof(grpc_event); +} |