API

Tuples

class streamparse.Tuple(id, component, stream, task, values)

Storm’s primitive data type passed around via streams.

Variables:
  • id – the ID of the Tuple.
  • component – component that the Tuple was generated from.
  • stream – the stream that the Tuple was emitted into.
  • task – the task the Tuple was generated from.
  • values – the payload of the Tuple where data is stored.

You should never have to instantiate an instance of a streamparse.Tuple yourself as streamparse handles this for you prior to, for example, a streamparse.Bolt‘s process() method being called.

None of the emit methods for bolts or spouts require that you pass a streamparse.Tuple instance.

Components

Both streamparse.Bolt and streamparse.Spout inherit from a common base-class, streamparse.storm.component.Component. It extends pystorm’s code for handling Multi-Lang IPC between Storm and Python and adds suport for our Python Topology DSL.

Spouts

Spouts are data sources for topologies, they can read from any data source and emit tuples into streams.

class streamparse.Spout(input_stream=<open file '<stdin>', mode 'r'>, output_stream=<open file '<stdout>', mode 'w'>, rdb_signal=u'SIGUSR1', serializer=u'json')[source]

Bases: pystorm.spout.Spout, streamparse.storm.spout.ShellSpout

pystorm Spout with streamparse-specific additions

ack(tup_id)[source]

Called when a bolt acknowledges a Tuple in the topology.

Parameters:tup_id (str) – the ID of the Tuple that has been fully acknowledged in the topology.
activate()[source]

Called when the Spout has been activated after being deactivated.

Note

This requires at least Storm 1.1.0.

deactivate()[source]

Called when the Spout has been deactivated.

Note

This requires at least Storm 1.1.0.

emit(tup, tup_id=None, stream=None, direct_task=None, need_task_ids=False)[source]

Emit a spout Tuple message.

Parameters:
  • tup (list or tuple) – the Tuple to send to Storm, should contain only JSON-serializable data.
  • tup_id (str) – the ID for the Tuple. Leave this blank for an unreliable emit.
  • stream (str) – ID of the stream this Tuple should be emitted to. Leave empty to emit to the default stream.
  • direct_task (int) – the task to send the Tuple to if performing a direct emit.
  • need_task_ids (bool) – indicate whether or not you’d like the task IDs the Tuple was emitted (default: False).
Returns:

None, unless need_task_ids=True, in which case it will be a list of task IDs that the Tuple was sent to if. Note that when specifying direct_task, this will be equal to [direct_task].

fail(tup_id)[source]

Called when a Tuple fails in the topology

A spout can choose to emit the Tuple again or ignore the fail. The default is to ignore.

Parameters:tup_id (str) – the ID of the Tuple that has failed in the topology either due to a bolt calling fail() or a Tuple timing out.
initialize(storm_conf, context)

Called immediately after the initial handshake with Storm and before the main run loop. A good place to initialize connections to data sources.

Parameters:
  • storm_conf (dict) – the Storm configuration for this component. This is the configuration provided to the topology, merged in with cluster configuration on the worker node.
  • context (dict) – information about the component’s place within the topology such as: task IDs, inputs, outputs etc.
is_heartbeat(tup)
Returns:Whether or not the given Tuple is a heartbeat
log(message, level=None)

Log a message to Storm optionally providing a logging level.

Parameters:
  • message (str) – the log message to send to Storm.
  • level (str) – the logging level that Storm should use when writing the message. Can be one of: trace, debug, info, warn, or error (default: info).

Warning

This will send your message to Storm regardless of what level you specify. In almost all cases, you are better of using Component.logger and not setting pystorm.log.path, because that will use a pystorm.component.StormHandler to do the filtering on the Python side (instead of on the Java side after taking the time to serialize your message and send it to Storm).

next_tuple()[source]

Implement this function to emit Tuples as necessary.

This function should not block, or Storm will think the spout is dead. Instead, let it return and pystorm will send a noop to storm, which lets it know the spout is functioning.

