aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport/inproc/inproc_transport.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport/inproc/inproc_transport.cc')
-rw-r--r--src/core/ext/transport/inproc/inproc_transport.cc287
1 files changed, 144 insertions, 143 deletions
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc
index 1551f5e988..a7a6db8bc2 100644
--- a/src/core/ext/transport/inproc/inproc_transport.cc
+++ b/src/core/ext/transport/inproc/inproc_transport.cc
@@ -50,20 +50,20 @@ typedef struct {
typedef struct inproc_transport {
grpc_transport base;
- shared_mu *mu;
+ shared_mu* mu;
gpr_refcount refs;
bool is_client;
grpc_connectivity_state_tracker connectivity;
- void (*accept_stream_cb)(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_transport *transport, const void *server_data);
- void *accept_stream_data;
+ void (*accept_stream_cb)(grpc_exec_ctx* exec_ctx, void* user_data,
+ grpc_transport* transport, const void* server_data);
+ void* accept_stream_data;
bool is_closed;
- struct inproc_transport *other_side;
- struct inproc_stream *stream_list;
+ struct inproc_transport* other_side;
+ struct inproc_stream* stream_list;
} inproc_transport;
typedef struct inproc_stream {
- inproc_transport *t;
+ inproc_transport* t;
grpc_metadata_batch to_read_initial_md;
uint32_t to_read_initial_md_flags;
bool to_read_initial_md_filled;
@@ -80,21 +80,21 @@ typedef struct inproc_stream {
grpc_millis write_buffer_deadline;
grpc_metadata_batch write_buffer_trailing_md;
bool write_buffer_trailing_md_filled;
- grpc_error *write_buffer_cancel_error;
+ grpc_error* write_buffer_cancel_error;
- struct inproc_stream *other_side;
+ struct inproc_stream* other_side;
bool other_side_closed; // won't talk anymore
bool write_buffer_other_side_closed; // on hold
- grpc_stream_refcount *refs;
- grpc_closure *closure_at_destroy;
+ grpc_stream_refcount* refs;
+ grpc_closure* closure_at_destroy;
- gpr_arena *arena;
+ gpr_arena* arena;
- grpc_transport_stream_op_batch *send_message_op;
- grpc_transport_stream_op_batch *send_trailing_md_op;
- grpc_transport_stream_op_batch *recv_initial_md_op;
- grpc_transport_stream_op_batch *recv_message_op;
- grpc_transport_stream_op_batch *recv_trailing_md_op;
+ grpc_transport_stream_op_batch* send_message_op;
+ grpc_transport_stream_op_batch* send_trailing_md_op;
+ grpc_transport_stream_op_batch* recv_initial_md_op;
+ grpc_transport_stream_op_batch* recv_message_op;
+ grpc_transport_stream_op_batch* recv_trailing_md_op;
grpc_slice_buffer recv_message;
grpc_slice_buffer_stream recv_stream;
@@ -107,29 +107,29 @@ typedef struct inproc_stream {
bool closed;
- grpc_error *cancel_self_error;
- grpc_error *cancel_other_error;
+ grpc_error* cancel_self_error;
+ grpc_error* cancel_other_error;
grpc_millis deadline;
bool listed;
- struct inproc_stream *stream_list_prev;
- struct inproc_stream *stream_list_next;
+ struct inproc_stream* stream_list_prev;
+ struct inproc_stream* stream_list_next;
} inproc_stream;
static grpc_closure do_nothing_closure;
-static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
- grpc_error *error);
-static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error);
+static bool cancel_stream_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s,
+ grpc_error* error);
+static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error);
-static void ref_transport(inproc_transport *t) {
+static void ref_transport(inproc_transport* t) {
INPROC_LOG(GPR_DEBUG, "ref_transport %p", t);
gpr_ref(&t->refs);
}
-static void really_destroy_transport(grpc_exec_ctx *exec_ctx,
- inproc_transport *t) {
+static void really_destroy_transport(grpc_exec_ctx* exec_ctx,
+ inproc_transport* t) {
INPROC_LOG(GPR_DEBUG, "really_destroy_transport %p", t);
grpc_connectivity_state_destroy(exec_ctx, &t->connectivity);
if (gpr_unref(&t->mu->refs)) {
@@ -138,7 +138,7 @@ static void really_destroy_transport(grpc_exec_ctx *exec_ctx,
gpr_free(t);
}
-static void unref_transport(grpc_exec_ctx *exec_ctx, inproc_transport *t) {
+static void unref_transport(grpc_exec_ctx* exec_ctx, inproc_transport* t) {
INPROC_LOG(GPR_DEBUG, "unref_transport %p", t);
if (gpr_unref(&t->refs)) {
really_destroy_transport(exec_ctx, t);
@@ -153,18 +153,18 @@ static void unref_transport(grpc_exec_ctx *exec_ctx, inproc_transport *t) {
#define STREAM_UNREF(e, refs, reason) grpc_stream_unref(e, refs)
#endif
-static void ref_stream(inproc_stream *s, const char *reason) {
+static void ref_stream(inproc_stream* s, const char* reason) {
INPROC_LOG(GPR_DEBUG, "ref_stream %p %s", s, reason);
STREAM_REF(s->refs, reason);
}
-static void unref_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s,
- const char *reason) {
+static void unref_stream(grpc_exec_ctx* exec_ctx, inproc_stream* s,
+ const char* reason) {
INPROC_LOG(GPR_DEBUG, "unref_stream %p %s", s, reason);
STREAM_UNREF(exec_ctx, s->refs, reason);
}
-static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) {
+static void really_destroy_stream(grpc_exec_ctx* exec_ctx, inproc_stream* s) {
INPROC_LOG(GPR_DEBUG, "really_destroy_stream %p", s);
GRPC_ERROR_UNREF(s->write_buffer_cancel_error);
@@ -182,12 +182,12 @@ static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) {
}
}
-static void log_metadata(const grpc_metadata_batch *md_batch, bool is_client,
+static void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
bool is_initial) {
- for (grpc_linked_mdelem *md = md_batch->list.head; md != NULL;
+ for (grpc_linked_mdelem* md = md_batch->list.head; md != NULL;
md = md->next) {
- char *key = grpc_slice_to_c_string(GRPC_MDKEY(md->md));
- char *value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md));
+ char* key = grpc_slice_to_c_string(GRPC_MDKEY(md->md));
+ char* value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md));
gpr_log(GPR_INFO, "INPROC:%s:%s: %s: %s", is_initial ? "HDR" : "TRL",
is_client ? "CLI" : "SVR", key, value);
gpr_free(key);
@@ -195,10 +195,10 @@ static void log_metadata(const grpc_metadata_batch *md_batch, bool is_client,
}
}
-static grpc_error *fill_in_metadata(grpc_exec_ctx *exec_ctx, inproc_stream *s,
- const grpc_metadata_batch *metadata,
- uint32_t flags, grpc_metadata_batch *out_md,
- uint32_t *outflags, bool *markfilled) {
+static grpc_error* fill_in_metadata(grpc_exec_ctx* exec_ctx, inproc_stream* s,
+ const grpc_metadata_batch* metadata,
+ uint32_t flags, grpc_metadata_batch* out_md,
+ uint32_t* outflags, bool* markfilled) {
if (GRPC_TRACER_ON(grpc_inproc_trace)) {
log_metadata(metadata, s->t->is_client, outflags != NULL);
}
@@ -209,11 +209,11 @@ static grpc_error *fill_in_metadata(grpc_exec_ctx *exec_ctx, inproc_stream *s,
if (markfilled != NULL) {
*markfilled = true;
}
- grpc_error *error = GRPC_ERROR_NONE;
- for (grpc_linked_mdelem *elem = metadata->list.head;
+ grpc_error* error = GRPC_ERROR_NONE;
+ for (grpc_linked_mdelem* elem = metadata->list.head;
(elem != NULL) && (error == GRPC_ERROR_NONE); elem = elem->next) {
- grpc_linked_mdelem *nelem =
- (grpc_linked_mdelem *)gpr_arena_alloc(s->arena, sizeof(*nelem));
+ grpc_linked_mdelem* nelem =
+ (grpc_linked_mdelem*)gpr_arena_alloc(s->arena, sizeof(*nelem));
nelem->md = grpc_mdelem_from_slices(
exec_ctx, grpc_slice_intern(GRPC_MDKEY(elem->md)),
grpc_slice_intern(GRPC_MDVALUE(elem->md)));
@@ -223,12 +223,12 @@ static grpc_error *fill_in_metadata(grpc_exec_ctx *exec_ctx, inproc_stream *s,
return error;
}
-static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, grpc_stream_refcount *refcount,
- const void *server_data, gpr_arena *arena) {
+static int init_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
+ grpc_stream* gs, grpc_stream_refcount* refcount,
+ const void* server_data, gpr_arena* arena) {
INPROC_LOG(GPR_DEBUG, "init_stream %p %p %p", gt, gs, server_data);
- inproc_transport *t = (inproc_transport *)gt;
- inproc_stream *s = (inproc_stream *)gs;
+ inproc_transport* t = (inproc_transport*)gt;
+ inproc_stream* s = (inproc_stream*)gs;
s->arena = arena;
s->refs = refcount;
@@ -277,7 +277,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
if (!server_data) {
ref_transport(t);
- inproc_transport *st = t->other_side;
+ inproc_transport* st = t->other_side;
ref_transport(st);
s->other_side = NULL; // will get filled in soon
// Pass the client-side stream address to the server-side for a ref
@@ -286,10 +286,10 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
INPROC_LOG(GPR_DEBUG, "calling accept stream cb %p %p",
st->accept_stream_cb, st->accept_stream_data);
(*st->accept_stream_cb)(exec_ctx, st->accept_stream_data, &st->base,
- (void *)s);
+ (void*)s);
} else {
// This is the server-side and is being called through accept_stream_cb
- inproc_stream *cs = (inproc_stream *)server_data;
+ inproc_stream* cs = (inproc_stream*)server_data;
s->other_side = cs;
// Ref the server-side stream on behalf of the client now
ref_stream(s, "inproc_init_stream:srv");
@@ -326,15 +326,15 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
return 0; // return value is not important
}
-static void close_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s) {
+static void close_stream_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s) {
if (!s->closed) {
// Release the metadata that we would have written out
grpc_metadata_batch_destroy(exec_ctx, &s->write_buffer_initial_md);
grpc_metadata_batch_destroy(exec_ctx, &s->write_buffer_trailing_md);
if (s->listed) {
- inproc_stream *p = s->stream_list_prev;
- inproc_stream *n = s->stream_list_next;
+ inproc_stream* p = s->stream_list_prev;
+ inproc_stream* n = s->stream_list_next;
if (p != NULL) {
p->stream_list_next = n;
} else {
@@ -352,8 +352,8 @@ static void close_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s) {
}
// This function means that we are done talking/listening to the other side
-static void close_other_side_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
- const char *reason) {
+static void close_other_side_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s,
+ const char* reason) {
if (s->other_side != NULL) {
// First release the metadata that came from the other side's arena
grpc_metadata_batch_destroy(exec_ctx, &s->to_read_initial_md);
@@ -371,10 +371,10 @@ static void close_other_side_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
// this stream_op_batch is only one of the pending operations for this
// stream. This is called when one of the pending operations for the stream
// is done and about to be NULLed out
-static void complete_if_batch_end_locked(grpc_exec_ctx *exec_ctx,
- inproc_stream *s, grpc_error *error,
- grpc_transport_stream_op_batch *op,
- const char *msg) {
+static void complete_if_batch_end_locked(grpc_exec_ctx* exec_ctx,
+ inproc_stream* s, grpc_error* error,
+ grpc_transport_stream_op_batch* op,
+ const char* msg) {
int is_sm = (int)(op == s->send_message_op);
int is_stm = (int)(op == s->send_trailing_md_op);
int is_rim = (int)(op == s->recv_initial_md_op);
@@ -387,9 +387,9 @@ static void complete_if_batch_end_locked(grpc_exec_ctx *exec_ctx,
}
}
-static void maybe_schedule_op_closure_locked(grpc_exec_ctx *exec_ctx,
- inproc_stream *s,
- grpc_error *error) {
+static void maybe_schedule_op_closure_locked(grpc_exec_ctx* exec_ctx,
+ inproc_stream* s,
+ grpc_error* error) {
if (s && s->ops_needed && !s->op_closure_scheduled) {
GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_REF(error));
s->op_closure_scheduled = true;
@@ -397,8 +397,8 @@ static void maybe_schedule_op_closure_locked(grpc_exec_ctx *exec_ctx,
}
}
-static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
- grpc_error *error) {
+static void fail_helper_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s,
+ grpc_error* error) {
INPROC_LOG(GPR_DEBUG, "op_state_machine %p fail_helper", s);
// If we're failing this side, we need to make sure that
// we also send or have already sent trailing metadata
@@ -409,10 +409,10 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
grpc_metadata_batch fake_md;
grpc_metadata_batch_init(&fake_md);
- inproc_stream *other = s->other_side;
- grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md
+ inproc_stream* other = s->other_side;
+ grpc_metadata_batch* dest = (other == NULL) ? &s->write_buffer_trailing_md
: &other->to_read_trailing_md;
- bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled
+ bool* destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled
: &other->to_read_trailing_md_filled;
fill_in_metadata(exec_ctx, s, &fake_md, 0, dest, NULL, destfilled);
grpc_metadata_batch_destroy(exec_ctx, &fake_md);
@@ -427,20 +427,20 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
}
}
if (s->recv_initial_md_op) {
- grpc_error *err;
+ grpc_error* err;
if (!s->t->is_client) {
// If this is a server, provide initial metadata with a path and authority
// since it expects that as well as no error yet
grpc_metadata_batch fake_md;
grpc_metadata_batch_init(&fake_md);
- grpc_linked_mdelem *path_md =
- (grpc_linked_mdelem *)gpr_arena_alloc(s->arena, sizeof(*path_md));
+ grpc_linked_mdelem* path_md =
+ (grpc_linked_mdelem*)gpr_arena_alloc(s->arena, sizeof(*path_md));
path_md->md =
grpc_mdelem_from_slices(exec_ctx, g_fake_path_key, g_fake_path_value);
GPR_ASSERT(grpc_metadata_batch_link_tail(exec_ctx, &fake_md, path_md) ==
GRPC_ERROR_NONE);
- grpc_linked_mdelem *auth_md =
- (grpc_linked_mdelem *)gpr_arena_alloc(s->arena, sizeof(*auth_md));
+ grpc_linked_mdelem* auth_md =
+ (grpc_linked_mdelem*)gpr_arena_alloc(s->arena, sizeof(*auth_md));
auth_md->md =
grpc_mdelem_from_slices(exec_ctx, g_fake_auth_key, g_fake_auth_value);
GPR_ASSERT(grpc_metadata_batch_link_tail(exec_ctx, &fake_md, auth_md) ==
@@ -509,9 +509,9 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
GRPC_ERROR_UNREF(error);
}
-static void message_transfer_locked(grpc_exec_ctx *exec_ctx,
- inproc_stream *sender,
- inproc_stream *receiver) {
+static void message_transfer_locked(grpc_exec_ctx* exec_ctx,
+ inproc_stream* sender,
+ inproc_stream* receiver) {
size_t remaining =
sender->send_message_op->payload->send_message.send_message->length;
if (receiver->recv_inited) {
@@ -525,7 +525,7 @@ static void message_transfer_locked(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(grpc_byte_stream_next(
exec_ctx, sender->send_message_op->payload->send_message.send_message,
SIZE_MAX, &unused));
- grpc_error *error = grpc_byte_stream_pull(
+ grpc_error* error = grpc_byte_stream_pull(
exec_ctx, sender->send_message_op->payload->send_message.send_message,
&message_slice);
if (error != GRPC_ERROR_NONE) {
@@ -558,8 +558,8 @@ static void message_transfer_locked(grpc_exec_ctx *exec_ctx,
sender->send_message_op = NULL;
}
-static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
+static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
// This function gets called when we have contents in the unprocessed reads
// Get what we want based on our ops wanted
// Schedule our appropriate closures
@@ -567,17 +567,17 @@ static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
// Since this is a closure directly invoked by the combiner, it should not
// unref the error parameter explicitly; the combiner will do that implicitly
- grpc_error *new_err = GRPC_ERROR_NONE;
+ grpc_error* new_err = GRPC_ERROR_NONE;
bool needs_close = false;
INPROC_LOG(GPR_DEBUG, "op_state_machine %p", arg);
- inproc_stream *s = (inproc_stream *)arg;
- gpr_mu *mu = &s->t->mu->mu; // keep aside in case s gets closed
+ inproc_stream* s = (inproc_stream*)arg;
+ gpr_mu* mu = &s->t->mu->mu; // keep aside in case s gets closed
gpr_mu_lock(mu);
s->op_closure_scheduled = false;
// cancellation takes precedence
- inproc_stream *other = s->other_side;
+ inproc_stream* other = s->other_side;
if (s->cancel_self_error != GRPC_ERROR_NONE) {
fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(s->cancel_self_error));
@@ -612,9 +612,9 @@ static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg,
(!s->send_message_op ||
(s->t->is_client &&
(s->trailing_md_recvd || s->to_read_trailing_md_filled)))) {
- grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md
+ grpc_metadata_batch* dest = (other == NULL) ? &s->write_buffer_trailing_md
: &other->to_read_trailing_md;
- bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled
+ bool* destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled
: &other->to_read_trailing_md_filled;
if (*destfilled || s->trailing_md_sent) {
// The buffer is already in use; that's an error!
@@ -810,8 +810,8 @@ done:
GRPC_ERROR_UNREF(new_err);
}
-static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
- grpc_error *error) {
+static bool cancel_stream_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s,
+ grpc_error* error) {
bool ret = false; // was the cancel accepted
INPROC_LOG(GPR_DEBUG, "cancel_stream %p with %s", s,
grpc_error_string(error));
@@ -826,10 +826,10 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
grpc_metadata_batch cancel_md;
grpc_metadata_batch_init(&cancel_md);
- inproc_stream *other = s->other_side;
- grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md
+ inproc_stream* other = s->other_side;
+ grpc_metadata_batch* dest = (other == NULL) ? &s->write_buffer_trailing_md
: &other->to_read_trailing_md;
- bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled
+ bool* destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled
: &other->to_read_trailing_md_filled;
fill_in_metadata(exec_ctx, s, &cancel_md, 0, dest, NULL, destfilled);
grpc_metadata_batch_destroy(exec_ctx, &cancel_md);
@@ -862,12 +862,12 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s,
return ret;
}
-static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs,
- grpc_transport_stream_op_batch *op) {
+static void perform_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
+ grpc_stream* gs,
+ grpc_transport_stream_op_batch* op) {
INPROC_LOG(GPR_DEBUG, "perform_stream_op %p %p %p", gt, gs, op);
- inproc_stream *s = (inproc_stream *)gs;
- gpr_mu *mu = &s->t->mu->mu; // save aside in case s gets closed
+ inproc_stream* s = (inproc_stream*)gs;
+ gpr_mu* mu = &s->t->mu->mu; // save aside in case s gets closed
gpr_mu_lock(mu);
if (GRPC_TRACER_ON(grpc_inproc_trace)) {
@@ -880,8 +880,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->t->is_client, false);
}
}
- grpc_error *error = GRPC_ERROR_NONE;
- grpc_closure *on_complete = op->on_complete;
+ grpc_error* error = GRPC_ERROR_NONE;
+ grpc_closure* on_complete = op->on_complete;
if (on_complete == NULL) {
on_complete = &do_nothing_closure;
}
@@ -907,18 +907,18 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
bool needs_close = false;
- inproc_stream *other = s->other_side;
+ inproc_stream* other = s->other_side;
if (error == GRPC_ERROR_NONE &&
(op->send_initial_metadata || op->send_trailing_metadata)) {
if (s->t->is_closed) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
}
if (error == GRPC_ERROR_NONE && op->send_initial_metadata) {
- grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_initial_md
+ grpc_metadata_batch* dest = (other == NULL) ? &s->write_buffer_initial_md
: &other->to_read_initial_md;
- uint32_t *destflags = (other == NULL) ? &s->write_buffer_initial_md_flags
+ uint32_t* destflags = (other == NULL) ? &s->write_buffer_initial_md_flags
: &other->to_read_initial_md_flags;
- bool *destfilled = (other == NULL) ? &s->write_buffer_initial_md_filled
+ bool* destfilled = (other == NULL) ? &s->write_buffer_initial_md_filled
: &other->to_read_initial_md_filled;
if (*destfilled || s->initial_md_sent) {
// The buffer is already in use; that's an error!
@@ -933,7 +933,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
dest, destflags, destfilled);
}
if (s->t->is_client) {
- grpc_millis *dl =
+ grpc_millis* dl =
(other == NULL) ? &s->write_buffer_deadline : &other->deadline;
*dl = GPR_MIN(*dl, op->payload->send_initial_metadata
.send_initial_metadata->deadline);
@@ -972,8 +972,9 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
// 4. We want to receive a message and there is a message ready
// 5. There is trailing metadata, even if nothing specifically wants
// that because that can shut down the receive message as well
- if ((op->send_message && other && ((other->recv_message_op != NULL) ||
- (other->recv_trailing_md_op != NULL))) ||
+ if ((op->send_message && other &&
+ ((other->recv_message_op != NULL) ||
+ (other->recv_trailing_md_op != NULL))) ||
(op->send_trailing_metadata && !op->send_message) ||
(op->recv_initial_metadata && s->to_read_initial_md_filled) ||
(op->recv_message && other && (other->send_message_op != NULL)) ||
@@ -1020,8 +1021,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
GRPC_ERROR_UNREF(error);
}
-static void close_transport_locked(grpc_exec_ctx *exec_ctx,
- inproc_transport *t) {
+static void close_transport_locked(grpc_exec_ctx* exec_ctx,
+ inproc_transport* t) {
INPROC_LOG(GPR_DEBUG, "close_transport %p %d", t, t->is_closed);
grpc_connectivity_state_set(
exec_ctx, &t->connectivity, GRPC_CHANNEL_SHUTDOWN,
@@ -1041,9 +1042,9 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
}
}
-static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_transport_op *op) {
- inproc_transport *t = (inproc_transport *)gt;
+static void perform_transport_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
+ grpc_transport_op* op) {
+ inproc_transport* t = (inproc_transport*)gt;
INPROC_LOG(GPR_DEBUG, "perform_transport_op %p %p", t, op);
gpr_mu_lock(&t->mu->mu);
if (op->on_connectivity_state_change) {
@@ -1075,17 +1076,17 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
gpr_mu_unlock(&t->mu->mu);
}
-static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs,
- grpc_closure *then_schedule_closure) {
+static void destroy_stream(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
+ grpc_stream* gs,
+ grpc_closure* then_schedule_closure) {
INPROC_LOG(GPR_DEBUG, "destroy_stream %p %p", gs, then_schedule_closure);
- inproc_stream *s = (inproc_stream *)gs;
+ inproc_stream* s = (inproc_stream*)gs;
s->closure_at_destroy = then_schedule_closure;
really_destroy_stream(exec_ctx, s);
}
-static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
- inproc_transport *t = (inproc_transport *)gt;
+static void destroy_transport(grpc_exec_ctx* exec_ctx, grpc_transport* gt) {
+ inproc_transport* t = (inproc_transport*)gt;
INPROC_LOG(GPR_DEBUG, "destroy_transport %p", t);
gpr_mu_lock(&t->mu->mu);
close_transport_locked(exec_ctx, t);
@@ -1098,24 +1099,24 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
* INTEGRATION GLUE
*/
-static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, grpc_pollset *pollset) {
+static void set_pollset(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
+ grpc_stream* gs, grpc_pollset* pollset) {
// Nothing to do here
}
-static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, grpc_pollset_set *pollset_set) {
+static void set_pollset_set(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
+ grpc_stream* gs, grpc_pollset_set* pollset_set) {
// Nothing to do here
}
-static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *t) {
+static grpc_endpoint* get_endpoint(grpc_exec_ctx* exec_ctx, grpc_transport* t) {
return NULL;
}
/*******************************************************************************
* GLOBAL INIT AND DESTROY
*/
-static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
+static void do_nothing(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {}
void grpc_inproc_transport_init(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -1146,16 +1147,16 @@ static const grpc_transport_vtable inproc_vtable = {
/*******************************************************************************
* Main inproc transport functions
*/
-static void inproc_transports_create(grpc_exec_ctx *exec_ctx,
- grpc_transport **server_transport,
- const grpc_channel_args *server_args,
- grpc_transport **client_transport,
- const grpc_channel_args *client_args) {
+static void inproc_transports_create(grpc_exec_ctx* exec_ctx,
+ grpc_transport** server_transport,
+ const grpc_channel_args* server_args,
+ grpc_transport** client_transport,
+ const grpc_channel_args* client_args) {
INPROC_LOG(GPR_DEBUG, "inproc_transports_create");
- inproc_transport *st = (inproc_transport *)gpr_zalloc(sizeof(*st));
- inproc_transport *ct = (inproc_transport *)gpr_zalloc(sizeof(*ct));
+ inproc_transport* st = (inproc_transport*)gpr_zalloc(sizeof(*st));
+ inproc_transport* ct = (inproc_transport*)gpr_zalloc(sizeof(*ct));
// Share one lock between both sides since both sides get affected
- st->mu = ct->mu = (shared_mu *)gpr_malloc(sizeof(*st->mu));
+ st->mu = ct->mu = (shared_mu*)gpr_malloc(sizeof(*st->mu));
gpr_mu_init(&st->mu->mu);
gpr_ref_init(&st->mu->refs, 2);
st->base.vtable = &inproc_vtable;
@@ -1174,37 +1175,37 @@ static void inproc_transports_create(grpc_exec_ctx *exec_ctx,
ct->other_side = st;
st->stream_list = NULL;
ct->stream_list = NULL;
- *server_transport = (grpc_transport *)st;
- *client_transport = (grpc_transport *)ct;
+ *server_transport = (grpc_transport*)st;
+ *client_transport = (grpc_transport*)ct;
}
-grpc_channel *grpc_inproc_channel_create(grpc_server *server,
- grpc_channel_args *args,
- void *reserved) {
+grpc_channel* grpc_inproc_channel_create(grpc_server* server,
+ grpc_channel_args* args,
+ void* reserved) {
GRPC_API_TRACE("grpc_inproc_channel_create(server=%p, args=%p)", 2,
(server, args));
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- const grpc_channel_args *server_args = grpc_server_get_channel_args(server);
+ const grpc_channel_args* server_args = grpc_server_get_channel_args(server);
// Add a default authority channel argument for the client
grpc_arg default_authority_arg;
default_authority_arg.type = GRPC_ARG_STRING;
- default_authority_arg.key = (char *)GRPC_ARG_DEFAULT_AUTHORITY;
- default_authority_arg.value.string = (char *)"inproc.authority";
- grpc_channel_args *client_args =
+ default_authority_arg.key = (char*)GRPC_ARG_DEFAULT_AUTHORITY;
+ default_authority_arg.value.string = (char*)"inproc.authority";
+ grpc_channel_args* client_args =
grpc_channel_args_copy_and_add(args, &default_authority_arg, 1);
- grpc_transport *server_transport;
- grpc_transport *client_transport;
+ grpc_transport* server_transport;
+ grpc_transport* client_transport;
inproc_transports_create(&exec_ctx, &server_transport, server_args,
&client_transport, client_args);
grpc_server_setup_transport(&exec_ctx, server, server_transport, NULL,
server_args);
- grpc_channel *channel =
+ grpc_channel* channel =
grpc_channel_create(&exec_ctx, "inproc", client_args,
GRPC_CLIENT_DIRECT_CHANNEL, client_transport);