aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport
diff options
context:
space:
mode:
authorGravatar Makarand Dharmapurikar <makarandd@google.com>2016-08-11 14:16:36 -0700
committerGravatar Makarand Dharmapurikar <makarandd@google.com>2016-08-11 14:16:36 -0700
commit280885eaa5e48912fe623dcbad8ad398d8eb405d (patch)
tree3e2c67ae8dfa1985369f2abd67d4e68640da05e4 /src/core/ext/transport
parentf66a37b63aed77732c8d7d253af89296a23bb72e (diff)
WIP
Diffstat (limited to 'src/core/ext/transport')
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c177
1 files changed, 128 insertions, 49 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 1d75603980..8f11ef7379 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -50,16 +50,13 @@
#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h"
#define GRPC_HEADER_SIZE_IN_BYTES 5
-// maximum ops in a batch. There is not much thinking behind this limit, except
-// that it seems to be enough for most use cases.
-#define MAX_PENDING_OPS 100
#define CRONET_LOG(...) \
{ \
if (grpc_cronet_trace) gpr_log(__VA_ARGS__); \
}
-// TODO (makdharma): Hook up into the wider tracing mechanism
+/* TODO (makdharma): Hook up into the wider tracing mechanism */
int grpc_cronet_trace = 1;
enum OP_RESULT {
@@ -68,7 +65,7 @@ enum OP_RESULT {
NO_ACTION_POSSIBLE
};
-// Used for printing debug
+/* Used for printing debug */
const char *op_result_string[] = {"ACTION_TAKEN_WITH_CALLBACK",
"ACTION_TAKEN_NO_CALLBACK",
"NO_ACTION_POSSIBLE"};
@@ -129,7 +126,7 @@ static cronet_bidirectional_stream_callback cronet_callbacks = {
on_failed,
on_canceled};
-// Cronet transport object
+/* Cronet transport object */
struct grpc_cronet_transport {
grpc_transport base; /* must be first element in this structure */
cronet_engine *engine;
@@ -146,6 +143,7 @@ struct read_state {
int length_field;
char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES];
char *payload_field;
+ bool read_stream_closed;
/* vars for holding data destined for the application */
struct grpc_slice_buffer_stream sbs;
@@ -177,14 +175,13 @@ struct op_and_state {
grpc_transport_stream_op op;
struct op_state state;
bool done;
- struct stream_obj *s; /* Pointer back to the stream object */
+ struct stream_obj *s; /* Pointer back to the stream object */
+ struct op_and_state *next; /* next op_and_state in the linked list */
};
struct op_storage {
- struct op_and_state pending_ops[MAX_PENDING_OPS];
- int wrptr;
- int rdptr;
int num_pending_ops;
+ struct op_and_state *head;
};
struct stream_obj {
@@ -217,21 +214,53 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas);
static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) {
gpr_mu_lock(&s->mu);
struct op_storage *storage = &s->storage;
- GPR_ASSERT(storage->num_pending_ops < MAX_PENDING_OPS);
+ /* add new op at the beginning of the linked list. The memory is freed
+ in remove_from_storage */
+ struct op_and_state *new_op = gpr_malloc(sizeof(struct op_and_state));
+ memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op));
+ memset(&new_op->state, 0, sizeof(new_op->state));
+ new_op->s = s;
+ new_op->done = false;
+ new_op->next = storage->head;
+ storage->head = new_op;
storage->num_pending_ops++;
- CRONET_LOG(GPR_DEBUG, "adding new op @wrptr=%d. %d in the queue.",
- storage->wrptr, storage->num_pending_ops);
- memcpy(&storage->pending_ops[storage->wrptr].op, op,
- sizeof(grpc_transport_stream_op));
- memset(&storage->pending_ops[storage->wrptr].state, 0,
- sizeof(storage->pending_ops[storage->wrptr].state));
- storage->pending_ops[storage->wrptr].done = false;
- storage->pending_ops[storage->wrptr].s = s;
- storage->wrptr = (storage->wrptr + 1) % MAX_PENDING_OPS;
+ CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
+ storage->num_pending_ops);
gpr_mu_unlock(&s->mu);
}
/*
+ Traverse the linked list and delete op and free memory
+*/
+static void remove_from_storage(struct stream_obj *s,
+ struct op_and_state *oas) {
+ struct op_and_state *curr;
+ if (s->storage.head == NULL || oas == NULL) {
+ return;
+ }
+ if (s->storage.head == oas) {
+ s->storage.head = oas->next;
+ gpr_free(oas);
+ s->storage.num_pending_ops--;
+ CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
+ s->storage.num_pending_ops);
+ } else {
+ for (curr = s->storage.head; curr != NULL; curr = curr->next) {
+ if (curr->next == oas) {
+ curr->next = oas->next;
+ s->storage.num_pending_ops--;
+ CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
+ s->storage.num_pending_ops);
+ gpr_free(oas);
+ break;
+ } else if (curr->next == NULL) {
+ CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
+ }
+ }
+ }
+}
+
+/*
Cycle through ops and try to take next action. Break when either
an action with callback is taken, or no action is possible.
This can be executed from the Cronet network thread via cronet callback
@@ -239,18 +268,21 @@ static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) {
*/
static void execute_from_storage(stream_obj *s) {
gpr_mu_lock(&s->mu);
- for (int i = 0; i < s->storage.wrptr;) {
- CRONET_LOG(GPR_DEBUG, "calling execute_stream_op[%d]. done = %d", i,
- s->storage.pending_ops[i].done);
- if (s->storage.pending_ops[i].done) {
- i++;
- continue;
+ for (struct op_and_state *curr = s->storage.head; curr != NULL;) {
+ CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
+ GPR_ASSERT(curr->done == 0);
+ enum OP_RESULT result = execute_stream_op(curr);
+ CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
+ op_result_string[result]);
+ /* if this op is done, then remove it and free memory */
+ if (curr->done) {
+ struct op_and_state *next = curr->next;
+ remove_from_storage(s, curr);
+ curr = next;
}
- enum OP_RESULT result = execute_stream_op(&s->storage.pending_ops[i]);
- CRONET_LOG(GPR_DEBUG, "%s = execute_stream_op[%d]",
- op_result_string[result], i);
+ /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */
if (result == NO_ACTION_POSSIBLE) {
- i++;
+ curr = curr->next;
} else if (result == ACTION_TAKEN_WITH_CALLBACK) {
break;
}
@@ -268,6 +300,14 @@ static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
cronet_bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_FAILED] = true;
s->cbs = NULL;
+ if (s->header_array.headers) {
+ gpr_free(s->header_array.headers);
+ s->header_array.headers = NULL;
+ }
+ if (s->state.ws.write_buffer) {
+ gpr_free(s->state.ws.write_buffer);
+ s->state.ws.write_buffer = NULL;
+ }
execute_from_storage(s);
}
@@ -280,6 +320,14 @@ static void on_canceled(cronet_bidirectional_stream *stream) {
cronet_bidirectional_stream_destroy(s->cbs);
s->state.state_callback_received[OP_CANCELED] = true;
s->cbs = NULL;
+ if (s->header_array.headers) {
+ gpr_free(s->header_array.headers);
+ s->header_array.headers = NULL;
+ }
+ if (s->state.ws.write_buffer) {
+ gpr_free(s->state.ws.write_buffer);
+ s->state.ws.write_buffer = NULL;
+ }
execute_from_storage(s);
}
@@ -306,6 +354,7 @@ static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
/* Free the memory allocated for headers */
if (s->header_array.headers) {
gpr_free(s->header_array.headers);
+ s->header_array.headers = NULL;
}
execute_from_storage(s);
}
@@ -358,6 +407,7 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
stream_obj *s = (stream_obj *)stream->annotation;
CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
count);
+ s->state.state_callback_received[OP_RECV_MESSAGE] = true;
if (count > 0) {
s->state.rs.received_bytes += count;
s->state.rs.remaining_bytes -= count;
@@ -370,7 +420,9 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
} else {
execute_from_storage(s);
}
- s->state.state_callback_received[OP_RECV_MESSAGE] = true;
+ } else {
+ s->state.rs.read_stream_closed = true;
+ execute_from_storage(s);
}
}
@@ -570,38 +622,51 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op,
} else if (op_id == OP_ON_COMPLETE) {
/* already executed (note we're checking op specific state, not stream
state) */
- if (op_state->state_op_done[OP_ON_COMPLETE]) result = false;
+ if (op_state->state_op_done[OP_ON_COMPLETE]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ }
/* Check if every op that was asked for is done. */
else if (curr_op->send_initial_metadata &&
- !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
+ !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
result = false;
- else if (curr_op->send_message &&
- !stream_state->state_op_done[OP_SEND_MESSAGE])
+ } else if (curr_op->send_message &&
+ !op_state->state_op_done[OP_SEND_MESSAGE]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
result = false;
- else if (curr_op->send_message &&
- !stream_state->state_callback_received[OP_SEND_MESSAGE])
+ } else if (curr_op->send_message &&
+ !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
result = false;
- else if (curr_op->send_trailing_metadata &&
- !stream_state->state_op_done[OP_SEND_TRAILING_METADATA])
+ } else if (curr_op->send_trailing_metadata &&
+ !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
result = false;
- else if (curr_op->recv_initial_metadata &&
- !stream_state->state_op_done[OP_RECV_INITIAL_METADATA])
+ } else if (curr_op->recv_initial_metadata &&
+ !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
result = false;
- else if (curr_op->recv_message &&
- !stream_state->state_op_done[OP_RECV_MESSAGE])
+ } else if (curr_op->recv_message &&
+ !stream_state->state_op_done[OP_RECV_MESSAGE]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
result = false;
- else if (curr_op->recv_trailing_metadata) {
+ } else if (curr_op->recv_trailing_metadata) {
/* We aren't done with trailing metadata yet */
- if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
+ if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
result = false;
+ }
/* We've asked for actual message in an earlier op, and it hasn't been
delivered yet. */
else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
/* If this op is not the one asking for read, (which means some earlier
op has asked), and the read hasn't been delivered. */
if (!curr_op->recv_message &&
- !stream_state->state_op_done[OP_SUCCEEDED])
+ !stream_state->state_callback_received[OP_SUCCEEDED]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
result = false;
+ }
}
}
/* We should see at least one on_write_completed for the trailers that we
@@ -625,6 +690,7 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) {
OP_SEND_INITIAL_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
/* This OP is the beginning. Reset various states */
+ memset(&s->header_array, 0, sizeof(s->header_array));
memset(&stream_state->rs, 0, sizeof(stream_state->rs));
memset(&stream_state->ws, 0, sizeof(stream_state->ws));
memset(stream_state->state_op_done, 0, sizeof(stream_state->state_op_done));
@@ -637,6 +703,7 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) {
s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs,
&cronet_callbacks);
char *url;
+ s->header_array.headers = NULL;
convert_metadata_to_cronet_headers(
stream_op->send_initial_metadata->list.head, s->curr_ct.host, &url,
&s->header_array.headers, &s->header_array.count);
@@ -696,6 +763,13 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) {
grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_message_ready,
GRPC_ERROR_CANCELLED, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
+ } else if (stream_state->rs.read_stream_closed == true) {
+ /* No more data will be received */
+ CRONET_LOG(GPR_DEBUG, "read stream closed");
+ grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_message_ready,
+ GRPC_ERROR_NONE, NULL);
+ stream_state->state_op_done[OP_RECV_MESSAGE] = true;
+ oas->state.state_op_done[OP_RECV_MESSAGE] = true;
} else if (stream_state->rs.length_field_received == false) {
if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
stream_state->rs.remaining_bytes == 0) {
@@ -808,9 +882,12 @@ static enum OP_RESULT execute_stream_op(struct op_and_state *oas) {
NULL);
oas->state.state_op_done[OP_ON_COMPLETE] = true;
oas->done = true;
- /* reset any send or receive message state. */
- stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
- stream_state->state_op_done[OP_SEND_MESSAGE] = false;
+ /* reset any send message state, only if this ON_COMPLETE is about a send.
+ */
+ if (stream_op->send_message) {
+ stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
+ stream_state->state_op_done[OP_SEND_MESSAGE] = false;
+ }
result = ACTION_TAKEN_NO_CALLBACK;
/* If this is the on_complete callback being called for a received message -
make a note */
@@ -831,9 +908,11 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
const void *server_data) {
stream_obj *s = (stream_obj *)gs;
memset(&s->storage, 0, sizeof(s->storage));
+ s->storage.head = NULL;
memset(&s->state, 0, sizeof(s->state));
s->curr_op = NULL;
s->cbs = NULL;
+ memset(&s->header_array, 0, sizeof(s->header_array));
memset(&s->state.rs, 0, sizeof(s->state.rs));
memset(&s->state.ws, 0, sizeof(s->state.ws));
memset(s->state.state_op_done, 0, sizeof(s->state.state_op_done));