API

Tuples

class streamparse.storm.component.Tuple(id, component, stream, task, values)

Storm’s primitive data type passed around via streams.

Variables:
  • id – the ID of the tuple.
  • component – component that the tuple was generated from.
  • stream – the stream that the tuple was emitted into.
  • task – the task the tuple was generated from.
  • values – the payload of the tuple where data is stored.

You should never have to instantiate an instance of a streamparse.storm.component.Tuple yourself as streamparse handles this for you prior to, for example, a streamparse.storm.bolt.Bolt‘s process() method being called.

None of the emit methods for bolts or spouts require that you pass a streamparse.storm.component.Tuple instance.

Components

Both streamparse.storm.bolt.Bolt and streamparse.storm.spout.Spout inherit from a common base-class, streamparse.storm.component.Component. It handles the basic Multi-Lang IPC between Storm and Python.

class streamparse.storm.component.Component(input_stream=<open file '<stdin>', mode 'r'>, output_stream=<open file '<stdout>', mode 'w'>)[source]

Base class for Spouts and Bolts which contains class methods for logging messages back to the Storm worker process.

Variables:
  • input_stream – The file-like object to use to retrieve commands from Storm. Defaults to sys.stdin.
  • output_stream – The file-like object to send messages to Storm with. Defaults to sys.stdout.
  • topology_name – The name of the topology sent by Storm in the initial handshake.
  • task_id – The numerical task ID for this component, as sent by Storm in the initial handshake.
  • component_name – The name of this component, as sent by Storm in the initial handshake.
  • debug – A bool indicating whether or not Storm is running in debug mode. Specified by the topology.debug Storm setting.
  • storm_conf – A dict containing the configuration values sent by Storm in the initial handshake with this component.
  • context – The context of where this component is in the topology. See the Storm Multi-Lang protocol documentation for details.
  • pid – An int indicating the process ID of this component as retrieved by os.getpid().
  • logger

    A logger to use with this component.

    Note

    Using Component.logger combined with the streamparse.storm.component.StormHandler handler is the recommended way for logging messages from your component. If you use Component.log instead, the logging messages will always be sent to Storm, even if they are debug level messages and you are running in production. Using streamparse.storm.component.StormHandler ensures that you will instead have your logging messages filtered on the Python side and only have the messages you actually want logged serialized and sent to Storm.

emit(tup, tup_id=None, stream=None, anchors=None, direct_task=None, need_task_ids=True)[source]

Emit a new tuple to a stream.

Parameters:
  • tup (list or streamparse.storm.component.Tuple) – the Tuple payload to send to Storm, should contain only JSON-serializable data.
  • tup_id (str) – the ID for the tuple. If omitted by a streamparse.storm.spout.Spout, this emit will be unreliable.
  • stream (str) – the ID of the stream to emit this tuple to. Specify None to emit to default stream.
  • anchors (list) – IDs the tuples (or streamparse.storm.component.Tuple instances) which the emitted tuples should be anchored to. If auto_anchor is set to True and you have not specified anchors, anchors will be set to the incoming/most recent tuple ID(s). This is only passed by streamparse.storm.bolt.Bolt.
  • direct_task (int) – the task to send the tuple to.
  • need_task_ids (bool) – indicate whether or not you’d like the task IDs the tuple was emitted (default: True).
Returns:

a list of task IDs that the tuple was sent to. Note that when specifying direct_task, this will be equal to [direct_task]. If you specify need_task_ids=False, this function will return None.

log(message, level=None)[source]

Log a message to Storm optionally providing a logging level.

Parameters:
  • message (str) – the log message to send to Storm.
  • level (str) – the logging level that Storm should use when writing the message. Can be one of: trace, debug, info, warn, or error (default: info).

Warning

This will send your message to Storm regardless of what level you specify. In almost all cases, you are better of using Component.logger with a streamparse.storm.component.StormHandler, because the filtering will happen on the Python side (instead of on the Java side after taking the time to serialize your message and send it to Storm).

raise_exception(exception, tup=None)[source]

Report an exception back to Storm via logging.

Parameters:
  • exception – a Python exception.
  • tup – a Tuple object.
read_handshake()[source]

Read and process an initial handshake message from Storm.

read_message()[source]

Read a message from Storm, reconstruct newlines appropriately.

