"""
Module to add streamparse-specific extensions to pystorm Bolt classes
"""
import pystorm
from ..dsl.bolt import JavaBoltSpec, ShellBoltSpec
from .component import Component
[docs]class JavaBolt(Component):
@classmethod
[docs] def spec(cls, name=None, serialized_java=None, full_class_name=None,
args_list=None, inputs=None, par=1, config=None, outputs=None):
"""Create a :class:`JavaBoltSpec` for a Java Bolt.
This spec represents this Bolt in a :class:`~streamparse.Topology`.
You must add the appropriate entries to your classpath by editing your
project's ``project.clj`` file in order for this to work.
:param name: Name of this Bolt. Defaults to name of
:class:`~streamparse.Topology` attribute this is assigned
to.
:type name: `str`
:param serialized_java: Serialized Java code representing the class.
You must either specify this, or
both ``full_class_name`` and ``args_list``.
:type serialized_java: `bytes`
:param full_class_name: Fully qualified class name (including the
package name)
:type full_class_name: `str`
:param args_list: A list of arguments to be passed to the constructor of
this class.
:type args_list: `list` of basic data types
:param inputs: Streams that feed into this Bolt.
Two forms of this are acceptable:
1. A `dict` mapping from
:class:`~streamparse.dsl.component.ComponentSpec` to
:class:`~streamparse.Grouping`.
2. A `list` of :class:`~streamparse.Stream` or
:class:`~streamparse.dsl.component.ComponentSpec`.
:param par: Parallelism hint for this Bolt. For Python
Components, this works out to be the number of Python
processes running it in the the topology (across all
machines). See :ref:`parallelism`.
:type par: `int`
:param config: Component-specific config settings to pass to Storm.
:type config: `dict`
:param outputs: Outputs this JavaBolt will produce. Acceptable forms
are:
1. A `list` of :class:`~streamparse.Stream` objects
describing the fields output on each stream.
2. A `list` of `str` representing the fields output on
the ``default`` stream.
"""
return JavaBoltSpec(cls, name=name, serialized_java=serialized_java,
full_class_name=full_class_name,
args_list=args_list, inputs=inputs, par=par,
config=config, outputs=outputs)
[docs]class ShellBolt(Component):
"""A Bolt that is started by running a command with a script argument."""
@classmethod
[docs] def spec(cls, name=None, command=None, script=None, inputs=None,
par=None, config=None, outputs=None):
"""Create a :class:`ShellBoltSpec` for a non-Java, non-Python Bolt.
If you want to create a spec for a Python Bolt, use
:meth:`~streamparse.dsl.bolt.Bolt.spec`.
This spec represents this Bolt in a :class:`~streamparse.Topology`.
:param name: Name of this Bolt. Defaults to name of
:class:`~streamparse.Topology` attribute this is assigned
to.
:type name: `str`
:param command: Path to command the Storm will execute.
:type command: `str`
:param script: Arguments to `command`. Multiple arguments should just
be separated by spaces.
:type script: `str`
:param inputs: Streams that feed into this Bolt.
Two forms of this are acceptable:
1. A `dict` mapping from
:class:`~streamparse.dsl.component.ComponentSpec` to
:class:`~streamparse.Grouping`.
2. A `list` of :class:`~streamparse.Stream` or
:class:`~streamparse.dsl.component.ComponentSpec`.
:param par: Parallelism hint for this Bolt. For shell
Components, this works out to be the number of running it
in the the topology (across all machines).
See :ref:`parallelism`.
:type par: `int`
:param config: Component-specific config settings to pass to Storm.
:type config: `dict`
:param outputs: Outputs this ShellBolt will produce. Acceptable forms
are:
1. A `list` of :class:`~streamparse.Stream` objects
describing the fields output on each stream.
2. A `list` of `str` representing the fields output on
the ``default`` stream.
"""
return ShellBoltSpec(cls, command=command, script=script, name=name,
inputs=inputs, par=par,
config=config, outputs=outputs)
[docs]class Bolt(pystorm.bolt.Bolt, ShellBolt):
"""pystorm Bolt with streamparse-specific additions"""
@classmethod
[docs] def spec(cls, name=None, inputs=None, par=None, config=None):
"""Create a :class:`~ShellBoltSpec` for a Python Bolt.
This spec represents this Bolt in a :class:`~streamparse.Topology`.
:param name: Name of this Bolt. Defaults to name of
:class:`~streamparse.Topology` attribute this is assigned
to.
:type name: `str`
:param inputs: Streams that feed into this Bolt.
Two forms of this are acceptable:
1. A `dict` mapping from
:class:`~streamparse.dsl.component.ComponentSpec` to
:class:`~streamparse.Grouping`.
2. A `list` of :class:`~streamparse.Stream` or
:class:`~streamparse.dsl.component.ComponentSpec`.
:param par: Parallelism hint for this Bolt. For Python
Components, this works out to be the number of Python
processes running it in the the topology (across all
machines). See :ref:`parallelism`.
.. note::
This can also be specified as an attribute of your
:class:`~Bolt` subclass.
:type par: `int`
:param config: Component-specific config settings to pass to Storm.
.. note::
This can also be specified as an attribute of your
:class:`~Bolt` subclass.
:type config: `dict`
.. note::
This method does not take a ``outputs`` argument because
``outputs`` should be an attribute of your :class:`~Bolt` subclass.
"""
return ShellBoltSpec(cls, command='streamparse_run',
script='{}.{}'.format(cls.__module__,
cls.__name__),
name=name, inputs=inputs, par=par,
config=config, outputs=cls.outputs)
[docs]class BatchingBolt(pystorm.bolt.BatchingBolt, Bolt):
"""pystorm BatchingBolt with streamparse-specific additions"""
pass
[docs]class TicklessBatchingBolt(pystorm.bolt.TicklessBatchingBolt, BatchingBolt):
"""pystorm TicklessBatchingBolt with streamparse-specific additions"""
pass