Python API: Consuming Messages

The other side of the Python API: Emitting Messages document is consuming messages.

Note

Messages that you consume come with a topic and a body (dict). The content of a message can be useful! For instance, messages from FAS that come when a user edits their profile can tell you who made the change and what fields were changed. fedmsg was designed with security and message validation in mind, but its still so new that you shouldn’t trust it. When building consumers, you should always verify information with existing webapps before acting on messages.

  • nirik> trust, but verify. or... don’t trust, and verify. ;)

Note

This document is on how to consume messages. But if you want to know what messages there are, you might check out List of Message Topics.

“Naive” Consuming

The most straightforward way, programmatically, to consume messages is to use fedmsg.tail_messages(). It is a generator that yields 4-tuples of the form (name, endpoint, topic, message):

>>> import fedmsg

>>> # Read in the config from /etc/fedmsg.d/
>>> config = fedmsg.config.load_config([], None)

>>> # Disable a warning about not sending.  We know.  We only want to tail.
>>> config['mute'] = True

>>> # Disable timing out so that we can tail forever.  This is deprecated
>>> # and will disappear in future versions.
>>> config['timeout'] = 0

>>> for name, endpoint, topic, msg in fedmsg.tail_messages(**config):
...     print topic, msg  # or use fedmsg.encoding.pretty_dumps(msg)

The API is easy to use and should hopefully make your scripts easy to understand and maintain.

For production services, you will want to use the hub-consumer approach described further below.

Note that the fedmsg.tail_messages() used to be quite inefficient; it spun in a sleep, listen, yield loop that was quite costly in IO and CPU terms. Typically, a script that used fedmsg.tail_messages() would consume 100% of a CPU. That has since be resolved by introducing the use of a zmq.Poller.

Note

The fedmsg-tail command described in Commands uses fedmsg.tail_messages() to “tail” the bus.

“Naive” API

fedmsg.tail_messages(*args, **kw)

Tail messages on the bus.

Generator that yields tuples of the form: (name, endpoint, topic, message)

The Hub-Consumer Approach

In contrast to the “naive” approach above, a more efficient way of consuming events can be accomplished by way of the fedmsg-hub. The drawback is that programming it is a sort of indirect and declarative; it can be confusing at first.

To consume messages and do with them what you’d like, you need to:

  • Write a class which extends fedmsg.consumers.FedmsgConsumer
  • Override certain properties and methods of that class. Namely,
    • topic – A string used soley for constraining what messages make their way to the consumer; the consumer can send messages on any topic. You may use ‘splats’ (‘*’) in the topic and subscribe to 'org.fedoraproject.stg.koji.*' to get all of the messages from koji in the staging environment.
    • config_key – A string used to declare a configuration entry that must be set to True for your consumer to be activated by the fedmsg-hub.
    • consume – A method that accepts a dict (the message) and contains code that “does what you would like to do”.
    • replay_name – (optional) The name of the replay endpoint where the system should query playback in case of missing messages. It must match a service key in replay_endpoints.
  • Register your class on the moksha.consumer python entry-point.

A simple example

Luke Macken wrote a simple example of a koji consumer. It’s a good place to start if you’re writing your own consumer.

An Example From “busmon”

In the busmon app, all messages from the hub are processed to be formatted and displayed on a client’s browser. We mark them up with a pretty-print format and use pygments to colorize them.

In the example below, the MessageColorizer consumer simply subscribes to ‘*’; it will receive every message that hits it’s local fedmsg-hub.

The config_key = 'busmon.consumers.enabled' line means that a 'busmon.consumers.enabled': True entry must appear in the fedmsg config for the consumer to be enabled.

Here’s the full example from busmon, it consumes messages from every topic, formats them in pretty colored HTML and then re-sends them out on a new topic:

import pygments.lexers
import pygments.formatters

import fedmsg
import fedmsg.encoding
import fedmsg.consumers