raise_exception(exception, tup=None)

Report an exception back to Storm via logging.

Parameters:
  • exception – a Python exception.
  • tup – a Tuple object.
read_handshake()

Read and process an initial handshake message from Storm.

read_message()

Read a message from Storm via serializer.

report_metric(name, value)

Report a custom metric back to Storm.

Parameters:
  • name – Name of the metric. This can be anything.
  • value – Value of the metric. This is usually a number.

Only supported in Storm 0.9.3+.

run()

Main run loop for all components.

Performs initial handshake with Storm and reads Tuples handing them off to subclasses. Any exceptions are caught and logged back to Storm prior to the Python process exiting.

Warning

Subclasses should not override this method.

send_message(message)

Send a message to Storm via stdout.

classmethod spec(name=None, par=None, config=None)[source]

Create a ShellBoltSpec for a Python Spout.

This spec represents this Spout in a Topology.

Parameters:
  • name (str) – Name of this Spout. Defaults to name of Topology attribute this is assigned to.
  • par (int) –

    Parallelism hint for this Spout. For Python Components, this works out to be the number of Python processes running it in the the topology (across all machines). See Parallelism and Workers.

    Note

    This can also be specified as an attribute of your Spout subclass.

  • config (dict) –

    Component-specific config settings to pass to Storm.

    Note

    This can also be specified as an attribute of your Spout subclass.

Note

This method does not take a outputs argument because outputs should be an attribute of your Spout subclass.

class streamparse.ReliableSpout(*args, **kwargs)[source]

Bases: pystorm.spout.ReliableSpout, streamparse.storm.spout.Spout

pystorm ReliableSpout with streamparse-specific additions

ack(tup_id)[source]

Called when a bolt acknowledges a Tuple in the topology.

Parameters:tup_id (str) – the ID of the Tuple that has been fully acknowledged in the topology.
activate()

Called when the Spout has been activated after being deactivated.

Note

This requires at least Storm 1.1.0.

deactivate()

Called when the Spout has been deactivated.

Note

This requires at least Storm 1.1.0.

emit(tup, tup_id=None, stream=None, direct_task=None, need_task_ids=False)[source]

Emit a spout Tuple & add metadata about it to unacked_tuples.

In order for this to work, tup_id is a required parameter.

See Bolt.emit().

fail(tup_id)[source]

Called when a Tuple fails in the topology

A reliable spout will replay a failed tuple up to max_fails times.

Parameters:tup_id (str) – the ID of the Tuple that has failed in the topology either due to a bolt calling fail() or a Tuple timing out.
initialize(storm_conf, context)

Called immediately after the initial handshake with Storm and before the main run loop. A good place to initialize connections to data sources.

Parameters:
  • storm_conf (dict) – the Storm configuration for this component. This is the configuration provided to the topology, merged in with cluster configuration on the worker node.
  • context (dict) – information about the component’s place within the topology such as: task IDs, inputs, outputs etc.
is_heartbeat(tup)
Returns:Whether or not the given Tuple is a heartbeat
log(message, level=None)

Log a message to Storm optionally providing a logging level.

Parameters:
  • message (str) – the log message to send to Storm.
  • level (str) – the logging level that Storm should use when writing the message. Can be one of: trace, debug, info, warn, or error (default: info).

Warning

This will send your message to Storm regardless of what level you specify. In almost all cases, you are better of using Component.logger and not setting pystorm.log.path, because that will use a pystorm.component.StormHandler to do the filtering on the Python side (instead of on the Java side after taking the time to serialize your message and send it to Storm).

next_tuple()

Implement this function to emit Tuples as necessary.

This function should not block, or Storm will think the spout is dead. Instead, let it return and pystorm will send a noop to storm, which lets it know the spout is functioning.

raise_exception(exception, tup=None)

Report an exception back to Storm via logging.

