distribusi-verse: medium-tech web app content management system for the web
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

319 lines
10 KiB

"""This is the main flask distribusi page"""
import os
import zipfile
import shutil
from datetime import timedelta
from flask import (
render_template,
redirect,
request,
flash,
url_for,
session,
abort,
send_from_directory,
)
from sqlalchemy.exc import (
IntegrityError,
DataError,
DatabaseError,
InterfaceError,
InvalidRequestError,
)
from flask_login import (
login_user,
logout_user,
login_required,
current_user,
)
from werkzeug.routing import BuildError
from flask_bcrypt import generate_password_hash, check_password_hash
from flask_wtf.csrf import CSRFError
from app import create_app, db, login_manager
from usermodel import User
from distribusimodel import Distribusis
# Forms!
from forms.loginform import LoginForm
from forms.registerform import RegisterForm
from forms.uploadform import UploadForm
from forms.distribusiform import DistribusiForm
from forms.themeform import ThemeForm
from forms.editorform import EditorForm
from statuspengguna.helper import AreFilesUploaded
# Tada!
from distribusi.cli import build_argparser
from distribusi.distribusi import distribusify
APP = create_app()
@APP.before_request
def session_handler():
session.permanent = True
APP.permanent_session_lifetime = timedelta(minutes=30)
@APP.route("/")
def index():
distribusis = Distribusis.query.filter(
Distribusis.distribusiname.isnot(None)
).all()
distribusies = {}
for distribusi in distribusis:
user = User.query.filter_by(id=distribusi.userid).first()
distribusies.update({user.email: distribusi.distribusiname})
return render_template("index.html", distribusies=distribusies)
@APP.route("/login", methods=["GET", "POST"])
def login():
loginform = LoginForm()
if loginform.validate_on_submit():
try:
user = User.query.filter_by(email=loginform.email.data).first()
if user is None:
loginform.password.errors.append("Invalid email or password!")
return render_template("login.html", loginform=loginform)
if check_password_hash(user.password, loginform.password.data):
login_user(user)
flash("Logged in successfully.", "success")
next = request.args.get("next")
if next is not None and not is_safe_url(next): # noqa: F821
return abort(400)
return redirect(next or url_for("index"))
else:
flash("Invalid email or password!", "danger")
loginform.password.errors.append("Invalid email or password!")
return render_template("login.html", loginform=loginform)
except Exception as e:
flash(e, "danger")
return render_template("login.html", loginform=loginform)
@APP.route("/register", methods=["GET", "POST"])
def register():
registerform = RegisterForm()
if registerform.validate_on_submit():
try:
email = registerform.email.data
password = registerform.confirmpassword.data
newuser = User(
email=email,
password=generate_password_hash(password),
)
db.session.add(newuser)
db.session.commit()
flash("Account Succesfully created", "success")
login_user(newuser)
return redirect(url_for("index"))
except InvalidRequestError:
db.session.rollback()
registerform.email.errors.append("Something went wrong!")
flash("Something went wrong!", "danger")
except IntegrityError:
db.session.rollback()
registerform.email.errors.append("User already exists!")
flash("User already exists!", "warning")
except DataError:
db.session.rollback()
registerform.email.errors.append("Invalid Entry")
flash("Invalid Entry", "warning")
except InterfaceError:
db.session.rollback()
registerform.email.errors.append(
"Error connecting to the database"
)
flash("Error connecting to the database", "danger")
except DatabaseError:
db.session.rollback()
registerform.email.errors.append(
"Error connecting to the database"
)
flash("Error connecting to the database", "danger")
except BuildError:
db.session.rollback()
registerform.email.errors.append("Unknown error occured!")
flash("An error occured !", "danger")
return render_template("register.html", registerform=registerform)
@APP.route("/distribusi", methods=["GET", "POST"])
@login_required
def distribusi():
uploadform = UploadForm()
distribusiform = DistribusiForm()
themeform = ThemeForm()
files_uploaded = AreFilesUploaded()
user = User.query.filter_by(email=current_user.email).first()
distribusi = Distribusis.query.filter_by(userid=user.id).first()
if distribusiform.validate_on_submit():
zipfilename = "{}.zip".format(distribusi.distribusiname)
userfolder = os.path.join("stash", distribusi.distribusiname)
unzipfile = os.path.join(userfolder, zipfilename)
print(unzipfile)
if os.path.exists(unzipfile):
with zipfile.ZipFile(unzipfile, "r") as zip_ref:
# To do, replace extractall with inspection and extract
zip_ref.extractall(userfolder)
if os.path.exists(unzipfile):
os.remove(os.path.join(userfolder, zipfilename))
# To Do: Make sure nothing can be executed from the upload folder
cssfile = ""
for filename in os.listdir(userfolder):
if filename.endswith(".css"):
cssfile = os.path.join(userfolder, filename)
parser = build_argparser()
args = parser.parse_args(["-s", cssfile])
distribusify(args, userfolder)
return redirect(url_for("index"))
template = render_template(
"distribusi.html",
uploadform=uploadform,
distribusiform=distribusiform,
themeform=themeform,
files_uploaded=files_uploaded,
)
return template
@APP.route("/upload", methods=["POST"])
@login_required
def upload():
uploadform = UploadForm()
distribusiform = DistribusiForm()
themeform = ThemeForm()
files_uploaded = False
if uploadform.validate_on_submit():
user = User.query.filter_by(email=current_user.email).first()
try:
newdistribusi = Distribusis(
distribusiname=uploadform.sitename.data,
userid=user.id,
year=uploadform.academicyear.data,
term=uploadform.term.data,
tags=uploadform.tags.data,
)
db.session.add(newdistribusi)
db.session.commit()
except InvalidRequestError:
db.session.rollback()
uploadform.sitename.errors.append("Something went wrong!")
flash("Something went wrong!", "danger")
except IntegrityError:
db.session.rollback()
uploadform.sitename.errors.append("distribusi name already exists!")
flash("distribusi name already exists!", "warning")
zipfilename = "{}.zip".format(newdistribusi.distribusiname)
zipfile = uploadform.zipfile.data
zipfile.save(os.path.join(APP.config["UPLOAD_FOLDER"], zipfilename))
newuserfolder = os.path.join("stash", newdistribusi.distribusiname)
if not os.path.exists(newuserfolder):
os.mkdir(newuserfolder)
copyzipfile = os.path.join(APP.config["UPLOAD_FOLDER"], zipfilename)
shutil.copy(copyzipfile, newuserfolder)
os.remove(os.path.join(APP.config["UPLOAD_FOLDER"], zipfilename))
files_uploaded = AreFilesUploaded()
template = render_template(
"distribusi.html",
uploadform=uploadform,
distribusiform=distribusiform,
themeform=themeform,
files_uploaded=files_uploaded,
)
return template
@APP.route("/theme", methods=["GET", "POST"])
@login_required
def theme():
uploadform = UploadForm()
distribusiform = DistribusiForm()
themeform = ThemeForm()
files_uploaded = AreFilesUploaded()
if themeform.validate_on_submit():
user = User.query.filter_by(email=current_user.email).first()
distribusi = Distribusis.query.filter_by(userid=user.id).first()
newuserfolder = os.path.join("stash", distribusi.distribusiname)
copycssfile = os.path.join(
"themes",
"{}.css".format(themeform.theme.data),
)
shutil.copy(copycssfile, newuserfolder)
template = render_template(
"distribusi.html",
uploadform=uploadform,
distribusiform=distribusiform,
themeform=themeform,
files_uploaded=files_uploaded,
)
return template
@APP.route("/editor", methods=["GET", "POST"])
@login_required
def editor():
editorform = EditorForm()
files_uploaded = AreFilesUploaded()
user = User.query.filter_by(email=current_user.email).first()
if editorform.validate_on_submit():
userfolder = os.path.join("stash", user.distribusiname)
cssfilename = "{}.css".format(editorform.cssname.data)
with open(os.path.join(userfolder, cssfilename), "w") as cssfile:
cssfile.write(editorform.css.data)
cssfile.close
template = render_template(
"editor.html",
files_uploaded=files_uploaded,
editorform=editorform,
)
return template
@APP.route("/stash/<path:path>")
def distribusistash(path):
return send_from_directory("stash", path)
@APP.route("/admin")
@login_required
def admin():
return "admin"
@APP.route("/logout")
@login_required
def logout():
logout_user()
return redirect(url_for("index"))
@APP.errorhandler(CSRFError)
def handle_csrf_error(e):
return render_template("csrf_error.html", reason=e.description), 400
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
if __name__ == "__main__":
APP.debug = True
APP.run(port=5000)