Source code for streamparse.dsl.topology

Topology base class
from __future__ import absolute_import

from copy import deepcopy

import simplejson as json
from pystorm.component import Component
from six import add_metaclass, iteritems, itervalues, string_types
from thriftpy.transport import TMemoryBuffer
from thriftpy.protocol import TBinaryProtocol

from ..thrift import storm_thrift
from .bolt import JavaBoltSpec, ShellBoltSpec
from .component import ComponentSpec, ShellComponentSpec
from .spout import JavaSpoutSpec, ShellSpoutSpec
from .util import to_python_arg_list

class TopologyType(type):
    """Class to define a Storm topology in a Python DSL."""

    def __new__(mcs, classname, bases, class_dict):
        bolt_specs = {}
        spout_specs = {}
        # Copy ComponentSpec items out of class_dict
        specs = TopologyType.class_dict_to_specs(class_dict)
        # Perform checks
        for spec in itervalues(specs):
            if isinstance(spec, (JavaBoltSpec, ShellBoltSpec)):
                TopologyType.add_bolt_spec(spec, bolt_specs)
            elif isinstance(spec, (JavaSpoutSpec, ShellSpoutSpec)):
                TopologyType.add_spout_spec(spec, spout_specs)
                raise TypeError('Specifications should either be bolts or '
                                'spouts.  Given: {!r}'.format(spec))
            TopologyType.clean_spec_inputs(spec, specs)
        if classname != 'Topology' and not spout_specs:
            raise ValueError('A Topology requires at least one Spout')
        if 'config' in class_dict:
            config_dict = class_dict['config']
            if not isinstance(config_dict, dict):
                raise TypeError('Topology config must be a dictionary. Given: '
            class_dict['config'] = {}
        class_dict['thrift_bolts'] = bolt_specs
        class_dict['thrift_spouts'] = spout_specs
        class_dict['specs'] = list(specs.values())
        class_dict['thrift_topology'] = storm_thrift.StormTopology(spouts=spout_specs,
        return type.__new__(mcs, classname, bases, class_dict)

    def class_dict_to_specs(mcs, class_dict):
        """Extract valid `ComponentSpec` entries from `Topology.__dict__`."""
        specs = {}
        # Set spec names first
        for name, spec in iteritems(class_dict):
            if isinstance(spec, ComponentSpec):
                # Use the variable name as the specification name.
                if is None:
           = name
                if in specs:
                    raise ValueError("Duplicate component name: {}"
                    specs[] = spec
            elif isinstance(spec, Component):
                raise TypeError('Topology classes should have ComponentSpec '
                                'attributes.  Did you forget to call the spec '
                                'class method for your component?  Given: {!r}'
        return specs

    def add_bolt_spec(mcs, spec, bolt_specs):
        """Add valid Bolt specs to `bolt_specs`; raise exceptions for others."""
        if not spec.inputs:
            cls_name = spec.component_cls.__name__
            raise ValueError('{} "{}" requires at least one input, because it '
                             'is a Bolt.'.format(cls_name,
        bolt_specs[] = storm_thrift.Bolt(bolt_object=spec.component_object,

    def add_spout_spec(mcs, spec, spout_specs):
        """Add valid Spout specs to `spout_specs`; raise exceptions for others.
        if not spec.outputs:
            cls_name = spec.component_cls.__name__
            raise ValueError('{} "{}" requires at least one output, because it '
                             'is a Spout'.format(cls_name,
        spout_specs[] = storm_thrift.SpoutSpec(spout_object=spec.component_object,

    def clean_spec_inputs(mcs, spec, specs):
        """Convert `spec.inputs` to a dict mapping from stream IDs to groupings.
        if spec.inputs is None:
            spec.inputs = {}
        for stream_id, grouping in list(iteritems(spec.inputs)):
            if isinstance(stream_id.componentId, ComponentSpec):
                # Have to reinsert key after fix because hash changes
                del spec.inputs[stream_id]
                stream_id.componentId =
                spec.inputs[stream_id] = grouping
            # This should never happen, but it's worth checking for
            elif stream_id.componentId is None:
                raise TypeError('GlobalStreamId.componentId cannot be None.')
            # Check for invalid fields grouping
            stream_comp = specs[stream_id.componentId]
            valid_fields = set(stream_comp.outputs[stream_id.streamId]
            if grouping.fields is not None:
                for field in grouping.fields:
                    if field not in valid_fields:
                        raise ValueError('Field {!r} specified in grouping is '
                                         'not a valid output field for the {!r}'
                                         ' {!r} stream.'.format(field,

    def __repr__(cls):
        """:returns: A string representation of the topology"""
        return repr(getattr(cls, '_topology', None))

[docs]class Topology(object): """Class to define a Storm topology in a Python DSL.""" @classmethod def write(cls, stream): """Write the topology to a stream or file. Typically used to write to Nimbus. .. note:: This will not save the `specs` attribute, as that is not part of the Thrift output. """ def write_it(stream): transport_out = TMemoryBuffer() protocol_out = TBinaryProtocol(transport_out) cls._topology.write(protocol_out) transport_bytes = transport_out.getvalue() stream.write(transport_bytes) if isinstance(stream, string_types): with open(stream, 'wb') as output_file: write_it(output_file) else: write_it(stream) @classmethod def read(cls, stream): """Read a topology from a stream or file. .. note:: This will not properly reconstruct the `specs` attribute, as that is not included in the Thrift output. """ def read_it(stream): stream_bytes = transport_in = TMemoryBuffer(stream_bytes) protocol_in = TBinaryProtocol(transport_in) topology = storm_thrift.StormTopology() cls._topology = topology cls.thrift_bolts = topology.bolts cls.thrift_spouts = topology.spouts # Can't reconstruct Python specs from Thrift. cls.specs = [] if isinstance(stream, string_types): with open(stream, 'rb') as input_file: return read_it(input_file) else: return read_it(stream) @staticmethod def _spec_to_flux_dict(spec): """Convert a ComponentSpec into a dict as expected by Flux""" flux_dict = {'id':, 'constructorArgs': []} if isinstance(spec, ShellComponentSpec): if isinstance(spec, ShellBoltSpec): flux_dict['className'] = 'org.apache.storm.flux.wrappers.bolts.FluxShellBolt' else: flux_dict['className'] = 'org.apache.storm.flux.wrappers.spouts.FluxShellSpout' shell_object = flux_dict['constructorArgs'].append([shell_object.execution_command, shell_object.script]) if not spec.outputs: flux_dict['constructorArgs'].append(['NONE_BUT_FLUX_WANTS_SOMETHING_HERE']) for output_stream in spec.outputs.keys(): if output_stream == 'default': output_fields = spec.outputs['default'].output_fields flux_dict['constructorArgs'].append(output_fields) else: if 'configMethods' not in flux_dict: flux_dict['configMethods'] = [] flux_dict['configMethods'].append({ 'name': 'setNamedStream', 'args': [ output_stream, spec.outputs[output_stream].output_fields ] }) else: if spec.component_object.serialized_java is not None: raise TypeError('Flux does not support specifying serialized ' 'Java objects. Given: {!r}'.format(spec)) java_object = spec.component_object.java_object flux_dict['className'] = java_object.full_class_name # Convert JavaObjectArg instances into basic data types flux_dict['constructorArgs'] = to_python_arg_list(java_object.args_list) return flux_dict @staticmethod def _stream_to_flux_dict(spec, global_stream, grouping): """Convert a GlobalStreamId into a dict as expected by Flux""" flux_dict = {'from': global_stream.componentId, 'to':} grouping_dict = {'streamId': global_stream.streamId} for key, val in grouping.__dict__.items(): if val is not None: grouping_dict['type'] = key.upper() if key == 'fields': if val: grouping_dict['args'] = val else: grouping_dict['type'] = 'GLOBAL' elif key == 'custom_object': grouping_dict['type'] = 'CUSTOM' class_dict = {'className': val.full_class_name, 'args': to_python_arg_list(val.arg_list)} grouping_dict['customClass'] = class_dict flux_dict['grouping'] = grouping_dict return flux_dict @classmethod def to_flux_dict(cls, name): """Convert topology to dict that can written out as Flux YAML file.""" flux_dict = {'name': name, 'bolts': [], 'spouts': [], 'streams': []} for spec in cls.specs: if isinstance(spec, (JavaBoltSpec, ShellBoltSpec)): flux_dict['bolts'].append(cls._spec_to_flux_dict(spec)) for global_stream, grouping in spec.inputs.items(): stream_dict = cls._stream_to_flux_dict(spec, global_stream, grouping) flux_dict['streams'].append(stream_dict) elif isinstance(spec, (JavaSpoutSpec, ShellSpoutSpec)): flux_dict['spouts'].append(cls._spec_to_flux_dict(spec)) else: raise TypeError('Specifications should either be bolts or ' 'spouts. Given: {!r}'.format(spec)) flux_dict = {key: val for key, val in flux_dict.items() if val} return flux_dict