Source code for app.inventory.views

# -*- coding: utf-8 -*-
"""
app.inventory.views
~~~~~~~~~~~~~~~~~~~

This module implements the inventory blueprint.

:copyright: (c) 2017 European Spallation Source ERIC
:license: BSD 2-Clause, see LICENSE for more details.

"""
import sqlalchemy as sa
from flask import (
    Blueprint,
    render_template,
    jsonify,
    session,
    request,
    redirect,
    url_for,
    flash,
    current_app,
)
from flask_login import login_required, current_user
from .forms import AttributeForm, ItemForm, CommentForm, EditCommentForm
from ..extensions import db
from ..decorators import login_groups_accepted
from .. import utils, models, helpers

bp = Blueprint("inventory", __name__)


[docs]@bp.route("/_retrieve_items", methods=["POST"]) @login_required def retrieve_items(): return utils.retrieve_data_for_datatables(request.values, models.Item)
[docs]@bp.route("/items") @login_required def list_items(): return render_template("inventory/items.html")
@bp.route("/items/_generate_excel_file") @login_required def _generate_excel_file(): task = current_user.launch_task( "generate_items_excel_file", func="generate_items_excel_file", job_timeout=180 ) db.session.commit() return utils.redirect_to_job_status(task.id)
[docs]@bp.route("/items/create", methods=("GET", "POST")) @login_groups_accepted("admin", "inventory") def create_item(): # The following keys are stored in the session to easily create # several identical items keys = ("manufacturer_id", "model_id", "location_id", "status_id", "parent_id") settings = {key: session.get(key, "") for key in keys} form = ItemForm(request.form, **settings) if form.validate_on_submit(): for key in keys: session[key] = getattr(form, key).data item = models.Item( ics_id=form.ics_id.data, serial_number=form.serial_number.data, quantity=form.quantity.data, manufacturer=helpers.get_model( models.Manufacturer, form.manufacturer_id.data ), model=helpers.get_model(models.Model, form.model_id.data), location=helpers.get_model(models.Location, form.location_id.data), status=helpers.get_model(models.Status, form.status_id.data), parent=helpers.get_model(models.Item, form.parent_id.data), host=helpers.get_model(models.Host, form.host_id.data), stack_member=form.stack_member.data, ) item.macs = [ models.Mac(address=address) for address in form.mac_addresses.data.split() ] current_app.logger.debug(f"Trying to create: {item!r}") db.session.add(item) try: db.session.commit() except sa.exc.IntegrityError as e: db.session.rollback() current_app.logger.warning(f"{e}") flash(f"{e}", "error") else: flash(f"Item {item} created!", "success") return redirect(url_for("inventory.create_item")) return render_template("inventory/create_item.html", form=form)
[docs]@bp.route("/items/view/<ics_id>") @login_required def view_item(ics_id): item = models.Item.query.filter_by(ics_id=ics_id).first_or_404() return render_template("inventory/view_item.html", item=item)
[docs]@bp.route("/items/comment/<ics_id>", methods=("GET", "POST")) @login_groups_accepted("admin", "inventory") def comment_item(ics_id): item = models.Item.query.filter_by(ics_id=ics_id).first_or_404() form = CommentForm() if form.validate_on_submit(): comment = models.ItemComment(body=form.body.data) item.comments.append(comment) db.session.add(comment) db.session.commit() return redirect(url_for("inventory.view_item", ics_id=ics_id)) return render_template("inventory/comment_item.html", item=item, form=form)
[docs]@bp.route("/items/comment/edit/<comment_id>", methods=("GET", "POST")) @login_groups_accepted("admin", "inventory") def edit_comment(comment_id): comment = models.ItemComment.query.get_or_404(comment_id) form = EditCommentForm(request.form, obj=comment) if form.validate_on_submit(): comment.body = form.body.data # Mark the item as "dirty" to add it to the session so that it will # be re-indexed sa.orm.attributes.flag_modified(comment.item, "comments") db.session.commit() return redirect(url_for("inventory.view_item", ics_id=comment.item.ics_id)) return render_template( "inventory/edit_comment.html", item=comment.item, comment_id=comment.id, form=form, )
[docs]@bp.route("/items/comment/delete", methods=["POST"]) @login_groups_accepted("admin", "inventory") def delete_comment(): comment = models.ItemComment.query.get_or_404(request.form["comment_id"]) ics_id = comment.item.ics_id # Explicitely remove the comment from the item to make sure # it will be re-indexed comment.item.comments.remove(comment) db.session.delete(comment) db.session.commit() flash(f"Comment {comment.id} has been deleted", "success") return redirect(url_for("inventory.view_item", ics_id=ics_id))
[docs]@bp.route("/items/edit/<ics_id>", methods=("GET", "POST")) @login_groups_accepted("admin", "inventory") def edit_item(ics_id): item = models.Item.query.filter_by(ics_id=ics_id).first_or_404() mac_addresses = " ".join([str(mac) for mac in item.macs]) form = ItemForm(request.form, obj=item, mac_addresses=mac_addresses) if form.validate_on_submit(): # Only allow to update temporary ics_id if item.ics_id.startswith(current_app.config["TEMPORARY_ICS_ID"]): item.ics_id = form.ics_id.data item.serial_number = form.serial_number.data item.quantity = form.quantity.data # When a field is disabled, it's value is not passed to the request # We don't use request.form.get('stack_member', None) to let the coerce # function of the field properly convert the value if "stack_member" in request.form: item.stack_member = form.stack_member.data else: # Field is disabled, force it to None item.stack_member = None item.manufacturer = helpers.get_model( models.Manufacturer, form.manufacturer_id.data ) item.model = helpers.get_model(models.Model, form.model_id.data) item.location = helpers.get_model(models.Location, form.location_id.data) item.status = helpers.get_model(models.Status, form.status_id.data) item.parent = helpers.get_model(models.Item, form.parent_id.data) item.host = helpers.get_model(models.Host, form.host_id.data) new_addresses = form.mac_addresses.data.split() # Delete the MAC addresses that have been removed for (index, mac) in enumerate(item.macs): if mac.address not in new_addresses: item.macs.pop(index) db.session.delete(mac) # Add new MAC addresseses for address in new_addresses: if address not in mac_addresses: mac = models.Mac(address=address) item.macs.append(mac) current_app.logger.debug(f"Trying to update: {item!r}") try: db.session.commit() except sa.exc.IntegrityError as e: db.session.rollback() current_app.logger.warning(f"{e}") flash(f"{e}", "error") else: flash(f"Item {item} updated!", "success") return redirect(url_for("inventory.view_item", ics_id=item.ics_id)) return render_template("inventory/edit_item.html", form=form)
[docs]@bp.route("/attributes/favorites") @login_required def attributes_favorites(): return render_template("inventory/attributes_favorites.html")
[docs]@bp.route("/_retrieve_attributes_favorites") @login_required def retrieve_attributes_favorites(): data = [ ( favorite.base64_image(), type(favorite).__name__, favorite.name, favorite.description, ) for favorite in current_user.favorite_attributes() ] return jsonify(data=data)
[docs]@bp.route("/attributes/<kind>", methods=("GET", "POST")) @login_groups_accepted("admin", "inventory") def attributes(kind): form = AttributeForm() if form.validate_on_submit(): model = getattr(models, kind) new_model = model( name=form.name.data, description=form.description.data or None ) db.session.add(new_model) try: db.session.commit() except sa.exc.IntegrityError: db.session.rollback() flash(f"{form.name.data} already exists! {kind} not created.", "error") else: flash(f"{kind} {new_model} created!", "success") return redirect(url_for("inventory.attributes", kind=kind)) return render_template("inventory/attributes.html", kind=kind, form=form)
[docs]@bp.route("/_retrieve_attributes/<kind>") @login_required def retrieve_attributes(kind): try: model = getattr(models, kind) except AttributeError: raise utils.CSEntryError(f"Unknown model '{kind}'", status_code=422) items = db.session.query(model).order_by(model.name) data = [ ( {"id": item.id, "favorite": item.is_user_favorite()}, item.base64_image(), item.name, item.description, ) for item in items ] return jsonify(data=data)
[docs]@bp.route("/_update_favorites/<kind>", methods=["POST"]) @login_required def update_favorites(kind): """Update the current user favorite attributes Add or remove the attribute from the favorites when the checkbox is checked/unchecked in the attributes table """ try: model = getattr(models, kind) except AttributeError: raise utils.CSEntryError(f"Unknown model '{kind}'", status_code=422) data = request.get_json() attribute = model.query.get(data["id"]) favorite_attributes_str = utils.pluralize(f"favorite_{kind.lower()}") user_favorite_attributes = getattr(current_user, favorite_attributes_str) if data["checked"]: user_favorite_attributes.append(attribute) message = "Attribute added to the favorites" else: user_favorite_attributes.remove(attribute) message = "Attribute removed from the favorites" db.session.commit() data = {"message": message} return jsonify(data=data), 201
[docs]@bp.route("/scanner") @login_required def scanner(): """Render the scanner setup codes""" return render_template("inventory/scanner.html")
[docs]@bp.route("/_retrieve_free_stack_members/<host_id>") @login_required def retrieve_free_stack_members(host_id): """Return as json the free stack members numbers for the given host If ics_id is passed in the query string, the member will be added to the list (if it exists) Used to populate dynamically the stack_member field in the create and edit item forms """ disabled_data = {"stack_members": [], "selected_member": None, "disabled": True} try: host = models.Host.query.get(host_id) except sa.exc.DataError: # In case of unknown host_id or if host_id is None current_app.logger.debug(f"Invalid host_id: {host_id}") return jsonify(data=disabled_data) if str(host.device_type) != "Network": return jsonify(data=disabled_data) members = host.free_stack_members() selected_member = "None" ics_id = request.args.get("ics_id", None) item = models.Item.query.filter_by(ics_id=ics_id).first() if item is None: current_app.logger.debug(f"Unknown ics_id: {ics_id}") else: if item.stack_member is not None: members.append(item.stack_member) members.sort() selected_member = item.stack_member members = ["None"] + members data = { "stack_members": members, "selected_member": selected_member, "disabled": False, } return jsonify(data=data)