# -*- coding: utf-8 -*-
"""
app.commands
~~~~~~~~~~~~
This module defines extra flask commands.
:copyright: (c) 2018 European Spallation Source ERIC
:license: BSD 2-Clause, see LICENSE for more details.
"""
import click
import ldap3
import redis
import rq
import sqlalchemy as sa
import sentry_sdk
from flask import current_app
from sentry_sdk.integrations.rq import RqIntegration
from .extensions import db, ldap_manager
from .defaults import defaults
from .tasks import TaskWorker
from . import models, utils, tokens
[docs]def disable_user(user):
"""Clear users'groups, email and tokens"""
user.groups = []
user.email = ""
# Revoke all user's tokens
for token in user.tokens:
db.session.delete(token)
[docs]def sync_user(connection, user):
"""Synchronize the user from the database with information from the LDAP server"""
search_attr = current_app.config.get("LDAP_USER_LOGIN_ATTR")
object_filter = current_app.config.get("LDAP_USER_OBJECT_FILTER")
search_filter = f"(&{object_filter}({search_attr}={user.username}))"
connection.search(
search_base=ldap_manager.full_user_search_dn,
search_filter=search_filter,
search_scope=getattr(ldap3, current_app.config.get("LDAP_USER_SEARCH_SCOPE")),
attributes=current_app.config.get("LDAP_GET_USER_ATTRIBUTES"),
)
results = [
result for result in connection.response if result["type"] == "searchResEntry"
]
if len(results) == 1:
ldap_user = results[0]
# OU=InActiveUsers is specific to ESS AD
if "OU=InActiveUsers" in ldap_user["dn"]:
current_app.logger.info(f"{user} is inactive. User disabled.")
disable_user(user)
else:
attributes = ldap_user["attributes"]
user.display_name = (
utils.attribute_to_string(attributes["cn"]) or user.username
)
user.email = utils.attribute_to_string(attributes["mail"])
groups = ldap_manager.get_user_groups(
dn=ldap3.utils.conv.escape_filter_chars(ldap_user["dn"]),
_connection=connection,
)
user.groups = sorted(
[utils.attribute_to_string(group["cn"]) for group in groups]
)
current_app.logger.info(f"{user} updated")
elif len(results) == 0:
current_app.logger.warning(f"{user} not found! User disabled.")
disable_user(user)
else:
current_app.logger.warning(f"Too many results for {user}!")
current_app.logger.warning(f"results: {results}")
return user
[docs]def sync_users():
"""Synchronize all users from the database with information the LDAP server"""
current_app.logger.info(
"Synchronize database with information from the LDAP server"
)
try:
connection = ldap_manager.connection
except ldap3.core.exceptions.LDAPException as e:
current_app.logger.warning(f"Failed to connect to the LDAP server: {e}")
return
for user in models.User.query.all():
sync_user(connection, user)
db.session.commit()
[docs]def clean_deferred_tasks():
"""Set all deferred tasks to failed"""
for task in (
models.Task.query.filter_by(status=models.JobStatus.DEFERRED)
.order_by(models.Task.created_at)
.all()
):
if task.depends_on is None or task.depends_on.status == models.JobStatus.FAILED:
current_app.logger.info(
f"Set deferred task {task.id} ({task.name}) to failed"
)
task.status = models.JobStatus.FAILED
db.session.commit()
[docs]def register_cli(app):
@app.cli.command()
def create_defaults():
"""Create the database default values"""
for instance in defaults:
db.session.add(instance)
try:
db.session.commit()
except sa.exc.IntegrityError:
db.session.rollback()
app.logger.debug(f"{instance} already exists")
@app.cli.command()
def clean_deferred():
"""Set deferred tasks to failed if the task it depends on failed"""
clean_deferred_tasks()
@app.cli.command()
def syncusers():
"""Synchronize all users from the database with information the LDAP server"""
sync_users()
@app.cli.command()
def delete_expired_tokens():
"""Prune database from expired tokens"""
tokens.prune_database()
@app.cli.command()
def maintenance():
"""Run maintenance commands"""
sync_users()
tokens.prune_database()
@app.cli.command()
def runworker():
"""Run RQ worker"""
redis_url = current_app.config["RQ_REDIS_URL"]
redis_connection = redis.from_url(redis_url)
with rq.Connection(redis_connection):
worker = TaskWorker(["high", "normal", "low"])
if current_app.config["SENTRY_DSN"]:
sentry_sdk.init(
current_app.config["SENTRY_DSN"],
environment=current_app.config["CSENTRY_ENVIRONMENT"],
integrations=[RqIntegration()],
)
worker.work()
@app.cli.command()
@click.option(
"--delete/--no-delete",
default=True,
help="Delete and recreate the index [default: True]",
)
@click.option(
"--name", default="all", help="name of the class to reindex [default: all]"
)
def reindex(delete, name):
"""Initialize the elasticsearch index"""
if delete:
current_app.elasticsearch.indices.delete("*", ignore=404)
if name == "all":
models.Item.reindex(delete)
models.Host.reindex(delete)
models.AnsibleGroup.reindex(delete)
return
try:
getattr(models, name).reindex(delete)
except AttributeError:
click.echo(f"The class {name} can't be indexed")