From 42d78c8cdbb1f7a586c55412fd499985b7a2a448 Mon Sep 17 00:00:00 2001 From: Junchao Wu Date: Thu, 5 Nov 2015 13:33:33 -0800 Subject: [PATCH 1/6] Remove assertion for connection close instead throwing StreamClosedError --- tchannel/tornado/connection.py | 16 +++++----------- tchannel/tornado/peer.py | 16 ++++++++++++++-- tchannel/tornado/request.py | 4 ++++ tests/integration/test_client_server.py | 22 ++++++++++++++++++++++ tests/tornado/test_connection.py | 7 ++++--- 5 files changed, 49 insertions(+), 16 deletions(-) diff --git a/tchannel/tornado/connection.py b/tchannel/tornado/connection.py index 1a0477f5..f6b5f859 100644 --- a/tchannel/tornado/connection.py +++ b/tchannel/tornado/connection.py @@ -163,6 +163,9 @@ def _on_close(self): except queues.QueueEmpty: pass + if self._close_cb: + self._close_cb() + def await(self): """Get the next call to this TChannel.""" if self._loop_running: @@ -208,10 +211,7 @@ def on_read_size(read_size_future): return read_body_future def on_error(future): - exception = future.exception() - - if isinstance(exception, tornado.iostream.StreamClosedError): - self.close() + log.info("Failed to read data: %s", future.exception()) size_width = frame.frame_rw.size_rw.width() read_bytes_future = self.connection.read_bytes(size_width) @@ -290,7 +290,6 @@ def send(self, message): :returns: A Future containing the response for the message """ - assert not self.closed assert self._loop_running, "Perform a handshake first." assert message.message_type in self.CALL_REQ_TYPES, ( "Message '%s' can't use send" % repr(message) @@ -314,8 +313,6 @@ def write(self, message): :param message: Message to write. """ - assert not self.closed - message.id = message.id or self.next_message_id() if message.message_type in self.CALL_REQ_TYPES: @@ -350,10 +347,8 @@ def _write(self, message): return self.connection.write(body) def close(self): - if not self.connection.closed(): + if not self.closed: self.connection.close() - if self._close_cb: - self._close_cb() @tornado.gen.coroutine def initiate_handshake(self, headers): @@ -623,7 +618,6 @@ def send_request(self, request): :returns: A Future containing the response for the request """ - assert not self.closed assert self._loop_running, "Perform a handshake first." assert request.id not in self._outstanding, ( diff --git a/tchannel/tornado/peer.py b/tchannel/tornado/peer.py index f86edde5..2d1d1398 100644 --- a/tchannel/tornado/peer.py +++ b/tchannel/tornado/peer.py @@ -27,17 +27,19 @@ from collections import deque from itertools import takewhile, dropwhile from tornado import gen +from tornado.iostream import StreamClosedError from ..schemes import DEFAULT as DEFAULT_SCHEME from ..retry import ( DEFAULT as DEFAULT_RETRY, DEFAULT_RETRY_LIMIT ) -from tchannel.event import EventType + from ..context import get_current_context from ..errors import BadRequestError from ..errors import NoAvailablePeerError from ..errors import TChannelError from ..errors import NetworkError +from ..event import EventType from ..glossary import DEFAULT_TIMEOUT from ..glossary import MAX_SIZE_OF_ARG1 from ..zipkin.annotation import Endpoint @@ -409,6 +411,17 @@ def _send(self, connection, req): with timeout(response_future, req.ttl): try: response = yield response_future + except StreamClosedError as error: + error = NetworkError( + id=req.id, + description=error.message, + tracing=req.tracing, + ) + # event: after_receive_error + self.tchannel.event_emitter.fire( + EventType.after_receive_error, req, error, + ) + raise except TChannelError as error: # event: after_receive_error self.tchannel.event_emitter.fire( @@ -429,7 +442,6 @@ def send_with_retry(self, request, peer, retry_limit, connection): try: response = yield self._send(connection, request) raise gen.Return(response) - # Why are we retying on all errors???? except TChannelError as error: blacklist.add(peer.hostport) (peer, connection) = yield self._prepare_for_retry( diff --git a/tchannel/tornado/request.py b/tchannel/tornado/request.py index 8f6288c5..e274c640 100644 --- a/tchannel/tornado/request.py +++ b/tchannel/tornado/request.py @@ -24,6 +24,7 @@ import tornado import tornado.gen +from tornado.iostream import StreamClosedError from tchannel import retry @@ -177,6 +178,9 @@ def should_retry_on_error(self, error): if retry_flag == retry.NEVER: return False + if isinstance(error, StreamClosedError): + return True + if error.code in [ErrorCode.bad_request, ErrorCode.cancelled, ErrorCode.unhealthy]: return False diff --git a/tests/integration/test_client_server.py b/tests/integration/test_client_server.py index e793f04b..3c9932eb 100644 --- a/tests/integration/test_client_server.py +++ b/tests/integration/test_client_server.py @@ -26,6 +26,7 @@ from tchannel.errors import NetworkError from tchannel.errors import BadRequestError from tchannel import TChannel +from tchannel.event import EventHook from tchannel.tornado.connection import StreamConnection from tests.util import big_arg @@ -109,3 +110,24 @@ def test_endpoint_not_found(mock_server): endpoint='fooo', hostport=mock_server.hostport, ) + + +@pytest.mark.gen_test +def test_connection_close(mock_server): + tchannel = TChannel(name='test') + + class TestHook(EventHook): + def before_send_request(self, request): + # close the connection + peer = tchannel._dep_tchannel.peers.get(mock_server.hostport) + peer.close() + peer.connections[0].closed = True + + tchannel.hooks.register(TestHook()) + + with pytest.raises(NetworkError): + yield tchannel.raw( + service='test-service', + hostport=mock_server.hostport, + endpoint='testg', + ) diff --git a/tests/tornado/test_connection.py b/tests/tornado/test_connection.py index cdea6aca..167ea1b3 100644 --- a/tests/tornado/test_connection.py +++ b/tests/tornado/test_connection.py @@ -71,12 +71,13 @@ def test_close_callback_is_called(): server = TChannel('server') server.listen() - close_cb = mock.Mock() + cb_future = tornado.gen.Future() conn = yield StreamConnection.outgoing( server.hostport, tchannel=mock.MagicMock() ) - conn.set_close_callback(close_cb) + conn.set_close_callback(lambda: cb_future.set_result(True)) conn.close() - close_cb.assert_called_once_with() + + assert (yield cb_future) From 6947caa7584a69f4049a07df4cafb8d437bb3556 Mon Sep 17 00:00:00 2001 From: Junchao Wu Date: Thu, 5 Nov 2015 17:20:46 -0800 Subject: [PATCH 2/6] update connection test --- tests/integration/test_client_server.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/tests/integration/test_client_server.py b/tests/integration/test_client_server.py index 3c9932eb..2f1fe7c2 100644 --- a/tests/integration/test_client_server.py +++ b/tests/integration/test_client_server.py @@ -23,10 +23,9 @@ import pytest from tchannel import tcurl +from tchannel import TChannel from tchannel.errors import NetworkError from tchannel.errors import BadRequestError -from tchannel import TChannel -from tchannel.event import EventHook from tchannel.tornado.connection import StreamConnection from tests.util import big_arg @@ -116,14 +115,16 @@ def test_endpoint_not_found(mock_server): def test_connection_close(mock_server): tchannel = TChannel(name='test') - class TestHook(EventHook): - def before_send_request(self, request): - # close the connection - peer = tchannel._dep_tchannel.peers.get(mock_server.hostport) - peer.close() - peer.connections[0].closed = True + # use a bad call to finish the hand shake and build the connection. + with pytest.raises(BadRequestError): + yield tchannel.raw( + service='test-service', + hostport=mock_server.hostport, + endpoint='testg', + ) - tchannel.hooks.register(TestHook()) + # close the server and close the connection. + mock_server.tchannel._dep_tchannel.close() with pytest.raises(NetworkError): yield tchannel.raw( From 8d1dfd9614d56b3b8b3343bec7f3118b904aeb8b Mon Sep 17 00:00:00 2001 From: Junchao Wu Date: Fri, 6 Nov 2015 12:04:10 -0800 Subject: [PATCH 3/6] change the exception variable name --- tchannel/tornado/peer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tchannel/tornado/peer.py b/tchannel/tornado/peer.py index 2d1d1398..e57d7f7c 100644 --- a/tchannel/tornado/peer.py +++ b/tchannel/tornado/peer.py @@ -412,7 +412,7 @@ def _send(self, connection, req): try: response = yield response_future except StreamClosedError as error: - error = NetworkError( + network_error = NetworkError( id=req.id, description=error.message, tracing=req.tracing, @@ -421,7 +421,7 @@ def _send(self, connection, req): self.tchannel.event_emitter.fire( EventType.after_receive_error, req, error, ) - raise + raise network_error except TChannelError as error: # event: after_receive_error self.tchannel.event_emitter.fire( From 30c4d723677cd3daae484411c1aec6b9a618ff8b Mon Sep 17 00:00:00 2001 From: Junchao Wu Date: Fri, 6 Nov 2015 14:18:01 -0800 Subject: [PATCH 4/6] Add streamclosederror catpure around stream.connect --- tchannel/tornado/connection.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tchannel/tornado/connection.py b/tchannel/tornado/connection.py index f6b5f859..c1937159 100644 --- a/tchannel/tornado/connection.py +++ b/tchannel/tornado/connection.py @@ -30,6 +30,7 @@ import tornado.queues as queues from tornado import stack_context +from tornado.iostream import StreamClosedError from .. import errors @@ -448,7 +449,7 @@ def outgoing(cls, hostport, process_name=None, serve_hostport=None, log.debug("Connecting to %s", hostport) try: yield stream.connect((host, int(port))) - except socket.error as e: + except (StreamClosedError, socket.error) as e: log.warn("Couldn't connect to %s", hostport) raise NetworkError( "Couldn't connect to %s" % hostport, e From ab0c20a41b66f49cc35c9bb62adf74e8cec97795 Mon Sep 17 00:00:00 2001 From: Junchao Wu Date: Fri, 6 Nov 2015 15:04:12 -0800 Subject: [PATCH 5/6] add change log --- CHANGES.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.rst b/CHANGES.rst index 4a95f754..d085a6c8 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -11,6 +11,7 @@ Changes by Version planned in 0.18. - Reduced Zipkin submission failures to warnings. - Limit the size of arg1 to 16KB. +- Fix bug which prevented requests from being retried if the candidate connection was previously terminated. 0.18.3 (unreleased) From 4012b26a89d91d01b484416d5e78e51ef99db78a Mon Sep 17 00:00:00 2001 From: Abhinav Gupta Date: Fri, 6 Nov 2015 15:15:01 -0800 Subject: [PATCH 6/6] Fix changelog formatting. --- CHANGES.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGES.rst b/CHANGES.rst index d085a6c8..69793d3c 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -11,7 +11,8 @@ Changes by Version planned in 0.18. - Reduced Zipkin submission failures to warnings. - Limit the size of arg1 to 16KB. -- Fix bug which prevented requests from being retried if the candidate connection was previously terminated. +- Fix bug which prevented requests from being retried if the candidate + connection was previously terminated. 0.18.3 (unreleased)