83 lines
2.7 KiB
Python
83 lines
2.7 KiB
Python
import os
|
|
|
|
import magic
|
|
from distribusi.mappings import FILE_TYPES
|
|
from models.distribusi_model import Distribusis
|
|
from models.distribusi_file_model import DistribusiFiles
|
|
from app import create_app, get_app, db
|
|
from sqlalchemy.exc import (
|
|
DatabaseError,
|
|
DataError,
|
|
IntegrityError,
|
|
InterfaceError,
|
|
InvalidRequestError,
|
|
)
|
|
|
|
MIME_TYPE = magic.Magic(mime=True)
|
|
|
|
|
|
def _distribusi_file_with_type(distribusi, full_path):
|
|
mime = MIME_TYPE.from_file(full_path)
|
|
type_, subtype = mime.split("/")
|
|
if type_ in FILE_TYPES:
|
|
_add_distribusi_file_to_db(distribusi, full_path, type_)
|
|
|
|
|
|
def _get_distribusi_from_path(path):
|
|
distribusi = Distribusis.query.filter_by(distribusiname=path).first()
|
|
return distribusi
|
|
|
|
|
|
def _add_distribusi_file_to_db(distribusi, full_path, type):
|
|
app = get_app()
|
|
app.logger.info(f"adding file to database: {full_path} type: {type}")
|
|
distribusi_file = DistribusiFiles.query.filter_by(path=full_path).first()
|
|
if distribusi_file is not None:
|
|
app.logger.error(f"File already in database: {full_path}")
|
|
return
|
|
try:
|
|
new_distribusi_file = DistribusiFiles(
|
|
path=full_path,
|
|
type=type,
|
|
distribusi=distribusi.id,
|
|
)
|
|
db.session.add(new_distribusi_file)
|
|
db.session.commit()
|
|
return
|
|
except InvalidRequestError:
|
|
db.session.rollback()
|
|
app.logger.error("Something went wrong!")
|
|
except IntegrityError:
|
|
db.session.rollback()
|
|
app.logger.error("File %s already exists!", full_path)
|
|
except DataError:
|
|
db.session.rollback()
|
|
app.logger.error("%s Invalid Entry", full_path)
|
|
except InterfaceError:
|
|
db.session.rollback()
|
|
app.logger.error("Error connecting to the database")
|
|
except DatabaseError:
|
|
db.session.rollback()
|
|
app.logger.error("Error connecting to the database")
|
|
|
|
|
|
def add_distribusi_files_to_db(path):
|
|
distribusi = _get_distribusi_from_path(path)
|
|
path = os.path.join("stash", path)
|
|
for root, dirs, files in os.walk(path, topdown=True):
|
|
files = list(filter(lambda f: not f.startswith("."), files))
|
|
files = list(filter(lambda f: not f.endswith(".html"), files))
|
|
files = list(filter(lambda f: not f.endswith("_thumbnail.jpg"), files))
|
|
files = list(filter(lambda f: not f.endswith("_alttext.txt"), files))
|
|
files = list(
|
|
filter(lambda f: not f.endswith("_dv_description.txt"), files)
|
|
)
|
|
|
|
for file in files:
|
|
full_path = os.path.join(root, file)
|
|
distribusi_file = DistribusiFiles.query.filter_by(
|
|
path=full_path
|
|
).first()
|
|
if distribusi_file is None:
|
|
_distribusi_file_with_type(distribusi, full_path)
|