Source code for graphitesend.graphitesend

#!/usr/bin/env python

    import gevent
except ImportError:
    gevent = False

import functools
import pickle
import socket
import struct
import time
import random

from .formatter import GraphiteStructuredFormatter
from .block_metric import BlockMetric

_module_instance = None

default_graphite_server = 'graphite'

[docs]class GraphiteSendException(Exception): pass
[docs]class GraphiteClient(object): """ Graphite Client that will setup a TCP connection to graphite. :param prefix: string added to the start of all metrics :type prefix: Default: "systems." :param graphite_server: hostname or ip address of graphite server :type graphite_server: Default: graphite :param graphite_port: TCP port we will connect to :type graphite_port: Default: The value of default_port :param debug: Toggle debug messages :type debug: True or False :param group: string added to after system_name and before metric name :param system_name: FDQN of the system generating the metrics :type system_name: Default: current FDQN :param suffix: string added to the end of all metrics :param lowercase_metric_names: Toggle the .lower() of all metric names :param fqdn_squash: Change to host_example_com :type fqdn_squash: True or False :param dryrun: Toggle if it will really send metrics or just return them :type dryrun: True or False :param timeout_in_seconds: Number of seconds before a connection is timed out. :param asynchronous: Send messages asynchronouly via gevent (You have to monkey patch sockets for it to work) :param clean_metric_name: Does GraphiteClient needs to clean metric's name :type clean_metric_name: True or False :param formatter: Callable that will format metric names It will then send any metrics that you give it via the .send() or .send_dict(). You can also take advantage of the prefix, group and system_name options that allow you to setup default locations where your whisper files will be kept. eg. ( where 'linuxserver' is the name of the localhost) .. code-block:: python >>> init().prefix systems.linuxserver. >>> init(system_name='remote_host').prefix systems.remote_host. >>> init(group='cpu').prefix systems.linuxserver.cpu. >>> init(prefix='apache').prefix apache. """ #: If graphite_port is not given, this port will be used default_port = 2003 #: Default block metric class block_metric_cls = BlockMetric def __init__(self, graphite_server=None, graphite_port=None, prefix=None, timeout_in_seconds=2, debug=False, group=None, system_name=None, suffix=None, lowercase_metric_names=False, connect_on_create=True, fqdn_squash=False, dryrun=False, asynchronous=False, autoreconnect=False, clean_metric_name=True, formatter=None): """ setup the connection to the graphite server and work out the prefix. This allows for very simple syntax when sending messages to the graphite server. """ if not graphite_port: graphite_port = self.default_port # If we are not passed a host, then use the graphite server defined # in the module. if not graphite_server: graphite_server = default_graphite_server self.addr = (graphite_server, graphite_port) # If this is a dry run, then we do not want to configure a connection # or try and make the connection once we create the object. self.dryrun = dryrun if self.dryrun: self.addr = None graphite_server = None connect_on_create = False # Only connect to the graphite server and port if we tell you too. # This is mostly used for testing. self.timeout_in_seconds = int(timeout_in_seconds) if connect_on_create: self.connect() self.debug = debug self.lastmessage = None self.asynchronous = False if asynchronous: self.asynchronous = self.enable_asynchronous() self._autoreconnect = autoreconnect if formatter is not None: self.formatter = formatter else: self.formatter = GraphiteStructuredFormatter(prefix=prefix, group=group, system_name=system_name, suffix=suffix, lowercase_metric_names=lowercase_metric_names, fqdn_squash=fqdn_squash, clean_metric_name=clean_metric_name)
[docs] def connect(self): """ Make a TCP connection to the graphite server on port self.port """ self.socket = socket.socket() self.socket.settimeout(self.timeout_in_seconds) try: self.socket.connect(self.addr) except socket.timeout: raise GraphiteSendException( "Took over %d second(s) to connect to %s" % (self.timeout_in_seconds, self.addr)) except socket.gaierror: raise GraphiteSendException( "No address associated with hostname %s:%s" % self.addr) except Exception as error: raise GraphiteSendException( "unknown exception while connecting to %s - %s" % (self.addr, error) ) return self.socket
[docs] def reconnect(self): self.disconnect() self.connect()
[docs] def autoreconnect(self, sleep=1, attempt=3, exponential=True, jitter=5): """ Tries to reconnect with some delay: exponential=False: up to `attempt` times with `sleep` seconds between each try exponential=True: up to `attempt` times with exponential growing `sleep` and random delay in range 1..`jitter` (exponential backoff) :param sleep: time to sleep between two attempts to reconnect :type sleep: float or int :param attempt: maximal number of attempts :type attempt: int :param exponential: if set - use exponential backoff logic :type exponential: bool :param jitter: top value of random delay, sec :type jitter: int """ p = 0 while attempt is None or attempt > 0: try: self.reconnect() return True except GraphiteSendException: if exponential: p += 1 time.sleep(pow(sleep, p) + random.randint(1, jitter)) else: time.sleep(sleep) attempt -= 1 return False
[docs] def clean_metric_name(self, metric_name): """ Make sure the metric is free of control chars, spaces, tabs, etc. """ return self.formatter.clean_metric_name(metric_name)
[docs] def disconnect(self): """ Close the TCP connection with the graphite server. """ try: self.socket.shutdown(1) # If its currently a socket, set it to None except AttributeError: self.socket = None except Exception: self.socket = None # Set the self.socket to None, no matter what. finally: self.socket = None
def _dispatch_send(self, message): """ Dispatch the different steps of sending """ if self.dryrun: return message if not self.socket: raise GraphiteSendException( "Socket was not created before send" ) sending_function = self._send if self._autoreconnect: sending_function = self._send_and_reconnect try: if self.asynchronous and gevent: gevent.spawn(sending_function, message) else: sending_function(message) except Exception as e: self._handle_send_error(e) return "sent {0} long message: {1}".format(len(message), message[:75]) def _handle_send_error(self, error): if isinstance(error, socket.gaierror): raise GraphiteSendException( "Failed to send data to %s, with error: %s" % (self.addr, error)) elif isinstance(error, socket.error): raise GraphiteSendException( "Socket closed before able to send data to %s, " "with error: %s" % (self.addr, error) ) else: raise GraphiteSendException( "Unknown error while trying to send data down socket to %s, " "error: %s" % (self.addr, error) ) def _send(self, message): """ Given a message send it to the graphite server. """ self.socket.sendall(message.encode("ascii")) def _send_and_reconnect(self, message): """Send _message_ to Graphite Server and attempt reconnect on failure. If _autoreconnect_ was specified, attempt to reconnect if first send fails. :raises AttributeError: When the socket has not been set. :raises socket.error: When the socket connection is no longer valid. """ try: self.socket.sendall(message.encode("ascii")) except (AttributeError, socket.error): if not self.autoreconnect(): raise else: self.socket.sendall(message.encode("ascii")) def _presend(self, message): """ Complete any message alteration tasks before sending to the graphite server. """ return message
[docs] def format_message(self, metric, value, timestamp=None, formatter=None): if formatter is None: formatter = self.formatter if timestamp is None: timestamp = time.time() timestamp = int(timestamp) metric = formatter(metric) return "{} {} {}\n".format(metric, value, timestamp)
[docs] def send(self, metric, value, timestamp=None, formatter=None): """ Format a single metric/value pair, and send it to the graphite server. :param metric: name of the metric :type prefix: string :param value: value of the metric :type prefix: float or int :param timestmap: epoch time of the event :type prefix: float or int :param formatter: option non-default formatter :type prefix: callable .. code-block:: python >>> g = init() >>> g.send("metric", 54) .. code-block:: python >>> g = init() >>> g.send(metric="metricname", value=73) """ message = self.format_message(metric, value, timestamp, formatter) message = self. _presend(message) return self._dispatch_send(message)
[docs] def send_dict(self, data, timestamp=None, formatter=None): """ Format a dict of metric/values pairs, and send them all to the graphite server. :param data: key,value pair of metric name and metric value :type prefix: dict :param timestmap: epoch time of the event :type prefix: float or int :param formatter: option non-default formatter :type prefix: callable .. code-block:: python >>> g = init() >>> g.send_dict({'metric1': 54, 'metric2': 43, 'metricN': 999}) """ metric_list = [] for metric, value in data.items(): tmp_message = self.format_message(metric, value, timestamp, formatter) metric_list.append(tmp_message) message = "".join(metric_list) return self._dispatch_send(message)
[docs] def send_list(self, data, timestamp=None, formatter=None): """ Format a list of set's of (metric, value) pairs, and send them all to the graphite server. :param data: list of key,value pairs of metric name and metric value :type prefix: list :param timestmap: epoch time of the event :type prefix: float or int :param formatter: option non-default formatter :type prefix: callable .. code-block:: python >>> g = init() >>> g.send_list([('metric1', 54),('metric2', 43, 1384418995)]) """ if timestamp is None: timestamp = int(time.time()) else: timestamp = int(timestamp) metric_list = [] for metric_info in data: # Support [ (metric, value, timestamp), ... ] as well as # [ (metric, value), ... ]. # If the metric_info provides a timestamp then use the timestamp. # If the metric_info fails to provide a timestamp, use the one # provided to send_list() or generated on the fly by time.time() if len(metric_info) == 3: (metric, value, metric_timestamp) = metric_info else: (metric, value) = metric_info metric_timestamp = timestamp tmp_message = self.format_message(metric, value, metric_timestamp, formatter) metric_list.append(tmp_message) message = "".join(metric_list) return self._dispatch_send(message)
[docs] def enable_asynchronous(self): """Check if socket have been monkey patched by gevent""" def is_monkey_patched(): try: from gevent import monkey, socket except ImportError: return False if hasattr(monkey, "saved"): return "socket" in monkey.saved return gevent.socket.socket == socket.socket if not is_monkey_patched(): raise Exception("To activate asynchonoucity, please monkey patch" " the socket module with gevent") return True
[docs] def block_metric(self, metric_name): return self.block_metric_cls(self, metric_name)
[docs] def decorator(self, metric_or_func): def wrapper(func, metric_name): @functools.wraps(func) def wrapped(*args, **kwargs): with self.block_metric(metric_name): return func(*args, **kwargs) return wrapped if not isinstance(metric_or_func, str): return wrapper(metric_or_func, metric_or_func.__name__) return functools.partial(wrapper, metric_name=metric_or_func)
[docs]class GraphitePickleClient(GraphiteClient): default_port = 2004
[docs] def str2listtuple(self, string_message): "Covert a string that is ready to be sent to graphite into a tuple" if type(string_message).__name__ not in ('str', 'unicode'): raise TypeError("Must provide a string or unicode") if not string_message.endswith('\n'): string_message += "\n" tpl_list = [] for line in string_message.split('\n'): line = line.strip() if not line: continue path, metric, timestamp = (None, None, None) try: (path, metric, timestamp) = line.split() except ValueError: raise ValueError( "message must contain - metric_name, value and timestamp '%s'" % line) try: timestamp = float(timestamp) except ValueError: raise ValueError("Timestamp must be float or int") tpl_list.append((path, (timestamp, metric))) if len(tpl_list) == 0: raise GraphiteSendException("No messages to send") payload = pickle.dumps(tpl_list) header = struct.pack("!L", len(payload)) message = header + payload return message
def _send(self, message): """ Given a message send it to the graphite server. """ # An option to lowercase the entire message if self.lowercase_metric_names: message = message.lower() # convert the message into a pickled payload. message = self.str2listtuple(message) try: self.socket.sendall(message) # Capture missing socket. except socket.gaierror as error: raise GraphiteSendException( "Failed to send data to %s, with error: %s" % (self.addr, error)) # noqa # Capture socket closure before send. except socket.error as error: raise GraphiteSendException( "Socket closed before able to send data to %s, " "with error: %s" % (self.addr, error)) # noqa except Exception as error: raise GraphiteSendException( "Unknown error while trying to send data down socket to %s, " "error: %s" % (self.addr, error)) # noqa return "sent %d long pickled message" % len(message)
[docs]def init(init_type='plaintext_tcp', *args, **kwargs): """ Create the module instance of the GraphiteClient. """ global _module_instance reset() validate_init_types = ['plaintext_tcp', 'plaintext', 'pickle_tcp', 'pickle', 'plain'] if init_type not in validate_init_types: raise GraphiteSendException( "Invalid init_type '%s', must be one of: %s" % (init_type, ", ".join(validate_init_types))) # Use TCP to send data to the plain text receiver on the graphite server. if init_type in ['plaintext_tcp', 'plaintext', 'plain']: _module_instance = GraphiteClient(*args, **kwargs) # Use TCP to send pickled data to the pickle receiver on the graphite # server. if init_type in ['pickle_tcp', 'pickle']: _module_instance = GraphitePickleClient(*args, **kwargs) return _module_instance
[docs]def send(*args, **kwargs): """ Make sure that we have an instance of the GraphiteClient. Then send the metrics to the graphite server. User consumable method. """ if not _module_instance: raise GraphiteSendException( "Must call graphitesend.init() before sending") _module_instance.send(*args, **kwargs) return _module_instance
[docs]def send_dict(*args, **kwargs): """ Make sure that we have an instance of the GraphiteClient. Then send the metrics to the graphite server. User consumable method. """ if not _module_instance: raise GraphiteSendException( "Must call graphitesend.init() before sending") _module_instance.send_dict(*args, **kwargs) return _module_instance
[docs]def send_list(*args, **kwargs): """ Make sure that we have an instance of the GraphiteClient. Then send the metrics to the graphite server. User consumable method. """ if not _module_instance: raise GraphiteSendException( "Must call graphitesend.init() before sending") _module_instance.send_list(*args, **kwargs) return _module_instance
[docs]def reset(): """ disconnect from the graphite server and destroy the module instance. """ global _module_instance if not _module_instance: return False _module_instance.disconnect() _module_instance = None
[docs]def cli(): """ Allow the module to be called from the cli. """ import argparse parser = argparse.ArgumentParser(description='Send data to graphite') # Core of the application is to accept a metric and a value. parser.add_argument('metric', metavar='metric', type=str, help='name.of.metric') parser.add_argument('value', metavar='value', type=int, help='value of metric as int') args = parser.parse_args() metric = args.metric value = args.value graphitesend_instance = init() graphitesend_instance.send(metric, value)
if __name__ == '__main__': # pragma: no cover cli()