Skip to content
This repository has been archived by the owner on Dec 7, 2018. It is now read-only.

Nonblock without exception #166

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions lib/celluloid/io/socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,32 @@ def self.try_convert(socket, convert_io = true)
end
end

private

def perform_io
loop do
begin
result = yield

case result
when :wait_readable then wait_readable
when :wait_writable then wait_writable
when NilClass then return :eof
else return result
end
rescue ::IO::WaitReadable
wait_readable
retry
rescue ::IO::WaitWritable,
Errno::EAGAIN
wait_writable
retry
end
end
rescue EOFError
:eof
end

class << self
extend Forwardable
def_delegators '::Socket', *(::Socket.methods - self.methods - [:try_convert])
Expand Down
37 changes: 21 additions & 16 deletions lib/celluloid/io/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,10 @@ def sysread(length = nil, buffer = nil)
buffer ||= ''.force_encoding(Encoding::ASCII_8BIT)

@read_latch.synchronize do
begin
read_nonblock(length, buffer)
rescue ::IO::WaitReadable
wait_readable
retry
op = perform_io do
read_nonblock(length, buffer)
end
raise EOFError if op == :eof
end

buffer
Expand All @@ -58,17 +56,10 @@ def syswrite(string)

@write_latch.synchronize do
while total_written < length
begin
written = write_nonblock(remaining)
rescue ::IO::WaitWritable
wait_writable
retry
rescue EOFError
return total_written
rescue Errno::EAGAIN
wait_writable
retry
end
written = perform_io do
write_nonblock(remaining)
end
return total_written if written == :eof

total_written += written

Expand All @@ -80,6 +71,20 @@ def syswrite(string)
total_written
end

# TODO: remove after ending ruby 2.0.0 support
if RUBY_VERSION >= "2.1"
def read_nonblock(*args, **options)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes the semantics of this method on Ruby >= "2.1" making it exceptionless by default and unilaterally (i.e. even if exception: true were passed)

I'd rather we preserve Ruby core semantics as much as possible.

How about an internal __read_nonblock method which is always exceptional on Ruby < 2.1 and always exceptionless on 2.1+?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll get back at this tomorrow. But isn't that what this method is doing already? No redefinition is happening under ruby < 2.1, it just delegates to the socket.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are redefining it under Ruby 2.1+ such that:

  • The exception: ... parameter is completely ignored
  • The default is switched from exceptional to exceptionless

options[:exception] = false unless options.has_key?(:exception)
super(*args, **options)
end

def write_nonblock(*args, **options)
options[:exception] = false unless options.has_key?(:exception)
super(*args, **options)
end
end


# Reads +size+ bytes from the stream. If +buf+ is provided it must
# reference a string which will receive the data.
#
Expand Down
25 changes: 15 additions & 10 deletions lib/celluloid/io/udp_socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,23 @@ def wait_readable; Celluloid::IO.wait_readable(self); end
# MSG_ options. The first element of the results, mesg, is the data
# received. The second element, sender_addrinfo, contains
# protocol-specific address information of the sender.
def recvfrom(maxlen, flags = 0)
begin
if RUBY_VERSION >= "2.3"
def recvfrom(*args, **options)
socket = to_io
if socket.respond_to? :recvfrom_nonblock
socket.recvfrom_nonblock(maxlen, flags)
else
# FIXME: hax for JRuby
socket.recvfrom(maxlen, flags)
options[:exception] = false unless options.has_key?(:exception)
perform_io { socket.recvfrom_nonblock(*args, **options) }
end
else
def recvfrom(*args)
socket = to_io
perform_io do
if socket.respond_to? :recvfrom_nonblock
socket.recvfrom_nonblock(*args)
else
# FIXME: hax for JRuby
socket.recvfrom(*args)
end
end
rescue ::IO::WaitReadable
wait_readable
retry
end
end

Expand Down
2 changes: 1 addition & 1 deletion spec/celluloid/io/reactor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Main server body:
within_io_actor do
begin
timeout(2) do
Timeout.timeout(2) do
loop do
socket.readpartial(2046)
end
Expand Down
4 changes: 3 additions & 1 deletion spec/celluloid/io/tcp_socket_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@
with_connected_sockets(example_port) do |subject, peer|
subject.sync = false
within_io_actor { subject << payload }
expect{ peer.read_nonblock payload.length }.to raise_exception ::IO::WaitReadable
if RUBY_VERSION < "2.1"
expect{ peer.read_nonblock payload.length }.to raise_exception ::IO::WaitReadable
end
within_io_actor { subject.close }
expect(peer.read).to eq payload
end
Expand Down