# -*- coding: utf-8 -*-
"""
app.inventory.views
~~~~~~~~~~~~~~~~~~~
This module implements the inventory blueprint.
:copyright: (c) 2017 European Spallation Source ERIC
:license: BSD 2-Clause, see LICENSE for more details.
"""
import sqlalchemy as sa
from flask import (
Blueprint,
render_template,
jsonify,
session,
request,
redirect,
url_for,
flash,
current_app,
)
from flask_login import login_required, current_user
from .forms import AttributeForm, ItemForm, CommentForm, EditCommentForm
from ..extensions import db
from ..decorators import login_groups_accepted
from .. import utils, models, helpers
bp = Blueprint("inventory", __name__)
[docs]@bp.route("/_retrieve_items", methods=["POST"])
@login_required
def retrieve_items():
return utils.retrieve_data_for_datatables(request.values, models.Item)
[docs]@bp.route("/items")
@login_required
def list_items():
return render_template("inventory/items.html")
@bp.route("/items/_generate_excel_file")
@login_required
def _generate_excel_file():
task = current_user.launch_task(
"generate_items_excel_file", func="generate_items_excel_file", job_timeout=180
)
db.session.commit()
return utils.redirect_to_job_status(task.id)
[docs]@bp.route("/items/create", methods=("GET", "POST"))
@login_groups_accepted("admin", "inventory")
def create_item():
# The following keys are stored in the session to easily create
# several identical items
keys = ("manufacturer_id", "model_id", "location_id", "status_id", "parent_id")
settings = {key: session.get(key, "") for key in keys}
form = ItemForm(request.form, **settings)
if form.validate_on_submit():
for key in keys:
session[key] = getattr(form, key).data
item = models.Item(
ics_id=form.ics_id.data,
serial_number=form.serial_number.data,
quantity=form.quantity.data,
manufacturer=helpers.get_model(
models.Manufacturer, form.manufacturer_id.data
),
model=helpers.get_model(models.Model, form.model_id.data),
location=helpers.get_model(models.Location, form.location_id.data),
status=helpers.get_model(models.Status, form.status_id.data),
parent=helpers.get_model(models.Item, form.parent_id.data),
host=helpers.get_model(models.Host, form.host_id.data),
stack_member=form.stack_member.data,
)
item.macs = [
models.Mac(address=address) for address in form.mac_addresses.data.split()
]
current_app.logger.debug(f"Trying to create: {item!r}")
db.session.add(item)
try:
db.session.commit()
except sa.exc.IntegrityError as e:
db.session.rollback()
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
else:
flash(f"Item {item} created!", "success")
return redirect(url_for("inventory.create_item"))
return render_template("inventory/create_item.html", form=form)
[docs]@bp.route("/items/view/<ics_id>")
@login_required
def view_item(ics_id):
item = models.Item.query.filter_by(ics_id=ics_id).first_or_404()
return render_template("inventory/view_item.html", item=item)
[docs]@bp.route("/items/edit/<ics_id>", methods=("GET", "POST"))
@login_groups_accepted("admin", "inventory")
def edit_item(ics_id):
item = models.Item.query.filter_by(ics_id=ics_id).first_or_404()
mac_addresses = " ".join([str(mac) for mac in item.macs])
form = ItemForm(request.form, obj=item, mac_addresses=mac_addresses)
if form.validate_on_submit():
# Only allow to update temporary ics_id
if item.ics_id.startswith(current_app.config["TEMPORARY_ICS_ID"]):
item.ics_id = form.ics_id.data
item.serial_number = form.serial_number.data
item.quantity = form.quantity.data
# When a field is disabled, it's value is not passed to the request
# We don't use request.form.get('stack_member', None) to let the coerce
# function of the field properly convert the value
if "stack_member" in request.form:
item.stack_member = form.stack_member.data
else:
# Field is disabled, force it to None
item.stack_member = None
item.manufacturer = helpers.get_model(
models.Manufacturer, form.manufacturer_id.data
)
item.model = helpers.get_model(models.Model, form.model_id.data)
item.location = helpers.get_model(models.Location, form.location_id.data)
item.status = helpers.get_model(models.Status, form.status_id.data)
item.parent = helpers.get_model(models.Item, form.parent_id.data)
item.host = helpers.get_model(models.Host, form.host_id.data)
new_addresses = form.mac_addresses.data.split()
# Delete the MAC addresses that have been removed
for (index, mac) in enumerate(item.macs):
if mac.address not in new_addresses:
item.macs.pop(index)
db.session.delete(mac)
# Add new MAC addresseses
for address in new_addresses:
if address not in mac_addresses:
mac = models.Mac(address=address)
item.macs.append(mac)
current_app.logger.debug(f"Trying to update: {item!r}")
try:
db.session.commit()
except sa.exc.IntegrityError as e:
db.session.rollback()
current_app.logger.warning(f"{e}")
flash(f"{e}", "error")
else:
flash(f"Item {item} updated!", "success")
return redirect(url_for("inventory.view_item", ics_id=item.ics_id))
return render_template("inventory/edit_item.html", form=form)
[docs]@bp.route("/attributes/favorites")
@login_required
def attributes_favorites():
return render_template("inventory/attributes_favorites.html")
[docs]@bp.route("/_retrieve_attributes_favorites")
@login_required
def retrieve_attributes_favorites():
data = [
(
favorite.base64_image(),
type(favorite).__name__,
favorite.name,
favorite.description,
)
for favorite in current_user.favorite_attributes()
]
return jsonify(data=data)
[docs]@bp.route("/attributes/<kind>", methods=("GET", "POST"))
@login_groups_accepted("admin", "inventory")
def attributes(kind):
form = AttributeForm()
if form.validate_on_submit():
model = getattr(models, kind)
new_model = model(
name=form.name.data, description=form.description.data or None
)
db.session.add(new_model)
try:
db.session.commit()
except sa.exc.IntegrityError:
db.session.rollback()
flash(f"{form.name.data} already exists! {kind} not created.", "error")
else:
flash(f"{kind} {new_model} created!", "success")
return redirect(url_for("inventory.attributes", kind=kind))
return render_template("inventory/attributes.html", kind=kind, form=form)
[docs]@bp.route("/_retrieve_attributes/<kind>")
@login_required
def retrieve_attributes(kind):
try:
model = getattr(models, kind)
except AttributeError:
raise utils.CSEntryError(f"Unknown model '{kind}'", status_code=422)
items = db.session.query(model).order_by(model.name)
data = [
(
{"id": item.id, "favorite": item.is_user_favorite()},
item.base64_image(),
item.name,
item.description,
)
for item in items
]
return jsonify(data=data)
[docs]@bp.route("/_update_favorites/<kind>", methods=["POST"])
@login_required
def update_favorites(kind):
"""Update the current user favorite attributes
Add or remove the attribute from the favorites when the
checkbox is checked/unchecked in the attributes table
"""
try:
model = getattr(models, kind)
except AttributeError:
raise utils.CSEntryError(f"Unknown model '{kind}'", status_code=422)
data = request.get_json()
attribute = model.query.get(data["id"])
favorite_attributes_str = utils.pluralize(f"favorite_{kind.lower()}")
user_favorite_attributes = getattr(current_user, favorite_attributes_str)
if data["checked"]:
user_favorite_attributes.append(attribute)
message = "Attribute added to the favorites"
else:
user_favorite_attributes.remove(attribute)
message = "Attribute removed from the favorites"
db.session.commit()
data = {"message": message}
return jsonify(data=data), 201
[docs]@bp.route("/scanner")
@login_required
def scanner():
"""Render the scanner setup codes"""
return render_template("inventory/scanner.html")
[docs]@bp.route("/_retrieve_free_stack_members/<host_id>")
@login_required
def retrieve_free_stack_members(host_id):
"""Return as json the free stack members numbers for the given host
If ics_id is passed in the query string, the member will be added to
the list (if it exists)
Used to populate dynamically the stack_member field in the create and
edit item forms
"""
disabled_data = {"stack_members": [], "selected_member": None, "disabled": True}
try:
host = models.Host.query.get(host_id)
except sa.exc.DataError:
# In case of unknown host_id or if host_id is None
current_app.logger.debug(f"Invalid host_id: {host_id}")
return jsonify(data=disabled_data)
if str(host.device_type) != "Network":
return jsonify(data=disabled_data)
members = host.free_stack_members()
selected_member = "None"
ics_id = request.args.get("ics_id", None)
item = models.Item.query.filter_by(ics_id=ics_id).first()
if item is None:
current_app.logger.debug(f"Unknown ics_id: {ics_id}")
else:
if item.stack_member is not None:
members.append(item.stack_member)
members.sort()
selected_member = item.stack_member
members = ["None"] + members
data = {
"stack_members": members,
"selected_member": selected_member,
"disabled": False,
}
return jsonify(data=data)