Source code for

Streams and Groupings
from __future__ import absolute_import

from six import iteritems, string_types

from ..thrift import storm_thrift
from .util import to_java_arg
from storm_thrift import NullStruct

[docs]class Stream(storm_thrift.StreamInfo): """ A Storm output stream """ def __init__(self, fields=None, name='default', direct=False): """ :param fields: Field names for this stream. :type fields: `list` or `tuple` of `str` :param name: Name of stream. Defaults to ``default``. :type name: `str` :param direct: Whether or not this stream is direct. Default is `False`. See :attr:``. :type direct: `bool` """ if fields is None: fields = [] elif isinstance(fields, (list, tuple)): fields = list(fields) for field in fields: if not isinstance(field, string_types): raise TypeError('All field names must be strings; given: ' '{!r}'.format(field)) else: raise TypeError('Stream fields must be a list, tuple, or None; ' 'given: {!r}'.format(fields)) self.fields = fields if isinstance(name, string_types): = name else: raise TypeError('Stream name must be a string; given: {!r}' .format(name)) if isinstance(direct, bool): = direct else: raise TypeError('"direct" must be either True or False; given: {!r}' .format(direct))
class _Grouping(storm_thrift.Grouping): """ Version of `storm_thrift.Grouping` that has better __str__. """ def __repr__(self): for name, val in iteritems(vars(self)): if not name.startswith('_') and val is not None: if isinstance(val, NullStruct): return '{}'.format(name.upper()) else: return '{}({!r})'.format(name, val)
[docs]class Grouping(object): """ A Grouping describes how Tuples should be distributed to the tasks of a Bolt listening on a particular stream. When no Grouping is specified, it defaults to `SHUFFLE` for normal streams, and `DIRECT` for direct streams. :ivar SHUFFLE: Tuples are randomly distributed across the Bolt's tasks in a way such that each Bolt is guaranteed to get an equal number of Tuples. :ivar GLOBAL: The entire stream goes to a single one of the Bolt's tasks. Specifically, it goes to the task with the lowest id. :ivar DIRECT: This is a special kind of grouping. A stream grouped this way means that the producer of the Tuple decides which task of the consumer will receive this Tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using the the `direct_task` parameter to the :meth:`streamparse.Bolt.emit` and :meth:`streamparse.Spout.emit` methods. :ivar ALL: The stream is replicated across all the Bolt's tasks. Use this grouping with care. :ivar NONE: This grouping specifies that you don't care how the stream is grouped. Currently, none groupings are equivalent to shuffle groupings. Eventually though, Storm will push down Bolts with none groupings to execute in the same thread as the Bolt or Spout they subscribe from (when possible). :ivar LOCAL_OR_SHUFFLE: If the target Bolt has one or more tasks in the same worker process, Tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a normal shuffle grouping. """ __slots__ = () SHUFFLE = _Grouping(shuffle=NullStruct()) GLOBAL = _Grouping(fields=[]) DIRECT = _Grouping(direct=NullStruct()) ALL = _Grouping(all=NullStruct()) NONE = _Grouping(none=NullStruct()) LOCAL_OR_SHUFFLE = _Grouping(local_or_shuffle=NullStruct()) @classmethod
[docs] def fields(cls, *fields): """The stream is partitioned by the fields specified in the grouping. For example, if the stream is grouped by the `user-id` field, Tuples with the same `user-id` will always go to the same task, but Tuples with different `user-id`'s may go to different tasks.""" if len(fields) == 1 and isinstance(fields[0], list): fields = fields[0] else: fields = list(fields) if not fields: raise ValueError('List cannot be empty for fields grouping') return _Grouping(fields=fields)
[docs] def custom_object(cls, java_class_name, arg_list): """Tuples will be assigned to tasks by the given Java class.""" java_object = storm_thrift.JavaObject(full_class_name=java_class_name, arg_list=[to_java_arg(arg) for arg in arg_list]) return _Grouping(custom_object=java_object)
[docs] def custom_serialized(cls, java_serialized): """Tuples will be assigned to tasks by the given Java serialized class. """ if not isinstance(java_serialized, bytes): return TypeError('Argument to custom_serialized must be a ' 'serialized Java class as bytes. Given: {!r}' .format(java_serialized)) return _Grouping(custom_serialized=java_serialized)