All of Storm’s messages (for either Bolts or Spouts) should be of the form:

'<command or task_id form prior emit>\nend\n'

Command example, an incoming tuple to a bolt:

'{ "id": "-6955786537413359385",  "comp": "1", "stream": "1", "task": 9, "tuple": ["snow white and the seven dwarfs", "field2", 3]}\nend\n'

Command example for a Spout to emit it’s next tuple:

'{"command": "next"}\nend\n'

Example, the task IDs a prior emit was sent to:

'[12, 22, 24]\nend\n'

The edge case of where we read '' from input_stream indicating EOF, usually means that communication with the supervisor has been severed.

run()[source]

Main run loop for all components.

Performs initial handshake with Storm and reads tuples handing them off to subclasses. Any exceptions are caught and logged back to Storm prior to the Python process exiting.

Warning

Subclasses should not override this method.

send_message(message)[source]

Send a message to Storm via stdout.

Spouts

Spouts are data sources for topologies, they can read from any data source and emit tuples into streams.

class streamparse.storm.spout.Spout(input_stream=<open file '<stdin>', mode 'r'>, output_stream=<open file '<stdout>', mode 'w'>)[source]

Bases: streamparse.storm.component.Component

Base class for all streamparse spouts.

For more information on spouts, consult Storm’s Concepts documentation.

ack(tup_id)[source]

Called when a bolt acknowledges a tuple in the topology.

Parameters:tup_id (str) – the ID of the tuple that has been fully acknowledged in the topology.
emit(tup, tup_id=None, stream=None, direct_task=None, need_task_ids=True)[source]

Emit a spout tuple message.

Parameters:
  • tup (list or tuple) – the tuple to send to Storm, should contain only JSON-serializable data.
  • tup_id (str) – the ID for the tuple. Leave this blank for an unreliable emit.
  • stream (str) – ID of the stream this tuple should be emitted to. Leave empty to emit to the default stream.
  • direct_task (int) – the task to send the tuple to if performing a direct emit.
  • need_task_ids (bool) – indicate whether or not you’d like the task IDs the tuple was emitted (default: True).
Returns:

a list of task IDs that the tuple was sent to. Note that when specifying direct_task, this will be equal to [direct_task]. If you specify need_task_ids=False, this function will return None.

emit_many(tuples, stream=None, tup_ids=None, direct_task=None, need_task_ids=True)[source]

Emit multiple tuples.

Parameters:
  • tuples (list) – a list of multiple tuple payloads to send to Storm. All tuples should contain only JSON-serializable data.
  • stream (str) – the ID of the steram to emit these tuples to. Specify None to emit to default stream.
  • tup_ids (list) – the ID for the tuple. Leave this blank for an unreliable emit.
  • tup_ids – IDs for each of the tuples in the list. Omit these for an unreliable emit.
  • direct_task (int) – indicates the task to send the tuple to.
  • need_task_ids (bool) – indicate whether or not you’d like the task IDs the tuple was emitted (default: True).

Deprecated since version 2.0.0: Just call Spout.emit() repeatedly instead.

fail(tup_id)[source]

Called when a tuple fails in the topology

A Spout can choose to emit the tuple again or ignore the fail. The default is to ignore.

Parameters:tup_id (str) – the ID of the tuple that has failed in the topology either due to a bolt calling fail() or a tuple timing out.
initialize(storm_conf, context)[source]

Called immediately after the initial handshake with Storm and before the main run loop. A good place to initialize connections to data sources.

Parameters:
  • storm_conf (dict) – the Storm configuration for this Spout. This is the configuration provided to the topology, merged in with cluster configuration on the worker node.
  • context (dict) – information about the component’s place within the topology such as: task IDs, inputs, outputs etc.
log(message, level=None)

Log a message to Storm optionally providing a logging level.

Parameters:
  • message (str) – the log message to send to Storm.
  • level (str) – the logging level that Storm should use when writing the message. Can be one of: trace, debug, info, warn, or error (default: info).

Warning

This will send your message to Storm regardless of what level you specify. In almost all cases, you are better of using Component.logger with a streamparse.storm.component.StormHandler, because the filtering will happen on the Python side (instead of on the Java side after taking the time to serialize your message and send it to Storm).

next_tuple()[source]

Implement this function to emit tuples as necessary.

