diff options
Diffstat (limited to 'src/core/ext/transport/inproc/inproc_transport.cc')
-rw-r--r-- | src/core/ext/transport/inproc/inproc_transport.cc | 287 |
1 files changed, 144 insertions, 143 deletions
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 1551f5e988..a7a6db8bc2 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -50,20 +50,20 @@ typedef struct { typedef struct inproc_transport { grpc_transport base; - shared_mu *mu; + shared_mu* mu; gpr_refcount refs; bool is_client; grpc_connectivity_state_tracker connectivity; - void (*accept_stream_cb)(grpc_exec_ctx *exec_ctx, void *user_data, - grpc_transport *transport, const void *server_data); - void *accept_stream_data; + void (*accept_stream_cb)(grpc_exec_ctx* exec_ctx, void* user_data, + grpc_transport* transport, const void* server_data); + void* accept_stream_data; bool is_closed; - struct inproc_transport *other_side; - struct inproc_stream *stream_list; + struct inproc_transport* other_side; + struct inproc_stream* stream_list; } inproc_transport; typedef struct inproc_stream { - inproc_transport *t; + inproc_transport* t; grpc_metadata_batch to_read_initial_md; uint32_t to_read_initial_md_flags; bool to_read_initial_md_filled; @@ -80,21 +80,21 @@ typedef struct inproc_stream { grpc_millis write_buffer_deadline; grpc_metadata_batch write_buffer_trailing_md; bool write_buffer_trailing_md_filled; - grpc_error *write_buffer_cancel_error; + grpc_error* write_buffer_cancel_error; - struct inproc_stream *other_side; + struct inproc_stream* other_side; bool other_side_closed; // won't talk anymore bool write_buffer_other_side_closed; // on hold - grpc_stream_refcount *refs; - grpc_closure *closure_at_destroy; + grpc_stream_refcount* refs; + grpc_closure* closure_at_destroy; - gpr_arena *arena; + gpr_arena* arena; - grpc_transport_stream_op_batch *send_message_op; - grpc_transport_stream_op_batch *send_trailing_md_op; - grpc_transport_stream_op_batch *recv_initial_md_op; - grpc_transport_stream_op_batch *recv_message_op; - grpc_transport_stream_op_batch *recv_trailing_md_op; + grpc_transport_stream_op_batch* send_message_op; + grpc_transport_stream_op_batch* send_trailing_md_op; + grpc_transport_stream_op_batch* recv_initial_md_op; + grpc_transport_stream_op_batch* recv_message_op; + grpc_transport_stream_op_batch* recv_trailing_md_op; grpc_slice_buffer recv_message; grpc_slice_buffer_stream recv_stream; @@ -107,29 +107,29 @@ typedef struct inproc_stream { bool closed; - grpc_error *cancel_self_error; - grpc_error *cancel_other_error; + grpc_error* cancel_self_error; + grpc_error* cancel_other_error; grpc_millis deadline; bool listed; - struct inproc_stream *stream_list_prev; - struct inproc_stream *stream_list_next; + struct inproc_stream* stream_list_prev; + struct inproc_stream* stream_list_next; } inproc_stream; static grpc_closure do_nothing_closure; -static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, - grpc_error *error); -static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); +static bool cancel_stream_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s, + grpc_error* error); +static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error); -static void ref_transport(inproc_transport *t) { +static void ref_transport(inproc_transport* t) { INPROC_LOG(GPR_DEBUG, "ref_transport %p", t); gpr_ref(&t->refs); } -static void really_destroy_transport(grpc_exec_ctx *exec_ctx, - inproc_transport *t) { +static void really_destroy_transport(grpc_exec_ctx* exec_ctx, + inproc_transport* t) { INPROC_LOG(GPR_DEBUG, "really_destroy_transport %p", t); grpc_connectivity_state_destroy(exec_ctx, &t->connectivity); if (gpr_unref(&t->mu->refs)) { @@ -138,7 +138,7 @@ static void really_destroy_transport(grpc_exec_ctx *exec_ctx, gpr_free(t); } -static void unref_transport(grpc_exec_ctx *exec_ctx, inproc_transport *t) { +static void unref_transport(grpc_exec_ctx* exec_ctx, inproc_transport* t) { INPROC_LOG(GPR_DEBUG, "unref_transport %p", t); if (gpr_unref(&t->refs)) { really_destroy_transport(exec_ctx, t); @@ -153,18 +153,18 @@ static void unref_transport(grpc_exec_ctx *exec_ctx, inproc_transport *t) { #define STREAM_UNREF(e, refs, reason) grpc_stream_unref(e, refs) #endif -static void ref_stream(inproc_stream *s, const char *reason) { +static void ref_stream(inproc_stream* s, const char* reason) { INPROC_LOG(GPR_DEBUG, "ref_stream %p %s", s, reason); STREAM_REF(s->refs, reason); } -static void unref_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s, - const char *reason) { +static void unref_stream(grpc_exec_ctx* exec_ctx, inproc_stream* s, + const char* reason) { INPROC_LOG(GPR_DEBUG, "unref_stream %p %s", s, reason); STREAM_UNREF(exec_ctx, s->refs, reason); } -static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) { +static void really_destroy_stream(grpc_exec_ctx* exec_ctx, inproc_stream* s) { INPROC_LOG(GPR_DEBUG, "really_destroy_stream %p", s); GRPC_ERROR_UNREF(s->write_buffer_cancel_error); @@ -182,12 +182,12 @@ static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) { } } -static void log_metadata(const grpc_metadata_batch *md_batch, bool is_client, +static void log_metadata(const grpc_metadata_batch* md_batch, bool is_client, bool is_initial) { - for (grpc_linked_mdelem *md = md_batch->list.head; md != NULL; + for (grpc_linked_mdelem* md = md_batch->list.head; md != NULL; md = md->next) { - char *key = grpc_slice_to_c_string(GRPC_MDKEY(md->md)); - char *value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md)); + char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md)); + char* value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md)); gpr_log(GPR_INFO, "INPROC:%s:%s: %s: %s", is_initial ? "HDR" : "TRL", is_client ? "CLI" : "SVR", key, value); gpr_free(key); @@ -195,10 +195,10 @@ static void log_metadata(const grpc_metadata_batch *md_batch, bool is_client, } } -static grpc_error *fill_in_metadata(grpc_exec_ctx *exec_ctx, inproc_stream *s, - const grpc_metadata_batch *metadata, - uint32_t flags, grpc_metadata_batch *out_md, - uint32_t *outflags, bool *markfilled) { +static grpc_error* fill_in_metadata(grpc_exec_ctx* exec_ctx, inproc_stream* s, + const grpc_metadata_batch* metadata, + uint32_t flags, grpc_metadata_batch* out_md, + uint32_t* outflags, bool* markfilled) { if (GRPC_TRACER_ON(grpc_inproc_trace)) { log_metadata(metadata, s->t->is_client, outflags != NULL); } @@ -209,11 +209,11 @@ static grpc_error *fill_in_metadata(grpc_exec_ctx *exec_ctx, inproc_stream *s, if (markfilled != NULL) { *markfilled = true; } - grpc_error *error = GRPC_ERROR_NONE; - for (grpc_linked_mdelem *elem = metadata->list.head; + grpc_error* error = GRPC_ERROR_NONE; + for (grpc_linked_mdelem* elem = metadata->list.head; (elem != NULL) && (error == GRPC_ERROR_NONE); elem = elem->next) { - grpc_linked_mdelem *nelem = - (grpc_linked_mdelem *)gpr_arena_alloc(s->arena, sizeof(*nelem)); + grpc_linked_mdelem* nelem = + (grpc_linked_mdelem*)gpr_arena_alloc(s->arena, sizeof(*nelem)); nelem->md = grpc_mdelem_from_slices( exec_ctx, grpc_slice_intern(GRPC_MDKEY(elem->md)), grpc_slice_intern(GRPC_MDVALUE(elem->md))); @@ -223,12 +223,12 @@ static grpc_error *fill_in_metadata(grpc_exec_ctx *exec_ctx, inproc_stream *s, return error; } -static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_stream_refcount *refcount, - const void *server_data, gpr_arena *arena) { +static int init_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, grpc_stream_refcount* refcount, + const void* server_data, gpr_arena* arena) { INPROC_LOG(GPR_DEBUG, "init_stream %p %p %p", gt, gs, server_data); - inproc_transport *t = (inproc_transport *)gt; - inproc_stream *s = (inproc_stream *)gs; + inproc_transport* t = (inproc_transport*)gt; + inproc_stream* s = (inproc_stream*)gs; s->arena = arena; s->refs = refcount; @@ -277,7 +277,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, if (!server_data) { ref_transport(t); - inproc_transport *st = t->other_side; + inproc_transport* st = t->other_side; ref_transport(st); s->other_side = NULL; // will get filled in soon // Pass the client-side stream address to the server-side for a ref @@ -286,10 +286,10 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, INPROC_LOG(GPR_DEBUG, "calling accept stream cb %p %p", st->accept_stream_cb, st->accept_stream_data); (*st->accept_stream_cb)(exec_ctx, st->accept_stream_data, &st->base, - (void *)s); + (void*)s); } else { // This is the server-side and is being called through accept_stream_cb - inproc_stream *cs = (inproc_stream *)server_data; + inproc_stream* cs = (inproc_stream*)server_data; s->other_side = cs; // Ref the server-side stream on behalf of the client now ref_stream(s, "inproc_init_stream:srv"); @@ -326,15 +326,15 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, return 0; // return value is not important } -static void close_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s) { +static void close_stream_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s) { if (!s->closed) { // Release the metadata that we would have written out grpc_metadata_batch_destroy(exec_ctx, &s->write_buffer_initial_md); grpc_metadata_batch_destroy(exec_ctx, &s->write_buffer_trailing_md); if (s->listed) { - inproc_stream *p = s->stream_list_prev; - inproc_stream *n = s->stream_list_next; + inproc_stream* p = s->stream_list_prev; + inproc_stream* n = s->stream_list_next; if (p != NULL) { p->stream_list_next = n; } else { @@ -352,8 +352,8 @@ static void close_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s) { } // This function means that we are done talking/listening to the other side -static void close_other_side_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, - const char *reason) { +static void close_other_side_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s, + const char* reason) { if (s->other_side != NULL) { // First release the metadata that came from the other side's arena grpc_metadata_batch_destroy(exec_ctx, &s->to_read_initial_md); @@ -371,10 +371,10 @@ static void close_other_side_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, // this stream_op_batch is only one of the pending operations for this // stream. This is called when one of the pending operations for the stream // is done and about to be NULLed out -static void complete_if_batch_end_locked(grpc_exec_ctx *exec_ctx, - inproc_stream *s, grpc_error *error, - grpc_transport_stream_op_batch *op, - const char *msg) { +static void complete_if_batch_end_locked(grpc_exec_ctx* exec_ctx, + inproc_stream* s, grpc_error* error, + grpc_transport_stream_op_batch* op, + const char* msg) { int is_sm = (int)(op == s->send_message_op); int is_stm = (int)(op == s->send_trailing_md_op); int is_rim = (int)(op == s->recv_initial_md_op); @@ -387,9 +387,9 @@ static void complete_if_batch_end_locked(grpc_exec_ctx *exec_ctx, } } -static void maybe_schedule_op_closure_locked(grpc_exec_ctx *exec_ctx, - inproc_stream *s, - grpc_error *error) { +static void maybe_schedule_op_closure_locked(grpc_exec_ctx* exec_ctx, + inproc_stream* s, + grpc_error* error) { if (s && s->ops_needed && !s->op_closure_scheduled) { GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_REF(error)); s->op_closure_scheduled = true; @@ -397,8 +397,8 @@ static void maybe_schedule_op_closure_locked(grpc_exec_ctx *exec_ctx, } } -static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, - grpc_error *error) { +static void fail_helper_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s, + grpc_error* error) { INPROC_LOG(GPR_DEBUG, "op_state_machine %p fail_helper", s); // If we're failing this side, we need to make sure that // we also send or have already sent trailing metadata @@ -409,10 +409,10 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, grpc_metadata_batch fake_md; grpc_metadata_batch_init(&fake_md); - inproc_stream *other = s->other_side; - grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md + inproc_stream* other = s->other_side; + grpc_metadata_batch* dest = (other == NULL) ? &s->write_buffer_trailing_md : &other->to_read_trailing_md; - bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled + bool* destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled : &other->to_read_trailing_md_filled; fill_in_metadata(exec_ctx, s, &fake_md, 0, dest, NULL, destfilled); grpc_metadata_batch_destroy(exec_ctx, &fake_md); @@ -427,20 +427,20 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, } } if (s->recv_initial_md_op) { - grpc_error *err; + grpc_error* err; if (!s->t->is_client) { // If this is a server, provide initial metadata with a path and authority // since it expects that as well as no error yet grpc_metadata_batch fake_md; grpc_metadata_batch_init(&fake_md); - grpc_linked_mdelem *path_md = - (grpc_linked_mdelem *)gpr_arena_alloc(s->arena, sizeof(*path_md)); + grpc_linked_mdelem* path_md = + (grpc_linked_mdelem*)gpr_arena_alloc(s->arena, sizeof(*path_md)); path_md->md = grpc_mdelem_from_slices(exec_ctx, g_fake_path_key, g_fake_path_value); GPR_ASSERT(grpc_metadata_batch_link_tail(exec_ctx, &fake_md, path_md) == GRPC_ERROR_NONE); - grpc_linked_mdelem *auth_md = - (grpc_linked_mdelem *)gpr_arena_alloc(s->arena, sizeof(*auth_md)); + grpc_linked_mdelem* auth_md = + (grpc_linked_mdelem*)gpr_arena_alloc(s->arena, sizeof(*auth_md)); auth_md->md = grpc_mdelem_from_slices(exec_ctx, g_fake_auth_key, g_fake_auth_value); GPR_ASSERT(grpc_metadata_batch_link_tail(exec_ctx, &fake_md, auth_md) == @@ -509,9 +509,9 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, GRPC_ERROR_UNREF(error); } -static void message_transfer_locked(grpc_exec_ctx *exec_ctx, - inproc_stream *sender, - inproc_stream *receiver) { +static void message_transfer_locked(grpc_exec_ctx* exec_ctx, + inproc_stream* sender, + inproc_stream* receiver) { size_t remaining = sender->send_message_op->payload->send_message.send_message->length; if (receiver->recv_inited) { @@ -525,7 +525,7 @@ static void message_transfer_locked(grpc_exec_ctx *exec_ctx, GPR_ASSERT(grpc_byte_stream_next( exec_ctx, sender->send_message_op->payload->send_message.send_message, SIZE_MAX, &unused)); - grpc_error *error = grpc_byte_stream_pull( + grpc_error* error = grpc_byte_stream_pull( exec_ctx, sender->send_message_op->payload->send_message.send_message, &message_slice); if (error != GRPC_ERROR_NONE) { @@ -558,8 +558,8 @@ static void message_transfer_locked(grpc_exec_ctx *exec_ctx, sender->send_message_op = NULL; } -static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { // This function gets called when we have contents in the unprocessed reads // Get what we want based on our ops wanted // Schedule our appropriate closures @@ -567,17 +567,17 @@ static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg, // Since this is a closure directly invoked by the combiner, it should not // unref the error parameter explicitly; the combiner will do that implicitly - grpc_error *new_err = GRPC_ERROR_NONE; + grpc_error* new_err = GRPC_ERROR_NONE; bool needs_close = false; INPROC_LOG(GPR_DEBUG, "op_state_machine %p", arg); - inproc_stream *s = (inproc_stream *)arg; - gpr_mu *mu = &s->t->mu->mu; // keep aside in case s gets closed + inproc_stream* s = (inproc_stream*)arg; + gpr_mu* mu = &s->t->mu->mu; // keep aside in case s gets closed gpr_mu_lock(mu); s->op_closure_scheduled = false; // cancellation takes precedence - inproc_stream *other = s->other_side; + inproc_stream* other = s->other_side; if (s->cancel_self_error != GRPC_ERROR_NONE) { fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(s->cancel_self_error)); @@ -612,9 +612,9 @@ static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg, (!s->send_message_op || (s->t->is_client && (s->trailing_md_recvd || s->to_read_trailing_md_filled)))) { - grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md + grpc_metadata_batch* dest = (other == NULL) ? &s->write_buffer_trailing_md : &other->to_read_trailing_md; - bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled + bool* destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled : &other->to_read_trailing_md_filled; if (*destfilled || s->trailing_md_sent) { // The buffer is already in use; that's an error! @@ -810,8 +810,8 @@ done: GRPC_ERROR_UNREF(new_err); } -static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, - grpc_error *error) { +static bool cancel_stream_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s, + grpc_error* error) { bool ret = false; // was the cancel accepted INPROC_LOG(GPR_DEBUG, "cancel_stream %p with %s", s, grpc_error_string(error)); @@ -826,10 +826,10 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, grpc_metadata_batch cancel_md; grpc_metadata_batch_init(&cancel_md); - inproc_stream *other = s->other_side; - grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md + inproc_stream* other = s->other_side; + grpc_metadata_batch* dest = (other == NULL) ? &s->write_buffer_trailing_md : &other->to_read_trailing_md; - bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled + bool* destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled : &other->to_read_trailing_md_filled; fill_in_metadata(exec_ctx, s, &cancel_md, 0, dest, NULL, destfilled); grpc_metadata_batch_destroy(exec_ctx, &cancel_md); @@ -862,12 +862,12 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, return ret; } -static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, - grpc_transport_stream_op_batch *op) { +static void perform_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, + grpc_transport_stream_op_batch* op) { INPROC_LOG(GPR_DEBUG, "perform_stream_op %p %p %p", gt, gs, op); - inproc_stream *s = (inproc_stream *)gs; - gpr_mu *mu = &s->t->mu->mu; // save aside in case s gets closed + inproc_stream* s = (inproc_stream*)gs; + gpr_mu* mu = &s->t->mu->mu; // save aside in case s gets closed gpr_mu_lock(mu); if (GRPC_TRACER_ON(grpc_inproc_trace)) { @@ -880,8 +880,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->t->is_client, false); } } - grpc_error *error = GRPC_ERROR_NONE; - grpc_closure *on_complete = op->on_complete; + grpc_error* error = GRPC_ERROR_NONE; + grpc_closure* on_complete = op->on_complete; if (on_complete == NULL) { on_complete = &do_nothing_closure; } @@ -907,18 +907,18 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, bool needs_close = false; - inproc_stream *other = s->other_side; + inproc_stream* other = s->other_side; if (error == GRPC_ERROR_NONE && (op->send_initial_metadata || op->send_trailing_metadata)) { if (s->t->is_closed) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown"); } if (error == GRPC_ERROR_NONE && op->send_initial_metadata) { - grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_initial_md + grpc_metadata_batch* dest = (other == NULL) ? &s->write_buffer_initial_md : &other->to_read_initial_md; - uint32_t *destflags = (other == NULL) ? &s->write_buffer_initial_md_flags + uint32_t* destflags = (other == NULL) ? &s->write_buffer_initial_md_flags : &other->to_read_initial_md_flags; - bool *destfilled = (other == NULL) ? &s->write_buffer_initial_md_filled + bool* destfilled = (other == NULL) ? &s->write_buffer_initial_md_filled : &other->to_read_initial_md_filled; if (*destfilled || s->initial_md_sent) { // The buffer is already in use; that's an error! @@ -933,7 +933,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, dest, destflags, destfilled); } if (s->t->is_client) { - grpc_millis *dl = + grpc_millis* dl = (other == NULL) ? &s->write_buffer_deadline : &other->deadline; *dl = GPR_MIN(*dl, op->payload->send_initial_metadata .send_initial_metadata->deadline); @@ -972,8 +972,9 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, // 4. We want to receive a message and there is a message ready // 5. There is trailing metadata, even if nothing specifically wants // that because that can shut down the receive message as well - if ((op->send_message && other && ((other->recv_message_op != NULL) || - (other->recv_trailing_md_op != NULL))) || + if ((op->send_message && other && + ((other->recv_message_op != NULL) || + (other->recv_trailing_md_op != NULL))) || (op->send_trailing_metadata && !op->send_message) || (op->recv_initial_metadata && s->to_read_initial_md_filled) || (op->recv_message && other && (other->send_message_op != NULL)) || @@ -1020,8 +1021,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, GRPC_ERROR_UNREF(error); } -static void close_transport_locked(grpc_exec_ctx *exec_ctx, - inproc_transport *t) { +static void close_transport_locked(grpc_exec_ctx* exec_ctx, + inproc_transport* t) { INPROC_LOG(GPR_DEBUG, "close_transport %p %d", t, t->is_closed); grpc_connectivity_state_set( exec_ctx, &t->connectivity, GRPC_CHANNEL_SHUTDOWN, @@ -1041,9 +1042,9 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, } } -static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_transport_op *op) { - inproc_transport *t = (inproc_transport *)gt; +static void perform_transport_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_transport_op* op) { + inproc_transport* t = (inproc_transport*)gt; INPROC_LOG(GPR_DEBUG, "perform_transport_op %p %p", t, op); gpr_mu_lock(&t->mu->mu); if (op->on_connectivity_state_change) { @@ -1075,17 +1076,17 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, gpr_mu_unlock(&t->mu->mu); } -static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, - grpc_closure *then_schedule_closure) { +static void destroy_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, + grpc_closure* then_schedule_closure) { INPROC_LOG(GPR_DEBUG, "destroy_stream %p %p", gs, then_schedule_closure); - inproc_stream *s = (inproc_stream *)gs; + inproc_stream* s = (inproc_stream*)gs; s->closure_at_destroy = then_schedule_closure; really_destroy_stream(exec_ctx, s); } -static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { - inproc_transport *t = (inproc_transport *)gt; +static void destroy_transport(grpc_exec_ctx* exec_ctx, grpc_transport* gt) { + inproc_transport* t = (inproc_transport*)gt; INPROC_LOG(GPR_DEBUG, "destroy_transport %p", t); gpr_mu_lock(&t->mu->mu); close_transport_locked(exec_ctx, t); @@ -1098,24 +1099,24 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { * INTEGRATION GLUE */ -static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_pollset *pollset) { +static void set_pollset(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, grpc_pollset* pollset) { // Nothing to do here } -static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_pollset_set *pollset_set) { +static void set_pollset_set(grpc_exec_ctx* exec_ctx, grpc_transport* gt, + grpc_stream* gs, grpc_pollset_set* pollset_set) { // Nothing to do here } -static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *t) { +static grpc_endpoint* get_endpoint(grpc_exec_ctx* exec_ctx, grpc_transport* t) { return NULL; } /******************************************************************************* * GLOBAL INIT AND DESTROY */ -static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} +static void do_nothing(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {} void grpc_inproc_transport_init(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -1146,16 +1147,16 @@ static const grpc_transport_vtable inproc_vtable = { /******************************************************************************* * Main inproc transport functions */ -static void inproc_transports_create(grpc_exec_ctx *exec_ctx, - grpc_transport **server_transport, - const grpc_channel_args *server_args, - grpc_transport **client_transport, - const grpc_channel_args *client_args) { +static void inproc_transports_create(grpc_exec_ctx* exec_ctx, + grpc_transport** server_transport, + const grpc_channel_args* server_args, + grpc_transport** client_transport, + const grpc_channel_args* client_args) { INPROC_LOG(GPR_DEBUG, "inproc_transports_create"); - inproc_transport *st = (inproc_transport *)gpr_zalloc(sizeof(*st)); - inproc_transport *ct = (inproc_transport *)gpr_zalloc(sizeof(*ct)); + inproc_transport* st = (inproc_transport*)gpr_zalloc(sizeof(*st)); + inproc_transport* ct = (inproc_transport*)gpr_zalloc(sizeof(*ct)); // Share one lock between both sides since both sides get affected - st->mu = ct->mu = (shared_mu *)gpr_malloc(sizeof(*st->mu)); + st->mu = ct->mu = (shared_mu*)gpr_malloc(sizeof(*st->mu)); gpr_mu_init(&st->mu->mu); gpr_ref_init(&st->mu->refs, 2); st->base.vtable = &inproc_vtable; @@ -1174,37 +1175,37 @@ static void inproc_transports_create(grpc_exec_ctx *exec_ctx, ct->other_side = st; st->stream_list = NULL; ct->stream_list = NULL; - *server_transport = (grpc_transport *)st; - *client_transport = (grpc_transport *)ct; + *server_transport = (grpc_transport*)st; + *client_transport = (grpc_transport*)ct; } -grpc_channel *grpc_inproc_channel_create(grpc_server *server, - grpc_channel_args *args, - void *reserved) { +grpc_channel* grpc_inproc_channel_create(grpc_server* server, + grpc_channel_args* args, + void* reserved) { GRPC_API_TRACE("grpc_inproc_channel_create(server=%p, args=%p)", 2, (server, args)); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - const grpc_channel_args *server_args = grpc_server_get_channel_args(server); + const grpc_channel_args* server_args = grpc_server_get_channel_args(server); // Add a default authority channel argument for the client grpc_arg default_authority_arg; default_authority_arg.type = GRPC_ARG_STRING; - default_authority_arg.key = (char *)GRPC_ARG_DEFAULT_AUTHORITY; - default_authority_arg.value.string = (char *)"inproc.authority"; - grpc_channel_args *client_args = + default_authority_arg.key = (char*)GRPC_ARG_DEFAULT_AUTHORITY; + default_authority_arg.value.string = (char*)"inproc.authority"; + grpc_channel_args* client_args = grpc_channel_args_copy_and_add(args, &default_authority_arg, 1); - grpc_transport *server_transport; - grpc_transport *client_transport; + grpc_transport* server_transport; + grpc_transport* client_transport; inproc_transports_create(&exec_ctx, &server_transport, server_args, &client_transport, client_args); grpc_server_setup_transport(&exec_ctx, server, server_transport, NULL, server_args); - grpc_channel *channel = + grpc_channel* channel = grpc_channel_create(&exec_ctx, "inproc", client_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport); |