Parameters:
  • exception – a Python exception.
  • tup – a Tuple object.
read_handshake()

Read and process an initial handshake message from Storm.

read_message()

Read a message from Storm via serializer.

report_metric(name, value)

Report a custom metric back to Storm.

Parameters:
  • name – Name of the metric. This can be anything.
  • value – Value of the metric. This is usually a number.

Only supported in Storm 0.9.3+.

run()

Main run loop for all components.

Performs initial handshake with Storm and reads Tuples handing them off to subclasses. Any exceptions are caught and logged back to Storm prior to the Python process exiting.

Warning

Subclasses should not override this method.

send_message(message)

Send a message to Storm via stdout.

spec(name=None, par=None, config=None)

Create a ShellBoltSpec for a Python Spout.

This spec represents this Spout in a Topology.

Parameters:
  • name (str) – Name of this Spout. Defaults to name of Topology attribute this is assigned to.
  • par (int) –

    Parallelism hint for this Spout. For Python Components, this works out to be the number of Python processes running it in the the topology (across all machines). See Parallelism and Workers.

    Note

    This can also be specified as an attribute of your Spout subclass.

  • config (dict) –

    Component-specific config settings to pass to Storm.

    Note

    This can also be specified as an attribute of your Spout subclass.

Note

This method does not take a outputs argument because outputs should be an attribute of your Spout subclass.

Bolts

class streamparse.Bolt(*args, **kwargs)[source]

Bases: pystorm.bolt.Bolt, streamparse.storm.bolt.ShellBolt

pystorm Bolt with streamparse-specific additions

ack(tup)[source]

Indicate that processing of a Tuple has succeeded.

Parameters:tup (str or pystorm.component.Tuple) – the Tuple to acknowledge.
emit(tup, stream=None, anchors=None, direct_task=None, need_task_ids=False)[source]

Emit a new Tuple to a stream.

Parameters:
  • tup (list or pystorm.component.Tuple) – the Tuple payload to send to Storm, should contain only JSON-serializable data.
  • stream (str) – the ID of the stream to emit this Tuple to. Specify None to emit to default stream.
  • anchors (list) – IDs the Tuples (or pystorm.component.Tuple instances) which the emitted Tuples should be anchored to. If auto_anchor is set to True and you have not specified anchors, anchors will be set to the incoming/most recent Tuple ID(s).
  • direct_task (int) – the task to send the Tuple to.
  • need_task_ids (bool) – indicate whether or not you’d like the task IDs the Tuple was emitted (default: False).
Returns:

None, unless need_task_ids=True, in which case it will be a list of task IDs that the Tuple was sent to if. Note that when specifying direct_task, this will be equal to [direct_task].

fail(tup)[source]

Indicate that processing of a Tuple has failed.

Parameters:tup (str or pystorm.component.Tuple) – the Tuple to fail (its id if str).
initialize(storm_conf, context)

Called immediately after the initial handshake with Storm and before the main run loop. A good place to initialize connections to data sources.

Parameters:
  • storm_conf (dict) – the Storm configuration for this component. This is the configuration provided to the topology, merged in with cluster configuration on the worker node.
  • context (dict) – information about the component’s place within the topology such as: task IDs, inputs, outputs etc.
is_heartbeat(tup)
Returns:Whether or not the given Tuple is a heartbeat
is_tick(tup)[source]
Returns:Whether or not the given Tuple is a tick Tuple
log(message, level=None)

Log a message to Storm optionally providing a logging level.

Parameters:
  • message (str) – the log message to send to Storm.
  • level (str) – the logging level that Storm should use when writing the message. Can be one of: trace, debug, info, warn, or error (default: info).

Warning

This will send your message to Storm regardless of what level you specify. In almost all cases, you are better of using Component.logger and not setting pystorm.log.path, because that will use a pystorm.component.StormHandler to do the filtering on the Python side (instead of on the Java side after taking the time to serialize your message and send it to Storm).

