diff options
author | 2017-04-12 15:16:35 -0700 | |
---|---|---|
committer | 2017-04-12 15:16:35 -0700 | |
commit | b3d308b9a11496722754f4dc23ebb82a9f0e63d2 (patch) | |
tree | d5c4d27375d08a91fdb71b6e84a2bb4558e28b52 /src/core/lib | |
parent | c20fa90c6094599540e3a7ef16a5aa381580be33 (diff) | |
parent | 1b76bda4a61a0ed65d5a5de7a6f3363a47871e50 (diff) |
Merge github.com:grpc/grpc into cpparena
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/channel/compress_filter.c | 14 | ||||
-rw-r--r-- | src/core/lib/channel/http_client_filter.c | 14 | ||||
-rw-r--r-- | src/core/lib/channel/message_size_filter.c | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address_uv.c | 3 | ||||
-rw-r--r-- | src/core/lib/surface/call.c | 44 | ||||
-rw-r--r-- | src/core/lib/transport/byte_stream.c | 32 | ||||
-rw-r--r-- | src/core/lib/transport/byte_stream.h | 21 |
7 files changed, 102 insertions, 30 deletions
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index 4625cba0d2..764524b24d 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -221,6 +221,13 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; + if (GRPC_ERROR_NONE != + grpc_byte_stream_pull(exec_ctx, + calld->send_op->payload->send_message.send_message, + &calld->incoming_slice)) { + /* Should never reach here */ + abort(); + } grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { finish_send_message(exec_ctx, elem); @@ -233,8 +240,11 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { call_data *calld = elem->call_data; while (grpc_byte_stream_next( - exec_ctx, calld->send_op->payload->send_message.send_message, - &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) { + exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0, + &calld->got_slice)) { + grpc_byte_stream_pull(exec_ctx, + calld->send_op->payload->send_message.send_message, + &calld->incoming_slice); grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { finish_send_message(exec_ctx, elem); diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index 4e47c5c658..151fb9885d 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -220,8 +220,11 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx, call_data *calld = elem->call_data; uint8_t *wrptr = calld->payload_bytes; while (grpc_byte_stream_next( - exec_ctx, calld->send_op->payload->send_message.send_message, - &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) { + exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0, + &calld->got_slice)) { + grpc_byte_stream_pull(exec_ctx, + calld->send_op->payload->send_message.send_message, + &calld->incoming_slice); memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice), GRPC_SLICE_LENGTH(calld->incoming_slice)); wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice); @@ -237,6 +240,13 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { grpc_call_element *elem = elemp; call_data *calld = elem->call_data; calld->send_message_blocked = false; + if (GRPC_ERROR_NONE != + grpc_byte_stream_pull(exec_ctx, + calld->send_op->payload->send_message.send_message, + &calld->incoming_slice)) { + /* Should never reach here */ + abort(); + } grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); if (calld->send_length == calld->slices.length) { /* Pass down the original send_message op that was blocked.*/ diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index 57726c8476..c12fabb37c 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -218,14 +218,14 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, if (strcmp(args->channel_args->args[i].key, GRPC_ARG_MAX_SEND_MESSAGE_LENGTH) == 0) { const grpc_integer_options options = { - GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH, 0, INT_MAX}; + GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH, -1, INT_MAX}; chand->max_send_size = grpc_channel_arg_get_integer(&args->channel_args->args[i], options); } if (strcmp(args->channel_args->args[i].key, GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH) == 0) { const grpc_integer_options options = { - GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH, 0, INT_MAX}; + GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH, -1, INT_MAX}; chand->max_recv_size = grpc_channel_arg_get_integer(&args->channel_args->args[i], options); } diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c index 102d1aa290..6b468764fc 100644 --- a/src/core/lib/iomgr/resolve_address_uv.c +++ b/src/core/lib/iomgr/resolve_address_uv.c @@ -69,8 +69,9 @@ static int retry_named_port_failure(int status, request *r, int retry_status; uv_getaddrinfo_t *req = gpr_malloc(sizeof(uv_getaddrinfo_t)); req->data = r; + r->port = svc[i][1]; retry_status = uv_getaddrinfo(uv_default_loop(), req, getaddrinfo_cb, - r->host, svc[i][1], r->hints); + r->host, r->port, r->hints); if (retry_status < 0 || getaddrinfo_cb == NULL) { // The callback will not be called gpr_free(req); diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 6a8ae03a21..ef159a4ca4 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1197,6 +1197,7 @@ static void finish_batch_step(grpc_exec_ctx *exec_ctx, batch_control *bctl) { static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, batch_control *bctl) { + grpc_error *error; grpc_call *call = bctl->call; for (;;) { size_t remaining = call->receiving_stream->length - @@ -1208,11 +1209,22 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, finish_batch_step(exec_ctx, bctl); return; } - if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, - &call->receiving_slice, remaining, + if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, remaining, &call->receiving_slice_ready)) { - grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, - call->receiving_slice); + error = grpc_byte_stream_pull(exec_ctx, call->receiving_stream, + &call->receiving_slice); + if (error == GRPC_ERROR_NONE) { + grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, + call->receiving_slice); + } else { + grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); + call->receiving_stream = NULL; + grpc_byte_buffer_destroy(*call->receiving_buffer); + *call->receiving_buffer = NULL; + call->receiving_message = 0; + finish_batch_step(exec_ctx, bctl); + return; + } } else { return; } @@ -1223,12 +1235,24 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; + grpc_byte_stream *bs = call->receiving_stream; + bool release_error = false; if (error == GRPC_ERROR_NONE) { - grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, - call->receiving_slice); - continue_receiving_slices(exec_ctx, bctl); - } else { + grpc_slice slice; + error = grpc_byte_stream_pull(exec_ctx, bs, &slice); + if (error == GRPC_ERROR_NONE) { + grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, + slice); + continue_receiving_slices(exec_ctx, bctl); + } else { + /* Error returned by grpc_byte_stream_pull needs to be released manually + */ + release_error = true; + } + } + + if (error != GRPC_ERROR_NONE) { if (grpc_trace_operation_failures) { GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error)); } @@ -1236,7 +1260,11 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, call->receiving_stream = NULL; grpc_byte_buffer_destroy(*call->receiving_buffer); *call->receiving_buffer = NULL; + call->receiving_message = 0; finish_batch_step(exec_ctx, bctl); + if (release_error) { + GRPC_ERROR_UNREF(error); + } } } diff --git a/src/core/lib/transport/byte_stream.c b/src/core/lib/transport/byte_stream.c index 4d4206189e..5800c70ef4 100644 --- a/src/core/lib/transport/byte_stream.c +++ b/src/core/lib/transport/byte_stream.c @@ -40,10 +40,15 @@ #include "src/core/lib/slice/slice_internal.h" int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, grpc_slice *slice, - size_t max_size_hint, grpc_closure *on_complete) { - return byte_stream->next(exec_ctx, byte_stream, slice, max_size_hint, - on_complete); + grpc_byte_stream *byte_stream, size_t max_size_hint, + grpc_closure *on_complete) { + return byte_stream->next(exec_ctx, byte_stream, max_size_hint, on_complete); +} + +grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_slice *slice) { + return byte_stream->pull(exec_ctx, byte_stream, slice); } void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, @@ -53,16 +58,24 @@ void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, /* slice_buffer_stream */ -static int slice_buffer_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, - grpc_slice *slice, size_t max_size_hint, - grpc_closure *on_complete) { +static bool slice_buffer_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + size_t max_size_hint, + grpc_closure *on_complete) { + grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; + GPR_ASSERT(stream->cursor < stream->backing_buffer->count); + return true; +} + +static grpc_error *slice_buffer_stream_pull(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_slice *slice) { grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; GPR_ASSERT(stream->cursor < stream->backing_buffer->count); *slice = grpc_slice_ref_internal(stream->backing_buffer->slices[stream->cursor]); stream->cursor++; - return 1; + return GRPC_ERROR_NONE; } static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx, @@ -75,6 +88,7 @@ void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream, stream->base.length = (uint32_t)slice_buffer->length; stream->base.flags = flags; stream->base.next = slice_buffer_stream_next; + stream->base.pull = slice_buffer_stream_pull; stream->base.destroy = slice_buffer_stream_destroy; stream->backing_buffer = slice_buffer; stream->cursor = 0; diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h index 1fdd5b4d77..381c65fb04 100644 --- a/src/core/lib/transport/byte_stream.h +++ b/src/core/lib/transport/byte_stream.h @@ -49,9 +49,10 @@ typedef struct grpc_byte_stream grpc_byte_stream; struct grpc_byte_stream { uint32_t length; uint32_t flags; - int (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, - grpc_slice *slice, size_t max_size_hint, - grpc_closure *on_complete); + bool (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, + size_t max_size_hint, grpc_closure *on_complete); + grpc_error *(*pull)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, + grpc_slice *slice); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); }; @@ -61,12 +62,20 @@ struct grpc_byte_stream { * * max_size_hint can be set as a hint as to the maximum number * of bytes that would be acceptable to read. + */ +int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, size_t max_size_hint, + grpc_closure *on_complete); + +/* returns the next slice in the byte stream when it is ready (indicated by + * either grpc_byte_stream_next returning 1 or on_complete passed to + * grpc_byte_stream_next is called). * * once a slice is returned into *slice, it is owned by the caller. */ -int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, grpc_slice *slice, - size_t max_size_hint, grpc_closure *on_complete); +grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_slice *slice); void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); |