|
|
|
|
|
|
|
|
|
|
|
# -*- coding: utf-8 -*- |
|
|
|
|
|
''' |
|
|
|
|
|
Saltgraph engine for catching returns of state runs, parsing them |
|
|
|
|
|
and passing them to flat database of latest Salt resource runs. |
|
|
|
|
|
''' |
|
|
|
|
|
|
|
|
|
|
|
# Import python libs |
|
|
|
|
|
from __future__ import absolute_import |
|
|
|
|
|
import datetime |
|
|
|
|
|
import json |
|
|
|
|
|
import logging |
|
|
|
|
|
|
|
|
|
|
|
# Import salt libs |
|
|
|
|
|
import salt.utils.event |
|
|
|
|
|
|
|
|
|
|
|
# Import third party libs |
|
|
|
|
|
try: |
|
|
|
|
|
import psycopg2 |
|
|
|
|
|
import psycopg2.extras |
|
|
|
|
|
HAS_POSTGRES = True |
|
|
|
|
|
except ImportError: |
|
|
|
|
|
HAS_POSTGRES = False |
|
|
|
|
|
|
|
|
|
|
|
__virtualname__ = 'saltgraph' |
|
|
|
|
|
log = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __virtual__(): |
|
|
|
|
|
if not HAS_POSTGRES: |
|
|
|
|
|
return False, 'Could not import saltgraph engine. python-psycopg2 is not installed.' |
|
|
|
|
|
return __virtualname__ |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_conn(options={}): |
|
|
|
|
|
''' |
|
|
|
|
|
Return a postgres connection. |
|
|
|
|
|
''' |
|
|
|
|
|
host = options.get('host', '127.0.0.1') |
|
|
|
|
|
user = options.get('user', 'salt') |
|
|
|
|
|
passwd = options.get('passwd', 'salt') |
|
|
|
|
|
datab = options.get('db', 'salt') |
|
|
|
|
|
port = options.get('port', 5432) |
|
|
|
|
|
|
|
|
|
|
|
return psycopg2.connect( |
|
|
|
|
|
host=host, |
|
|
|
|
|
user=user, |
|
|
|
|
|
password=passwd, |
|
|
|
|
|
database=datab, |
|
|
|
|
|
port=port) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _close_conn(conn): |
|
|
|
|
|
''' |
|
|
|
|
|
Close the Postgres connection |
|
|
|
|
|
''' |
|
|
|
|
|
conn.commit() |
|
|
|
|
|
conn.close() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_lowstate_data(options={}): |
|
|
|
|
|
''' |
|
|
|
|
|
TODO: document this method |
|
|
|
|
|
''' |
|
|
|
|
|
conn = _get_conn(options) |
|
|
|
|
|
cur = conn.cursor() |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
# you can only do this on Salt Masters minion |
|
|
|
|
|
lowstate_req = __salt__['saltutil.cmd']('*', 'state.show_lowstate', **{'timeout': 15, 'concurrent': True, 'queue': True}) |
|
|
|
|
|
except: |
|
|
|
|
|
lowstate_req = {} |
|
|
|
|
|
|
|
|
|
|
|
for minion, lowstate_ret in lowstate_req.items(): |
|
|
|
|
|
if lowstate_ret.get('retcode') != 0: |
|
|
|
|
|
continue |
|
|
|
|
|
for resource in lowstate_ret.get('ret', []): |
|
|
|
|
|
low_sql = '''INSERT INTO salt_resources |
|
|
|
|
|
(id, resource_id, host, service, module, fun, status) |
|
|
|
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s) |
|
|
|
|
|
ON CONFLICT (id) DO UPDATE |
|
|
|
|
|
SET resource_id = excluded.resource_id, |
|
|
|
|
|
host = excluded.host, |
|
|
|
|
|
service = excluded.service, |
|
|
|
|
|
module = excluded.module, |
|
|
|
|
|
fun = excluded.fun, |
|
|
|
|
|
alter_time = current_timestamp''' |
|
|
|
|
|
|
|
|
|
|
|
rid = "%s|%s" % (minion, resource.get('__id__')) |
|
|
|
|
|
cur.execute( |
|
|
|
|
|
low_sql, ( |
|
|
|
|
|
rid, |
|
|
|
|
|
resource.get('__id__'), |
|
|
|
|
|
minion, |
|
|
|
|
|
resource.get('__sls__'), |
|
|
|
|
|
resource.get('state') if 'state' in resource else resource.get('module'), |
|
|
|
|
|
resource.get('fun'), |
|
|
|
|
|
'unknown' |
|
|
|
|
|
) |
|
|
|
|
|
) |
|
|
|
|
|
conn.commit() |
|
|
|
|
|
|
|
|
|
|
|
if lowstate_req: |
|
|
|
|
|
meta_sql = '''INSERT INTO salt_resources_meta |
|
|
|
|
|
(id, options) |
|
|
|
|
|
VALUES (%s, %s) |
|
|
|
|
|
ON CONFLICT (id) DO UPDATE |
|
|
|
|
|
SET options = excluded.options, |
|
|
|
|
|
alter_time = current_timestamp''' |
|
|
|
|
|
|
|
|
|
|
|
cur.execute( |
|
|
|
|
|
meta_sql, ( |
|
|
|
|
|
'lowstate_data', |
|
|
|
|
|
'{}' |
|
|
|
|
|
) |
|
|
|
|
|
) |
|
|
|
|
|
_close_conn(conn) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _up_to_date(options={}): |
|
|
|
|
|
''' |
|
|
|
|
|
TODO: document this method |
|
|
|
|
|
''' |
|
|
|
|
|
conn = _get_conn(options) |
|
|
|
|
|
cur = conn.cursor() |
|
|
|
|
|
#cur_dict = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) |
|
|
|
|
|
|
|
|
|
|
|
ret = False |
|
|
|
|
|
|
|
|
|
|
|
# if lowstate data are older than 1 day, refresh them |
|
|
|
|
|
cur.execute('SELECT alter_time FROM salt_resources_meta WHERE id = %s', ('lowstate_data',)) |
|
|
|
|
|
alter_time = cur.fetchone() |
|
|
|
|
|
|
|
|
|
|
|
if alter_time: |
|
|
|
|
|
now = datetime.datetime.utcnow() |
|
|
|
|
|
day = datetime.timedelta(days=1) |
|
|
|
|
|
time_diff = now - alter_time[0].replace(tzinfo=None) |
|
|
|
|
|
if time_diff < day: |
|
|
|
|
|
ret = True |
|
|
|
|
|
else: |
|
|
|
|
|
skip = False |
|
|
|
|
|
|
|
|
|
|
|
_close_conn(conn) |
|
|
|
|
|
|
|
|
|
|
|
return ret |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _update_resources(event, options): |
|
|
|
|
|
''' |
|
|
|
|
|
TODO: document this method |
|
|
|
|
|
''' |
|
|
|
|
|
conn = _get_conn(options) |
|
|
|
|
|
cur = conn.cursor() |
|
|
|
|
|
|
|
|
|
|
|
cur.execute('SELECT id FROM salt_resources') |
|
|
|
|
|
resources_db = [res[0] for res in cur.fetchall()] |
|
|
|
|
|
resources = event.get('return', {}).values() |
|
|
|
|
|
|
|
|
|
|
|
for resource in resources: |
|
|
|
|
|
rid = '%s|%s' % (event.get('id'), resource.get('__id__')) |
|
|
|
|
|
if rid in resources_db: |
|
|
|
|
|
status = 'unknown' |
|
|
|
|
|
if resource.get('result', None) is not None: |
|
|
|
|
|
status = 'success' if resource.get('result') else 'failed' |
|
|
|
|
|
|
|
|
|
|
|
resource_sql = '''UPDATE salt_resources SET (status, last_ret, alter_time) = (%s, %s, current_timestamp) |
|
|
|
|
|
WHERE id = %s''' |
|
|
|
|
|
|
|
|
|
|
|
cur.execute( |
|
|
|
|
|
resource_sql, ( |
|
|
|
|
|
status, |
|
|
|
|
|
repr(resource), |
|
|
|
|
|
rid |
|
|
|
|
|
) |
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
conn.commit() |
|
|
|
|
|
|
|
|
|
|
|
_close_conn(conn) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def start(host='salt', user='salt', password='salt', database='salt', port=5432, **kwargs): |
|
|
|
|
|
''' |
|
|
|
|
|
Listen to events and parse Salt state returns |
|
|
|
|
|
''' |
|
|
|
|
|
if __opts__['__role'] == 'master': |
|
|
|
|
|
event_bus = salt.utils.event.get_master_event( |
|
|
|
|
|
__opts__, |
|
|
|
|
|
__opts__['sock_dir'], |
|
|
|
|
|
listen=True) |
|
|
|
|
|
else: |
|
|
|
|
|
event_bus = salt.utils.event.get_event( |
|
|
|
|
|
'minion', |
|
|
|
|
|
transport=__opts__['transport'], |
|
|
|
|
|
opts=__opts__, |
|
|
|
|
|
sock_dir=__opts__['sock_dir'], |
|
|
|
|
|
listen=True) |
|
|
|
|
|
log.debug('Saltgraph engine started') |
|
|
|
|
|
|
|
|
|
|
|
while True: |
|
|
|
|
|
event = event_bus.get_event() |
|
|
|
|
|
supported_funcs = ['state.sls', 'state.apply', 'state.highstate'] |
|
|
|
|
|
if event and event.get('fun', None) in supported_funcs: |
|
|
|
|
|
test = 'test=true' in [arg.lower() for arg in event.get('fun_args', [])] |
|
|
|
|
|
if not test: |
|
|
|
|
|
options = { |
|
|
|
|
|
'host': host, |
|
|
|
|
|
'user': user, |
|
|
|
|
|
'passwd': password, |
|
|
|
|
|
'db': database, |
|
|
|
|
|
'port': port |
|
|
|
|
|
} |
|
|
|
|
|
is_reclass = [arg for arg in event.get('fun_args', []) if arg.startswith('reclass')] |
|
|
|
|
|
if is_reclass or not _up_to_date(options): |
|
|
|
|
|
_get_lowstate_data(options) |
|
|
|
|
|
|
|
|
|
|
|
_update_resources(event, options) |
|
|
|
|
|
|