GRPC C++  0.10.0.0
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
server.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015, Google Inc.
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  * * Redistributions of source code must retain the above copyright
11  * notice, this list of conditions and the following disclaimer.
12  * * Redistributions in binary form must reproduce the above
13  * copyright notice, this list of conditions and the following disclaimer
14  * in the documentation and/or other materials provided with the
15  * distribution.
16  * * Neither the name of Google Inc. nor the names of its
17  * contributors may be used to endorse or promote products derived from
18  * this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  *
32  */
33 
34 #ifndef GRPCXX_SERVER_H
35 #define GRPCXX_SERVER_H
36 
37 #include <list>
38 #include <memory>
39 
41 #include <grpc++/config.h>
42 #include <grpc++/impl/call.h>
44 #include <grpc++/impl/sync.h>
45 #include <grpc++/status.h>
46 
47 struct grpc_server;
48 
49 namespace grpc {
50 
51 class AsynchronousService;
52 class GenericServerContext;
53 class AsyncGenericService;
54 class RpcService;
55 class RpcServiceMethod;
56 class ServerAsyncStreamingInterface;
57 class ServerCredentials;
58 class ThreadPoolInterface;
59 
60 // Currently it only supports handling rpcs in a single thread.
61 class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
62  public:
63  ~Server();
64 
65  // Shutdown the server, block until all rpc processing finishes.
66  void Shutdown();
67 
68  // Block waiting for all work to complete (the server must either
69  // be shutting down or some other thread must call Shutdown for this
70  // function to ever return)
71  void Wait();
72 
73  private:
74  friend class AsyncGenericService;
75  friend class AsynchronousService;
76  friend class ServerBuilder;
77 
78  class SyncRequest;
79  class AsyncRequest;
80  class ShutdownRequest;
81 
82  // ServerBuilder use only
83  Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
84  int max_message_size);
85  // Register a service. This call does not take ownership of the service.
86  // The service must exist for the lifetime of the Server instance.
87  bool RegisterService(const grpc::string *host, RpcService* service);
88  bool RegisterAsyncService(const grpc::string *host, AsynchronousService* service);
89  void RegisterAsyncGenericService(AsyncGenericService* service);
90  // Add a listening port. Can be called multiple times.
91  int AddListeningPort(const grpc::string& addr, ServerCredentials* creds);
92  // Start the server.
93  bool Start();
94 
95  void HandleQueueClosed();
96  void RunRpc();
97  void ScheduleCallback();
98 
99  void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE;
100 
101  class BaseAsyncRequest : public CompletionQueueTag {
102  public:
103  BaseAsyncRequest(Server* server, ServerContext* context,
105  CompletionQueue* call_cq, void* tag);
106  virtual ~BaseAsyncRequest();
107 
108  bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
109 
110  protected:
111  Server* const server_;
112  ServerContext* const context_;
113  ServerAsyncStreamingInterface* const stream_;
114  CompletionQueue* const call_cq_;
115  void* const tag_;
116  grpc_call* call_;
117  grpc_metadata_array initial_metadata_array_;
118  };
119 
120  class RegisteredAsyncRequest : public BaseAsyncRequest {
121  public:
122  RegisteredAsyncRequest(Server* server, ServerContext* context,
124  CompletionQueue* call_cq, void* tag);
125 
126  // uses BaseAsyncRequest::FinalizeResult
127 
128  protected:
129  void IssueRequest(void* registered_method, grpc_byte_buffer** payload,
130  ServerCompletionQueue* notification_cq);
131  };
132 
133  class NoPayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
134  public:
135  NoPayloadAsyncRequest(void* registered_method, Server* server,
136  ServerContext* context,
137  ServerAsyncStreamingInterface* stream,
138  CompletionQueue* call_cq,
139  ServerCompletionQueue* notification_cq, void* tag)
140  : RegisteredAsyncRequest(server, context, stream, call_cq, tag) {
141  IssueRequest(registered_method, nullptr, notification_cq);
142  }
143 
144  // uses RegisteredAsyncRequest::FinalizeResult
145  };
146 
147  template <class Message>
148  class PayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
149  public:
150  PayloadAsyncRequest(void* registered_method, Server* server,
151  ServerContext* context,
152  ServerAsyncStreamingInterface* stream,
153  CompletionQueue* call_cq,
154  ServerCompletionQueue* notification_cq, void* tag,
155  Message* request)
156  : RegisteredAsyncRequest(server, context, stream, call_cq, tag),
157  request_(request) {
158  IssueRequest(registered_method, &payload_, notification_cq);
159  }
160 
161  bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
162  bool serialization_status =
163  *status && payload_ &&
164  SerializationTraits<Message>::Deserialize(payload_, request_,
165  server_->max_message_size_)
166  .ok();
167  bool ret = RegisteredAsyncRequest::FinalizeResult(tag, status);
168  *status = serialization_status && *status;
169  return ret;
170  }
171 
172  private:
173  grpc_byte_buffer* payload_;
174  Message* const request_;
175  };
176 
177  class GenericAsyncRequest GRPC_FINAL : public BaseAsyncRequest {
178  public:
179  GenericAsyncRequest(Server* server, GenericServerContext* context,
180  ServerAsyncStreamingInterface* stream,
181  CompletionQueue* call_cq,
182  ServerCompletionQueue* notification_cq, void* tag);
183 
184  bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
185 
186  private:
187  grpc_call_details call_details_;
188  };
189 
190  template <class Message>
191  void RequestAsyncCall(void* registered_method, ServerContext* context,
192  ServerAsyncStreamingInterface* stream,
193  CompletionQueue* call_cq,
194  ServerCompletionQueue* notification_cq, void* tag,
195  Message* message) {
196  new PayloadAsyncRequest<Message>(registered_method, this, context, stream,
197  call_cq, notification_cq, tag, message);
198  }
199 
200  void RequestAsyncCall(void* registered_method, ServerContext* context,
201  ServerAsyncStreamingInterface* stream,
202  CompletionQueue* call_cq,
203  ServerCompletionQueue* notification_cq, void* tag) {
204  new NoPayloadAsyncRequest(registered_method, this, context, stream, call_cq,
205  notification_cq, tag);
206  }
207 
208  void RequestAsyncGenericCall(GenericServerContext* context,
209  ServerAsyncStreamingInterface* stream,
210  CompletionQueue* call_cq,
211  ServerCompletionQueue* notification_cq,
212  void* tag) {
213  new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
214  tag);
215  }
216 
217  const int max_message_size_;
218 
219  // Completion queue.
220  CompletionQueue cq_;
221 
222  // Sever status
223  grpc::mutex mu_;
224  bool started_;
225  bool shutdown_;
226  // The number of threads which are running callbacks.
227  int num_running_cb_;
228  grpc::condition_variable callback_cv_;
229 
230  std::list<SyncRequest>* sync_methods_;
231 
232  // Pointer to the c grpc server.
233  grpc_server* const server_;
234 
235  ThreadPoolInterface* thread_pool_;
236  // Whether the thread pool is created and owned by the server.
237  bool thread_pool_owned_;
238 };
239 
240 } // namespace grpc
241 
242 #endif // GRPCXX_SERVER_H
Definition: completion_queue.h:75
void Shutdown()
std::string string
Definition: config.h:112
An abstract collection of call ops, used to generate the grpc_call_op structure to pass down to the l...
Definition: call.h:482
Definition: service_type.h:68
Definition: sync_no_cxx11.h:45
Definition: service_type.h:57
#define GRPC_FINAL
Definition: config.h:71
Definition: async_generic_service.h:59
Definition: thread_pool_interface.h:42
Definition: sync_no_cxx11.h:84
Definition: grpc_library.h:41
Definition: rpc_service_method.h:227
Definition: server_credentials.h:48
Definition: server.h:61
Definition: call.h:565
Definition: server_context.h:86
Definition: completion_queue.h:87
Definition: completion_queue.h:162
#define GRPC_OVERRIDE
Definition: config.h:77
Definition: call.h:558
Definition: server_builder.h:54
::google::protobuf::Message Message
Definition: config_protobuf.h:60