diff --git a/CHANGELOG b/CHANGELOG index 9dce1406..def5c14b 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,9 @@ Changelog for ssm ================= +* Tue Dev 01 2020 Adrian Coveney - 3.1.0-1 + - Enabled retries for all AMS communication methods to avoid timeouts from + crashing SSM. + * Wed Sep 23 2020 Adrian Coveney - 3.0.0-1 - As part of the migration to Python 3, this release removes support for Python 2.6. diff --git a/README.md b/README.md index 608baa2b..94c9bea5 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ For more information about SSM, see the [EGI wiki](https://wiki.egi.eu/wiki/APEL EOSC-hub logo -SSM is provided by [STFC](https://stfc.ukri.org/), a part of [UK Research and Innovation](www.ukri.org), and is co-funded by the [EOSC-hub](https://www.eosc-hub.eu/) project (Horizon 2020) under Grant number 777536. Licensed under the [Apache 2 License](http://www.apache.org/licenses/LICENSE-2.0). +SSM is provided by [STFC](https://stfc.ukri.org/), a part of [UK Research and Innovation](https://www.ukri.org/), and is co-funded by the [EOSC-hub](https://www.eosc-hub.eu/) project (Horizon 2020) under Grant number 777536. Licensed under the [Apache 2 License](http://www.apache.org/licenses/LICENSE-2.0). ## Installing the RPM diff --git a/apel-ssm.spec b/apel-ssm.spec index e0783246..35e2b778 100644 --- a/apel-ssm.spec +++ b/apel-ssm.spec @@ -4,7 +4,7 @@ %endif Name: apel-ssm -Version: 3.0.0 +Version: 3.1.0 %define releasenumber 1 Release: %{releasenumber}%{?dist} Summary: Secure stomp messenger @@ -100,6 +100,10 @@ rm -rf $RPM_BUILD_ROOT %doc %_defaultdocdir/%{name} %changelog +* Tue Dev 01 2020 Adrian Coveney - 3.1.0-1 + - Enabled retries for all AMS communication methods to avoid timeouts from + crashing SSM. + * Wed Sep 23 2020 Adrian Coveney - 3.0.0-1 - As part of the migration to Python 3, this release removes support for Python 2.6. diff --git a/bin/receiver.py b/bin/receiver.py index fb10245e..0afeb759 100644 --- a/bin/receiver.py +++ b/bin/receiver.py @@ -14,10 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -''' -Script to run a receiving SSM. -@author: Will Rogers -''' +"""Script to run a receiving SSM.""" + from __future__ import print_function from ssm.brokers import StompBrokerGetter, STOMP_SERVICE, STOMP_SSL_SERVICE @@ -51,9 +49,7 @@ def get_dns(dn_file): - ''' - Retrieve a list of DNs from a file. - ''' + """Retrieve a list of DNs from a file.""" dns = [] f = None try: @@ -78,9 +74,7 @@ def get_dns(dn_file): def main(): - ''' - Set up connection, and listen for messages. - ''' + """Set up connection, and listen for messages.""" ver = "SSM %s.%s.%s" % __version__ op = OptionParser(description=__doc__, version=ver) op.add_option('-c', '--config', help='location of config file', @@ -217,10 +211,10 @@ def main(): try: ssm = Ssm2(brokers, - cp.get('messaging','path'), - cert=cp.get('certificates','certificate'), - key=cp.get('certificates','key'), - listen=cp.get('messaging','destination'), + cp.get('messaging', 'path'), + cert=cp.get('certificates', 'certificate'), + key=cp.get('certificates', 'key'), + listen=cp.get('messaging', 'destination'), use_ssl=use_ssl, capath=cp.get('certificates', 'capath'), check_crls=cp.getboolean('certificates', 'check_crls'), diff --git a/bin/sender.py b/bin/sender.py index 45756ebe..30fe95b2 100644 --- a/bin/sender.py +++ b/bin/sender.py @@ -14,10 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -''' -Script to run a sending SSM. -@author: Will Rogers -''' +"""Script to run a sending SSM.""" + from __future__ import print_function from ssm import __version__, set_up_logging, LOG_BREAK @@ -38,13 +36,11 @@ def main(): - ''' - Set up connection, send all messages and quit. - ''' + """Set up connection, send all messages and quit.""" ver = "SSM %s.%s.%s" % __version__ op = OptionParser(description=__doc__, version=ver) op.add_option('-c', '--config', help='location of config file', - default='/etc/apel/sender.cfg') + default='/etc/apel/sender.cfg') op.add_option('-l', '--log_config', help='location of logging config file (optional)', default='/etc/apel/logging.cfg') @@ -174,7 +170,7 @@ def main(): server_cert = None verify_server_cert = True try: - server_cert = cp.get('certificates','server_cert') + server_cert = cp.get('certificates', 'server_cert') try: verify_server_cert = cp.getboolean('certificates', 'verify_server_cert') except ConfigParser.NoOptionError: diff --git a/scripts/ssm-build-deb.sh b/scripts/ssm-build-deb.sh index b819da8b..34e5ef9d 100755 --- a/scripts/ssm-build-deb.sh +++ b/scripts/ssm-build-deb.sh @@ -16,7 +16,7 @@ set -eu -TAG=3.0.0-1 +TAG=3.1.0-1 SOURCE_DIR=~/debbuild/source BUILD_DIR=~/debbuild/build diff --git a/scripts/ssm-build-rpm.sh b/scripts/ssm-build-rpm.sh index 5d9c750a..d04d69dd 100644 --- a/scripts/ssm-build-rpm.sh +++ b/scripts/ssm-build-rpm.sh @@ -10,7 +10,7 @@ rpmdev-setuptree RPMDIR=/home/rpmb/rpmbuild -VERSION=3.0.0-1 +VERSION=3.1.0-1 SSMDIR=apel-ssm-$VERSION # Remove old sources and RPMS diff --git a/ssm/__init__.py b/ssm/__init__.py index a0c0c156..488dda89 100644 --- a/ssm/__init__.py +++ b/ssm/__init__.py @@ -1,4 +1,4 @@ -''' +""" Copyright (C) 2012 STFC. Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,19 +14,18 @@ limitations under the License. @author: Will Rogers -''' +""" import logging import sys -__version__ = (3, 0, 0) +__version__ = (3, 1, 0) LOG_BREAK = '========================================' + def set_up_logging(logfile, level, console): - ''' - Programmatically initialise logging system. - ''' + """Programmatically initialise logging system.""" levels = {'DEBUG': logging.DEBUG, 'INFO': logging.INFO, 'WARN': logging.WARN, diff --git a/ssm/brokers.py b/ssm/brokers.py index b52bbc7e..598824e0 100644 --- a/ssm/brokers.py +++ b/ssm/brokers.py @@ -1,4 +1,4 @@ -''' +""" Copyright (C) 2012 STFC. Licensed under the Apache License, Version 2.0 (the "License"); @@ -17,7 +17,7 @@ Class to interact with a BDII LDAP server to retrieve information about the stomp brokers specified in a network. -''' +""" from __future__ import print_function import ldap @@ -32,32 +32,31 @@ STOMP_PREFIX = 'stomp' STOMP_SSL_PREFIX = 'stomp+ssl' + class StompBrokerGetter(object): - ''' + """Class for seaching a BDII for message brokers. + Given the URL of a BDII, searches for all the STOMP brokers listed that are part of the specified network. - ''' + """ def __init__(self, bdii_url): - ''' - Set up the LDAP connection and strings which are re-used. - ''' + """Set up the LDAP connection and strings which are re-used.""" # Set up the LDAP connection log.debug('Connecting to %s...', bdii_url) self._ldap_conn = ldap.initialize(bdii_url) self._base_dn = 'o=grid' self._service_id_key = 'GlueServiceUniqueID' - self._endpoint_key = 'GlueServiceEndpoint' + self._endpoint_key = 'GlueServiceEndpoint' self._service_data_value_key = 'GlueServiceDataValue' def get_broker_urls(self, service_type, network): - ''' - Gets the list of all the stomp brokers in the BDII, then - checks them to see if they are part of the network. The network - is supplied as a string. - Returns a list of URLs. - ''' + """Get a list stomp broker URLs in a specified network from a BDII. + + Checks them to see if they are part of the network. The network is + supplied as a string. Returns a list of URLs. + """ prod_broker_urls = [] broker_details = self._get_broker_details(service_type) @@ -69,12 +68,12 @@ def get_broker_urls(self, service_type, network): return prod_broker_urls def get_broker_hosts_and_ports(self, service_type, network): - ''' - Gets the list of all the stomp brokers in the BDII, then - checks them to see if they are part of the network. The network - is supplied as a string. - Returns a list of (host, port) tuples. - ''' + """Get a list of stomp broker (host, port) tuples from a BDII. + + Gets the list of all the stomp brokers in the BDII, then checks them to + see if they are part of the network. The network is supplied as a + string.Returns a list of (host, port) tuples. + """ urls = self.get_broker_urls(service_type, network) hosts_and_ports = [] for url in urls: @@ -82,10 +81,10 @@ def get_broker_hosts_and_ports(self, service_type, network): return hosts_and_ports def _get_broker_details(self, service_type): - ''' - Searches the BDII for all STOMP message brokers. Returns a list of - tuples: (, ). - ''' + """Search the BDII for all STOMP message brokers. + + Returns a list of tuples: (, ). + """ broker_details = [] ldap_filter = '(&(objectClass=GlueService)(GlueServiceType=%s))' % service_type @@ -100,26 +99,26 @@ def _get_broker_details(self, service_type): return broker_details def _broker_in_network(self, broker_id, network): - ''' - Given a GlueServiceUniqueID for a message broker, check that it is - part of the specified network. - ''' + """Check that a GlueServiceUniqueID is part of a specified netowrk.""" ldap_filter = '(&(GlueServiceDataKey=cluster)(GlueChunkKey=GlueServiceUniqueID=%s))' \ % broker_id attrs = [self._service_data_value_key] - results = self._ldap_conn.search_s(self._base_dn, ldap.SCOPE_SUBTREE, ldap_filter, attrs) + results = self._ldap_conn.search_s(self._base_dn, ldap.SCOPE_SUBTREE, + ldap_filter, attrs) try: unused_dn, attrs2 = results[0] return network in attrs2[self._service_data_value_key] - except IndexError: # no results from the query + except IndexError: # no results from the query return False + def parse_stomp_url(stomp_url): - ''' + """Parse a stomp scheme URL. + Given a URL of the form stomp://stomp.cern.ch:6262/, return a tuple containing (stomp.cern.ch, 6262). - ''' + """ parts = stomp_url.split(':') protocols = [STOMP_PREFIX, STOMP_SSL_PREFIX] @@ -140,6 +139,7 @@ def parse_stomp_url(stomp_url): BG = StompBrokerGetter(BDII) def print_brokers(text, service, network): + """Pretty print a list of brokers.""" brokers = BG.get_broker_hosts_and_ports(service, network) # Print section heading print('==', text, '==') diff --git a/ssm/crypto.py b/ssm/crypto.py index 81bf7217..4000e6d7 100644 --- a/ssm/crypto.py +++ b/ssm/crypto.py @@ -1,4 +1,4 @@ -''' +""" Copyright (C) 2012 STFC. Licensed under the Apache License, Version 2.0 (the "License"); @@ -19,7 +19,7 @@ We investigated python's crypto libraries (all openssl bindings) and found that none were mature enough to implement the SMIME crypto we had decided on. -''' +""" from __future__ import print_function from subprocess import Popen, PIPE @@ -34,16 +34,13 @@ class CryptoException(Exception): - ''' - Exception for use by the crypto module. - ''' + """Exception for use by the crypto module.""" + pass def _from_file(filename): - ''' - Convenience function to read entire file into string. - ''' + """Read entire file into string. Convenience function.""" f = open(filename, 'r') s = f.read() f.close() @@ -51,10 +48,10 @@ def _from_file(filename): def check_cert_key(certpath, keypath): - ''' - Check that a certificate and a key match, using openssl directly to fetch - the modulus of each, which must be the same. - ''' + """Check that a certificate and a key match. + + Uses openssl directly to fetch the modulus of each, which must be the same. + """ try: cert = _from_file(certpath) key = _from_file(keypath) @@ -86,13 +83,13 @@ def check_cert_key(certpath, keypath): def sign(text, certpath, keypath): - ''' - Sign the specified message using the certificate and key in the files specified. + """Sign the message using the certificate and key in the files specified. Returns the signed message as an SMIME string, suitable for transmission. - ''' + """ try: - p1 = Popen(['openssl', 'smime', '-sign', '-inkey', keypath, '-signer', certpath, '-text'], + p1 = Popen(['openssl', 'smime', '-sign', '-inkey', + keypath, '-signer', certpath, '-text'], stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True) @@ -105,15 +102,14 @@ def sign(text, certpath, keypath): except OSError as e: log.error('Failed to sign message: %s', e) - raise CryptoException('Message signing failed. Check cert and key permissions.') + raise CryptoException('Message signing failed. Check cert and key permissions.') def encrypt(text, certpath, cipher='aes128'): - ''' - Encrypt the specified message using the certificate string. + """Encrypt the specified message using the certificate string. Returns the encrypted SMIME text suitable for transmission - ''' + """ if cipher not in CIPHERS: raise CryptoException('Invalid cipher %s.' % cipher) @@ -131,15 +127,17 @@ def encrypt(text, certpath, cipher='aes128'): def verify(signed_text, capath, check_crl): - ''' - Verify the signed message has been signed by the certificate (attached to the - supplied SMIME message) it claims to have, by one of the accepted CAs in - capath. - - Returns a tuple including the signer's certificate and the plain-text of the - message if it has been verified. If the content transfer encoding is specified - as 'quoted-printable' or 'base64', decode the message body accordingly. - ''' + """Verify the signed message has been signed by the certificate. + + Verify the signed message has been signed by the certificate (attached to + the supplied SMIME message) it claims to have, by one of the accepted CAs + in capath. + + Returns a tuple including the signer's certificate and the plain-text of + the message if it has been verified. If the content transfer encoding is + specified as 'quoted-printable' or 'base64', decode the message body + accordingly. + """ if signed_text is None or capath is None: raise CryptoException('Invalid None argument to verify().') # This ensures that openssl knows that the string is finished. @@ -195,14 +193,15 @@ def verify(signed_text, capath, check_crl): def decrypt(encrypted_text, certpath, keypath): - ''' - Decrypt the specified message using the certificate and key contained in the - named PEM files. The capath should point to a directory holding all the - CAs that we accept + """Decrypt the specified message using the certificate and key. + + Decrypt the specified message using the certificate and key contained in + the named PEM files. The capath should point to a directory holding all the + CAs that we accept. This decryption function can be used whether or not OpenSSL is used to - encrypt the data - ''' + encrypt the data. + """ # This ensures that openssl knows that the string is finished. # It makes no difference if the signed message is correct, but # prevents it from hanging in the case of an empty string. @@ -250,15 +249,13 @@ def verify_cert_date(certpath): def verify_cert(certstring, capath, check_crls=True): - ''' - Verify that the certificate is signed by a CA whose certificate is stored in - capath. + """Verify that the certificate is signed by a CA with a cert in capath. Note that I've had to compare strings in the output of openssl to check for verification, which may make this brittle. Returns True if the certificate is verified - ''' + """ if certstring is None or capath is None: raise CryptoException('Invalid None argument to verify_cert().') @@ -293,18 +290,16 @@ def verify_cert(certstring, capath, check_crls=True): def verify_cert_path(certpath, capath, check_crls=True): - ''' - Verify certificate, but using the certificate filepath rather than - the certificate string as in verify_cert. - ''' + """Verify certificate using the certificate filepath. + + This is different to verify_cert which uses the certificate string. + """ certstring = _from_file(certpath) return verify_cert(certstring, capath, check_crls) def get_certificate_subject(certstring): - ''' - Return the certificate subject's DN, in legacy openssl format. - ''' + """Return the certificate subject's DN, in legacy openssl format.""" p1 = Popen(['openssl', 'x509', '-noout', '-subject'], stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True) @@ -314,7 +309,7 @@ def get_certificate_subject(certstring): log.error(error) raise CryptoException('Failed to get subject: %s' % error) - subject = subject.strip()[9:] # remove 'subject= ' from the front + subject = subject.strip()[9:] # remove 'subject= ' from the front return subject diff --git a/ssm/ssm2.py b/ssm/ssm2.py index 868acc87..354aa318 100644 --- a/ssm/ssm2.py +++ b/ssm/ssm2.py @@ -1,4 +1,4 @@ -''' +""" Copyright (C) 2012 STFC Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,9 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - - @author: Will Rogers -''' +""" from __future__ import print_function from ssm import crypto @@ -48,19 +46,18 @@ class Ssm2Exception(Exception): - ''' - Exception for use by SSM2. - ''' + """Exception for use by SSM2.""" + pass class Ssm2(stomp.ConnectionListener): - ''' - Minimal SSM implementation. - ''' + """Minimal SSM implementation.""" + # Schema for the dirq message queue. - QSCHEMA = {'body': 'string', 'signer':'string', 'empaid':'string?'} - REJECT_SCHEMA = {'body': 'string', 'signer':'string?', 'empaid':'string?', 'error':'string'} + QSCHEMA = {'body': 'string', 'signer': 'string', 'empaid': 'string?'} + REJECT_SCHEMA = {'body': 'string', 'signer': 'string?', + 'empaid': 'string?', 'error': 'string'} CONNECTION_TIMEOUT = 10 # Messaging protocols @@ -71,10 +68,10 @@ def __init__(self, hosts_and_ports, qpath, cert, key, dest=None, listen=None, capath=None, check_crls=False, use_ssl=False, username=None, password=None, enc_cert=None, verify_enc_cert=True, pidfile=None, path_type='dirq', protocol=STOMP_MESSAGING, project=None, token=''): - ''' - Creates an SSM2 object. If a listen value is supplied, - this SSM2 will be a receiver. - ''' + """Create an SSM2 object. + + If a listen value is supplied, this SSM2 will be a receiver. + """ self._conn = None self._last_msg = None @@ -185,9 +182,7 @@ def __init__(self, hosts_and_ports, qpath, cert, key, dest=None, listen=None, ).setLevel(logging.INFO) def set_dns(self, dn_list): - ''' - Set the list of DNs which are allowed to sign incoming messages. - ''' + """Set the list of DNs which are allowed to sign incoming messages.""" self._valid_dns = dn_list ########################################################################## @@ -195,12 +190,11 @@ def set_dns(self, dn_list): ########################################################################## def on_send(self, frame, unused_body=None): - ''' - Called by stomppy when a message is sent. + """Log sending of message with empaid if available. - unused_body is only present to have a backward compatible - method signature when using stomp.py v3.1.X - ''' + Called by stomppy when a message is sent. 'unused_body' is only present + to have a backward compatible method signature for stomp.py v3.1.X + """ try: # Try the stomp.py v4 way first empaid = frame.headers['empa-id'] @@ -216,14 +210,13 @@ def on_send(self, frame, unused_body=None): log.debug('Sent message: %s', empaid) def on_message(self, headers, body): - ''' - Called by stomppy when a message is received. + """Handle the message according to its content and headers. - Handle the message according to its content and headers. - ''' + Called by stomppy when a message is received. + """ try: empaid = headers['empa-id'] - if empaid == 'ping': # ignore ping message + if empaid == 'ping': # ignore ping message log.info('Received ping message.') return except KeyError: @@ -234,9 +227,10 @@ def on_message(self, headers, body): self._save_msg_to_queue(body, empaid) def on_error(self, headers, body): - ''' + """Log error messages. + Called by stomppy when an error frame is received. - ''' + """ if 'No user for client certificate: ' in headers['message']: log.error('The following certificate is not authorised: %s', headers['message'].split(':')[1]) @@ -244,33 +238,34 @@ def on_error(self, headers, body): log.error('Error message received: %s', body) def on_connected(self, unused_headers, unused_body): - ''' - Called by stomppy when a connection is established. + """Track the connection. - Track the connection. - ''' + Called by stomppy when a connection is established. + """ self.connected = True log.info('Connected.') def on_disconnected(self): - ''' + """Log disconnection and set 'connected' to 'False'. + Called by stomppy when disconnected from the broker. - ''' + """ log.info('Disconnected from broker.') self.connected = False def on_receipt(self, headers, unused_body): - ''' + """Log receipt of message by broker and set '_last_msg'. + Called by stomppy when the broker acknowledges receipt of a message. - ''' + """ log.info('Broker received message: %s', headers['receipt-id']) self._last_msg = headers['receipt-id'] def on_receiver_loop_completed(self, _unused_headers, _unused_body): - """ - Called by stompy when the receiver loop ends. + """Log receiver loop complete for debug only. - This is usually trigger as part of a disconnect. + Called by stompy when the receiver loop ends. This is usually triggered + as part of a disconnect. """ log.debug('on_receiver_loop_completed called.') @@ -279,12 +274,13 @@ def on_receiver_loop_completed(self, _unused_headers, _unused_body): ########################################################################## def _handle_msg(self, text): - ''' - Deal with the raw message contents appropriately: + """Deal with the raw message contents appropriately. + + Namely: - decrypt if necessary - verify signature - Return plain-text message, signer's DN and an error/None. - ''' + - Return plain-text message, signer's DN and an error/None. + """ if text is None or text == '': warning = 'Empty text passed to _handle_msg.' log.warn(warning) @@ -351,11 +347,11 @@ def _save_msg_to_queue(self, body, empaid): log.error('Failed to read or write file: %s', error) def _send_msg(self, message, msgid): - ''' - Send one message using stomppy. The message will be signed using - the host cert and key. If an encryption certificate - has been supplied, the message will also be encrypted. - ''' + """Send one message using stomppy. + + The message will be signed using the host cert and key. If an + encryption certificate has been supplied, it will also be encrypted. + """ log.info('Sending message: %s', msgid) headers = {'destination': self._dest, 'receipt': msgid, 'empa-id': msgid} @@ -392,7 +388,7 @@ def _send_msg_ams(self, text, msgid): message = AmsMessage(data=to_send, attributes={'empaid': msgid}).dict() - argo_response = self._ams.publish(self._dest, message) + argo_response = self._ams.publish(self._dest, message, retry=3) return argo_response['messageIds'][0] def pull_msg_ams(self): @@ -412,7 +408,8 @@ def pull_msg_ams(self): ackids = [] for msg_ack_id, msg in self._ams.pull_sub(self._listen, - messages_to_pull): + messages_to_pull, + retry=3): # Get the AMS message id msgid = msg.get_msgid() # Get the SSM dirq id @@ -441,16 +438,17 @@ def pull_msg_ams(self): # it can move the offset for the next subscription pull # (basically acknowledging pulled messages) if ackids: - self._ams.ack_sub(self._listen, ackids) + self._ams.ack_sub(self._listen, ackids, retry=3) def send_ping(self): - ''' + """Perform connection stay-alive steps. + If a STOMP connection is left open with no activity for an hour or so, it stops responding. Stomppy 3.1.3 has two ways of handling this, but stomppy 3.0.3 (EPEL 5 and 6) has neither. To get around this, we begin and then abort a STOMP transaction to keep the connection active. - ''' + """ # Use time as transaction id to ensure uniqueness within each connection transaction_id = str(time.time()) @@ -458,17 +456,15 @@ def send_ping(self): self._conn.abort({'transaction': transaction_id}) def has_msgs(self): - ''' - Return True if there are any messages in the outgoing queue. - ''' + """Return True if there are any messages in the outgoing queue.""" return self._outq.count() > 0 def send_all(self): - ''' + """ Send all the messages in the outgoing queue. Either via STOMP or HTTPS (to an Argo Message Broker). - ''' + """ log.info('Found %s messages.', self._outq.count()) for msgid in self._outq: if not self._outq.lock(msgid): @@ -514,15 +510,15 @@ def send_all(self): except OSError as e: log.warn('OSError raised while purging message queue: %s', e) - ############################################################################ + ########################################################################### # Connection handling methods - ############################################################################ + ########################################################################### def _initialise_connection(self, host, port): - ''' - Create the self._connection object with the appropriate properties, - but don't try to start the connection. - ''' + """Create the self._connection object with the appropriate properties. + + This doesn't start the connection. + """ log.info("Established connection to %s, port %i", host, port) if self._use_ssl: log.info('Connecting using SSL...') @@ -540,13 +536,14 @@ def _initialise_connection(self, host, port): self._conn.set_listener('SSM', self) def handle_connect(self): - ''' + """Connect to broker. + Assuming that the SSM has retrieved the details of the broker or brokers it wants to connect to, connect to one. If more than one is in the list self._network_brokers, try to connect to each in turn until successful. - ''' + """ if self._protocol == Ssm2.AMS_MESSAGING: log.debug('handle_connect called for AMS, doing nothing.') return @@ -565,13 +562,10 @@ def handle_connect(self): log.warn('Failed to connect to %s:%s: %s', host, port, e) if not self.connected: - raise Ssm2Exception('Attempts to start the SSM failed. The system will exit.') + raise Ssm2Exception('Attempts to start the SSM failed. The system will exit.') def handle_disconnect(self): - ''' - When disconnected, attempt to reconnect using the same method as used - when starting up. - ''' + """Attempt to reconnect using the same method as when starting up.""" if self._protocol == Ssm2.AMS_MESSAGING: log.debug('handle_disconnect called for AMS, doing nothing.') return @@ -596,13 +590,11 @@ def handle_disconnect(self): raise Ssm2Exception(err_msg) def start_connection(self): - ''' - Once self._connection exists, attempt to start it and subscribe - to the relevant topics. + """Start existing connection and subscribe to the relevant topics. If the timeout is reached without receiving confirmation of connection, raise an exception. - ''' + """ if self._protocol == Ssm2.AMS_MESSAGING: log.debug('start_connection called for AMS, doing nothing.') return @@ -634,11 +626,11 @@ def start_connection(self): log.info('Subscribing to: %s', self._listen) def close_connection(self): - ''' - Close the connection. This is important because it runs - in a separate thread, so it can outlive the main process - if it is not ended. - ''' + """Close the connection. + + This is important because it runs in a separate thread, so it can + outlive the main process if it is not ended. + """ if self._protocol == Ssm2.AMS_MESSAGING: log.debug('close_connection called for AMS, doing nothing.') return @@ -654,9 +646,7 @@ def close_connection(self): log.info('SSM connection ended.') def startup(self): - ''' - Create the pidfile then start the connection. - ''' + """Create the pidfile then start the connection.""" if self._pidfile is not None: try: f = open(self._pidfile, 'w') @@ -669,9 +659,7 @@ def startup(self): self.handle_connect() def shutdown(self): - ''' - Close the connection then remove the pidfile. - ''' + """Close the connection then remove the pidfile.""" self.close_connection() if self._pidfile is not None: try: