# -*- coding: utf-8 -*-
"""
app.tasks
~~~~~~~~~
This module implements tasks to run.
:copyright: (c) 2018 European Spallation Source ERIC
:license: BSD 2-Clause, see LICENSE for more details.
"""
import time
import tower_cli
from datetime import datetime
from flask import current_app
from rq import Worker, get_current_job
from openpyxl import Workbook
from .extensions import db
from . import models, utils
[docs]class TaskWorker(Worker):
"""
Modified version of the rq worker which updates
the task status and end time in the CSEntry database
"""
[docs] @staticmethod
def save_exception(job, exc_string):
"""Save the exception to the database
The exception is only saved if it occured before the AWX job was triggered.
If the AWX job failed, we can refer to the logs on AWX.
"""
task = models.Task.query.get(job.id)
if task is None:
return
if task.awx_job_id is None:
# No AWX job was triggered. An exception occured before. Save it.
task.exception = exc_string
db.session.commit()
[docs] def update_task_attributes(self, job, attributes):
"""Update the attributes of the task linked to the given job"""
# The task is created after enqueueing the job.
# If the job is processed very quickly, the task might
# not be found in the database. We wait up to 3 seconds.
for _ in range(3):
task = models.Task.query.get(job.id)
if task is not None:
break
else:
self.log.warning("task not found...")
time.sleep(1)
else:
self.log.error(
f"Task {job.id} not found for job: {job.get_call_string()}!"
" Task attribute not updated!"
)
return
for name, value in attributes.items():
setattr(task, name, value)
db.session.commit()
[docs] @staticmethod
def update_reverse_dependencies(job):
task = models.Task.query.get(job.id)
if task is None:
return
task.update_reverse_dependencies()
db.session.commit()
[docs] def handle_job_failure(self, job, queue, started_job_registry=None, exc_string=""):
self.update_task_attributes(
job, {"ended_at": job.ended_at, "status": models.JobStatus.FAILED}
)
self.update_reverse_dependencies(job)
self.save_exception(job, exc_string)
super().handle_job_failure(
job, queue, started_job_registry=started_job_registry, exc_string=exc_string
)
[docs] def handle_job_success(self, job, queue, started_job_registry):
self.update_task_attributes(
job, {"ended_at": job.ended_at, "status": models.JobStatus.FINISHED}
)
super().handle_job_success(job, queue, started_job_registry)
[docs] def prepare_job_execution(self, job, heartbeat_ttl=None):
self.update_task_attributes(job, {"status": models.JobStatus.STARTED})
super().prepare_job_execution(job, heartbeat_ttl)
[docs]def launch_awx_job(resource="job", **kwargs):
r"""Launch an AWX job
job_template or inventory_source shall be passed as keyword argument
:param resource: job|workflow_job|inventory_source
:param \*\*kwargs: keyword arguments passed to launch the job
:returns: A dictionary with information from resource.monitor
"""
rq_job = get_current_job()
job_template = kwargs.pop("job_template", None)
inventory_source = kwargs.pop("inventory_source", None)
if job_template is None and inventory_source is None:
current_app.logger.warning("No job_template nor inventory_source passed!")
return "No job_template nor inventory_source passed!"
if job_template in (
current_app.config["AWX_CREATE_VIOC"],
current_app.config["AWX_CREATE_VM"],
) and not current_app.config.get("AWX_VM_CREATION_ENABLED", False):
current_app.logger.info("AWX VM creation is disabled. Not sending any request.")
return "AWX VM creation not triggered"
if not current_app.config.get("AWX_JOB_ENABLED", False):
current_app.logger.info("AWX job is disabled. Not sending any request.")
return "AWX job not triggered"
# Launch the AWX job
tower_resource = tower_cli.get_resource(resource)
if resource == "inventory_source":
result = tower_resource.update(inventory_source, **kwargs)
else:
result = tower_resource.launch(job_template, **kwargs)
# Save the AWX job id in the task
task = models.Task.query.get(rq_job.id)
task.awx_job_id = result["id"]
db.session.commit()
# Monitor the job until done
result = tower_resource.monitor(pk=result["id"])
return result
[docs]def generate_items_excel_file():
"""Export all inventory items to an excel file
Return the name of the file
"""
job = get_current_job()
name = f"items-{datetime.utcnow().strftime('%Y%m%d_%H%M')}.xlsx"
full_path = utils.unique_filename(current_app.config["CSENTRY_STATIC_FILES"] / name)
# Instead of loading all items at once, we perform several smaller queries
pagination = models.Item.query.order_by(models.Item.created_at).paginate(1, 100)
wb = Workbook()
ws = wb.active
# Add header
# Note that we rely on the fact that dict keep their order in Python 3.6
# (this is official in 3.7)
ws.append(list(pagination.items[0].to_dict().keys()))
while pagination.items:
for item in pagination.items:
ws.append([val for val in item.to_row_dict().values()])
job.meta["progress"] = int(100 * pagination.page / pagination.pages)
current_app.logger.debug(f"progress: {job.meta['progress']}")
job.save_meta()
pagination = pagination.next()
wb.save(full_path)
return full_path.name
[docs]def reindex_ansible_groups():
"""Reindex Ansible groups
This is to ensure the hosts are up to date in the elasticsearch index
for dynamic groups
"""
models.AnsibleGroup.reindex(delete=False)