API¶
Tuples¶
-
class
streamparse.ipc.
Tuple
(id, component, stream, task, values)¶ Storm’s primitive data type passed around via streams.
Variables: - id – the ID of the tuple.
- component – component that the tuple was generated from.
- stream – the stream that the tuple was emitted into.
- task – the task the tuple was generated from.
- values – the payload of the tuple where data is stored.
You should never have to instantiate an instance of a
streamparse.ipc.Tuple
yourself as streamparse handles this for you
prior to, for example, a streamparse.bolt.Bolt
‘s process()
method
being called.
None of the emit methods for bolts or spouts require that you pass a
streamparse.ipc.Tuple
instance.
Spouts¶
Spouts are data sources for topologies, they can read from any data source and emit tuples into streams.
-
class
streamparse.spout.
Spout
¶ Base class for all streamparse spouts.
For more information on spouts, consult Storm’s Concepts documentation.
-
ack
(tup_id)¶ Called when a bolt acknowledges a tuple in the topology.
Parameters: tup_id (str) – the ID of the tuple that has been fully acknowledged in the topology.
-
emit
(tup, tup_id=None, stream=None, direct_task=None, need_task_ids=None)¶ Emit a spout tuple message.
Parameters: - tup (list) – the tuple to send to Storm. Should contain only JSON-serializable data.
- tup_id (str) – the ID for the tuple. Leave this blank for an unreliable emit.
- stream (str) – ID of the stream this tuple should be emitted to. Leave empty to emit to the default stream.
- direct_task (int) – the task to send the tuple to if performing a direct emit.
- need_task_ids (bool) – indicate whether or not you’d like the task IDs
the tuple was emitted (default:
True
).
Returns: a
list
of task IDs that the tuple was sent to. Note that when specifying direct_task, this will be equal to[direct_task]
. If you specifyneed_task_ids=False
, this function will returnNone
.
-
emit_many
(tuples, stream=None, anchors=None, direct_task=None, need_task_ids=None)¶ Emit multiple tuples.
Parameters: - tuples (list) – a
list
containinglist
s of tuple payload data to send to Storm. All tuples should contain only JSON-serializable data. - stream (str) – the ID of the steram to emit these tuples to. Specify
None
to emit to default stream. - anchors (list) – IDs the tuples (or
streamparse.ipc.Tuple
instances) which the emitted tuples should be anchored to. Ifauto_anchor
is set toTrue
and you have not specifiedanchors
,anchors
will be set to the incoming/most recent tuple ID(s). - direct_task (int) – indicates the task to send the tuple to.
- need_task_ids (bool) – indicate whether or not you’d like the task IDs
the tuple was emitted (default:
True
).
- tuples (list) – a
-
fail
(tup_id)¶ Called when a tuple fails in the topology
A Spout can choose to emit the tuple again or ignore the fail. The default is to ignore.
Parameters: tup_id (str) – the ID of the tuple that has failed in the topology either due to a bolt calling fail()
or a tuple timing out.
-
initialize
(storm_conf, context)¶ Called immediately after the initial handshake with Storm and before the main run loop. A good place to initialize connections to data sources.
Parameters:
-
log
(message, level=None)¶ Log a message to Storm optionally providing a logging level.
Parameters:
-
next_tuple
()¶ Implement this function to emit tuples as necessary.
This function should not block, or Storm will think the spout is dead. Instead, let it return and streamparse will send a noop to storm, which lets it know the spout is functioning.
-
Bolts¶
-
class
streamparse.bolt.
Bolt
¶ Bases:
streamparse.base.Component
The base class for all streamparse bolts.
For more information on bolts, consult Storm’s Concepts documentation.
Example:
from streamparse.bolt import Bolt class SentenceSplitterBolt(Bolt): def process(self, tup): sentence = tup.values[0] for word in sentence.split(" "): self.emit([word])
-
ack
(tup)¶ Indicate that processing of a tuple has succeeded.
Parameters: tup (str or Tuple) – the tuple to acknowledge.
-
auto_ack
= True¶ A
bool
indicating whether or not the bolt should automatically acknowledge tuples afterprocess()
is called. Default isTrue
.
-
auto_anchor
= True¶ A
bool
indicating whether or not the bolt should automatically anchor emits to the incoming tuple ID. Tuple anchoring is how Storm provides reliability, you can read more about tuple anchoring in Storm’s docs. Default isTrue
.
-
auto_fail
= True¶ A
bool
indicating whether or not the bolt should automatically fail tuples when an exception occurs when theprocess()
method is called. Default isTrue
.
-
emit
(tup, stream=None, anchors=None, direct_task=None, need_task_ids=None)¶ Emit a new tuple to a stream.
Parameters: - tup (list) – the Tuple payload to send to Storm, should contain only JSON-serializable data.
- stream (str) – the ID of the stream to emit this tuple to. Specify
None
to emit to default stream. - anchors (list) – IDs the tuples (or
streamparse.ipc.Tuple
instances) which the emitted tuples should be anchored to. Ifauto_anchor
is set toTrue
and you have not specifiedanchors
,anchors
will be set to the incoming/most recent tuple ID(s). - direct_task (int) – the task to send the tuple to.
- need_task_ids (bool) – indicate whether or not you’d like the task IDs
the tuple was emitted (default:
True
).
Returns: a
list
of task IDs that the tuple was sent to. Note that when specifying direct_task, this will be equal to[direct_task]
. If you specifyneed_task_ids=False
, this function will returnNone
.
-
emit_many
(tuples, stream=None, anchors=None, direct_task=None, need_task_ids=None)¶ Emit multiple tuples.
Parameters: - tuples (list) – a
list
containinglist
s of tuple payload data to send to Storm. All tuples should contain only JSON-serializable data. - stream (str) – the ID of the steram to emit these tuples to. Specify
None
to emit to default stream. - anchors (list) – IDs the tuples (or
streamparse.ipc.Tuple
instances) which the emitted tuples should be anchored to. Ifauto_anchor
is set toTrue
and you have not specifiedanchors
,anchors
will be set to the incoming/most recent tuple ID(s). - direct_task (int) – indicates the task to send the tuple to.
- need_task_ids (bool) – indicate whether or not you’d like the task IDs
the tuple was emitted (default:
True
).
- tuples (list) – a
-
fail
(tup)¶ Indicate that processing of a tuple has failed.
Parameters: tup (str or Tuple) – the tuple to fail ( id
ifstr
).
-
initialize
(storm_conf, context)¶ Called immediately after the initial handshake with Storm and before the main run loop. A good place to initialize connections to data sources.
Parameters:
-
log
(message, level=None)¶ Log a message to Storm optionally providing a logging level.
Parameters:
-
process
(tup)¶ Process a single tuple
streamparse.ipc.Tuple
of inputThis should be overridden by subclasses.
streamparse.ipc.Tuple
objects contain metadata about which component, stream and task it came from. The actual values of the tuple can be accessed by callingtup.values
.Parameters: tup (streamparse.ipc.Tuple) – the tuple to be processed.
-
-
class
streamparse.bolt.
BatchingBolt
¶ Bases:
streamparse.bolt.Bolt
A bolt which batches tuples for processing.
Batching tuples is unexpectedly complex to do correctly. The main problem is that all bolts are single-threaded. The difficult comes when the topology is shutting down because Storm stops feeding the bolt tuples. If the bolt is blocked waiting on stdin, then it can’t process any waiting tuples, or even ack ones that were asynchronously written to a data store.
This bolt helps with that grouping tuples based on a time interval and then processing them on a worker thread.
To use this class, you must implement
process_batch
.group_key
can be optionally implemented so that tuples are grouped beforeprocess_batch
is even called.Example:
from streamparse.bolt import BatchingBolt class WordCounterBolt(BatchingBolt): secs_between_batches = 5 def group_key(self, tup): word = tup.values[0] return word # collect batches of words def process_batch(self, key, tups): # emit the count of words we had per 5s batch self.emit([key, len(tups)])
-
auto_ack
= True¶ A
bool
indicating whether or not the bolt should automatically acknowledge tuples afterprocess_batch()
is called. Default isTrue
.
-
auto_anchor
= True¶ A
bool
indicating whether or not the bolt should automatically anchor emits to the incoming tuple ID. Tuple anchoring is how Storm provides reliability, you can read more about tuple anchoring in Storm’s docs. Default isTrue
.
-
auto_fail
= True¶ A
bool
indicating whether or not the bolt should automatically fail tuples when an exception occurs when theprocess_batch()
method is called. Default isTrue
.
-
group_key
(tup)¶ Return the group key used to group tuples within a batch.
By default, returns None, which put all tuples in a single batch, effectively just time-based batching. Override this create multiple batches based on a key.
Parameters: tup (Tuple) – the tuple used to extract a group key Returns: Any hashable
value.
-
process_batch
(key, tups)¶ Process a batch of tuples. Should be overridden by subclasses.
Parameters: - key (hashable) – the group key for the list of batches.
- tups (list) – a list of
streamparse.ipc.Tuple
s for the group.
-
secs_between_batches
= 2¶ The time (in seconds) between calls to
process_batch()
. Note that if there are no tuples in any batch, the BatchingBolt will continue to sleep. Note: Can be fractional to specify greater precision (e.g. 2.5).
-