Skip to content
This repository has been archived by the owner on Jan 5, 2024. It is now read-only.

Remove assertion for connection close instead throwing StreamClosedError #291

Merged
merged 6 commits into from
Nov 7, 2015
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions tchannel/tornado/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ def _on_close(self):
except queues.QueueEmpty:
pass

if self._close_cb:
self._close_cb()

def await(self):
"""Get the next call to this TChannel."""
if self._loop_running:
Expand Down Expand Up @@ -208,10 +211,7 @@ def on_read_size(read_size_future):
return read_body_future

def on_error(future):
exception = future.exception()

if isinstance(exception, tornado.iostream.StreamClosedError):
self.close()
log.info("Failed to read data: %s", future.exception())

size_width = frame.frame_rw.size_rw.width()
read_bytes_future = self.connection.read_bytes(size_width)
Expand Down Expand Up @@ -290,7 +290,6 @@ def send(self, message):
:returns:
A Future containing the response for the message
"""
assert not self.closed
assert self._loop_running, "Perform a handshake first."
assert message.message_type in self.CALL_REQ_TYPES, (
"Message '%s' can't use send" % repr(message)
Expand All @@ -314,8 +313,6 @@ def write(self, message):
:param message:
Message to write.
"""
assert not self.closed

message.id = message.id or self.next_message_id()

if message.message_type in self.CALL_REQ_TYPES:
Expand Down Expand Up @@ -350,10 +347,8 @@ def _write(self, message):
return self.connection.write(body)

def close(self):
if not self.connection.closed():
if not self.closed:
self.connection.close()
if self._close_cb:
self._close_cb()

@tornado.gen.coroutine
def initiate_handshake(self, headers):
Expand Down Expand Up @@ -623,7 +618,6 @@ def send_request(self, request):
:returns:
A Future containing the response for the request
"""
assert not self.closed
assert self._loop_running, "Perform a handshake first."

assert request.id not in self._outstanding, (
Expand Down
16 changes: 14 additions & 2 deletions tchannel/tornado/peer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@
from collections import deque
from itertools import takewhile, dropwhile
from tornado import gen
from tornado.iostream import StreamClosedError

from ..schemes import DEFAULT as DEFAULT_SCHEME
from ..retry import (
DEFAULT as DEFAULT_RETRY, DEFAULT_RETRY_LIMIT
)
from tchannel.event import EventType

from ..context import get_current_context
from ..errors import BadRequestError
from ..errors import NoAvailablePeerError
from ..errors import TChannelError
from ..errors import NetworkError
from ..event import EventType
from ..glossary import DEFAULT_TIMEOUT
from ..glossary import MAX_SIZE_OF_ARG1
from ..zipkin.annotation import Endpoint
Expand Down Expand Up @@ -409,6 +411,17 @@ def _send(self, connection, req):
with timeout(response_future, req.ttl):
try:
response = yield response_future
except StreamClosedError as error:
error = NetworkError(
id=req.id,
description=error.message,
tracing=req.tracing,
)
# event: after_receive_error
self.tchannel.event_emitter.fire(
EventType.after_receive_error, req, error,
)
raise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused here. Are you trying to re-raise the StreamClosedError you caught or do you want the new NetworkError to be raised?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is will be raised again since we have a try catch from the caller to tell whether we should retry based on the exception or not.

This part will be refined in this task: #285

except TChannelError as error:
# event: after_receive_error
self.tchannel.event_emitter.fire(
Expand All @@ -429,7 +442,6 @@ def send_with_retry(self, request, peer, retry_limit, connection):
try:
response = yield self._send(connection, request)
raise gen.Return(response)
# Why are we retying on all errors????
except TChannelError as error:
blacklist.add(peer.hostport)
(peer, connection) = yield self._prepare_for_retry(
Expand Down
4 changes: 4 additions & 0 deletions tchannel/tornado/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import tornado
import tornado.gen
from tornado.iostream import StreamClosedError

from tchannel import retry

Expand Down Expand Up @@ -177,6 +178,9 @@ def should_retry_on_error(self, error):
if retry_flag == retry.NEVER:
return False

if isinstance(error, StreamClosedError):
return True

if error.code in [ErrorCode.bad_request, ErrorCode.cancelled,
ErrorCode.unhealthy]:
return False
Expand Down
22 changes: 22 additions & 0 deletions tests/integration/test_client_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from tchannel.errors import NetworkError
from tchannel.errors import BadRequestError
from tchannel import TChannel
from tchannel.event import EventHook
from tchannel.tornado.connection import StreamConnection
from tests.util import big_arg

Expand Down Expand Up @@ -109,3 +110,24 @@ def test_endpoint_not_found(mock_server):
endpoint='fooo',
hostport=mock_server.hostport,
)


@pytest.mark.gen_test
def test_connection_close(mock_server):
tchannel = TChannel(name='test')

class TestHook(EventHook):
def before_send_request(self, request):
# close the connection
peer = tchannel._dep_tchannel.peers.get(mock_server.hostport)
peer.close()
peer.connections[0].closed = True

tchannel.hooks.register(TestHook())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you just call close right here before making the call instead of using the hook?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I call close, then we make a call, we will choose peer and rebuilt a connection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant what if you stop the server here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the way iostream calls the callback method is

    def _run_callback(self, callback, *args):
       ....
        with stack_context.NullContext():
            self._pending_callbacks += 1
            self.io_loop.add_callback(wrapper)

from the last line, we see the callback won't be called immediately, there is some delay. That's why even if I stop the server, the call back is triggered later.

for testing purpose: I manually set peer.connections[0].closed = True

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not the problem I'm trying to solve here. I'm trying to solve the hook being used to close the connection. Why not,

mock_server.tchannel._dep_tchannel.close()

That will tell the server to immediately stop listening for new incoming connections, so when the call happens two lines below, it'll be trying to hit a hostport that doesn't exist.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, just to confirm: Is this test just trying to verify that we raise a NetworkError when the destination doesn't exist?


with pytest.raises(NetworkError):
yield tchannel.raw(
service='test-service',
hostport=mock_server.hostport,
endpoint='testg',
)
7 changes: 4 additions & 3 deletions tests/tornado/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ def test_close_callback_is_called():
server = TChannel('server')
server.listen()

close_cb = mock.Mock()
cb_future = tornado.gen.Future()

conn = yield StreamConnection.outgoing(
server.hostport, tchannel=mock.MagicMock()
)
conn.set_close_callback(close_cb)
conn.set_close_callback(lambda: cb_future.set_result(True))

conn.close()
close_cb.assert_called_once_with()

assert (yield cb_future)