"""
Topology base class
"""
from __future__ import absolute_import
import simplejson as json
from pystorm.component import Component
from six import add_metaclass, iteritems, itervalues, string_types
from thriftpy.transport import TMemoryBuffer
from thriftpy.protocol import TBinaryProtocol
from ..thrift import Bolt, SpoutSpec, StormTopology
from .bolt import JavaBoltSpec, ShellBoltSpec
from .component import ComponentSpec, ShellComponentSpec
from .spout import JavaSpoutSpec, ShellSpoutSpec
from .util import to_python_arg_list
class TopologyType(type):
"""Class to define a Storm topology in a Python DSL."""
def __new__(mcs, classname, bases, class_dict):
bolt_specs = {}
spout_specs = {}
# Copy ComponentSpec items out of class_dict
specs = TopologyType.class_dict_to_specs(class_dict)
# Perform checks
for spec in itervalues(specs):
if isinstance(spec, (JavaBoltSpec, ShellBoltSpec)):
TopologyType.add_bolt_spec(spec, bolt_specs)
elif isinstance(spec, (JavaSpoutSpec, ShellSpoutSpec)):
TopologyType.add_spout_spec(spec, spout_specs)
else:
raise TypeError('Specifications should either be bolts or '
'spouts. Given: {!r}'.format(spec))
TopologyType.clean_spec_inputs(spec, specs)
if classname != 'Topology' and not spout_specs:
raise ValueError('A Topology requires at least one Spout')
if 'config' in class_dict:
config_dict = class_dict['config']
if not isinstance(config_dict, dict):
raise TypeError('Topology config must be a dictionary. Given: '
'{!r}'.format(config_dict))
else:
class_dict['config'] = {}
class_dict['thrift_bolts'] = bolt_specs
class_dict['thrift_spouts'] = spout_specs
class_dict['specs'] = list(specs.values())
class_dict['thrift_topology'] = StormTopology(spouts=spout_specs,
bolts=bolt_specs,
state_spouts={})
return type.__new__(mcs, classname, bases, class_dict)
@classmethod
def class_dict_to_specs(mcs, class_dict):
"""Extract valid `ComponentSpec` entries from `Topology.__dict__`."""
specs = {}
# Set spec names first
for name, spec in iteritems(class_dict):
if isinstance(spec, ComponentSpec):
# Use the variable name as the specification name.
if spec.name is None:
spec.name = name
if spec.name in specs:
raise ValueError("Duplicate component name: {}"
.format(spec.name))
else:
specs[spec.name] = spec
elif isinstance(spec, Component):
raise TypeError('Topology classes should have ComponentSpec '
'attributes. Did you forget to call the spec '
'class method for your component? Given: {!r}'
.format(spec))
return specs
@classmethod
def add_bolt_spec(mcs, spec, bolt_specs):
"""Add valid Bolt specs to `bolt_specs`; raise exceptions for others."""
if not spec.inputs:
cls_name = spec.component_cls.__name__
raise ValueError('{} "{}" requires at least one input, because it '
'is a Bolt.'.format(cls_name, spec.name))
bolt_specs[spec.name] = Bolt(bolt_object=spec.component_object,
common=spec.common)
@classmethod
def add_spout_spec(mcs, spec, spout_specs):
"""Add valid Spout specs to `spout_specs`; raise exceptions for others.
"""
if not spec.outputs:
cls_name = spec.component_cls.__name__
raise ValueError('{} "{}" requires at least one output, because it '
'is a Spout'.format(cls_name, spec.name))
spout_specs[spec.name] = SpoutSpec(spout_object=spec.component_object,
common=spec.common)
@classmethod
def clean_spec_inputs(mcs, spec, specs):
"""Convert `spec.inputs` to a dict mapping from stream IDs to groupings.
"""
if spec.inputs is None:
spec.inputs = {}
for stream_id, grouping in list(iteritems(spec.inputs)):
if isinstance(stream_id.componentId, ComponentSpec):
# Have to reinsert key after fix because hash changes
del spec.inputs[stream_id]
stream_id.componentId = stream_id.componentId.name
spec.inputs[stream_id] = grouping
# This should never happen, but it's worth checking for
elif stream_id.componentId is None:
raise TypeError('GlobalStreamId.componentId cannot be None.')
# Check for invalid fields grouping
stream_comp = specs[stream_id.componentId]
valid_fields = set(stream_comp.outputs[stream_id.streamId]
.output_fields)
if grouping.fields is not None:
for field in grouping.fields:
if field not in valid_fields:
raise ValueError('Field {!r} specified in grouping is '
'not a valid output field for the {!r}'
' {!r} stream.'.format(field,
stream_comp.name,
stream_id.streamId))
def __repr__(cls):
""":returns: A string representation of the topology"""
return repr(getattr(cls, '_topology', None))
@add_metaclass(TopologyType)
[docs]class Topology(object):
"""Class to define a Storm topology in a Python DSL."""
@classmethod
def write(cls, stream):
"""Write the topology to a stream or file.
Typically used to write to Nimbus.
.. note::
This will not save the `specs` attribute, as that is not part of
the Thrift output.
"""
def write_it(stream):
transport_out = TMemoryBuffer()
protocol_out = TBinaryProtocol(transport_out)
cls._topology.write(protocol_out)
transport_bytes = transport_out.getvalue()
stream.write(transport_bytes)
if isinstance(stream, string_types):
with open(stream, 'wb') as output_file:
write_it(output_file)
else:
write_it(stream)
@classmethod
def read(cls, stream):
"""Read a topology from a stream or file.
.. note::
This will not properly reconstruct the `specs` attribute, as that is
not included in the Thrift output.
"""
def read_it(stream):
stream_bytes = stream.read()
transport_in = TMemoryBuffer(stream_bytes)
protocol_in = TBinaryProtocol(transport_in)
topology = StormTopology()
topology.read(protocol_in)
cls._topology = topology
cls.thrift_bolts = topology.bolts
cls.thrift_spouts = topology.spouts
# Can't reconstruct Python specs from Thrift.
cls.specs = []
if isinstance(stream, string_types):
with open(stream, 'rb') as input_file:
return read_it(input_file)
else:
return read_it(stream)
@staticmethod
def _spec_to_flux_dict(spec):
"""Convert a ComponentSpec into a dict as expected by Flux"""
flux_dict = {'id': spec.name,
'constructorArgs': []}
if isinstance(spec, ShellComponentSpec):
if isinstance(spec, ShellBoltSpec):
flux_dict['className'] = 'org.apache.storm.flux.wrappers.bolts.FluxShellBolt'
else:
flux_dict['className'] = 'org.apache.storm.flux.wrappers.spouts.FluxShellSpout'
shell_object = spec.component_object.shell
flux_dict['constructorArgs'].append([shell_object.execution_command,
shell_object.script])
if not spec.outputs:
flux_dict['constructorArgs'].append(['NONE_BUT_FLUX_WANTS_SOMETHING_HERE'])
for output_stream in spec.outputs.keys():
if output_stream == 'default':
output_fields = spec.outputs['default'].output_fields
flux_dict['constructorArgs'].append(output_fields)
else:
if 'configMethods' not in flux_dict:
flux_dict['configMethods'] = []
flux_dict['configMethods'].append({
'name': 'setNamedStream',
'args': [
output_stream,
spec.outputs[output_stream].output_fields
]
})
flux_dict['parallelism'] = spec.par
if spec.config:
component_config = spec.config
if not isinstance(component_config, dict):
component_config = json.loads(component_config)
flux_dict.setdefault('configMethods', [])
for key, value in component_config.items():
flux_dict['configMethods'].append({
'name': 'addComponentConfig',
'args': [key, value]
})
else:
if spec.component_object.serialized_java is not None:
raise TypeError('Flux does not support specifying serialized '
'Java objects. Given: {!r}'.format(spec))
java_object = spec.component_object.java_object
flux_dict['className'] = java_object.full_class_name
# Convert JavaObjectArg instances into basic data types
flux_dict['constructorArgs'] = to_python_arg_list(java_object.args_list)
return flux_dict
@staticmethod
def _stream_to_flux_dict(spec, global_stream, grouping):
"""Convert a GlobalStreamId into a dict as expected by Flux"""
flux_dict = {'from': global_stream.componentId,
'to': spec.name}
grouping_dict = {'streamId': global_stream.streamId}
for key, val in grouping.__dict__.items():
if val is not None:
grouping_dict['type'] = key.upper()
if key == 'fields':
if val:
grouping_dict['args'] = val
else:
grouping_dict['type'] = 'GLOBAL'
elif key == 'custom_object':
grouping_dict['type'] = 'CUSTOM'
class_dict = {'className': val.full_class_name,
'args': to_python_arg_list(val.arg_list)}
grouping_dict['customClass'] = class_dict
flux_dict['grouping'] = grouping_dict
return flux_dict
@classmethod
def to_flux_dict(cls, name):
"""Convert topology to dict that can written out as Flux YAML file."""
flux_dict = {'name': name,
'bolts': [],
'spouts': [],
'streams': []}
for spec in cls.specs:
if isinstance(spec, (JavaBoltSpec, ShellBoltSpec)):
flux_dict['bolts'].append(cls._spec_to_flux_dict(spec))
for global_stream, grouping in spec.inputs.items():
stream_dict = cls._stream_to_flux_dict(spec,
global_stream,
grouping)
flux_dict['streams'].append(stream_dict)
elif isinstance(spec, (JavaSpoutSpec, ShellSpoutSpec)):
flux_dict['spouts'].append(cls._spec_to_flux_dict(spec))
else:
raise TypeError('Specifications should either be bolts or '
'spouts. Given: {!r}'.format(spec))
flux_dict = {key: val for key, val in flux_dict.items() if val}
return flux_dict