aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport')
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc6
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc6
-rw-r--r--src/core/ext/transport/inproc/inproc_transport.cc18
3 files changed, 25 insertions, 5 deletions
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc
index 8c4025ed13..f3cc16d8cc 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc
@@ -30,8 +30,8 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/security/credentials/credentials.h"
+#include "src/core/lib/security/security_connector/security_connector.h"
#include "src/core/lib/security/transport/lb_targets_info.h"
-#include "src/core/lib/security/transport/security_connector.h"
#include "src/core/lib/slice/slice_hash_table.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/api_trace.h"
@@ -63,9 +63,7 @@ static grpc_subchannel_args* get_secure_naming_subchannel_args(
// To which address are we connecting? By default, use the server URI.
const grpc_arg* server_uri_arg =
grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
- GPR_ASSERT(server_uri_arg != nullptr);
- GPR_ASSERT(server_uri_arg->type == GRPC_ARG_STRING);
- const char* server_uri_str = server_uri_arg->value.string;
+ const char* server_uri_str = grpc_channel_arg_get_string(server_uri_arg);
GPR_ASSERT(server_uri_str != nullptr);
grpc_uri* server_uri =
grpc_uri_parse(server_uri_str, true /* supress errors */);
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index ad8da94cb3..2fc3c4fa41 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -1473,6 +1473,7 @@ static void perform_stream_op_locked(void* stream_op,
// streaming call might send another message before getting a
// recv_message failure, breaking out of its loop, and then
// starting recv_trailing_metadata.
+ grpc_byte_stream_destroy(op->payload->send_message.send_message);
grpc_chttp2_complete_closure_step(
t, s, &s->fetching_send_message_finished,
t->is_client && s->received_trailing_metadata
@@ -2092,7 +2093,10 @@ void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
GRPC_ERROR_REF(error),
"send_trailing_metadata_finished");
- s->fetching_send_message = nullptr;
+ if (s->fetching_send_message != nullptr) {
+ grpc_byte_stream_destroy(s->fetching_send_message);
+ s->fetching_send_message = nullptr;
+ }
grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished,
GRPC_ERROR_REF(error),
"fetching_send_message_finished");
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc
index 2022eaffe5..e1d4843785 100644
--- a/src/core/ext/transport/inproc/inproc_transport.cc
+++ b/src/core/ext/transport/inproc/inproc_transport.cc
@@ -480,6 +480,8 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) {
s->recv_message_op = nullptr;
}
if (s->send_message_op) {
+ grpc_byte_stream_destroy(
+ s->send_message_op->payload->send_message.send_message);
complete_if_batch_end_locked(
s, error, s->send_message_op,
"fail_helper scheduling send-message-on-complete");
@@ -506,6 +508,14 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) {
GRPC_ERROR_UNREF(error);
}
+// TODO(vjpai): It should not be necessary to drain the incoming byte
+// stream and create a new one; instead, we should simply pass the byte
+// stream from the sender directly to the receiver as-is.
+//
+// Note that fixing this will also avoid the assumption in this code
+// that the incoming byte stream's next() call will always return
+// synchronously. That assumption is true today but may not always be
+// true in the future.
static void message_transfer_locked(inproc_stream* sender,
inproc_stream* receiver) {
size_t remaining =
@@ -532,6 +542,8 @@ static void message_transfer_locked(inproc_stream* sender,
remaining -= GRPC_SLICE_LENGTH(message_slice);
grpc_slice_buffer_add(&receiver->recv_message, message_slice);
} while (remaining > 0);
+ grpc_byte_stream_destroy(
+ sender->send_message_op->payload->send_message.send_message);
grpc_slice_buffer_stream_init(&receiver->recv_stream, &receiver->recv_message,
0);
@@ -592,6 +604,8 @@ static void op_state_machine(void* arg, grpc_error* error) {
(s->trailing_md_sent || other->recv_trailing_md_op)) {
// A server send will never be matched if the client is waiting
// for trailing metadata already
+ grpc_byte_stream_destroy(
+ s->send_message_op->payload->send_message.send_message);
complete_if_batch_end_locked(
s, GRPC_ERROR_NONE, s->send_message_op,
"op_state_machine scheduling send-message-on-complete");
@@ -728,6 +742,8 @@ static void op_state_machine(void* arg, grpc_error* error) {
if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) {
// Nothing further will try to receive from this stream, so finish off
// any outstanding send_message op
+ grpc_byte_stream_destroy(
+ s->send_message_op->payload->send_message.send_message);
complete_if_batch_end_locked(
s, new_err, s->send_message_op,
"op_state_machine scheduling send-message-on-complete");
@@ -785,6 +801,8 @@ static void op_state_machine(void* arg, grpc_error* error) {
s->send_message_op) {
// Nothing further will try to receive from this stream, so finish off
// any outstanding send_message op
+ grpc_byte_stream_destroy(
+ s->send_message_op->payload->send_message.send_message);
complete_if_batch_end_locked(
s, new_err, s->send_message_op,
"op_state_machine scheduling send-message-on-complete");