Source code for streamparse.dsl.topology

"""
Topology base class
"""
from __future__ import absolute_import

import simplejson as json
from pystorm.component import Component
from six import add_metaclass, iteritems, itervalues, string_types
from thriftpy.transport import TMemoryBuffer
from thriftpy.protocol import TBinaryProtocol

from ..thrift import Bolt, SpoutSpec, StormTopology
from .bolt import JavaBoltSpec, ShellBoltSpec
from .component import ComponentSpec, ShellComponentSpec
from .spout import JavaSpoutSpec, ShellSpoutSpec
from .util import to_python_arg_list


class TopologyType(type):
    """Class to define a Storm topology in a Python DSL."""

    def __new__(mcs, classname, bases, class_dict):
        bolt_specs = {}
        spout_specs = {}
        # Copy ComponentSpec items out of class_dict
        specs = TopologyType.class_dict_to_specs(class_dict)
        # Perform checks
        for spec in itervalues(specs):
            if isinstance(spec, (JavaBoltSpec, ShellBoltSpec)):
                TopologyType.add_bolt_spec(spec, bolt_specs)
            elif isinstance(spec, (JavaSpoutSpec, ShellSpoutSpec)):
                TopologyType.add_spout_spec(spec, spout_specs)
            else:
                raise TypeError('Specifications should either be bolts or '
                                'spouts.  Given: {!r}'.format(spec))
            TopologyType.clean_spec_inputs(spec, specs)
        if classname != 'Topology' and not spout_specs:
            raise ValueError('A Topology requires at least one Spout')
        if 'config' in class_dict:
            config_dict = class_dict['config']
            if not isinstance(config_dict, dict):
                raise TypeError('Topology config must be a dictionary. Given: '
                                '{!r}'.format(config_dict))
        else:
            class_dict['config'] = {}
        class_dict['thrift_bolts'] = bolt_specs
        class_dict['thrift_spouts'] = spout_specs
        class_dict['specs'] = list(specs.values())
        class_dict['thrift_topology'] = StormTopology(spouts=spout_specs,
                                                      bolts=bolt_specs,
                                                      state_spouts={})
        return type.__new__(mcs, classname, bases, class_dict)

    @classmethod
    def class_dict_to_specs(mcs, class_dict):
        """Extract valid `ComponentSpec` entries from `Topology.__dict__`."""
        specs = {}
        # Set spec names first
        for name, spec in iteritems(class_dict):
            if isinstance(spec, ComponentSpec):
                # Use the variable name as the specification name.
                if spec.name is None:
                    spec.name = name
                if spec.name in specs:
                    raise ValueError("Duplicate component name: {}"
                                     .format(spec.name))
                else:
                    specs[spec.name] = spec
            elif isinstance(spec, Component):
                raise TypeError('Topology classes should have ComponentSpec '
                                'attributes.  Did you forget to call the spec '
                                'class method for your component?  Given: {!r}'
                                .format(spec))
        return specs

    @classmethod
    def add_bolt_spec(mcs, spec, bolt_specs):
        """Add valid Bolt specs to `bolt_specs`; raise exceptions for others."""
        if not spec.inputs:
            cls_name = spec.component_cls.__name__
            raise ValueError('{} "{}" requires at least one input, because it '
                             'is a Bolt.'.format(cls_name, spec.name))
        bolt_specs[spec.name] = Bolt(bolt_object=spec.component_object,
                                     common=spec.common)

    @classmethod
    def add_spout_spec(mcs, spec, spout_specs):
        """Add valid Spout specs to `spout_specs`; raise exceptions for others.
        """
        if not spec.outputs:
            cls_name = spec.component_cls.__name__
            raise ValueError('{} "{}" requires at least one output, because it '
                             'is a Spout'.format(cls_name, spec.name))
        spout_specs[spec.name] = SpoutSpec(spout_object=spec.component_object,
                                           common=spec.common)

    @classmethod
    def clean_spec_inputs(mcs, spec, specs):
        """Convert `spec.inputs` to a dict mapping from stream IDs to groupings.
        """
        if spec.inputs is None:
            spec.inputs = {}
        for stream_id, grouping in list(iteritems(spec.inputs)):
            if isinstance(stream_id.componentId, ComponentSpec):
                # Have to reinsert key after fix because hash changes
                del spec.inputs[stream_id]
                stream_id.componentId = stream_id.componentId.name
                spec.inputs[stream_id] = grouping
            # This should never happen, but it's worth checking for
            elif stream_id.componentId is None:
                raise TypeError('GlobalStreamId.componentId cannot be None.')
            # Check for invalid fields grouping
            stream_comp = specs[stream_id.componentId]
            valid_fields = set(stream_comp.outputs[stream_id.streamId]
                               .output_fields)
            if grouping.fields is not None:
                for field in grouping.fields:
                    if field not in valid_fields:
                        raise ValueError('Field {!r} specified in grouping is '
                                         'not a valid output field for the {!r}'
                                         ' {!r} stream.'.format(field,
                                                                stream_comp.name,
                                                                stream_id.streamId))

    def __repr__(cls):
        """:returns: A string representation of the topology"""
        return repr(getattr(cls, '_topology', None))


