Source code for app.commands

# -*- coding: utf-8 -*-
"""
app.commands
~~~~~~~~~~~~

This module defines extra flask commands.

:copyright: (c) 2018 European Spallation Source ERIC
:license: BSD 2-Clause, see LICENSE for more details.

"""
import click
import ldap3
import redis
import rq
import sqlalchemy as sa
import sentry_sdk
from flask import current_app
from sentry_sdk.integrations.rq import RqIntegration
from .extensions import db, ldap_manager
from .defaults import defaults
from .tasks import TaskWorker
from . import models, utils, tokens


[docs]def disable_user(user): """Clear users'groups, email and tokens""" user.groups = [] user.email = "" # Revoke all user's tokens for token in user.tokens: db.session.delete(token)
[docs]def sync_user(connection, user): """Synchronize the user from the database with information from the LDAP server""" search_attr = current_app.config.get("LDAP_USER_LOGIN_ATTR") object_filter = current_app.config.get("LDAP_USER_OBJECT_FILTER") search_filter = f"(&{object_filter}({search_attr}={user.username}))" connection.search( search_base=ldap_manager.full_user_search_dn, search_filter=search_filter, search_scope=getattr(ldap3, current_app.config.get("LDAP_USER_SEARCH_SCOPE")), attributes=current_app.config.get("LDAP_GET_USER_ATTRIBUTES"), ) results = [ result for result in connection.response if result["type"] == "searchResEntry" ] if len(results) == 1: ldap_user = results[0] # OU=InActiveUsers is specific to ESS AD if "OU=InActiveUsers" in ldap_user["dn"]: current_app.logger.info(f"{user} is inactive. User disabled.") disable_user(user) else: attributes = ldap_user["attributes"] user.display_name = ( utils.attribute_to_string(attributes["cn"]) or user.username ) user.email = utils.attribute_to_string(attributes["mail"]) groups = ldap_manager.get_user_groups( dn=ldap3.utils.conv.escape_filter_chars(ldap_user["dn"]), _connection=connection, ) user.groups = sorted( [utils.attribute_to_string(group["cn"]) for group in groups] ) current_app.logger.info(f"{user} updated") elif len(results) == 0: current_app.logger.warning(f"{user} not found! User disabled.") disable_user(user) else: current_app.logger.warning(f"Too many results for {user}!") current_app.logger.warning(f"results: {results}") return user
[docs]def sync_users(): """Synchronize all users from the database with information the LDAP server""" current_app.logger.info( "Synchronize database with information from the LDAP server" ) try: connection = ldap_manager.connection except ldap3.core.exceptions.LDAPException as e: current_app.logger.warning(f"Failed to connect to the LDAP server: {e}") return for user in models.User.query.all(): sync_user(connection, user) db.session.commit()
[docs]def clean_deferred_tasks(): """Set all deferred tasks to failed""" for task in ( models.Task.query.filter_by(status=models.JobStatus.DEFERRED) .order_by(models.Task.created_at) .all() ): if task.depends_on is None or task.depends_on.status == models.JobStatus.FAILED: current_app.logger.info( f"Set deferred task {task.id} ({task.name}) to failed" ) task.status = models.JobStatus.FAILED db.session.commit()
[docs]def register_cli(app): @app.cli.command() def create_defaults(): """Create the database default values""" for instance in defaults: db.session.add(instance) try: db.session.commit() except sa.exc.IntegrityError: db.session.rollback() app.logger.debug(f"{instance} already exists") @app.cli.command() def clean_deferred(): """Set deferred tasks to failed if the task it depends on failed""" clean_deferred_tasks() @app.cli.command() def syncusers(): """Synchronize all users from the database with information the LDAP server""" sync_users() @app.cli.command() def delete_expired_tokens(): """Prune database from expired tokens""" tokens.prune_database() @app.cli.command() def maintenance(): """Run maintenance commands""" sync_users() tokens.prune_database() @app.cli.command() def runworker(): """Run RQ worker""" redis_url = current_app.config["RQ_REDIS_URL"] redis_connection = redis.from_url(redis_url) with rq.Connection(redis_connection): worker = TaskWorker(["high", "normal", "low"]) if current_app.config["SENTRY_DSN"]: sentry_sdk.init( current_app.config["SENTRY_DSN"], environment=current_app.config["CSENTRY_ENVIRONMENT"], integrations=[RqIntegration()], ) worker.work() @app.cli.command() @click.option( "--delete/--no-delete", default=True, help="Delete and recreate the index [default: True]", ) @click.option( "--name", default="all", help="name of the class to reindex [default: all]" ) def reindex(delete, name): """Initialize the elasticsearch index""" if delete: current_app.elasticsearch.indices.delete("*", ignore=404) if name == "all": models.Item.reindex(delete) models.Host.reindex(delete) models.AnsibleGroup.reindex(delete) return try: getattr(models, name).reindex(delete) except AttributeError: click.echo(f"The class {name} can't be indexed")