class MessageColorizer(fedmsg.consumers.FedmsgConsumer):
    topic = "*"
    jsonify = False
    config_key = 'busmon.consumers.enabled'

    def consume(self, message):
        destination_topic = "colorized-messages"

        # Just so we don't create an infinite feedback loop.
        if self.destination_topic in message.topic:
            return

        # Format the incoming message
        code = pygments.highlight(
            fedmsg.encoding.pretty_dumps(fedmsg.encoding.loads(message.body)),
            pygments.lexers.JavascriptLexer(),
            pygments.formatters.HtmlFormatter(full=False)
        ).strip()

        # Ship it!
        fedmsg.publish(
            topic=self.destination_topic,
            msg=code,
        )

Now, just defining a consumer isn’t enough to have it picked up by the fedmsg-hub when it runs. You must also declare the consumer as an entry-point in your app’s setup.py, like this:

setup(
    ...
    entry_points={
        'moksha.consumer': (
            'colorizer = busmon.consumers:MessageColorizer',
        ),
    },
)

At initialization, fedmsg-hub looks for all the objects registered on the moksha.consumer entry point and loads them

FedmsgConsumer API

class fedmsg.consumers.FedmsgConsumer(hub)

Base class for fedmsg consumers.

The fedmsg consumption API is really just a thin wrapper over moksha. Moksha expects consumers to:

  • Declare themselves on the moksha.consumers python entry-point.
  • Declare a consume(...) method.
  • Specify a topic.

All this class does in addition to moksha is:

  • Provide a mechanism for disabling/enabling consumers by configuration in a consistent way (namely, by use of config_key).

    If you set validate_signatures = False on your consumer, it will be exempt from global validation rules. Messages will not be checked for authenticity before being handed off to your consume method. This is handy if you’re developing or building a special-case consumer. The consumer used by fedmsg-relay (described in Commands) sets validate_signatures = False so that it can transparently forward along everything and let the terminal endpoints decide whether or not to consume particular messages.

  • Provide a mechanism for automatically validating fedmsg messages with fedmsg.crypto.

  • Provide a mechanism to play back messages that haven’t been received by the hub even though emitted. To make use of this feature, you have to set replay_name to some string corresponding to an endpoint in the replay_endpoints dict in the configuration.

    You must set config_key to some string. A config value by that name must be True in the config parsed by fedmsg.config in order for the consumer to be activated.

DIY - Listening with Raw zeromq

So you want to receive messages without using any fedmsg libs? (say you’re on some ancient system where moksha and twisted won’t fly) If you can get python-zmq built, you’re in good shape. Use the following example script as a starting point for whatever you want to build:

#!/usr/bin/env python

import json
import pprint
import zmq


def listen_and_print():
    # You can listen to stg at "tcp://stg.fedoraproject.org:9940"
    endpoint = "tcp://hub.fedoraproject.org:9940"
    # Set this to something like org.fedoraproject.prod.compose
    topic = 'org.fedoraproject.prod.'

    ctx = zmq.Context()
    s = ctx.socket(zmq.SUB)
    s.connect(endpoint)

    s.setsockopt(zmq.SUBSCRIBE, topic)

    poller = zmq.Poller()
    poller.register(s, zmq.POLLIN)

    while True:
        evts = poller.poll()  # This blocks until a message arrives
        topic, msg = s.recv_multipart()
        print topic, pprint.pformat(json.loads(msg))

if __name__ == "__main__":
    listen_and_print()

Just bear in mind that you don’t reap any of the benefits of fedmsg.crypto or fedmsg.meta.

Note

In the example above, the topic is just 'org.fedoraproject.prod.' and not 'org.fedoraproject.prod.*'. The * that you see elsewhere is a Moksha convention and it is actually just stripped from the topic.

Why? The * has meaning in AMQP, but not zeromq. The Moksha project (which underlies fedmsg) aims to be an abstraction layer over zeromq, AMQP, and STOMP and contains some code that allows use of the * for zeromq, in order to make it look more like AMQP or STOMP (superficially). fedmsg (being built on Moksha) inherits this behavior even though it only uses the zeromq backend. See these comments for some discussion.

fedmsg

Previous topic

Python API: Emitting Messages

Next topic

Message Encoding – JSON

Edit this document

Go to Python API: Consuming Messages on GitHub.

Use the web interface to fork the repo, edit the file, and send a pull request.

Your changes will be queued for review under project's Pull requests tab on Github.

Fork me on GitHub