aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/channel/message_size_filter.c14
-rw-r--r--src/core/lib/iomgr/error.h8
-rw-r--r--src/core/lib/iomgr/udp_server.c10
-rw-r--r--src/core/lib/surface/byte_buffer.c5
-rw-r--r--src/core/lib/surface/call.c15
5 files changed, 38 insertions, 14 deletions
diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c
index 02fc68fc3a..f067a3a51c 100644
--- a/src/core/lib/channel/message_size_filter.c
+++ b/src/core/lib/channel/message_size_filter.c
@@ -73,16 +73,22 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
gpr_asprintf(&message_string,
"Received message larger than max (%u vs. %d)",
(*calld->recv_message)->length, chand->max_recv_size);
- gpr_slice message = gpr_slice_from_copied_string(message_string);
+ grpc_error* new_error = grpc_error_set_int(
+ GRPC_ERROR_CREATE(message_string), GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_INVALID_ARGUMENT);
+ if (error == GRPC_ERROR_NONE) {
+ error = new_error;
+ } else {
+ error = grpc_error_add_child(error, new_error);
+ GRPC_ERROR_UNREF(new_error);
+ }
gpr_free(message_string);
- grpc_call_element_send_close_with_message(
- exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message);
}
// Invoke the next callback.
grpc_exec_ctx_sched(exec_ctx, calld->next_recv_message_ready, error, NULL);
}
-// Start transport op.
+// Start transport stream op.
static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
grpc_transport_stream_op* op) {
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index 7e2fd7a3bd..00ace8a7a9 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -40,6 +40,10 @@
#include <grpc/status.h>
#include <grpc/support/time.h>
+#ifdef __cplusplus
+extern "C" {
+#endif
+
/// Opaque representation of an error.
/// Errors are refcounted objects that represent the result of an operation.
/// Ownership laws:
@@ -204,4 +208,8 @@ bool grpc_log_if_error(const char *what, grpc_error *error, const char *file,
#define GRPC_LOG_IF_ERROR(what, error) \
grpc_log_if_error((what), (error), __FILE__, __LINE__)
+#ifdef __cplusplus
+}
+#endif
+
#endif /* GRPC_CORE_LIB_IOMGR_ERROR_H */
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index 48032412a2..edf7b133e9 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -38,7 +38,6 @@
#include <grpc/support/port_platform.h>
-#ifdef GRPC_NEED_UDP
#ifdef GPR_POSIX_SOCKET
#include "src/core/lib/iomgr/udp_server.h"
@@ -171,6 +170,8 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
sp->destroyed_closure.cb = destroyed_port;
sp->destroyed_closure.cb_arg = s;
+ /* Call the orphan_cb to signal that the FD is about to be closed and
+ * should no longer be used. */
GPR_ASSERT(sp->orphan_cb);
sp->orphan_cb(sp->emfd);
@@ -197,6 +198,12 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
/* shutdown all fd's */
if (s->active_ports) {
for (i = 0; i < s->nports; i++) {
+ server_port *sp = &s->ports[i];
+ /* Call the orphan_cb to signal that the FD is about to be closed and
+ * should no longer be used. */
+ GPR_ASSERT(sp->orphan_cb);
+ sp->orphan_cb(sp->emfd);
+
grpc_fd_shutdown(exec_ctx, s->ports[i].emfd);
}
gpr_mu_unlock(&s->mu);
@@ -439,4 +446,3 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
}
#endif
-#endif
diff --git a/src/core/lib/surface/byte_buffer.c b/src/core/lib/surface/byte_buffer.c
index a093a37af3..054a6e6c58 100644
--- a/src/core/lib/surface/byte_buffer.c
+++ b/src/core/lib/surface/byte_buffer.c
@@ -72,8 +72,9 @@ grpc_byte_buffer *grpc_raw_byte_buffer_from_reader(
grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
switch (bb->type) {
case GRPC_BB_RAW:
- return grpc_raw_byte_buffer_create(bb->data.raw.slice_buffer.slices,
- bb->data.raw.slice_buffer.count);
+ return grpc_raw_compressed_byte_buffer_create(
+ bb->data.raw.slice_buffer.slices, bb->data.raw.slice_buffer.count,
+ bb->data.raw.compression);
}
GPR_UNREACHABLE_CODE(return NULL);
}
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 5690bcab1e..b0f66f4f61 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -1103,8 +1103,8 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
}
}
-static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl,
- bool success) {
+static void process_data_after_md(grpc_exec_ctx *exec_ctx,
+ batch_control *bctl) {
grpc_call *call = bctl->call;
if (call->receiving_stream == NULL) {
*call->receiving_buffer = NULL;
@@ -1124,8 +1124,6 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl,
grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready,
bctl);
continue_receiving_slices(exec_ctx, bctl);
- /* early out */
- return;
}
}
@@ -1133,12 +1131,17 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
-
+ if (error != GRPC_ERROR_NONE) {
+ grpc_status_code status;
+ const char *msg;
+ grpc_error_get_status(error, &status, &msg);
+ close_with_status(exec_ctx, call, status, msg);
+ }
gpr_mu_lock(&bctl->call->mu);
if (bctl->call->has_initial_md_been_received || error != GRPC_ERROR_NONE ||
call->receiving_stream == NULL) {
gpr_mu_unlock(&bctl->call->mu);
- process_data_after_md(exec_ctx, bctlp, error);
+ process_data_after_md(exec_ctx, bctlp);
} else {
call->saved_receiving_stream_ready_bctlp = bctlp;
gpr_mu_unlock(&bctl->call->mu);