process(tup)[source]

Process a single Tuple pystorm.component.Tuple of input

This should be overridden by subclasses. pystorm.component.Tuple objects contain metadata about which component, stream and task it came from. The actual values of the Tuple can be accessed by calling tup.values.

Parameters:tup (pystorm.component.Tuple) – the Tuple to be processed.
process_tick(tup)[source]

Process special ‘tick Tuples’ which allow time-based behaviour to be included in bolts.

Default behaviour is to ignore time ticks. This should be overridden by subclasses who wish to react to timer events via tick Tuples.

Tick Tuples will be sent to all bolts in a toplogy when the storm configuration option ‘topology.tick.tuple.freq.secs’ is set to an integer value, the number of seconds.

Parameters:tup (pystorm.component.Tuple) – the Tuple to be processed.
raise_exception(exception, tup=None)

Report an exception back to Storm via logging.

Parameters:
  • exception – a Python exception.
  • tup – a Tuple object.
read_handshake()

Read and process an initial handshake message from Storm.

read_message()

Read a message from Storm via serializer.

read_tuple()[source]

Read a tuple from the pipe to Storm.

report_metric(name, value)

Report a custom metric back to Storm.

Parameters:
  • name – Name of the metric. This can be anything.
  • value – Value of the metric. This is usually a number.

Only supported in Storm 0.9.3+.

run()

Main run loop for all components.

Performs initial handshake with Storm and reads Tuples handing them off to subclasses. Any exceptions are caught and logged back to Storm prior to the Python process exiting.

Warning

Subclasses should not override this method.

send_message(message)

Send a message to Storm via stdout.

classmethod spec(name=None, inputs=None, par=None, config=None)[source]

Create a ShellBoltSpec for a Python Bolt.

This spec represents this Bolt in a Topology.

Parameters:
  • name (str) – Name of this Bolt. Defaults to name of Topology attribute this is assigned to.
  • inputs

    Streams that feed into this Bolt.

    Two forms of this are acceptable:

    1. A dict mapping from ComponentSpec to Grouping.
    2. A list of Stream or ComponentSpec.
  • par (int) –

    Parallelism hint for this Bolt. For Python Components, this works out to be the number of Python processes running it in the the topology (across all machines). See Parallelism and Workers.

    Note

    This can also be specified as an attribute of your Bolt subclass.

  • config (dict) –

    Component-specific config settings to pass to Storm.

    Note

    This can also be specified as an attribute of your Bolt subclass.

Note

This method does not take a outputs argument because outputs should be an attribute of your Bolt subclass.

class streamparse.BatchingBolt(*args, **kwargs)[source]

Bases: pystorm.bolt.BatchingBolt, streamparse.storm.bolt.Bolt

pystorm BatchingBolt with streamparse-specific additions

ack(tup)

Indicate that processing of a Tuple has succeeded.

Parameters:tup (str or pystorm.component.Tuple) – the Tuple to acknowledge.
emit(tup, **kwargs)[source]

Modified emit that will not return task IDs after emitting.

See pystorm.component.Bolt for more information.

Returns:None.
fail(tup)

Indicate that processing of a Tuple has failed.

Parameters:tup (str or pystorm.component.Tuple) – the Tuple to fail (its id if str).
group_key(tup)[source]

Return the group key used to group Tuples within a batch.

By default, returns None, which put all Tuples in a single batch, effectively just time-based batching. Override this to create multiple batches based on a key.

Parameters:tup (pystorm.component.Tuple) – the Tuple used to extract a group key
Returns:Any hashable value.
initialize(storm_conf, context)

Called immediately after the initial handshake with Storm and before the main run loop. A good place to initialize connections to data sources.

Parameters:
  • storm_conf (dict) – the Storm configuration for this component. This is the configuration provided to the topology, merged in with cluster configuration on the worker node.
  • context (dict) – information about the component’s place within the topology such as: task IDs, inputs, outputs etc.
