diff options
author | Jan Tattermusch <jtattermusch@google.com> | 2015-06-09 17:47:56 -0700 |
---|---|---|
committer | Jan Tattermusch <jtattermusch@google.com> | 2015-06-09 17:47:56 -0700 |
commit | 9bfedd67e06c4bd4fa1855aabaf169461a3d6a54 (patch) | |
tree | e48bf5a1c3539393508c02b2b6e41052b4532411 /src/core | |
parent | 97f1454fc5ccdfc57e2f1296a52f1f83037cfd78 (diff) | |
parent | a561ea66aeb460c6f33cfabff396d5ef2b748791 (diff) |
Merge remote-tracking branch 'upstream/master' into you-complete-me-csharp
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/compression/algorithm.c | 2 | ||||
-rw-r--r-- | src/core/compression/algorithm.h | 49 | ||||
-rw-r--r-- | src/core/compression/message_compress.h | 2 | ||||
-rw-r--r-- | src/core/surface/byte_buffer.c | 32 | ||||
-rw-r--r-- | src/core/surface/byte_buffer_reader.c | 56 | ||||
-rw-r--r-- | src/core/surface/call.c | 18 | ||||
-rw-r--r-- | src/core/surface/channel.c | 34 | ||||
-rw-r--r-- | src/core/surface/channel.h | 10 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 66 | ||||
-rw-r--r-- | src/core/transport/metadata.c | 2 |
10 files changed, 154 insertions, 117 deletions
diff --git a/src/core/compression/algorithm.c b/src/core/compression/algorithm.c index ca07002ff9..36ead843d2 100644 --- a/src/core/compression/algorithm.c +++ b/src/core/compression/algorithm.c @@ -31,7 +31,7 @@ * */ -#include "src/core/compression/algorithm.h" +#include <grpc/compression.h> const char *grpc_compression_algorithm_name( grpc_compression_algorithm algorithm) { diff --git a/src/core/compression/algorithm.h b/src/core/compression/algorithm.h deleted file mode 100644 index 9dd9f57b56..0000000000 --- a/src/core/compression/algorithm.h +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_INTERNAL_CORE_COMPRESSION_ALGORITHM_H -#define GRPC_INTERNAL_CORE_COMPRESSION_ALGORITHM_H - -/* The various compression algorithms supported by GRPC */ -typedef enum { - GRPC_COMPRESS_NONE = 0, - GRPC_COMPRESS_DEFLATE, - GRPC_COMPRESS_GZIP, - /* TODO(ctiller): snappy */ - GRPC_COMPRESS_ALGORITHMS_COUNT -} grpc_compression_algorithm; - -const char *grpc_compression_algorithm_name( - grpc_compression_algorithm algorithm); - -#endif /* GRPC_INTERNAL_CORE_COMPRESSION_ALGORITHM_H */ diff --git a/src/core/compression/message_compress.h b/src/core/compression/message_compress.h index e8aef1a713..aba701a6ee 100644 --- a/src/core/compression/message_compress.h +++ b/src/core/compression/message_compress.h @@ -34,7 +34,7 @@ #ifndef GRPC_INTERNAL_CORE_COMPRESSION_MESSAGE_COMPRESS_H #define GRPC_INTERNAL_CORE_COMPRESSION_MESSAGE_COMPRESS_H -#include "src/core/compression/algorithm.h" +#include <grpc/compression.h> #include <grpc/support/slice_buffer.h> /* compress 'input' to 'output' using 'algorithm'. diff --git a/src/core/surface/byte_buffer.c b/src/core/surface/byte_buffer.c index 12244f6644..4817e00454 100644 --- a/src/core/surface/byte_buffer.c +++ b/src/core/surface/byte_buffer.c @@ -35,25 +35,31 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> -grpc_byte_buffer *grpc_byte_buffer_create(gpr_slice *slices, size_t nslices) { +grpc_byte_buffer *grpc_raw_byte_buffer_create(gpr_slice *slices, + size_t nslices) { + return grpc_raw_compressed_byte_buffer_create(slices, nslices, + GRPC_COMPRESS_NONE); +} + +grpc_byte_buffer *grpc_raw_compressed_byte_buffer_create( + gpr_slice *slices, size_t nslices, grpc_compression_algorithm compression) { size_t i; grpc_byte_buffer *bb = malloc(sizeof(grpc_byte_buffer)); - - bb->type = GRPC_BB_SLICE_BUFFER; - gpr_slice_buffer_init(&bb->data.slice_buffer); + bb->type = GRPC_BB_RAW; + bb->data.raw.compression = compression; + gpr_slice_buffer_init(&bb->data.raw.slice_buffer); for (i = 0; i < nslices; i++) { gpr_slice_ref(slices[i]); - gpr_slice_buffer_add(&bb->data.slice_buffer, slices[i]); + gpr_slice_buffer_add(&bb->data.raw.slice_buffer, slices[i]); } - return bb; } grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) { switch (bb->type) { - case GRPC_BB_SLICE_BUFFER: - return grpc_byte_buffer_create(bb->data.slice_buffer.slices, - bb->data.slice_buffer.count); + case GRPC_BB_RAW: + return grpc_raw_byte_buffer_create(bb->data.raw.slice_buffer.slices, + bb->data.raw.slice_buffer.count); } gpr_log(GPR_INFO, "should never get here"); abort(); @@ -63,8 +69,8 @@ grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) { void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) { if (!bb) return; switch (bb->type) { - case GRPC_BB_SLICE_BUFFER: - gpr_slice_buffer_destroy(&bb->data.slice_buffer); + case GRPC_BB_RAW: + gpr_slice_buffer_destroy(&bb->data.raw.slice_buffer); break; } free(bb); @@ -72,8 +78,8 @@ void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) { size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) { switch (bb->type) { - case GRPC_BB_SLICE_BUFFER: - return bb->data.slice_buffer.length; + case GRPC_BB_RAW: + return bb->data.raw.slice_buffer.length; } gpr_log(GPR_ERROR, "should never reach here"); abort(); diff --git a/src/core/surface/byte_buffer_reader.c b/src/core/surface/byte_buffer_reader.c index 41ad700274..86829a686f 100644 --- a/src/core/surface/byte_buffer_reader.c +++ b/src/core/surface/byte_buffer_reader.c @@ -33,41 +33,73 @@ #include <grpc/byte_buffer_reader.h> +#include <grpc/compression.h> #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/slice_buffer.h> #include <grpc/byte_buffer.h> +#include "src/core/compression/message_compress.h" + +static int is_compressed(grpc_byte_buffer *buffer) { + switch (buffer->type) { + case GRPC_BB_RAW: + if (buffer->data.raw.compression == GRPC_COMPRESS_NONE) { + return 0 /* GPR_FALSE */; + } + break; + } + return 1 /* GPR_TRUE */; +} + void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader, grpc_byte_buffer *buffer) { - reader->buffer = buffer; - switch (buffer->type) { - case GRPC_BB_SLICE_BUFFER: + gpr_slice_buffer decompressed_slices_buffer; + reader->buffer_in = buffer; + switch (reader->buffer_in->type) { + case GRPC_BB_RAW: + gpr_slice_buffer_init(&decompressed_slices_buffer); + if (is_compressed(reader->buffer_in)) { + grpc_msg_decompress(reader->buffer_in->data.raw.compression, + &reader->buffer_in->data.raw.slice_buffer, + &decompressed_slices_buffer); + reader->buffer_out = grpc_raw_byte_buffer_create( + decompressed_slices_buffer.slices, + decompressed_slices_buffer.count); + gpr_slice_buffer_destroy(&decompressed_slices_buffer); + } else { /* not compressed, use the input buffer as output */ + reader->buffer_out = reader->buffer_in; + } reader->current.index = 0; + break; } } void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) { - /* no-op: the user is responsible for memory deallocation. - * Other cleanup operations would go here if needed. */ + switch (reader->buffer_in->type) { + case GRPC_BB_RAW: + /* keeping the same if-else structure as in the init function */ + if (is_compressed(reader->buffer_in)) { + grpc_byte_buffer_destroy(reader->buffer_out); + } + break; + } } int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader, gpr_slice *slice) { - grpc_byte_buffer *buffer = reader->buffer; - gpr_slice_buffer *slice_buffer; - switch (buffer->type) { - case GRPC_BB_SLICE_BUFFER: - slice_buffer = &buffer->data.slice_buffer; + switch (reader->buffer_in->type) { + case GRPC_BB_RAW: { + gpr_slice_buffer *slice_buffer; + slice_buffer = &reader->buffer_out->data.raw.slice_buffer; if (reader->current.index < slice_buffer->count) { *slice = gpr_slice_ref(slice_buffer->slices[reader->current.index]); reader->current.index += 1; return 1; - } else { - return 0; } break; + } } return 0; } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 74df5745b5..cead5e08dc 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -636,7 +636,7 @@ static void call_on_done_send(void *pc, int success) { static void finish_message(grpc_call *call) { /* TODO(ctiller): this could be a lot faster if coded directly */ - grpc_byte_buffer *byte_buffer = grpc_byte_buffer_create( + grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create( call->incoming_message.slices, call->incoming_message.count); gpr_slice_buffer_reset_and_unref(&call->incoming_message); @@ -759,7 +759,7 @@ static void call_on_done_recv(void *pc, int success) { unlock(call); GRPC_CALL_INTERNAL_UNREF(call, "receiving", 0); - GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_ON_DONE_RECV, 0); + GRPC_TIMER_END(GRPC_PTAG_CALL_ON_DONE_RECV, 0); } static int prepare_application_metadata(grpc_call *call, size_t count, @@ -806,9 +806,9 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, size_t i; switch (byte_buffer->type) { - case GRPC_BB_SLICE_BUFFER: - for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) { - gpr_slice slice = byte_buffer->data.slice_buffer.slices[i]; + case GRPC_BB_RAW: + for (i = 0; i < byte_buffer->data.raw.slice_buffer.count; i++) { + gpr_slice slice = byte_buffer->data.raw.slice_buffer.slices[i]; gpr_slice_ref(slice); grpc_sopb_add_slice(sopb, slice); } @@ -820,7 +820,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { grpc_ioreq_data data; grpc_metadata_batch mdb; size_t i; - char status_str[GPR_LTOA_MIN_BUFSIZE]; GPR_ASSERT(op->send_ops == NULL); switch (call->write_state) { @@ -865,13 +864,10 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { /* send status */ /* TODO(ctiller): cache common status values */ data = call->request_data[GRPC_IOREQ_SEND_STATUS]; - gpr_ltoa(data.send_status.code, status_str); grpc_metadata_batch_add_tail( &mdb, &call->status_link, - grpc_mdelem_from_metadata_strings( - call->metadata_context, - grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)), - grpc_mdstr_from_string(call->metadata_context, status_str))); + grpc_channel_get_reffed_status_elem(call->channel, + data.send_status.code)); if (data.send_status.details) { grpc_metadata_batch_add_tail( &mdb, &call->details_link, diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 947011c613..9175ad0572 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -37,12 +37,20 @@ #include <string.h> #include "src/core/iomgr/iomgr.h" +#include "src/core/support/string.h" #include "src/core/surface/call.h" #include "src/core/surface/client.h" #include "src/core/surface/init.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> +/** Cache grpc-status: X mdelems for X = 0..NUM_CACHED_STATUS_ELEMS. + * Avoids needing to take a metadata context lock for sending status + * if the status code is <= NUM_CACHED_STATUS_ELEMS. + * Sized to allow the most commonly used codes to fit in + * (OK, Cancelled, Unknown). */ +#define NUM_CACHED_STATUS_ELEMS 3 + typedef struct registered_call { grpc_mdelem *path; grpc_mdelem *authority; @@ -54,10 +62,13 @@ struct grpc_channel { gpr_refcount refs; gpr_uint32 max_message_length; grpc_mdctx *metadata_context; + /** mdstr for the grpc-status key */ grpc_mdstr *grpc_status_string; grpc_mdstr *grpc_message_string; grpc_mdstr *path_string; grpc_mdstr *authority_string; + /** mdelem for grpc-status: 0 thru grpc-status: 2 */ + grpc_mdelem *grpc_status_elem[NUM_CACHED_STATUS_ELEMS]; gpr_mu registered_call_mu; registered_call *registered_calls; @@ -88,6 +99,13 @@ grpc_channel *grpc_channel_create_from_filters( channel->metadata_context = mdctx; channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message"); + for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { + char buf[GPR_LTOA_MIN_BUFSIZE]; + gpr_ltoa(i, buf); + channel->grpc_status_elem[i] = grpc_mdelem_from_metadata_strings( + mdctx, grpc_mdstr_ref(channel->grpc_status_string), + grpc_mdstr_from_string(mdctx, buf)); + } channel->path_string = grpc_mdstr_from_string(mdctx, ":path"); channel->authority_string = grpc_mdstr_from_string(mdctx, ":authority"); grpc_channel_stack_init(filters, num_filters, args, channel->metadata_context, @@ -175,7 +193,11 @@ void grpc_channel_internal_ref(grpc_channel *channel) { static void destroy_channel(void *p, int ok) { grpc_channel *channel = p; + size_t i; grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel)); + for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { + grpc_mdelem_unref(channel->grpc_status_elem[i]); + } grpc_mdstr_unref(channel->grpc_status_string); grpc_mdstr_unref(channel->grpc_message_string); grpc_mdstr_unref(channel->path_string); @@ -235,6 +257,18 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) { return channel->grpc_status_string; } +grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { + if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) { + return grpc_mdelem_ref(channel->grpc_status_elem[i]); + } else { + char tmp[GPR_LTOA_MIN_BUFSIZE]; + gpr_ltoa(i, tmp); + return grpc_mdelem_from_metadata_strings( + channel->metadata_context, grpc_mdstr_ref(channel->grpc_status_string), + grpc_mdstr_from_string(channel->metadata_context, tmp)); + } +} + grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) { return channel->grpc_message_string; } diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index 388be35711..6d1ed87900 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -40,8 +40,18 @@ grpc_channel *grpc_channel_create_from_filters( const grpc_channel_filter **filters, size_t count, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client); +/** Get a (borrowed) pointer to this channels underlying channel stack */ grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel); + +/** Get a (borrowed) pointer to the channel wide metadata context */ grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel); + +/** Get a grpc_mdelem of grpc-status: X where X is the numeric value of + status_code. + + The returned elem is owned by the caller. */ +grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, + int status_code); grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 630504565b..bd259f7ae3 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -230,7 +230,10 @@ struct transport { /* basic state management - what are we doing at the moment? */ gpr_uint8 reading; gpr_uint8 writing; - gpr_uint8 calling_back; + /** are we calling back (via cb) with a channel-level event */ + gpr_uint8 calling_back_channel; + /** are we calling back any grpc_transport_op completion events */ + gpr_uint8 calling_back_ops; gpr_uint8 destroying; gpr_uint8 closed; error_state error_state; @@ -357,7 +360,7 @@ static void push_setting(transport *t, grpc_chttp2_setting_id id, gpr_uint32 value); static int prepare_callbacks(transport *t); -static void run_callbacks(transport *t, const grpc_transport_callbacks *cb); +static void run_callbacks(transport *t); static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb); static int prepare_write(transport *t); @@ -565,7 +568,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, } gpr_mu_lock(&t->mu); - t->calling_back = 1; + t->calling_back_channel = 1; ref_transport(t); /* matches unref at end of this function */ gpr_mu_unlock(&t->mu); @@ -574,7 +577,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, lock(t); t->cb = sr.callbacks; t->cb_user_data = sr.user_data; - t->calling_back = 0; + t->calling_back_channel = 0; if (t->destroying) gpr_cv_signal(&t->cv); unlock(t); @@ -595,7 +598,7 @@ static void destroy_transport(grpc_transport *gt) { We need to be not writing as cancellation finalization may produce some callbacks that NEED to be made to close out some streams when t->writing becomes 0. */ - while (t->calling_back || t->writing) { + while (t->calling_back_channel || t->writing) { gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future); } drop_connection(t); @@ -830,28 +833,29 @@ static void unlock(transport *t) { finish_reads(t); /* gather any callbacks that need to be made */ - if (!t->calling_back) { - t->calling_back = perform_callbacks = prepare_callbacks(t); - if (cb) { - if (t->error_state == ERROR_STATE_SEEN && !t->writing) { - call_closed = 1; - t->calling_back = 1; - t->cb = NULL; /* no more callbacks */ - t->error_state = ERROR_STATE_NOTIFIED; - } - if (t->num_pending_goaways) { - goaways = t->pending_goaways; - num_goaways = t->num_pending_goaways; - t->pending_goaways = NULL; - t->num_pending_goaways = 0; - t->cap_pending_goaways = 0; - t->calling_back = 1; - } - } + if (!t->calling_back_ops) { + t->calling_back_ops = perform_callbacks = prepare_callbacks(t); + if (perform_callbacks) ref_transport(t); } - if (perform_callbacks || call_closed || num_goaways) { - ref_transport(t); + if (!t->calling_back_channel && cb) { + if (t->error_state == ERROR_STATE_SEEN && !t->writing) { + call_closed = 1; + t->calling_back_channel = 1; + t->cb = NULL; /* no more callbacks */ + t->error_state = ERROR_STATE_NOTIFIED; + } + if (t->num_pending_goaways) { + goaways = t->pending_goaways; + num_goaways = t->num_pending_goaways; + t->pending_goaways = NULL; + t->num_pending_goaways = 0; + t->cap_pending_goaways = 0; + t->calling_back_channel = 1; + } + if (call_closed || num_goaways) { + ref_transport(t); + } } /* finally unlock */ @@ -865,7 +869,11 @@ static void unlock(transport *t) { } if (perform_callbacks) { - run_callbacks(t, cb); + run_callbacks(t); + lock(t); + t->calling_back_ops = 0; + unlock(t); + unref_transport(t); } if (call_closed) { @@ -878,9 +886,9 @@ static void unlock(transport *t) { perform_write(t, ep); } - if (perform_callbacks || call_closed || num_goaways) { + if (call_closed || num_goaways) { lock(t); - t->calling_back = 0; + t->calling_back_channel = 0; if (t->destroying) gpr_cv_signal(&t->cv); unlock(t); unref_transport(t); @@ -2101,7 +2109,7 @@ static int prepare_callbacks(transport *t) { return t->executing_callbacks.count > 0; } -static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) { +static void run_callbacks(transport *t) { size_t i; for (i = 0; i < t->executing_callbacks.count; i++) { op_closure c = t->executing_callbacks.callbacks[i]; diff --git a/src/core/transport/metadata.c b/src/core/transport/metadata.c index c80d67823f..e75b449e12 100644 --- a/src/core/transport/metadata.c +++ b/src/core/transport/metadata.c @@ -120,7 +120,7 @@ static void unlock(grpc_mdctx *ctx) { if (ctx->refs == 0) { /* uncomment if you're having trouble diagnosing an mdelem leak to make things clearer (slows down destruction a lot, however) */ - gc_mdtab(ctx); + /* gc_mdtab(ctx); */ if (ctx->mdtab_count && ctx->mdtab_count == ctx->mdtab_free) { discard_metadata(ctx); } |