aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/request_routing.h
blob: 0c671229c8e889c92b35c796d2f9aa986746c012 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
/*
 *
 * Copyright 2018 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_REQUEST_ROUTING_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_REQUEST_ROUTING_H

#include <grpc/support/port_platform.h>

#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/ext/filters/client_channel/client_channel_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/resolver.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata_batch.h"

namespace grpc_core {

class RequestRouter {
 public:
  class Request {
   public:
    // Synchronous callback that applies the service config to a call.
    // Returns false if the call should be failed.
    typedef bool (*ApplyServiceConfigCallback)(void* user_data);

    Request(grpc_call_stack* owning_call, grpc_call_combiner* call_combiner,
            grpc_polling_entity* pollent,
            grpc_metadata_batch* send_initial_metadata,
            uint32_t* send_initial_metadata_flags,
            ApplyServiceConfigCallback apply_service_config,
            void* apply_service_config_user_data, grpc_closure* on_route_done);

    ~Request();

    // TODO(roth): It seems a bit ugly to expose this member in a
    // non-const way.  Find a better API to avoid this.
    LoadBalancingPolicy::PickState* pick() { return &pick_; }

   private:
    friend class RequestRouter;

    class ResolverResultWaiter;
    class AsyncPickCanceller;

    void ProcessServiceConfigAndStartLbPickLocked();
    void StartLbPickLocked();
    static void LbPickDoneLocked(void* arg, grpc_error* error);

    void MaybeAddCallToInterestedPartiesLocked();
    void MaybeRemoveCallFromInterestedPartiesLocked();

    // Populated by caller.
    grpc_call_stack* owning_call_;
    grpc_call_combiner* call_combiner_;
    grpc_polling_entity* pollent_;
    ApplyServiceConfigCallback apply_service_config_;
    void* apply_service_config_user_data_;
    grpc_closure* on_route_done_;
    LoadBalancingPolicy::PickState pick_;

    // Internal state.
    RequestRouter* request_router_ = nullptr;
    bool pollent_added_to_interested_parties_ = false;
    grpc_closure on_pick_done_;
    AsyncPickCanceller* pick_canceller_ = nullptr;
  };

  // Synchronous callback that takes the service config JSON string and
  // LB policy name.
  // Returns true if the service config has changed since the last result.
  typedef bool (*ProcessResolverResultCallback)(void* user_data,
                                                const grpc_channel_args& args,
                                                const char** lb_policy_name,
                                                grpc_json** lb_policy_config);

  RequestRouter(grpc_channel_stack* owning_stack, grpc_combiner* combiner,
                grpc_client_channel_factory* client_channel_factory,
                grpc_pollset_set* interested_parties, TraceFlag* tracer,
                ProcessResolverResultCallback process_resolver_result,
                void* process_resolver_result_user_data, const char* target_uri,
                const grpc_channel_args* args, grpc_error** error);

  ~RequestRouter();

  void set_channelz_node(channelz::ClientChannelNode* channelz_node) {
    channelz_node_ = channelz_node;
  }

  void RouteCallLocked(Request* request);

  // TODO(roth): Add methods to cancel picks.

  void ShutdownLocked(grpc_error* error);

  void ExitIdleLocked();
  void ResetConnectionBackoffLocked();

  grpc_connectivity_state GetConnectivityState();
  void NotifyOnConnectivityStateChange(grpc_connectivity_state* state,
                                       grpc_closure* closure);

  LoadBalancingPolicy* lb_policy() const { return lb_policy_.get(); }

 private:
  using TraceStringVector = grpc_core::InlinedVector<char*, 3>;

  class ReresolutionRequestHandler;
  class LbConnectivityWatcher;

  void StartResolvingLocked();
  void OnResolverShutdownLocked(grpc_error* error);
  void CreateNewLbPolicyLocked(const char* lb_policy_name, grpc_json* lb_config,
                               grpc_connectivity_state* connectivity_state,
                               grpc_error** connectivity_error,
                               TraceStringVector* trace_strings);
  void MaybeAddTraceMessagesForAddressChangesLocked(
      TraceStringVector* trace_strings);
  void ConcatenateAndAddChannelTraceLocked(
      TraceStringVector* trace_strings) const;
  static void OnResolverResultChangedLocked(void* arg, grpc_error* error);

  void SetConnectivityStateLocked(grpc_connectivity_state state,
                                  grpc_error* error, const char* reason);

  // Passed in from caller at construction time.
  grpc_channel_stack* owning_stack_;
  grpc_combiner* combiner_;
  grpc_client_channel_factory* client_channel_factory_;
  grpc_pollset_set* interested_parties_;
  TraceFlag* tracer_;

  channelz::ClientChannelNode* channelz_node_ = nullptr;

  // Resolver and associated state.
  OrphanablePtr<Resolver> resolver_;
  ProcessResolverResultCallback process_resolver_result_;
  void* process_resolver_result_user_data_;
  bool started_resolving_ = false;
  grpc_channel_args* resolver_result_ = nullptr;
  bool previous_resolution_contained_addresses_ = false;
  grpc_closure_list waiting_for_resolver_result_closures_;
  grpc_closure on_resolver_result_changed_;

  // LB policy and associated state.
  OrphanablePtr<LoadBalancingPolicy> lb_policy_;
  bool exit_idle_when_lb_policy_arrives_ = false;

  grpc_connectivity_state_tracker state_tracker_;
};

}  // namespace grpc_core

#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_REQUEST_ROUTING_H */