is_heartbeat(tup)
Returns:Whether or not the given Tuple is a heartbeat
is_tick(tup)
Returns:Whether or not the given Tuple is a tick Tuple
log(message, level=None)

Log a message to Storm optionally providing a logging level.

Parameters:
  • message (str) – the log message to send to Storm.
  • level (str) – the logging level that Storm should use when writing the message. Can be one of: trace, debug, info, warn, or error (default: info).

Warning

This will send your message to Storm regardless of what level you specify. In almost all cases, you are better of using Component.logger and not setting pystorm.log.path, because that will use a pystorm.component.StormHandler to do the filtering on the Python side (instead of on the Java side after taking the time to serialize your message and send it to Storm).

process(tup)[source]

Group non-tick Tuples into batches by group_key.

Warning

This method should not be overriden. If you want to tweak how Tuples are grouped into batches, override group_key.

process_batch(key, tups)[source]

Process a batch of Tuples. Should be overridden by subclasses.

Parameters:
  • key (hashable) – the group key for the list of batches.
  • tups (list) – a list of pystorm.component.Tuple s for the group.
process_batches()[source]

Iterate through all batches, call process_batch on them, and ack.

Separated out for the rare instances when we want to subclass BatchingBolt and customize what mechanism causes batches to be processed.

process_tick(tick_tup)[source]

Increment tick counter, and call process_batch for all current batches if tick counter exceeds ticks_between_batches.

See pystorm.component.Bolt for more information.

Warning

This method should not be overriden. If you want to tweak how Tuples are grouped into batches, override group_key.

raise_exception(exception, tup=None)

Report an exception back to Storm via logging.

Parameters:
  • exception – a Python exception.
  • tup – a Tuple object.
read_handshake()

Read and process an initial handshake message from Storm.

read_message()

Read a message from Storm via serializer.

read_tuple()

Read a tuple from the pipe to Storm.

report_metric(name, value)

Report a custom metric back to Storm.

Parameters:
  • name – Name of the metric. This can be anything.
  • value – Value of the metric. This is usually a number.

Only supported in Storm 0.9.3+.

run()

Main run loop for all components.

Performs initial handshake with Storm and reads Tuples handing them off to subclasses. Any exceptions are caught and logged back to Storm prior to the Python process exiting.

Warning

Subclasses should not override this method.

send_message(message)

Send a message to Storm via stdout.

spec(name=None, inputs=None, par=None, config=None)

Create a ShellBoltSpec for a Python Bolt.

This spec represents this Bolt in a Topology.

Parameters:
  • name (str) – Name of this Bolt. Defaults to name of Topology attribute this is assigned to.
  • inputs

    Streams that feed into this Bolt.

    Two forms of this are acceptable:

    1. A dict mapping from ComponentSpec to Grouping.
    2. A list of Stream or ComponentSpec.
  • par (int) –

    Parallelism hint for this Bolt. For Python Components, this works out to be the number of Python processes running it in the the topology (across all machines). See Parallelism and Workers.

    Note

    This can also be specified as an attribute of your Bolt subclass.

  • config (dict) –

    Component-specific config settings to pass to Storm.

    Note

    This can also be specified as an attribute of your Bolt subclass.

Note

This method does not take a outputs argument because outputs should be an attribute of your Bolt subclass.

class streamparse.TicklessBatchingBolt(*args, **kwargs)[source]

Bases: pystorm.bolt.TicklessBatchingBolt, streamparse.storm.bolt.BatchingBolt

pystorm TicklessBatchingBolt with streamparse-specific additions

ack(tup)

Indicate that processing of a Tuple has succeeded.

Parameters:tup (str or pystorm.component.Tuple) – the Tuple to acknowledge.
emit(tup, **kwargs)

Modified emit that will not return task IDs after emitting.

See pystorm.component.Bolt for more information.

Returns:None.
fail(tup)

Indicate that processing of a Tuple has failed.

Parameters:tup (str or pystorm.component.Tuple) – the Tuple to fail (its id if str).
group_key(tup)

Return the group key used to group Tuples within a batch.

By default, returns None, which put all Tuples in a single batch, effectively just time-based batching. Override this to create multiple batches based on a key.

Parameters:tup (pystorm.component.Tuple) – the Tuple used to extract a group key
Returns:Any hashable value.
initialize(storm_conf, context)

Called immediately after the initial handshake with Storm and before the main run loop. A good place to initialize connections to data sources.

Parameters:
  • storm_conf (dict) – the Storm configuration for this component. This is the configuration provided to the topology, merged in with cluster configuration on the worker node.
  • context (dict) – information about the component’s place within the topology such as: task IDs, inputs, outputs etc.
is_heartbeat(tup)
Returns:Whether or not the given Tuple is a heartbeat
is_tick(tup)
Returns:Whether or not the given Tuple is a tick Tuple
log(message, level=None)

Log a message to Storm optionally providing a logging level.

Parameters:
  • message (str) – the log message to send to Storm.
  • level (str) – the logging level that Storm should use when writing the message. Can be one of: trace, debug, info, warn, or error (default: info).

Warning

This will send your message to Storm regardless of what level you specify. In almost all cases, you are better of using Component.logger and not setting pystorm.log.path, because that will use a pystorm.component.StormHandler to do the filtering on the Python side (instead of on the Java side after taking the time to serialize your message and send it to Storm).

process(tup)

Group non-tick Tuples into batches by group_key.

Warning

This method should not be overriden. If you want to tweak how Tuples are grouped into batches, override group_key.

process_batch(key, tups)

Process a batch of Tuples. Should be overridden by subclasses.

Parameters:
  • key (hashable) – the group key for the list of batches.
  • tups (list) – a list of pystorm.component.Tuple s for the group.
process_batches()

Iterate through all batches, call process_batch on them, and ack.

Separated out for the rare instances when we want to subclass BatchingBolt and customize what mechanism causes batches to be processed.

process_tick(tick_tup)[source]

Just ack tick tuples and ignore them.

raise_exception(exception, tup=None)

Report an exception back to Storm via logging.

Parameters:
  • exception – a Python exception.
  • tup – a Tuple object.
read_handshake()

Read and process an initial handshake message from Storm.

read_message()

Read a message from Storm via serializer.

read_tuple()

Read a tuple from the pipe to Storm.

report_metric(name, value)

Report a custom metric back to Storm.

Parameters:
  • name – Name of the metric. This can be anything.
  • value – Value of the metric. This is usually a number.

Only supported in Storm 0.9.3+.

run()

Main run loop for all components.

Performs initial handshake with Storm and reads Tuples handing them off to subclasses. Any exceptions are caught and logged back to Storm prior to the Python process exiting.

Warning

Subclasses should not override this method.

send_message(message)

Send a message to Storm via stdout.

spec(name=None, inputs=None, par=None, config=None)

Create a ShellBoltSpec for a Python Bolt.

This spec represents this Bolt in a Topology.

Parameters:
  • name (str) – Name of this Bolt. Defaults to name of Topology attribute this is assigned to.
  • inputs

    Streams that feed into this Bolt.

    Two forms of this are acceptable:

    1. A dict mapping from ComponentSpec to Grouping.
    2. A list of Stream or ComponentSpec.
  • par (int) –

    Parallelism hint for this Bolt. For Python Components, this works out to be the number of Python processes running it in the the topology (across all machines). See Parallelism and Workers.

    Note

    This can also be specified as an attribute of your Bolt subclass.

  • config (dict) –

    Component-specific config settings to pass to Storm.

    Note

    This can also be specified as an attribute of your Bolt subclass.

Note

This method does not take a outputs argument because outputs should be an attribute of your Bolt subclass.

Logging

class streamparse.StormHandler(serializer)[source]

Bases: logging.Handler

Handler that will send messages back to Storm.

Initialize handler

Parameters:serializer – The serializer of the component this handler is being used for.
emit(record)[source]

Emit a record.

If a formatter is specified, it is used to format the record. If exception information is present, it is formatted using traceback.print_exception and sent to Storm.

Topology DSL

class streamparse.Topology[source]

Class to define a Storm topology in a Python DSL.

class streamparse.Grouping[source]

A Grouping describes how Tuples should be distributed to the tasks of a Bolt listening on a particular stream.

When no Grouping is specified, it defaults to SHUFFLE for normal streams, and DIRECT for direct streams.

Variables:
  • SHUFFLE – Tuples are randomly distributed across the Bolt’s tasks in a way such that each Bolt is guaranteed to get an equal number of Tuples.
  • GLOBAL – The entire stream goes to a single one of the Bolt’s tasks. Specifically, it goes to the task with the lowest id.
  • DIRECT – This is a special kind of grouping. A stream grouped this way means that the producer of the Tuple decides which task of the consumer will receive this Tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using the the direct_task parameter to the streamparse.Bolt.emit() and streamparse.Spout.emit() methods.
  • ALL – The stream is replicated across all the Bolt’s tasks. Use this grouping with care.
  • NONE – This grouping specifies that you don’t care how the stream is grouped. Currently, none groupings are equivalent to shuffle groupings. Eventually though, Storm will push down Bolts with none groupings to execute in the same thread as the Bolt or Spout they subscribe from (when possible).
  • LOCAL_OR_SHUFFLE – If the target Bolt has one or more tasks in the same worker process, Tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a normal shuffle grouping.
classmethod custom_object(java_class_name, arg_list)[source]

Tuples will be assigned to tasks by the given Java class.

classmethod custom_serialized(java_serialized)[source]

Tuples will be assigned to tasks by the given Java serialized class.

classmethod fields(*fields)[source]

The stream is partitioned by the fields specified in the grouping.

For example, if the stream is grouped by the user-id field, Tuples with the same user-id will always go to the same task, but Tuples with different user-id‘s may go to different tasks.

class streamparse.Stream(fields=None, name='default', direct=False)[source]

A Storm output stream

Parameters:
  • fields (list or tuple of str) – Field names for this stream.
  • name (str) – Name of stream. Defaults to default.
  • direct (bool) – Whether or not this stream is direct. Default is False. See DIRECT.
class streamparse.JavaBolt(input_stream=<open file '<stdin>', mode 'r'>, output_stream=<open file '<stdout>', mode 'w'>, rdb_signal=u'SIGUSR1', serializer=u'json')[source]
classmethod spec(name=None, serialized_java=None, full_class_name=None, args_list=None, inputs=None, par=1, config=None, outputs=None)[source]

Create a JavaBoltSpec for a Java Bolt.

This spec represents this Bolt in a Topology.

You must add the appropriate entries to your classpath by editing your project’s project.clj file in order for this to work.

Parameters:
  • name (str) – Name of this Bolt. Defaults to name of Topology attribute this is assigned to.
  • serialized_java (bytes) – Serialized Java code representing the class. You must either specify this, or both full_class_name and args_list.
  • full_class_name (str) – Fully qualified class name (including the package name)
  • args_list (list of basic data types) – A list of arguments to be passed to the constructor of this class.
  • inputs

    Streams that feed into this Bolt.

    Two forms of this are acceptable:

    1. A dict mapping from ComponentSpec to Grouping.
    2. A list of Stream or ComponentSpec.
  • par (int) – Parallelism hint for this Bolt. For Python Components, this works out to be the number of Python processes running it in the the topology (across all machines). See Parallelism and Workers.
  • config (dict) – Component-specific config settings to pass to Storm.
  • outputs

    Outputs this JavaBolt will produce. Acceptable forms are:

    1. A list of Stream objects describing the fields output on each stream.
    2. A list of str representing the fields output on the default stream.
