"""
Base Spout classes.
"""
from __future__ import absolute_import, print_function, unicode_literals
from collections import Counter
from .component import Component
class Spout(Component):
"""Base class for all pystorm spouts.
For more information on spouts, consult Storm's
`Concepts documentation <http://storm.apache.org/documentation/Concepts.html>`_.
"""
[docs] def ack(self, tup_id):
"""Called when a bolt acknowledges a Tuple in the topology.
:param tup_id: the ID of the Tuple that has been fully acknowledged in
the topology.
:type tup_id: str
"""
pass
[docs] def fail(self, tup_id):
"""Called when a Tuple fails in the topology
A spout can choose to emit the Tuple again or ignore the fail. The
default is to ignore.
:param tup_id: the ID of the Tuple that has failed in the topology
either due to a bolt calling ``fail()`` or a Tuple
timing out.
:type tup_id: str
"""
pass
[docs] def next_tuple(self):
"""Implement this function to emit Tuples as necessary.
This function should not block, or Storm will think the
spout is dead. Instead, let it return and pystorm will
send a noop to storm, which lets it know the spout is functioning.
"""
raise NotImplementedError()
[docs] def emit(self, tup, tup_id=None, stream=None, direct_task=None,
need_task_ids=False):
"""Emit a spout Tuple message.
:param tup: the Tuple to send to Storm, should contain only
JSON-serializable data.
:type tup: list or tuple
:param tup_id: the ID for the Tuple. Leave this blank for an
unreliable emit.
:type tup_id: str
:param stream: ID of the stream this Tuple should be emitted to.
Leave empty to emit to the default stream.
:type stream: str
:param direct_task: the task to send the Tuple to if performing a
direct emit.
:type direct_task: int
:param need_task_ids: indicate whether or not you'd like the task IDs
the Tuple was emitted (default: ``False``).
:type need_task_ids: bool
:returns: ``None``, unless ``need_task_ids=True``, in which case it will
be a ``list`` of task IDs that the Tuple was sent to if. Note
that when specifying direct_task, this will be equal to
``[direct_task]``.
"""
return super(Spout, self).emit(tup, tup_id=tup_id, stream=stream,
direct_task=direct_task,
need_task_ids=need_task_ids)
[docs] def activate(self):
"""Called when the Spout has been activated after being deactivated.
.. note::
This requires at least Storm 1.1.0.
"""
pass
[docs] def deactivate(self):
"""Called when the Spout has been deactivated.
.. note::
This requires at least Storm 1.1.0.
"""
pass
def _run(self):
"""The inside of ``run``'s infinite loop.
Separated out so it can be properly unit tested.
"""
cmd = self.read_command()
if cmd['command'] == 'next':
self.next_tuple()
elif cmd['command'] == 'ack':
self.ack(cmd['id'])
elif cmd['command'] == 'fail':
self.fail(cmd['id'])
elif cmd['command'] == 'activate':
self.activate()
elif cmd['command'] == 'deactivate':
self.deactivate()
else:
self.logger.error('Received invalid command from Storm: %r', cmd)
self.send_message({'command': 'sync'})
class ReliableSpout(Component):
"""Reliable spout that will automatically replay failed tuples.
Failed tuples will be replayed up to ``max_fails`` times.
For more information on spouts, consult Storm's
`Concepts documentation <http://storm.apache.org/documentation/Concepts.html>`_.
"""
max_fails = 3
def __init__(self, *args, **kwargs):
super(ReliableSpout, self).__init__(*args, **kwargs)
self.failed_tuples = Counter()
self.unacked_tuples = {}
[docs] def ack(self, tup_id):
"""Called when a bolt acknowledges a Tuple in the topology.
:param tup_id: the ID of the Tuple that has been fully acknowledged in
the topology.
:type tup_id: str
"""
self.failed_tuples.pop(tup_id, None)
try:
del self.unacked_tuples[tup_id]
except KeyError:
self.logger.error('Received ack for unknown tuple ID: %r', tup_id)
[docs] def fail(self, tup_id):
"""Called when a Tuple fails in the topology
A reliable spout will replay a failed tuple up to ``max_fails`` times.
:param tup_id: the ID of the Tuple that has failed in the topology
either due to a bolt calling ``fail()`` or a Tuple
timing out.
:type tup_id: str
"""
saved_args = self.unacked_tuples.get(tup_id)
if saved_args is None:
self.logger.error('Received fail for unknown tuple ID: %r', tup_id)
return
tup, stream, direct_task, need_task_ids = saved_args
if self.failed_tuples[tup_id] < self.max_fails:
self.emit(tup, tup_id=tup_id, stream=stream,
direct_task=direct_task, need_task_ids=need_task_ids)
self.failed_tuples[tup_id] += 1
else:
# Just pretend we got an ack when we exceed retry limit
self.logger.info('Acking tuple ID %r after it exceeded retry limit '
'(%r)', tup_id, self.max_fails)
self.ack(tup_id)
[docs] def emit(self, tup, tup_id=None, stream=None, direct_task=None,
need_task_ids=False):
"""Emit a spout Tuple & add metadata about it to `unacked_tuples`.
In order for this to work, `tup_id` is a required parameter.
See :meth:`Bolt.emit`.
"""
if tup_id is None:
raise ValueError('You must provide a tuple ID when emitting with a '
'ReliableSpout in order for the tuple to be '
'tracked.')
args = (tup, stream, direct_task, need_task_ids)
self.unacked_tuples[tup_id] = args
return super(ReliableSpout, self).emit(tup, tup_id=tup_id, stream=stream,
direct_task=direct_task,
need_task_ids=need_task_ids)