GRPC C++  0.11.0.0
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
server.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015, Google Inc.
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  * * Redistributions of source code must retain the above copyright
11  * notice, this list of conditions and the following disclaimer.
12  * * Redistributions in binary form must reproduce the above
13  * copyright notice, this list of conditions and the following disclaimer
14  * in the documentation and/or other materials provided with the
15  * distribution.
16  * * Neither the name of Google Inc. nor the names of its
17  * contributors may be used to endorse or promote products derived from
18  * this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  *
32  */
33 
34 #ifndef GRPCXX_SERVER_H
35 #define GRPCXX_SERVER_H
36 
37 #include <list>
38 #include <memory>
39 
41 #include <grpc++/impl/call.h>
43 #include <grpc++/impl/sync.h>
45 #include <grpc++/support/config.h>
46 #include <grpc++/support/status.h>
47 
48 struct grpc_server;
49 
50 namespace grpc {
51 
52 class AsynchronousService;
53 class GenericServerContext;
54 class AsyncGenericService;
55 class RpcService;
56 class RpcServiceMethod;
57 class ServerAsyncStreamingInterface;
58 class ThreadPoolInterface;
59 
63 class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
64  public:
65  ~Server();
66 
72  template <class T>
73  void Shutdown(const T& deadline) {
74  ShutdownInternal(TimePoint<T>(deadline).raw_time());
75  }
76 
78  void Shutdown() { ShutdownInternal(gpr_inf_future(GPR_CLOCK_MONOTONIC)); }
79 
84  void Wait();
85 
86  private:
87  friend class AsyncGenericService;
88  friend class AsynchronousService;
89  friend class ServerBuilder;
90 
91  class SyncRequest;
92  class AsyncRequest;
93  class ShutdownRequest;
94 
101  Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
102  int max_message_size);
103 
106  bool RegisterService(const grpc::string* host, RpcService* service);
107 
110  bool RegisterAsyncService(const grpc::string* host,
111  AsynchronousService* service);
112 
115  void RegisterAsyncGenericService(AsyncGenericService* service);
116 
128  int AddListeningPort(const grpc::string& addr, ServerCredentials* creds);
129 
138  bool Start(ServerCompletionQueue** cqs, size_t num_cqs);
139 
140  void HandleQueueClosed();
141 
143  void RunRpc();
144 
146  void ScheduleCallback();
147 
148  void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE;
149 
150  void ShutdownInternal(gpr_timespec deadline);
151 
152  class BaseAsyncRequest : public CompletionQueueTag {
153  public:
154  BaseAsyncRequest(Server* server, ServerContext* context,
156  CompletionQueue* call_cq, void* tag,
157  bool delete_on_finalize);
158  virtual ~BaseAsyncRequest();
159 
160  bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
161 
162  protected:
163  Server* const server_;
164  ServerContext* const context_;
165  ServerAsyncStreamingInterface* const stream_;
166  CompletionQueue* const call_cq_;
167  void* const tag_;
168  const bool delete_on_finalize_;
169  grpc_call* call_;
170  grpc_metadata_array initial_metadata_array_;
171  };
172 
173  class RegisteredAsyncRequest : public BaseAsyncRequest {
174  public:
175  RegisteredAsyncRequest(Server* server, ServerContext* context,
177  CompletionQueue* call_cq, void* tag);
178 
179  // uses BaseAsyncRequest::FinalizeResult
180 
181  protected:
182  void IssueRequest(void* registered_method, grpc_byte_buffer** payload,
183  ServerCompletionQueue* notification_cq);
184  };
185 
186  class NoPayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
187  public:
188  NoPayloadAsyncRequest(void* registered_method, Server* server,
189  ServerContext* context,
190  ServerAsyncStreamingInterface* stream,
191  CompletionQueue* call_cq,
192  ServerCompletionQueue* notification_cq, void* tag)
193  : RegisteredAsyncRequest(server, context, stream, call_cq, tag) {
194  IssueRequest(registered_method, nullptr, notification_cq);
195  }
196 
197  // uses RegisteredAsyncRequest::FinalizeResult
198  };
199 
200  template <class Message>
201  class PayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
202  public:
203  PayloadAsyncRequest(void* registered_method, Server* server,
204  ServerContext* context,
205  ServerAsyncStreamingInterface* stream,
206  CompletionQueue* call_cq,
207  ServerCompletionQueue* notification_cq, void* tag,
208  Message* request)
209  : RegisteredAsyncRequest(server, context, stream, call_cq, tag),
210  request_(request) {
211  IssueRequest(registered_method, &payload_, notification_cq);
212  }
213 
214  bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
215  bool serialization_status =
216  *status && payload_ &&
217  SerializationTraits<Message>::Deserialize(payload_, request_,
218  server_->max_message_size_)
219  .ok();
220  bool ret = RegisteredAsyncRequest::FinalizeResult(tag, status);
221  *status = serialization_status && *status;
222  return ret;
223  }
224 
225  private:
226  grpc_byte_buffer* payload_;
227  Message* const request_;
228  };
229 
230  class GenericAsyncRequest : public BaseAsyncRequest {
231  public:
232  GenericAsyncRequest(Server* server, GenericServerContext* context,
233  ServerAsyncStreamingInterface* stream,
234  CompletionQueue* call_cq,
235  ServerCompletionQueue* notification_cq, void* tag,
236  bool delete_on_finalize);
237 
238  bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
239 
240  private:
241  grpc_call_details call_details_;
242  };
243 
244  class UnimplementedAsyncRequestContext;
245  class UnimplementedAsyncRequest;
246  class UnimplementedAsyncResponse;
247 
248  template <class Message>
249  void RequestAsyncCall(void* registered_method, ServerContext* context,
250  ServerAsyncStreamingInterface* stream,
251  CompletionQueue* call_cq,
252  ServerCompletionQueue* notification_cq, void* tag,
253  Message* message) {
254  new PayloadAsyncRequest<Message>(registered_method, this, context, stream,
255  call_cq, notification_cq, tag, message);
256  }
257 
258  void RequestAsyncCall(void* registered_method, ServerContext* context,
259  ServerAsyncStreamingInterface* stream,
260  CompletionQueue* call_cq,
261  ServerCompletionQueue* notification_cq, void* tag) {
262  new NoPayloadAsyncRequest(registered_method, this, context, stream, call_cq,
263  notification_cq, tag);
264  }
265 
266  void RequestAsyncGenericCall(GenericServerContext* context,
267  ServerAsyncStreamingInterface* stream,
268  CompletionQueue* call_cq,
269  ServerCompletionQueue* notification_cq,
270  void* tag) {
271  new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
272  tag, true);
273  }
274 
275  const int max_message_size_;
276 
277  // Completion queue.
278  CompletionQueue cq_;
279 
280  // Sever status
281  grpc::mutex mu_;
282  bool started_;
283  bool shutdown_;
284  // The number of threads which are running callbacks.
285  int num_running_cb_;
286  grpc::condition_variable callback_cv_;
287 
288  std::list<SyncRequest>* sync_methods_;
289  std::unique_ptr<RpcServiceMethod> unknown_method_;
290  bool has_generic_service_;
291 
292  // Pointer to the c grpc server.
293  grpc_server* const server_;
294 
295  ThreadPoolInterface* thread_pool_;
296  // Whether the thread pool is created and owned by the server.
297  bool thread_pool_owned_;
298 };
299 
300 } // namespace grpc
301 
302 #endif // GRPCXX_SERVER_H
An interface allowing implementors to process and filter event tags.
Definition: completion_queue.h:192
void Shutdown()
Shutdown the server, waiting for all rpc processing to finish.
Definition: server.h:78
void * tag_
Definition: channel.cc:118
std::string string
Definition: config.h:112
An abstract collection of call ops, used to generate the grpc_call_op structure to pass down to the l...
Definition: call.h:478
Definition: server.cc:100
Definition: service_type.h:68
void Shutdown(const T &deadline)
Shutdown the server, blocking until all rpc processing finishes.
Definition: server.h:73
Definition: sync_no_cxx11.h:45
Definition: service_type.h:57
#define GRPC_FINAL
Definition: config.h:71
Definition: async_generic_service.h:59
Definition: server.cc:108
Definition: time.h:53
Definition: thread_pool_interface.h:42
~Server()
Definition: server.cc:282
Definition: sync_no_cxx11.h:87
Definition: grpc_library.h:41
Definition: rpc_service_method.h:248
Definition: server_credentials.h:49
Models a gRPC server.
Definition: server.h:63
Definition: call.h:560
void Wait()
Block waiting for all work to complete.
Definition: server.cc:407
Definition: server_context.h:89
A thin wrapper around grpc_completion_queue (see / src/core/surface/completion_queue.h).
Definition: completion_queue.h:81
A specific type of completion queue used by the processing of notifications by servers.
Definition: completion_queue.h:204
#define GRPC_OVERRIDE
Definition: config.h:77
Definition: call.h:553
A builder class for the creation and startup of grpc::Server instances.
Definition: server_builder.h:55
::google::protobuf::Message Message
Definition: config_protobuf.h:60