aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/surface/call.c36
-rw-r--r--src/core/surface/channel.c4
-rw-r--r--src/core/transport/chttp2_transport.c5
-rw-r--r--src/core/transport/stream_op.c14
-rw-r--r--src/core/transport/stream_op.h6
-rw-r--r--src/core/transport/transport.c16
-rw-r--r--src/core/transport/transport_op_string.c115
7 files changed, 78 insertions, 118 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 7fcf6e2b04..18be81308d 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -283,6 +283,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
call->send_deadline = send_deadline;
grpc_channel_internal_ref(channel);
call->metadata_context = grpc_channel_get_metadata_context(channel);
+ grpc_sopb_init(&call->send_ops);
+ grpc_sopb_init(&call->recv_ops);
/* one ref is dropped in response to destroy, the other in
stream_closed */
gpr_ref_init(&call->internal_refcount, 2);
@@ -330,6 +332,8 @@ static void destroy_call(void *call, int ignored_success) {
destroy_legacy_state(c->legacy_state);
}
grpc_bbq_destroy(&c->incoming_queue);
+ grpc_sopb_destroy(&c->send_ops);
+ grpc_sopb_destroy(&c->recv_ops);
gpr_free(c);
}
@@ -1091,41 +1095,9 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
grpc_mdctx_unlock(mdctx);
}
-#if 0
-void grpc_call_read_closed(grpc_call_element *elem) {
- set_read_state(CALL_FROM_TOP_ELEM(elem), READ_STATE_READ_CLOSED);
-}
-
-void grpc_call_stream_closed(grpc_call_element *elem) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- set_read_state(call, READ_STATE_STREAM_CLOSED);
- grpc_call_internal_unref(call, 0);
-}
-
-void grpc_call_recv_message(grpc_call_element *elem,
- grpc_byte_buffer *byte_buffer) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- lock(call);
- grpc_bbq_push(&call->incoming_queue, byte_buffer);
- finish_read_ops(call);
- unlock(call);
-}
-
-void grpc_call_recv_synthetic_status(grpc_call_element *elem,
- grpc_status_code status,
- const char *message) {
- grpc_call *call = CALL_FROM_TOP_ELEM(elem);
- lock(call);
- set_status_code(call, STATUS_FROM_CORE, status);
- set_status_details(call, STATUS_FROM_CORE,
- grpc_mdstr_from_string(call->metadata_context, message));
- unlock(call);
-}
-
grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
return CALL_STACK_FROM_CALL(call);
}
-#endif
/*
* BATCH API IMPLEMENTATION
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index f1d71afaf2..de2f354c78 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -242,3 +242,7 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) {
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) {
return channel->grpc_message_string;
}
+
+gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel) {
+ return channel->max_message_length;
+} \ No newline at end of file
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index fed3088789..9c2af560c1 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -1503,8 +1503,6 @@ static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
return window + window_update < MAX_WINDOW;
}
-static void free_md(void *p, grpc_op_error result) { gpr_free(p); }
-
static void add_metadata_batch(transport *t, stream *s) {
grpc_metadata_batch b;
size_t i;
@@ -1522,8 +1520,7 @@ static void add_metadata_batch(transport *t, stream *s) {
s->incoming_metadata[s->incoming_metadata_count - 1].next = NULL;
grpc_sopb_add_metadata(&s->parser.incoming_sopb, b);
- grpc_sopb_add_flow_ctl_cb(&s->parser.incoming_sopb, free_md,
- s->incoming_metadata);
+ /* TODO(ctiller): don't leak incoming_metadata */
/* reset */
s->incoming_deadline = gpr_inf_future;
diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c
index 882c078d51..ea22b0e1c8 100644
--- a/src/core/transport/stream_op.c
+++ b/src/core/transport/stream_op.c
@@ -81,9 +81,6 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops) {
case GRPC_OP_METADATA:
grpc_metadata_batch_destroy(&ops[i].data.metadata);
break;
- case GRPC_OP_FLOW_CTL_CB:
- ops[i].data.flow_ctl_cb.cb(ops[i].data.flow_ctl_cb.arg, GRPC_OP_ERROR);
- break;
case GRPC_NO_OP:
case GRPC_OP_BEGIN_MESSAGE:
break;
@@ -119,6 +116,7 @@ static grpc_stream_op *add(grpc_stream_op_buffer *sopb) {
assert_contained_metadata_ok(sopb->ops, sopb->nops);
+ GPR_ASSERT(sopb->nops <= sopb->capacity);
if (sopb->nops == sopb->capacity) {
expandto(sopb, GROW(sopb->capacity));
}
@@ -158,16 +156,6 @@ void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice) {
assert_contained_metadata_ok(sopb->ops, sopb->nops);
}
-void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
- void (*cb)(void *arg, grpc_op_error error),
- void *arg) {
- grpc_stream_op *op = add(sopb);
- op->type = GRPC_OP_FLOW_CTL_CB;
- op->data.flow_ctl_cb.cb = cb;
- op->data.flow_ctl_cb.arg = arg;
- assert_contained_metadata_ok(sopb->ops, sopb->nops);
-}
-
void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
size_t nops) {
size_t orig_nops = sopb->nops;
diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h
index c3901bf608..f5de64d583 100644
--- a/src/core/transport/stream_op.h
+++ b/src/core/transport/stream_op.h
@@ -154,12 +154,10 @@ void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch metadata);
/* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */
void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice);
-/* Append a GRPC_OP_FLOW_CTL_CB to a buffer */
-void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
- void (*cb)(void *arg, grpc_op_error error),
- void *arg);
/* Append a buffer to a buffer - does not ref/unref any internal objects */
void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
size_t nops);
+char *grpc_sopb_string(grpc_stream_op_buffer *sopb);
+
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_STREAM_OP_H */
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index ef0020dc58..35195348e7 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -56,14 +56,9 @@ int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream,
return transport->vtable->init_stream(transport, stream, server_data);
}
-void grpc_transport_send_batch(grpc_transport *transport, grpc_stream *stream,
- grpc_stream_op *ops, size_t nops, int is_last) {
- transport->vtable->send_batch(transport, stream, ops, nops, is_last);
-}
-
-void grpc_transport_set_allow_window_updates(grpc_transport *transport,
- grpc_stream *stream, int allow) {
- transport->vtable->set_allow_window_updates(transport, stream, allow);
+void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream,
+ grpc_transport_op *op) {
+ transport->vtable->perform_op(transport, stream, op);
}
void grpc_transport_add_to_pollset(grpc_transport *transport,
@@ -76,11 +71,6 @@ void grpc_transport_destroy_stream(grpc_transport *transport,
transport->vtable->destroy_stream(transport, stream);
}
-void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream,
- grpc_status_code status) {
- transport->vtable->abort_stream(transport, stream, status);
-}
-
void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data),
void *user_data) {
transport->vtable->ping(transport, cb, user_data);
diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c
index 5f7e1be268..e886690234 100644
--- a/src/core/transport/transport_op_string.c
+++ b/src/core/transport/transport_op_string.c
@@ -66,65 +66,76 @@ static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) {
}
}
-char *grpc_call_op_string(grpc_call_op *op) {
+char *grpc_sopb_string(grpc_stream_op_buffer *sopb) {
+ char *out;
+ char *tmp;
+ size_t i;
+ gpr_strvec b;
+ gpr_strvec_init(&b);
+
+ for (i = 0; i < sopb->nops; i++) {
+ grpc_stream_op *op = &sopb->ops[i];
+ if (i) gpr_strvec_add(&b, gpr_strdup(", "));
+ switch (op->type) {
+ case GRPC_NO_OP:
+ gpr_strvec_add(&b, gpr_strdup("NO_OP"));
+ break;
+ case GRPC_OP_BEGIN_MESSAGE:
+ gpr_asprintf(&tmp, "BEGIN_MESSAGE:%d", op->data.begin_message.length);
+ gpr_strvec_add(&b, tmp);
+ break;
+ case GRPC_OP_SLICE:
+ gpr_asprintf(&tmp, "SLICE:%d", GPR_SLICE_LENGTH(op->data.slice));
+ break;
+ case GRPC_OP_METADATA:
+ put_metadata_list(&b, op->data.metadata);
+ break;
+ }
+ }
+
+ out = gpr_strvec_flatten(&b, NULL);
+ gpr_strvec_destroy(&b);
+
+ return out;
+}
+
+char *grpc_transport_op_string(grpc_transport_op *op) {
char *tmp;
char *out;
+ int first = 1;
gpr_strvec b;
gpr_strvec_init(&b);
- switch (op->dir) {
- case GRPC_CALL_DOWN:
- gpr_strvec_add(&b, gpr_strdup(">"));
- break;
- case GRPC_CALL_UP:
- gpr_strvec_add(&b, gpr_strdup("<"));
- break;
+ if (op->send_ops) {
+ if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+ first = 0;
+ gpr_strvec_add(&b, gpr_strdup("SEND"));
+ if (op->is_last_send) {
+ gpr_strvec_add(&b, gpr_strdup("_LAST"));
+ }
+ gpr_strvec_add(&b, gpr_strdup("["));
+ gpr_strvec_add(&b, grpc_sopb_string(op->send_ops));
+ gpr_strvec_add(&b, gpr_strdup("]"));
}
- switch (op->type) {
- case GRPC_SEND_METADATA:
- gpr_strvec_add(&b, gpr_strdup("SEND_METADATA"));
- put_metadata_list(&b, op->data.metadata);
- break;
- case GRPC_SEND_MESSAGE:
- gpr_strvec_add(&b, gpr_strdup("SEND_MESSAGE"));
- break;
- case GRPC_SEND_PREFORMATTED_MESSAGE:
- gpr_strvec_add(&b, gpr_strdup("SEND_PREFORMATTED_MESSAGE"));
- break;
- case GRPC_SEND_FINISH:
- gpr_strvec_add(&b, gpr_strdup("SEND_FINISH"));
- break;
- case GRPC_REQUEST_DATA:
- gpr_strvec_add(&b, gpr_strdup("REQUEST_DATA"));
- break;
- case GRPC_RECV_METADATA:
- gpr_strvec_add(&b, gpr_strdup("RECV_METADATA"));
- put_metadata_list(&b, op->data.metadata);
- break;
- case GRPC_RECV_MESSAGE:
- gpr_strvec_add(&b, gpr_strdup("RECV_MESSAGE"));
- break;
- case GRPC_RECV_HALF_CLOSE:
- gpr_strvec_add(&b, gpr_strdup("RECV_HALF_CLOSE"));
- break;
- case GRPC_RECV_FINISH:
- gpr_strvec_add(&b, gpr_strdup("RECV_FINISH"));
- break;
- case GRPC_RECV_SYNTHETIC_STATUS:
- gpr_asprintf(&tmp, "RECV_SYNTHETIC_STATUS status=%d message='%s'",
- op->data.synthetic_status.status,
- op->data.synthetic_status.message);
- gpr_strvec_add(&b, tmp);
- break;
- case GRPC_CANCEL_OP:
- gpr_strvec_add(&b, gpr_strdup("CANCEL_OP"));
- break;
+
+ if (op->recv_ops) {
+ if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+ first = 0;
+ gpr_strvec_add(&b, gpr_strdup("RECV"));
}
- gpr_asprintf(&tmp, " flags=0x%08x", op->flags);
- gpr_strvec_add(&b, tmp);
+
if (op->bind_pollset) {
- gpr_strvec_add(&b, gpr_strdup("bind_pollset"));
+ if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+ first = 0;
+ gpr_strvec_add(&b, gpr_strdup("BIND"));
+ }
+
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
+ first = 0;
+ gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status);
+ gpr_strvec_add(&b, tmp);
}
out = gpr_strvec_flatten(&b, NULL);
@@ -134,8 +145,8 @@ char *grpc_call_op_string(grpc_call_op *op) {
}
void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
- grpc_call_element *elem, grpc_call_op *op) {
- char *str = grpc_call_op_string(op);
+ grpc_call_element *elem, grpc_transport_op *op) {
+ char *str = grpc_transport_op_string(op);
gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str);
gpr_free(str);
}