This function should not block, or Storm will think the spout is dead. Instead, let it return and streamparse will send a noop to storm, which lets it know the spout is functioning.

raise_exception(exception, tup=None)

Report an exception back to Storm via logging.

Parameters:
  • exception – a Python exception.
  • tup – a Tuple object.
read_handshake()

Read and process an initial handshake message from Storm.

read_message()

Read a message from Storm, reconstruct newlines appropriately.

All of Storm’s messages (for either Bolts or Spouts) should be of the form:

'<command or task_id form prior emit>\nend\n'

Command example, an incoming tuple to a bolt:

'{ "id": "-6955786537413359385",  "comp": "1", "stream": "1", "task": 9, "tuple": ["snow white and the seven dwarfs", "field2", 3]}\nend\n'

Command example for a Spout to emit it’s next tuple:

'{"command": "next"}\nend\n'

Example, the task IDs a prior emit was sent to:

'[12, 22, 24]\nend\n'

The edge case of where we read '' from input_stream indicating EOF, usually means that communication with the supervisor has been severed.

run()

Main run loop for all components.

Performs initial handshake with Storm and reads tuples handing them off to subclasses. Any exceptions are caught and logged back to Storm prior to the Python process exiting.

Warning

Subclasses should not override this method.

send_message(message)

Send a message to Storm via stdout.

Bolts

class streamparse.storm.bolt.Bolt(input_stream=<open file '<stdin>', mode 'r'>, output_stream=<open file '<stdout>', mode 'w'>)[source]

Bases: streamparse.storm.component.Component

The base class for all streamparse bolts.

For more information on bolts, consult Storm’s Concepts documentation.

Variables:
  • auto_anchor – A bool indicating whether or not the bolt should automatically anchor emits to the incoming tuple ID. Tuple anchoring is how Storm provides reliability, you can read more about tuple anchoring in Storm’s docs. Default is True.
  • auto_ack – A bool indicating whether or not the bolt should automatically acknowledge tuples after process() is called. Default is True.
  • auto_fail – A bool indicating whether or not the bolt should automatically fail tuples when an exception occurs when the process() method is called. Default is True.

Example:

from streamparse.bolt import Bolt

class SentenceSplitterBolt(Bolt):

    def process(self, tup):
        sentence = tup.values[0]
        for word in sentence.split(" "):
            self.emit([word])
ack(tup)[source]

Indicate that processing of a tuple has succeeded.

Parameters:tup (str or streamparse.storm.component.Tuple) – the tuple to acknowledge.
emit(tup, stream=None, anchors=None, direct_task=None, need_task_ids=True)[source]

Emit a new tuple to a stream.

Parameters:
  • tup (list or streamparse.storm.component.Tuple) – the Tuple payload to send to Storm, should contain only JSON-serializable data.
  • stream (str) – the ID of the stream to emit this tuple to. Specify None to emit to default stream.
  • anchors (list) – IDs the tuples (or streamparse.storm.component.Tuple instances) which the emitted tuples should be anchored to. If auto_anchor is set to True and you have not specified anchors, anchors will be set to the incoming/most recent tuple ID(s).
  • direct_task (int) – the task to send the tuple to.
  • need_task_ids (bool) – indicate whether or not you’d like the task IDs the tuple was emitted (default: True).
Returns:

a list of task IDs that the tuple was sent to. Note that when specifying direct_task, this will be equal to [direct_task]. If you specify need_task_ids=False, this function will return None.

emit_many(tuples, stream=None, anchors=None, direct_task=None, need_task_ids=True)[source]

Emit multiple tuples.

Parameters:
  • tuples (list) – a list of multiple tuple payloads to send to Storm. All tuples should contain only JSON-serializable data.
  • stream (str) – the ID of the steram to emit these tuples to. Specify None to emit to default stream.
  • anchors (list) – IDs the tuples (or streamparse.storm.component.Tuple instances) which the emitted tuples should be anchored to. If auto_anchor is set to True and you have not specified anchors, anchors will be set to the incoming/most recent tuple ID(s).
  • direct_task (int) – indicates the task to send the tuple to.
  • need_task_ids (bool) – indicate whether or not you’d like the task IDs the tuple was emitted (default: True).

Deprecated since version 2.0.0: Just call Bolt.emit() repeatedly instead.

