diff --git a/conf/sender.cfg b/conf/sender.cfg index 70ff164a..61c09790 100644 --- a/conf/sender.cfg +++ b/conf/sender.cfg @@ -34,8 +34,9 @@ ams_project: accounting # Queue to which SSM will send messages destination: gLite-APEL -# Outgoing messages will be read and removed from this directory. -path: /var/spool/apel/outgoing +# Accepted messages will be read and removed from /outgoing +# Rejected messages will be moved to /reject +path: /var/spool/apel # If 'path_type' is set to 'dirq' (or if 'path_type' is omitted), the supplied # 'path' will be treated as a Python dirq (a directory based queue, which is a # port of the Perl module Directory::Queue). diff --git a/ssm/ssm2.py b/ssm/ssm2.py index c38415c4..5e72115c 100644 --- a/ssm/ssm2.py +++ b/ssm/ssm2.py @@ -117,16 +117,21 @@ def __init__(self, hosts_and_ports, qpath, cert, key, dest=None, listen=None, # create the filesystem queues for accepted and rejected messages if dest is not None and listen is None: + outqpath = os.path.join(qpath, 'outgoing') + rejectqpath = os.path.join(qpath, 'reject') + # Determine what sort of outgoing structure to make if path_type == 'dirq': if QueueSimple is None: raise ImportError("dirq path_type requested but the dirq " "module wasn't found.") - self._outq = QueueSimple(qpath) + self._outq = QueueSimple(outqpath) + self._rejectq = MessageDirectory(rejectqpath) elif path_type == 'directory': - self._outq = MessageDirectory(qpath) + self._outq = MessageDirectory(outqpath) + self._rejectq = MessageDirectory(rejectqpath) else: raise Ssm2Exception('Unsupported path_type variable.') @@ -503,8 +508,17 @@ def send_all(self): if "Message size is too large" not in str(e): raise else: - # Exit out of loop iteration so that message is not removed. log.warn('Message %s could not be sent as its larger than 1MB', msgid) + + # Add the message to the rejected queue + name = self._rejectq.add(text) + log.info("Message %s saved to reject queue as %s", msgid, name) + + # Remove the message from the outgoing queue + self._last_msg = None + self._outq.remove(msgid) + + # Exit out of loop iteration so that message is not removed. continue else: