|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217 |
- # -*- coding: utf-8 -*-
- '''
- Saltgraph engine for catching returns of state runs, parsing them
- and passing them to flat database of latest Salt resource runs.
- '''
-
- # Import python libs
- from __future__ import absolute_import
- import datetime
- import json
- import logging
-
- # Import salt libs
- import salt.utils.event
-
- # Import third party libs
- try:
- import psycopg2
- import psycopg2.extras
- HAS_POSTGRES = True
- except ImportError:
- HAS_POSTGRES = False
-
- __virtualname__ = 'saltgraph'
- log = logging.getLogger(__name__)
-
-
- def __virtual__():
- if not HAS_POSTGRES:
- return False, 'Could not import saltgraph engine. python-psycopg2 is not installed.'
- return __virtualname__
-
-
- def _get_conn(options={}):
- '''
- Return a postgres connection.
- '''
- host = options.get('host', '127.0.0.1')
- user = options.get('user', 'salt')
- passwd = options.get('passwd', 'salt')
- datab = options.get('db', 'salt')
- port = options.get('port', 5432)
-
- return psycopg2.connect(
- host=host,
- user=user,
- password=passwd,
- database=datab,
- port=port)
-
-
- def _close_conn(conn):
- '''
- Close the Postgres connection
- '''
- conn.commit()
- conn.close()
-
-
- def _get_lowstate_data(options={}):
- '''
- TODO: document this method
- '''
- conn = _get_conn(options)
- cur = conn.cursor()
-
- try:
- # you can only do this on Salt Masters minion
- lowstate_req = __salt__['saltutil.cmd']('*', 'state.show_lowstate', **{'timeout': 15, 'concurrent': True, 'queue': True})
- except:
- lowstate_req = {}
-
- for minion, lowstate_ret in lowstate_req.items():
- if lowstate_ret.get('retcode') != 0:
- continue
- for resource in lowstate_ret.get('ret', []):
- low_sql = '''INSERT INTO salt_resources
- (id, resource_id, host, service, module, fun, status)
- VALUES (%s, %s, %s, %s, %s, %s, %s)
- ON CONFLICT (id) DO UPDATE
- SET resource_id = excluded.resource_id,
- host = excluded.host,
- service = excluded.service,
- module = excluded.module,
- fun = excluded.fun,
- alter_time = current_timestamp'''
-
- rid = "%s|%s" % (minion, resource.get('__id__'))
- cur.execute(
- low_sql, (
- rid,
- resource.get('__id__'),
- minion,
- resource.get('__sls__'),
- resource.get('state') if 'state' in resource else resource.get('module'),
- resource.get('fun'),
- 'unknown'
- )
- )
- conn.commit()
-
- if lowstate_req:
- meta_sql = '''INSERT INTO salt_resources_meta
- (id, options)
- VALUES (%s, %s)
- ON CONFLICT (id) DO UPDATE
- SET options = excluded.options,
- alter_time = current_timestamp'''
-
- cur.execute(
- meta_sql, (
- 'lowstate_data',
- '{}'
- )
- )
- _close_conn(conn)
-
-
- def _up_to_date(options={}):
- '''
- TODO: document this method
- '''
- conn = _get_conn(options)
- cur = conn.cursor()
- #cur_dict = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
-
- ret = False
-
- # if lowstate data are older than 1 day, refresh them
- cur.execute('SELECT alter_time FROM salt_resources_meta WHERE id = %s', ('lowstate_data',))
- alter_time = cur.fetchone()
-
- if alter_time:
- now = datetime.datetime.utcnow()
- day = datetime.timedelta(days=1)
- time_diff = now - alter_time[0].replace(tzinfo=None)
- if time_diff < day:
- ret = True
- else:
- skip = False
-
- _close_conn(conn)
-
- return ret
-
-
- def _update_resources(event, options):
- '''
- TODO: document this method
- '''
- conn = _get_conn(options)
- cur = conn.cursor()
-
- cur.execute('SELECT id FROM salt_resources')
- resources_db = [res[0] for res in cur.fetchall()]
- resources = event.get('return', {}).values()
-
- for resource in resources:
- rid = '%s|%s' % (event.get('id'), resource.get('__id__'))
- if rid in resources_db:
- status = 'unknown'
- if resource.get('result', None) is not None:
- status = 'success' if resource.get('result') else 'failed'
-
- resource_sql = '''UPDATE salt_resources SET (status, last_ret, alter_time) = (%s, %s, current_timestamp)
- WHERE id = %s'''
-
- cur.execute(
- resource_sql, (
- status,
- repr(resource),
- rid
- )
- )
-
- conn.commit()
-
- _close_conn(conn)
-
-
- def start(host='salt', user='salt', password='salt', database='salt', port=5432, **kwargs):
- '''
- Listen to events and parse Salt state returns
- '''
- if __opts__['__role'] == 'master':
- event_bus = salt.utils.event.get_master_event(
- __opts__,
- __opts__['sock_dir'],
- listen=True)
- else:
- event_bus = salt.utils.event.get_event(
- 'minion',
- transport=__opts__['transport'],
- opts=__opts__,
- sock_dir=__opts__['sock_dir'],
- listen=True)
- log.debug('Saltgraph engine started')
-
- while True:
- event = event_bus.get_event()
- supported_funcs = ['state.sls', 'state.apply', 'state.highstate']
- if event and event.get('fun', None) in supported_funcs:
- test = 'test=true' in [arg.lower() for arg in event.get('fun_args', [])]
- if not test:
- options = {
- 'host': host,
- 'user': user,
- 'passwd': password,
- 'db': database,
- 'port': port
- }
- is_reclass = [arg for arg in event.get('fun_args', []) if arg.startswith('reclass')]
- if is_reclass or not _up_to_date(options):
- _get_lowstate_data(options)
-
- _update_resources(event, options)
-
|