fail(tup)[source]

Indicate that processing of a tuple has failed.

Parameters:tup (str or streamparse.storm.component.Tuple) – the tuple to fail (its id if str).
initialize(storm_conf, context)[source]

Called immediately after the initial handshake with Storm and before the main run loop. A good place to initialize connections to data sources.

Parameters:
  • storm_conf (dict) – the Storm configuration for this Bolt. This is the configuration provided to the topology, merged in with cluster configuration on the worker node.
  • context (dict) – information about the component’s place within the topology such as: task IDs, inputs, outputs etc.
log(message, level=None)

Log a message to Storm optionally providing a logging level.

Parameters:
  • message (str) – the log message to send to Storm.
  • level (str) – the logging level that Storm should use when writing the message. Can be one of: trace, debug, info, warn, or error (default: info).

Warning

This will send your message to Storm regardless of what level you specify. In almost all cases, you are better of using Component.logger with a streamparse.storm.component.StormHandler, because the filtering will happen on the Python side (instead of on the Java side after taking the time to serialize your message and send it to Storm).

process(tup)[source]

Process a single tuple streamparse.storm.component.Tuple of input

This should be overridden by subclasses. streamparse.storm.component.Tuple objects contain metadata about which component, stream and task it came from. The actual values of the tuple can be accessed by calling tup.values.

Parameters:tup (streamparse.storm.component.Tuple) – the tuple to be processed.
process_tick(tup)[source]

Process special ‘tick tuples’ which allow time-based behaviour to be included in bolts.

Default behaviour is to ignore time ticks. This should be overridden by subclasses who wish to react to timer events via tick tuples.

Tick tuples will be sent to all bolts in a toplogy when the storm configuration option ‘topology.tick.tuple.freq.secs’ is set to an integer value, the number of seconds.

Parameters:tup (streamparse.storm.component.Tuple) – the tuple to be processed.
raise_exception(exception, tup=None)

Report an exception back to Storm via logging.

Parameters:
  • exception – a Python exception.
  • tup – a Tuple object.
read_handshake()

Read and process an initial handshake message from Storm.

read_message()

Read a message from Storm, reconstruct newlines appropriately.

All of Storm’s messages (for either Bolts or Spouts) should be of the form:

'<command or task_id form prior emit>\nend\n'

Command example, an incoming tuple to a bolt:

'{ "id": "-6955786537413359385",  "comp": "1", "stream": "1", "task": 9, "tuple": ["snow white and the seven dwarfs", "field2", 3]}\nend\n'

Command example for a Spout to emit it’s next tuple:

'{"command": "next"}\nend\n'

Example, the task IDs a prior emit was sent to:

'[12, 22, 24]\nend\n'

The edge case of where we read '' from input_stream indicating EOF, usually means that communication with the supervisor has been severed.

run()

Main run loop for all components.

Performs initial handshake with Storm and reads tuples handing them off to subclasses. Any exceptions are caught and logged back to Storm prior to the Python process exiting.

Warning

Subclasses should not override this method.

send_message(message)

Send a message to Storm via stdout.

class streamparse.storm.bolt.BatchingBolt(*args, **kwargs)[source]

Bases: streamparse.storm.bolt.Bolt

A bolt which batches tuples for processing.

Batching tuples is unexpectedly complex to do correctly. The main problem is that all bolts are single-threaded. The difficult comes when the topology is shutting down because Storm stops feeding the bolt tuples. If the bolt is blocked waiting on stdin, then it can’t process any waiting tuples, or even ack ones that were asynchronously written to a data store.

This bolt helps with that by grouping tuples received between tick tuples into batches.

To use this class, you must implement process_batch. group_key can be optionally implemented so that tuples are grouped before process_batch is even called.

You must also set the topology.tick.tuple.freq.secs to how frequently you would like ticks to be sent. If you want ticks_between_batches to work the same way secs_between_batches worked in older versions of streamparse, just set topology.tick.tuple.freq.secs to 1.

Variables:
  • auto_anchor

    A bool indicating whether or not the bolt should automatically anchor emits to the incoming tuple ID. Tuple anchoring is how Storm provides reliability, you can read more about tuple anchoring in Storm’s docs. Default is True.

  • auto_ack – A bool indicating whether or not the bolt should automatically acknowledge tuples after process_batch() is called. Default is True.
  • auto_fail – A bool indicating whether or not the bolt should automatically fail tuples when an exception occurs when the process_batch() method is called. Default is True.
  • ticks_between_batches – The number of tick tuples to wait before processing a batch.