class streamparse.JavaSpout(input_stream=<open file '<stdin>', mode 'r'>, output_stream=<open file '<stdout>', mode 'w'>, rdb_signal=u'SIGUSR1', serializer=u'json')[source]
classmethod spec(name=None, serialized_java=None, full_class_name=None, args_list=None, par=1, config=None, outputs=None)[source]

Create a JavaSpoutSpec for a Java Spout.

This spec represents this Spout in a Topology.

You must add the appropriate entries to your classpath by editing your project’s project.clj file in order for this to work.

Parameters:
  • name (str) – Name of this Spout. Defaults to name of Topology attribute this is assigned to.
  • serialized_java (bytes) – Serialized Java code representing the class. You must either specify this, or both full_class_name and args_list.
  • full_class_name (str) – Fully qualified class name (including the package name)
  • args_list (list of basic data types) – A list of arguments to be passed to the constructor of this class.
  • par (int) – Parallelism hint for this Spout. See Parallelism and Workers.
  • config (dict) – Component-specific config settings to pass to Storm.
  • outputs

    Outputs this JavaSpout will produce. Acceptable forms are:

    1. A list of Stream objects describing the fields output on each stream.
    2. A list of str representing the fields output on the default stream.
class streamparse.ShellBolt(input_stream=<open file '<stdin>', mode 'r'>, output_stream=<open file '<stdout>', mode 'w'>, rdb_signal=u'SIGUSR1', serializer=u'json')[source]

A Bolt that is started by running a command with a script argument.

classmethod spec(name=None, command=None, script=None, inputs=None, par=None, config=None, outputs=None)[source]

Create a ShellBoltSpec for a non-Java, non-Python Bolt.

If you want to create a spec for a Python Bolt, use spec().

This spec represents this Bolt in a Topology.

Parameters:
  • name (str) – Name of this Bolt. Defaults to name of Topology attribute this is assigned to.
  • command (str) – Path to command the Storm will execute.
  • script (str) – Arguments to command. Multiple arguments should just be separated by spaces.
  • inputs

    Streams that feed into this Bolt.

    Two forms of this are acceptable:

    1. A dict mapping from ComponentSpec to Grouping.
    2. A list of Stream or ComponentSpec.
  • par (int) – Parallelism hint for this Bolt. For shell Components, this works out to be the number of running it in the the topology (across all machines). See Parallelism and Workers.
  • config (dict) – Component-specific config settings to pass to Storm.
  • outputs

    Outputs this ShellBolt will produce. Acceptable forms are:

    1. A list of Stream objects describing the fields output on each stream.
    2. A list of str representing the fields output on the default stream.
class streamparse.ShellSpout(input_stream=<open file '<stdin>', mode 'r'>, output_stream=<open file '<stdout>', mode 'w'>, rdb_signal=u'SIGUSR1', serializer=u'json')[source]
classmethod spec(name=None, command=None, script=None, par=None, config=None, outputs=None)[source]

Create a ShellSpoutSpec for a non-Java, non-Python Spout.

If you want to create a spec for a Python Spout, use spec().

This spec represents this Spout in a Topology.

Parameters:
  • name (str) – Name of this Spout. Defaults to name of Topology attribute this is assigned to.
  • command (str) – Path to command the Storm will execute.
  • script (str) – Arguments to command. Multiple arguments should just be separated by spaces.
  • par (int) – Parallelism hint for this Spout. For shell Components, this works out to be the number of processes running it in the the topology (across all machines). See Parallelism and Workers.
  • config (dict) – Component-specific config settings to pass to Storm.
  • outputs

    Outputs this ShellSpout will produce. Acceptable forms are:

    1. A list of Stream objects describing the fields output on each stream.
    2. A list of str representing the fields output on the default stream.