Metadata-Version: 1.1
Name: amqpipe
Version: 1.2
Summary: Twisted based pipeline framework for AMQP
Home-page: https://github.com/Fatal1ty/amqpipe
Author: Alexander Tikhonov
Author-email: random.gauss@gmail.com
License: MIT
Description: AMQPipe
        =======
        
        .. image:: https://travis-ci.org/Fatal1ty/amqpipe.svg?branch=master
            :target: https://travis-ci.org/Fatal1ty/amqpipe
        
        .. image:: https://requires.io/github/Fatal1ty/amqpipe/requirements.svg?branch=master
            :target: https://requires.io/github/Fatal1ty/amqpipe/requirements/?branch=master
            :alt: Requirements Status
        
        .. image:: https://img.shields.io/pypi/v/amqpipe.svg
            :target: https://pypi.python.org/pypi/amqpipe
        
        .. image:: https://img.shields.io/pypi/pyversions/amqpipe.svg
            :target: https://pypi.python.org/pypi/amqpipe/
        
        .. image:: https://img.shields.io/badge/license-MIT-blue.svg
            :target: https://raw.githubusercontent.com/Fatal1ty/amqpipe/master/LICENSE
        
        Twisted based pipeline framework for AMQP. It allow you to create fast
        asynchronous services which follow ideology:
        
        -  get message from queue
        -  doing something with message
        -  publish some result
        
        Installation
        ------------
        
        Install via pip:
        
        ::
        
                pip install amqpipe
        
        Basic usage
        -----------
        
        The minimal module based on AMQPipe is:
        
        .. code:: python
        
            from amqpipe import AMQPipe
        
            pipe = AMQPipe()
            pipe.run()
        
        It will simply get all messages from one RabbitMQ queue and publish them
        to other RabbitMQ exchange.
        
        Now we define some action on messages:
        
        .. code:: python
        
            import hashlib
            from amqpipe import AMQPipe
        
            def action(message):
                return hashlib.md5(message).hexdigest()
        
            pipe = AMQPipe(action=action)
            pipe.run()
        
        It will publish md5 checksum for every message as result.
        
        If messages in input queue are in predefined format then you can define
        converter-function:
        
        .. code:: python
        
            import hashlib
            from amqpipe import AMQPipe
        
            def converter(message):
                return message['text']
        
            def action(text):
                return hashlib.md5(text).hexdigest()
        
            pipe = AMQPipe(converter=converter, action=action)
            pipe.run()
        
        You can define service-specific arguments:
        
        .. code:: python
        
            import hashlib
            from amqpipe import AMQPipe
        
            class Processor:
                def set_field(self, field):
                    self.field = field
        
            processor = Processor()
        
            def init(args):
                processor.set_field(args.field)
        
            def converter(message):
                return message.get(processor.field)
        
            def action(text):
                return hashlib.md5(text).hexdigest()
        
            pipe = AMQPipe(converter, action, init)
            pipe.parser.add_argument('--field', default='text', help='Field name for retrieving message value')
            pipe.run()
        
        You can connect to database in ``init`` function or do some other things
        for initialization.
        
        If your action returns Deferred then result would be published to
        RabbitMQ when this Deferred will be resolved:
        
        .. code:: python
        
            import logging
            from twisted.internet import defer
            from amqpipe import AMQPipe
        
            logger = logging.getLogger(__name__)
        
            class Processor:
                def set_field(self, field):
                    self.field = field
        
            processor = Processor()
        
            def init(args):
                connect_to_db()
                ...
        
            def converter(message):
                return message.get(processor.field)
        
            @defer.inlineCallbacks
            def action(text):
                result = yield db_query(text)
                logger.info('Get from db: %s', result)
                defer.returnValue(result)
        
            pipe = AMQPipe(converter, action, init)
            pipe.parser.add_argument('--field', default='text', help='Field name for retrieving message value')
            pipe.run()
        
        Init function may return Deferred too.
Platform: all
Classifier: Development Status :: 5 - Production/Stable
Classifier: Environment :: Console
Classifier: Framework :: Twisted
Classifier: License :: OSI Approved :: MIT License
Classifier: Natural Language :: English
Classifier: Operating System :: MacOS :: MacOS X
Classifier: Operating System :: POSIX
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 2
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System
Classifier: Topic :: System :: Software Distribution