@add_metaclass(TopologyType)
[docs]class Topology(object): """Class to define a Storm topology in a Python DSL.""" @classmethod def write(cls, stream): """Write the topology to a stream or file. Typically used to write to Nimbus. .. note:: This will not save the `specs` attribute, as that is not part of the Thrift output. """ def write_it(stream): transport_out = TMemoryBuffer() protocol_out = TBinaryProtocol(transport_out) cls._topology.write(protocol_out) transport_bytes = transport_out.getvalue() stream.write(transport_bytes) if isinstance(stream, string_types): with open(stream, 'wb') as output_file: write_it(output_file) else: write_it(stream) @classmethod def read(cls, stream): """Read a topology from a stream or file. .. note:: This will not properly reconstruct the `specs` attribute, as that is not included in the Thrift output. """ def read_it(stream): stream_bytes = stream.read() transport_in = TMemoryBuffer(stream_bytes) protocol_in = TBinaryProtocol(transport_in) topology = StormTopology() topology.read(protocol_in) cls._topology = topology cls.thrift_bolts = topology.bolts cls.thrift_spouts = topology.spouts # Can't reconstruct Python specs from Thrift. cls.specs = [] if isinstance(stream, string_types): with open(stream, 'rb') as input_file: return read_it(input_file) else: return read_it(stream) @staticmethod def _spec_to_flux_dict(spec): """Convert a ComponentSpec into a dict as expected by Flux""" flux_dict = {'id': spec.name, 'constructorArgs': []} if isinstance(spec, ShellComponentSpec): if isinstance(spec, ShellBoltSpec): flux_dict['className'] = 'org.apache.storm.flux.wrappers.bolts.FluxShellBolt' else: flux_dict['className'] = 'org.apache.storm.flux.wrappers.spouts.FluxShellSpout' shell_object = spec.component_object.shell flux_dict['constructorArgs'].append([shell_object.execution_command, shell_object.script]) if not spec.outputs: flux_dict['constructorArgs'].append(['NONE_BUT_FLUX_WANTS_SOMETHING_HERE']) for output_stream in spec.outputs.keys(): if output_stream == 'default': output_fields = spec.outputs['default'].output_fields flux_dict['constructorArgs'].append(output_fields) else: if 'configMethods' not in flux_dict: flux_dict['configMethods'] = [] flux_dict['configMethods'].append({ 'name': 'setNamedStream', 'args': [ output_stream, spec.outputs[output_stream].output_fields ] }) flux_dict['parallelism'] = spec.par if spec.config: component_config = spec.config if not isinstance(component_config, dict): component_config = json.loads(component_config) flux_dict.setdefault('configMethods', []) for key, value in component_config.items(): flux_dict['configMethods'].append({ 'name': 'addComponentConfig', 'args': [key, value] }) else: if spec.component_object.serialized_java is not None: raise TypeError('Flux does not support specifying serialized ' 'Java objects. Given: {!r}'.format(spec)) java_object = spec.component_object.java_object flux_dict['className'] = java_object.full_class_name # Convert JavaObjectArg instances into basic data types flux_dict['constructorArgs'] = to_python_arg_list(java_object.args_list) return flux_dict @staticmethod def _stream_to_flux_dict(spec, global_stream, grouping): """Convert a GlobalStreamId into a dict as expected by Flux""" flux_dict = {'from': global_stream.componentId, 'to': spec.name} grouping_dict = {'streamId': global_stream.streamId} for key, val in grouping.__dict__.items(): if val is not None: grouping_dict['type'] = key.upper() if key == 'fields': if val: grouping_dict['args'] = val else: grouping_dict['type'] = 'GLOBAL' elif key == 'custom_object': grouping_dict['type'] = 'CUSTOM' class_dict = {'className': val.full_class_name, 'args': to_python_arg_list(val.arg_list)} grouping_dict['customClass'] = class_dict flux_dict['grouping'] = grouping_dict return flux_dict @classmethod def to_flux_dict(cls, name): """Convert topology to dict that can written out as Flux YAML file.""" flux_dict = {'name': name, 'bolts': [], 'spouts': [], 'streams': []} for spec in cls.specs: if isinstance(spec, (JavaBoltSpec, ShellBoltSpec)): flux_dict['bolts'].append(cls._spec_to_flux_dict(spec)) for global_stream, grouping in spec.inputs.items(): stream_dict = cls._stream_to_flux_dict(spec, global_stream, grouping) flux_dict['streams'].append(stream_dict) elif isinstance(spec, (JavaSpoutSpec, ShellSpoutSpec)): flux_dict['spouts'].append(cls._spec_to_flux_dict(spec)) else: raise TypeError('Specifications should either be bolts or ' 'spouts. Given: {!r}'.format(spec)) flux_dict = {key: val for key, val in flux_dict.items() if val} return flux_dict