aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby
diff options
context:
space:
mode:
authorGravatar Tim Emiola <temiola@google.com>2015-05-28 14:06:41 -0700
committerGravatar Tim Emiola <temiola@google.com>2015-05-28 14:18:38 -0700
commitdd7a036f75c63f9a266c9e8ee4e79942505a9892 (patch)
treeb8f9e322c112ab726fc88834effe9e855bc792ef /src/ruby
parentd49aeaea49aacfc0179463bc0a2a389ddfdffd60 (diff)
Add's logging tweaks thread interaction for bidi_server
Diffstat (limited to 'src/ruby')
-rwxr-xr-xsrc/ruby/bin/interop/interop_server.rb7
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb1
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb28
-rw-r--r--src/ruby/lib/grpc/generic/rpc_desc.rb1
4 files changed, 23 insertions, 14 deletions
diff --git a/src/ruby/bin/interop/interop_server.rb b/src/ruby/bin/interop/interop_server.rb
index 78cb8dd836..2ba8d2c19e 100755
--- a/src/ruby/bin/interop/interop_server.rb
+++ b/src/ruby/bin/interop/interop_server.rb
@@ -128,16 +128,19 @@ class TestTarget < Grpc::Testing::TestService::Service
cls = StreamingOutputCallResponse
Thread.new do
begin
+ GRPC.logger.info('interop-server: started receiving')
reqs.each do |req|
- GRPC.logger.info("read #{req.inspect}")
resp_size = req.response_parameters[0].size
+ GRPC.logger.info("read a req, response size is #{resp_size}")
resp = cls.new(payload: Payload.new(type: req.response_type,
body: nulls(resp_size)))
q.push(resp)
end
- GRPC.logger.info('finished reads')
+ GRPC.logger.info('interop-server: finished receiving')
q.push(self)
rescue StandardError => e
+ GRPC.logger.info('interop-server: failed')
+ GRPC.logger.warn(e)
q.push(e) # share the exception with the enumerator
end
end
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 5f7beb5ab1..04abab8ac3 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -39,6 +39,7 @@ class Struct
return nil if status.nil?
fail GRPC::Cancelled if status.code == GRPC::Core::StatusCodes::CANCELLED
if status.code != GRPC::Core::StatusCodes::OK
+ GRPC.logger.debug("Failing with status #{status}")
# raise BadStatus, propagating the metadata if present.
md = status.metadata
with_sym_keys = Hash[md.each_pair.collect { |x, y| [x.to_sym, y] }]
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index 67143d40cf..f1b9f6b00d 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -100,6 +100,7 @@ module GRPC
replys = gen_each_reply.call(each_queued_msg)
@enq_th = start_write_loop(replys, is_client: false)
@loop_th = start_read_loop
+ @enq_th.join if @enq_th.alive?
end
private
@@ -115,7 +116,7 @@ module GRPC
return enum_for(:each_queued_msg) unless block_given?
count = 0
loop do
- GRPC.logger.debug("each_queued_msg: msg##{count}")
+ GRPC.logger.debug("each_queued_msg: waiting##{count}")
count += 1
req = @readq.pop
GRPC.logger.debug("each_queued_msg: req = #{req}")
@@ -123,70 +124,73 @@ module GRPC
break if req.equal?(END_OF_READS)
yield req
end
- @enq_th.join if @enq_th.alive?
end
# during bidi-streaming, read the requests to send from a separate thread
# read so that read_loop does not block waiting for requests to read.
def start_write_loop(requests, is_client: true)
Thread.new do # TODO: run on a thread pool
- write_tag = Object.new
+ GRPC.logger.debug('bidi-write-loop: starting')
begin
+ write_tag = Object.new
count = 0
requests.each do |req|
- GRPC.logger.debug("bidi-write_loop: #{count}")
+ GRPC.logger.debug("bidi-write-loop: #{count}")
count += 1
payload = @marshal.call(req)
@call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_MESSAGE => payload)
end
+ GRPC.logger.debug("bidi-write-loop: #{count} writes done")
if is_client
- GRPC.logger.debug("bidi-write-loop: sent #{count}, waiting")
+ GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
+ @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
+ SEND_CLOSE_FROM_CLIENT => nil)
batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
- SEND_CLOSE_FROM_CLIENT => nil,
RECV_STATUS_ON_CLIENT => nil)
batch_result.check_status
end
rescue StandardError => e
- GRPC.logger.warn('bidi-write_loop: failed')
+ GRPC.logger.warn('bidi-write-loop: failed')
GRPC.logger.warn(e)
raise e
end
+ GRPC.logger.debug('bidi-write-loop: finished')
end
end
# starts the read loop
def start_read_loop
Thread.new do
+ GRPC.logger.debug('bidi-read-loop: starting')
begin
read_tag = Object.new
count = 0
-
# queue the initial read before beginning the loop
loop do
- GRPC.logger.debug("bidi-read_loop: #{count}")
+ GRPC.logger.debug("bidi-read-loop: #{count}")
count += 1
# TODO: ensure metadata is read if available, currently it's not
batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
RECV_MESSAGE => nil)
# handle the next message
if batch_result.message.nil?
+ GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}")
@readq.push(END_OF_READS)
GRPC.logger.debug('bidi-read-loop: done reading!')
break
end
# push the latest read onto the queue and continue reading
- GRPC.logger.debug("received req: #{batch_result.message}")
res = @unmarshal.call(batch_result.message)
@readq.push(res)
end
-
rescue StandardError => e
- GRPC.logger.warn('bidi: read_loop failed')
+ GRPC.logger.warn('bidi: read-loop failed')
GRPC.logger.warn(e)
@readq.push(e) # let each_queued_msg terminate with this error
end
+ GRPC.logger.debug('bidi-read-loop: finished')
end
end
end
diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb
index 2fd61c5f7e..dd90d8d91d 100644
--- a/src/ruby/lib/grpc/generic/rpc_desc.rb
+++ b/src/ruby/lib/grpc/generic/rpc_desc.rb
@@ -137,6 +137,7 @@ module GRPC
def send_status(active_client, code, details, **kw)
details = 'Not sure why' if details.nil?
+ GRPC.logger.debug("Sending status #{code}:#{details}")
active_client.send_status(code, details, code == OK, **kw)
rescue StandardError => e
GRPC.logger.warn("Could not send status #{code}:#{details}")