Source code for ceilometer.collector

#
# Copyright 2012-2013 eNovance <licensing@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import socket

import msgpack
import oslo.messaging
from oslo_config import cfg
from oslo_utils import netutils
from oslo_utils import timeutils
from oslo_utils import units

from ceilometer import dispatcher
from ceilometer.event.storage import models
from ceilometer import messaging
from ceilometer.i18n import _, _LE
from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service
from ceilometer import utils

OPTS = [
    cfg.StrOpt('udp_address',
               default='0.0.0.0',
               help='Address to which the UDP socket is bound. Set to '
               'an empty string to disable.'),
    cfg.IntOpt('udp_port',
               default=4952,
               help='Port to which the UDP socket is bound.'),
    cfg.BoolOpt('requeue_sample_on_dispatcher_error',
                default=False,
                help='Requeue the sample on the collector sample queue '
                'when the collector fails to dispatch it. This is only valid '
                'if the sample come from the notifier publisher.'),
    cfg.BoolOpt('requeue_event_on_dispatcher_error',
                default=False,
                help='Requeue the event on the collector event queue '
                'when the collector fails to dispatch it.'),
]

cfg.CONF.register_opts(OPTS, group="collector")
cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging',
                    group='publisher_rpc')
cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.messaging',
                    group='publisher_notifier')
cfg.CONF.import_opt('event_topic', 'ceilometer.publisher.messaging',
                    group='publisher_notifier')
cfg.CONF.import_opt('store_events', 'ceilometer.notification',
                    group='notification')


LOG = log.getLogger(__name__)


[docs]class CollectorService(os_service.Service): """Listener for the collector service."""
[docs] def start(self): """Bind the UDP socket and handle incoming data.""" # ensure dispatcher is configured before starting other services self.dispatcher_manager = dispatcher.load_dispatcher_manager() self.rpc_server = None self.sample_listener = None self.event_listener = None super(CollectorService, self).start() if cfg.CONF.collector.udp_address: self.tg.add_thread(self.start_udp) transport = messaging.get_transport(optional=True) if transport: self.rpc_server = messaging.get_rpc_server( transport, cfg.CONF.publisher_rpc.metering_topic, self) sample_target = oslo.messaging.Target( topic=cfg.CONF.publisher_notifier.metering_topic) self.sample_listener = messaging.get_notification_listener( transport, [sample_target], [SampleEndpoint(self.dispatcher_manager)], allow_requeue=(cfg.CONF.collector. requeue_sample_on_dispatcher_error)) if cfg.CONF.notification.store_events: event_target = oslo.messaging.Target( topic=cfg.CONF.publisher_notifier.event_topic) self.event_listener = messaging.get_notification_listener( transport, [event_target], [EventEndpoint(self.dispatcher_manager)], allow_requeue=(cfg.CONF.collector. requeue_event_on_dispatcher_error)) self.event_listener.start() self.rpc_server.start() self.sample_listener.start() if not cfg.CONF.collector.udp_address: # Add a dummy thread to have wait() working self.tg.add_timer(604800, lambda: None)
[docs] def start_udp(self): address_family = socket.AF_INET if netutils.is_valid_ipv6(cfg.CONF.collector.udp_address): address_family = socket.AF_INET6 udp = socket.socket(address_family, socket.SOCK_DGRAM) udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) udp.bind((cfg.CONF.collector.udp_address, cfg.CONF.collector.udp_port)) self.udp_run = True while self.udp_run: # NOTE(jd) Arbitrary limit of 64K because that ought to be # enough for anybody. data, source = udp.recvfrom(64 * units.Ki) try: sample = msgpack.loads(data, encoding='utf-8') except Exception: LOG.warn(_("UDP: Cannot decode data sent by %s"), source) else: try: LOG.debug(_("UDP: Storing %s"), sample) self.dispatcher_manager.map_method('record_metering_data', sample) except Exception: LOG.exception(_("UDP: Unable to store meter"))
[docs] def stop(self): self.udp_run = False if self.rpc_server: self.rpc_server.stop() if self.sample_listener: utils.kill_listeners([self.sample_listener]) if self.event_listener: utils.kill_listeners([self.event_listener]) super(CollectorService, self).stop()
[docs] def record_metering_data(self, context, data): """RPC endpoint for messages we send to ourselves. When the notification messages are re-published through the RPC publisher, this method receives them for processing. """ self.dispatcher_manager.map_method('record_metering_data', data=data)
[docs]class CollectorEndpoint(object): def __init__(self, dispatcher_manager, requeue_on_error): self.dispatcher_manager = dispatcher_manager self.requeue_on_error = requeue_on_error
[docs] def sample(self, ctxt, publisher_id, event_type, payload, metadata): """RPC endpoint for notification messages When another service sends a notification over the message bus, this method receives it. """ try: self.dispatcher_manager.map_method(self.method, payload) except Exception: if self.requeue_on_error: LOG.exception(_LE("Dispatcher failed to handle the %s, " "requeue it."), self.ep_type) return oslo.messaging.NotificationResult.REQUEUE raise
[docs]class SampleEndpoint(CollectorEndpoint): method = 'record_metering_data' ep_type = 'sample' def __init__(self, dispatcher_manager): super(SampleEndpoint, self).__init__( dispatcher_manager, cfg.CONF.collector.requeue_sample_on_dispatcher_error)
[docs]class EventEndpoint(CollectorEndpoint): method = 'record_events' ep_type = 'event' def __init__(self, dispatcher_manager): super(EventEndpoint, self).__init__( dispatcher_manager, cfg.CONF.collector.requeue_event_on_dispatcher_error)
[docs] def sample(self, ctxt, publisher_id, event_type, payload, metadata): events = [] for ev in payload: try: events.append( models.Event( message_id=ev['message_id'], event_type=ev['event_type'], generated=timeutils.normalize_time( timeutils.parse_isotime(ev['generated'])), traits=[models.Trait( name, dtype, models.Trait.convert_value(dtype, value)) for name, dtype, value in ev['traits']], raw=ev.get('raw', {})) ) except Exception: LOG.exception(_LE("Error processing event and it will be " "dropped: %s"), ev) return super(EventEndpoint, self).sample( ctxt, publisher_id, event_type, events, metadata)