aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/health/health_check_client.h
blob: 7f77348f185ef5812991c9671246ed63b96c17c2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
/*
 *
 * Copyright 2018 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H

#include <grpc/support/port_platform.h>

#include <grpc/grpc.h>
#include <grpc/support/atm.h>
#include <grpc/support/sync.h>

#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/gpr/arena.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/transport/byte_stream.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"

namespace grpc_core {

class HealthCheckClient
    : public InternallyRefCountedWithTracing<HealthCheckClient> {
 public:
  HealthCheckClient(const char* service_name,
                    RefCountedPtr<ConnectedSubchannel> connected_subchannel,
                    grpc_pollset_set* interested_parties,
                    RefCountedPtr<channelz::SubchannelNode> channelz_node);

  ~HealthCheckClient();

  // When the health state changes from *state, sets *state to the new
  // value and schedules closure.
  // Only one closure can be outstanding at a time.
  void NotifyOnHealthChange(grpc_connectivity_state* state,
                            grpc_closure* closure);

  void Orphan() override;

 private:
  // Contains a call to the backend and all the data related to the call.
  class CallState : public InternallyRefCountedWithTracing<CallState> {
   public:
    CallState(RefCountedPtr<HealthCheckClient> health_check_client,
              grpc_pollset_set* interested_parties_);
    ~CallState();

    void Orphan() override;

    void StartCall();

   private:
    void Cancel();

    void StartBatch(grpc_transport_stream_op_batch* batch);
    static void StartBatchInCallCombiner(void* arg, grpc_error* error);

    static void CallEndedRetry(void* arg, grpc_error* error);
    void CallEnded(bool retry);

    static void OnComplete(void* arg, grpc_error* error);
    static void RecvInitialMetadataReady(void* arg, grpc_error* error);
    static void RecvMessageReady(void* arg, grpc_error* error);
    static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
    static void StartCancel(void* arg, grpc_error* error);
    static void OnCancelComplete(void* arg, grpc_error* error);

    static void OnByteStreamNext(void* arg, grpc_error* error);
    void ContinueReadingRecvMessage();
    grpc_error* PullSliceFromRecvMessage();
    void DoneReadingRecvMessage(grpc_error* error);

    RefCountedPtr<HealthCheckClient> health_check_client_;
    grpc_polling_entity pollent_;

    gpr_arena* arena_;
    grpc_call_combiner call_combiner_;
    grpc_call_context_element context_[GRPC_CONTEXT_COUNT];

    // The streaming call to the backend. Always non-NULL.
    grpc_subchannel_call* call_;

    grpc_transport_stream_op_batch_payload payload_;
    grpc_transport_stream_op_batch batch_;
    grpc_transport_stream_op_batch recv_message_batch_;
    grpc_transport_stream_op_batch recv_trailing_metadata_batch_;

    grpc_closure on_complete_;

    // send_initial_metadata
    grpc_metadata_batch send_initial_metadata_;
    grpc_linked_mdelem path_metadata_storage_;

    // send_message
    ManualConstructor<SliceBufferByteStream> send_message_;

    // send_trailing_metadata
    grpc_metadata_batch send_trailing_metadata_;

    // recv_initial_metadata
    grpc_metadata_batch recv_initial_metadata_;
    grpc_closure recv_initial_metadata_ready_;

    // recv_message
    OrphanablePtr<ByteStream> recv_message_;
    grpc_closure recv_message_ready_;
    grpc_slice_buffer recv_message_buffer_;
    gpr_atm seen_response_;

    // recv_trailing_metadata
    grpc_metadata_batch recv_trailing_metadata_;
    grpc_transport_stream_stats collect_stats_;
    grpc_closure recv_trailing_metadata_ready_;
  };

  void StartCall();
  void StartCallLocked();  // Requires holding mu_.

  void StartRetryTimer();
  static void OnRetryTimer(void* arg, grpc_error* error);

  void SetHealthStatus(grpc_connectivity_state state, grpc_error* error);
  void SetHealthStatusLocked(grpc_connectivity_state state,
                             grpc_error* error);  // Requires holding mu_.

  const char* service_name_;  // Do not own.
  RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
  grpc_pollset_set* interested_parties_;  // Do not own.
  RefCountedPtr<channelz::SubchannelNode> channelz_node_;

  gpr_mu mu_;
  grpc_connectivity_state state_ = GRPC_CHANNEL_CONNECTING;
  grpc_error* error_ = GRPC_ERROR_NONE;
  grpc_connectivity_state* notify_state_ = nullptr;
  grpc_closure* on_health_changed_ = nullptr;
  bool shutting_down_ = false;

  // The data associated with the current health check call.  It holds a ref
  // to this HealthCheckClient object.
  OrphanablePtr<CallState> call_state_;

  // Call retry state.
  BackOff retry_backoff_;
  grpc_timer retry_timer_;
  grpc_closure retry_timer_callback_;
  bool retry_timer_callback_pending_ = false;
};

}  // namespace grpc_core

#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H */