Example:

from streamparse.bolt import BatchingBolt

class WordCounterBolt(BatchingBolt):

    ticks_between_batches = 5

    def group_key(self, tup):
        word = tup.values[0]
        return word  # collect batches of words

    def process_batch(self, key, tups):
        # emit the count of words we had per 5s batch
        self.emit([key, len(tups)])
ack(tup)

Indicate that processing of a tuple has succeeded.

Parameters:tup (str or streamparse.storm.component.Tuple) – the tuple to acknowledge.
emit(tup, **kwargs)[source]

Modified emit that will not return task IDs after emitting.

See streamparse.storm.component.Bolt for more information.

Returns:None.
emit_many(tups, **kwargs)[source]

Modified emit_many that will not return task IDs after emitting.

See streamparse.storm.component.Bolt for more information.

Returns:None.

Deprecated since version 2.0.0: Just call BatchingBolt.emit() repeatedly instead.

fail(tup)

Indicate that processing of a tuple has failed.

Parameters:tup (str or streamparse.storm.component.Tuple) – the tuple to fail (its id if str).
group_key(tup)[source]

Return the group key used to group tuples within a batch.

By default, returns None, which put all tuples in a single batch, effectively just time-based batching. Override this to create multiple batches based on a key.

Parameters:tup (streamparse.storm.component.Tuple) – the tuple used to extract a group key
Returns:Any hashable value.
initialize(storm_conf, context)

Called immediately after the initial handshake with Storm and before the main run loop. A good place to initialize connections to data sources.

Parameters:
  • storm_conf (dict) – the Storm configuration for this Bolt. This is the configuration provided to the topology, merged in with cluster configuration on the worker node.
  • context (dict) – information about the component’s place within the topology such as: task IDs, inputs, outputs etc.
log(message, level=None)

Log a message to Storm optionally providing a logging level.

Parameters:
  • message (str) – the log message to send to Storm.
  • level (str) – the logging level that Storm should use when writing the message. Can be one of: trace, debug, info, warn, or error (default: info).

Warning

This will send your message to Storm regardless of what level you specify. In almost all cases, you are better of using Component.logger with a streamparse.storm.component.StormHandler, because the filtering will happen on the Python side (instead of on the Java side after taking the time to serialize your message and send it to Storm).

process(tup)[source]

Group non-tick tuples into batches by group_key.

Warning

This method should not be overriden. If you want to tweak how tuples are grouped into batches, override group_key.

process_batch(key, tups)[source]

Process a batch of tuples. Should be overridden by subclasses.

Parameters:
process_tick(tick_tup)[source]

Increment tick counter, and call process_batch for all current batches if tick counter exceeds ticks_between_batches.

See streamparse.storm.component.Bolt for more information.

Warning

This method should not be overriden. If you want to tweak how tuples are grouped into batches, override group_key.

raise_exception(exception, tup=None)

Report an exception back to Storm via logging.

Parameters:
  • exception – a Python exception.
  • tup – a Tuple object.
read_handshake()

Read and process an initial handshake message from Storm.

read_message()

Read a message from Storm, reconstruct newlines appropriately.

All of Storm’s messages (for either Bolts or Spouts) should be of the form:

'<command or task_id form prior emit>\nend\n'

Command example, an incoming tuple to a bolt:

'{ "id": "-6955786537413359385",  "comp": "1", "stream": "1", "task": 9, "tuple": ["snow white and the seven dwarfs", "field2", 3]}\nend\n'

Command example for a Spout to emit it’s next tuple:

'{"command": "next"}\nend\n'

Example, the task IDs a prior emit was sent to:

'[12, 22, 24]\nend\n'

The edge case of where we read '' from input_stream indicating EOF, usually means that communication with the supervisor has been severed.

run()

Main run loop for all components.

Performs initial handshake with Storm and reads tuples handing them off to subclasses. Any exceptions are caught and logged back to Storm prior to the Python process exiting.

Warning

Subclasses should not override this method.

send_message(message)

Send a message to Storm via stdout.