diff options
Diffstat (limited to 'include/grpc++/impl/codegen/server_interface.h')
-rw-r--r-- | include/grpc++/impl/codegen/server_interface.h | 41 |
1 files changed, 34 insertions, 7 deletions
diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h index cc9bf0a108..3bcf4c87e7 100644 --- a/include/grpc++/impl/codegen/server_interface.h +++ b/include/grpc++/impl/codegen/server_interface.h @@ -177,22 +177,49 @@ class ServerInterface : public internal::CallHook { ServerCompletionQueue* notification_cq, void* tag, Message* request) : RegisteredAsyncRequest(server, context, stream, call_cq, tag), + registered_method_(registered_method), + server_(server), + context_(context), + stream_(stream), + call_cq_(call_cq), + notification_cq_(notification_cq), + tag_(tag), request_(request) { IssueRequest(registered_method, &payload_, notification_cq); } bool FinalizeResult(void** tag, bool* status) override { - bool serialization_status = - *status && payload_ && - SerializationTraits<Message>::Deserialize(payload_, request_).ok(); - bool ret = RegisteredAsyncRequest::FinalizeResult(tag, status); - *status = serialization_status && *status; - return ret; + if (*status) { + if (payload_ == nullptr || + !SerializationTraits<Message>::Deserialize(payload_, request_) + .ok()) { + // If deserialization fails, we cancel the call and instantiate + // a new instance of ourselves to request another call. We then + // return false, which prevents the call from being returned to + // the application. + g_core_codegen_interface->grpc_call_cancel_with_status( + call_, GRPC_STATUS_INTERNAL, "Unable to parse request", nullptr); + g_core_codegen_interface->grpc_call_unref(call_); + new PayloadAsyncRequest(registered_method_, server_, context_, + stream_, call_cq_, notification_cq_, tag_, + request_); + delete this; + return false; + } + } + return RegisteredAsyncRequest::FinalizeResult(tag, status); } private: - grpc_byte_buffer* payload_; + void* const registered_method_; + ServerInterface* const server_; + ServerContext* const context_; + internal::ServerAsyncStreamingInterface* const stream_; + CompletionQueue* const call_cq_; + ServerCompletionQueue* const notification_cq_; + void* const tag_; Message* const request_; + grpc_byte_buffer* payload_; }; class GenericAsyncRequest : public BaseAsyncRequest { |