This is the repository for the online module Bots as Digital Infrapuncture, commissioned by the Utrecht University
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

935 lines
37 KiB

import calendar
import errno
import fnmatch
import logging
import os
from collections import defaultdict
from functools import partial
from itertools import chain, groupby
from operator import attrgetter
from jinja2 import (BaseLoader, ChoiceLoader, Environment, FileSystemLoader,
PrefixLoader, TemplateNotFound)
from pelican.cache import FileStampDataCacher
from pelican.contents import Article, Page, Static
from pelican.plugins import signals
from pelican.readers import Readers
from pelican.utils import (DateFormatter, copy, mkdir_p, order_content,
posixize_path, process_translations)
logger = logging.getLogger(__name__)
class PelicanTemplateNotFound(Exception):
pass
class Generator:
"""Baseclass generator"""
def __init__(self, context, settings, path, theme, output_path,
readers_cache_name='', **kwargs):
self.context = context
self.settings = settings
self.path = path
self.theme = theme
self.output_path = output_path
for arg, value in kwargs.items():
setattr(self, arg, value)
self.readers = Readers(self.settings, readers_cache_name)
# templates cache
self._templates = {}
self._templates_path = list(self.settings['THEME_TEMPLATES_OVERRIDES'])
theme_templates_path = os.path.expanduser(
os.path.join(self.theme, 'templates'))
self._templates_path.append(theme_templates_path)
theme_loader = FileSystemLoader(theme_templates_path)
simple_theme_path = os.path.dirname(os.path.abspath(__file__))
simple_loader = FileSystemLoader(
os.path.join(simple_theme_path, "themes", "simple", "templates"))
self.env = Environment(
loader=ChoiceLoader([
FileSystemLoader(self._templates_path),
simple_loader, # implicit inheritance
PrefixLoader({
'!simple': simple_loader,
'!theme': theme_loader
}) # explicit ones
]),
**self.settings['JINJA_ENVIRONMENT']
)
logger.debug('Template list: %s', self.env.list_templates())
# provide utils.strftime as a jinja filter
self.env.filters.update({'strftime': DateFormatter()})
# get custom Jinja filters from user settings
custom_filters = self.settings['JINJA_FILTERS']
self.env.filters.update(custom_filters)
# get custom Jinja globals from user settings
custom_globals = self.settings['JINJA_GLOBALS']
self.env.globals.update(custom_globals)
# get custom Jinja tests from user settings
custom_tests = self.settings['JINJA_TESTS']
self.env.tests.update(custom_tests)
signals.generator_init.send(self)
def get_template(self, name):
"""Return the template by name.
Use self.theme to get the templates to use, and return a list of
templates ready to use with Jinja2.
"""
if name not in self._templates:
for ext in self.settings['TEMPLATE_EXTENSIONS']:
try:
self._templates[name] = self.env.get_template(name + ext)
break
except TemplateNotFound:
continue
if name not in self._templates:
raise PelicanTemplateNotFound(
'[templates] unable to load {}[{}] from {}'.format(
name, ', '.join(self.settings['TEMPLATE_EXTENSIONS']),
self._templates_path))
return self._templates[name]
def _include_path(self, path, extensions=None):
"""Inclusion logic for .get_files(), returns True/False
:param path: the path which might be including
:param extensions: the list of allowed extensions, or False if all
extensions are allowed
"""
if extensions is None:
extensions = tuple(self.readers.extensions)
basename = os.path.basename(path)
# check IGNORE_FILES
ignores = self.settings['IGNORE_FILES']
if any(fnmatch.fnmatch(basename, ignore) for ignore in ignores):
return False
ext = os.path.splitext(basename)[1][1:]
if extensions is False or ext in extensions:
return True
return False
def get_files(self, paths, exclude=[], extensions=None):
"""Return a list of files to use, based on rules
:param paths: the list pf paths to search (relative to self.path)
:param exclude: the list of path to exclude
:param extensions: the list of allowed extensions (if False, all
extensions are allowed)
"""
# backward compatibility for older generators
if isinstance(paths, str):
paths = [paths]
# group the exclude dir names by parent path, for use with os.walk()
exclusions_by_dirpath = {}
for e in exclude:
parent_path, subdir = os.path.split(os.path.join(self.path, e))
exclusions_by_dirpath.setdefault(parent_path, set()).add(subdir)
files = set()
ignores = self.settings['IGNORE_FILES']
for path in paths:
# careful: os.path.join() will add a slash when path == ''.
root = os.path.join(self.path, path) if path else self.path
if os.path.isdir(root):
for dirpath, dirs, temp_files in os.walk(
root, topdown=True, followlinks=True):
excl = exclusions_by_dirpath.get(dirpath, ())
# We copy the `dirs` list as we will modify it in the loop:
for d in list(dirs):
if (d in excl or
any(fnmatch.fnmatch(d, ignore)
for ignore in ignores)):
if d in dirs:
dirs.remove(d)
reldir = os.path.relpath(dirpath, self.path)
for f in temp_files:
fp = os.path.join(reldir, f)
if self._include_path(fp, extensions):
files.add(fp)
elif os.path.exists(root) and self._include_path(path, extensions):
files.add(path) # can't walk non-directories
return files
def add_source_path(self, content, static=False):
"""Record a source file path that a Generator found and processed.
Store a reference to its Content object, for url lookups later.
"""
location = content.get_relative_source_path()
key = 'static_content' if static else 'generated_content'
self.context[key][location] = content
def _add_failed_source_path(self, path, static=False):
"""Record a source file path that a Generator failed to process.
(For example, one that was missing mandatory metadata.)
The path argument is expected to be relative to self.path.
"""
key = 'static_content' if static else 'generated_content'
self.context[key][posixize_path(os.path.normpath(path))] = None
def _is_potential_source_path(self, path, static=False):
"""Return True if path was supposed to be used as a source file.
(This includes all source files that have been found by generators
before this method is called, even if they failed to process.)
The path argument is expected to be relative to self.path.
"""
key = 'static_content' if static else 'generated_content'
return (posixize_path(os.path.normpath(path)) in self.context[key])
def add_static_links(self, content):
"""Add file links in content to context to be processed as Static
content.
"""
self.context['static_links'] |= content.get_static_links()
def _update_context(self, items):
"""Update the context with the given items from the currrent
processor.
"""
for item in items:
value = getattr(self, item)
if hasattr(value, 'items'):
value = list(value.items()) # py3k safeguard for iterators
self.context[item] = value
def __str__(self):
# return the name of the class for logging purposes
return self.__class__.__name__
class CachingGenerator(Generator, FileStampDataCacher):
'''Subclass of Generator and FileStampDataCacher classes
enables content caching, either at the generator or reader level
'''
def __init__(self, *args, **kwargs):
'''Initialize the generator, then set up caching
note the multiple inheritance structure
'''
cls_name = self.__class__.__name__
Generator.__init__(self, *args,
readers_cache_name=(cls_name + '-Readers'),
**kwargs)
cache_this_level = \
self.settings['CONTENT_CACHING_LAYER'] == 'generator'
caching_policy = cache_this_level and self.settings['CACHE_CONTENT']
load_policy = cache_this_level and self.settings['LOAD_CONTENT_CACHE']
FileStampDataCacher.__init__(self, self.settings, cls_name,
caching_policy, load_policy
)
def _get_file_stamp(self, filename):
'''Get filestamp for path relative to generator.path'''
filename = os.path.join(self.path, filename)
return super()._get_file_stamp(filename)
class _FileLoader(BaseLoader):
def __init__(self, path, basedir):
self.path = path
self.fullpath = os.path.join(basedir, path)
def get_source(self, environment, template):
if template != self.path or not os.path.exists(self.fullpath):
raise TemplateNotFound(template)
mtime = os.path.getmtime(self.fullpath)
with open(self.fullpath, encoding='utf-8') as f:
source = f.read()
return (source, self.fullpath,
lambda: mtime == os.path.getmtime(self.fullpath))
class TemplatePagesGenerator(Generator):
def generate_output(self, writer):
for source, dest in self.settings['TEMPLATE_PAGES'].items():
self.env.loader.loaders.insert(0, _FileLoader(source, self.path))
try:
template = self.env.get_template(source)
rurls = self.settings['RELATIVE_URLS']
writer.write_file(dest, template, self.context, rurls,
override_output=True, url='')
finally:
del self.env.loader.loaders[0]
class ArticlesGenerator(CachingGenerator):
"""Generate blog articles"""
def __init__(self, *args, **kwargs):
"""initialize properties"""
self.articles = [] # only articles in default language
self.translations = []
self.dates = {}
self.tags = defaultdict(list)
self.categories = defaultdict(list)
self.related_posts = []
self.authors = defaultdict(list)
self.drafts = [] # only drafts in default language
self.drafts_translations = []
super().__init__(*args, **kwargs)
signals.article_generator_init.send(self)
def generate_feeds(self, writer):
"""Generate the feeds from the current context, and output files."""
if self.settings.get('FEED_ATOM'):
writer.write_feed(
self.articles,
self.context,
self.settings['FEED_ATOM'],
self.settings.get('FEED_ATOM_URL', self.settings['FEED_ATOM'])
)
if self.settings.get('FEED_RSS'):
writer.write_feed(
self.articles,
self.context,
self.settings['FEED_RSS'],
self.settings.get('FEED_RSS_URL', self.settings['FEED_RSS']),
feed_type='rss'
)
if (self.settings.get('FEED_ALL_ATOM') or
self.settings.get('FEED_ALL_RSS')):
all_articles = list(self.articles)
for article in self.articles:
all_articles.extend(article.translations)
order_content(all_articles,
order_by=self.settings['ARTICLE_ORDER_BY'])
if self.settings.get('FEED_ALL_ATOM'):
writer.write_feed(
all_articles,
self.context,
self.settings['FEED_ALL_ATOM'],
self.settings.get('FEED_ALL_ATOM_URL',
self.settings['FEED_ALL_ATOM'])
)
if self.settings.get('FEED_ALL_RSS'):
writer.write_feed(
all_articles,
self.context,
self.settings['FEED_ALL_RSS'],
self.settings.get('FEED_ALL_RSS_URL',
self.settings['FEED_ALL_RSS']),
feed_type='rss'
)
for cat, arts in self.categories:
if self.settings.get('CATEGORY_FEED_ATOM'):
writer.write_feed(
arts,
self.context,
self.settings['CATEGORY_FEED_ATOM'].format(slug=cat.slug),
self.settings.get(
'CATEGORY_FEED_ATOM_URL',
self.settings['CATEGORY_FEED_ATOM']).format(
slug=cat.slug
),
feed_title=cat.name
)
if self.settings.get('CATEGORY_FEED_RSS'):
writer.write_feed(
arts,
self.context,
self.settings['CATEGORY_FEED_RSS'].format(slug=cat.slug),
self.settings.get(
'CATEGORY_FEED_RSS_URL',
self.settings['CATEGORY_FEED_RSS']).format(
slug=cat.slug
),
feed_title=cat.name,
feed_type='rss'
)
for auth, arts in self.authors:
if self.settings.get('AUTHOR_FEED_ATOM'):
writer.write_feed(
arts,
self.context,
self.settings['AUTHOR_FEED_ATOM'].format(slug=auth.slug),
self.settings.get(
'AUTHOR_FEED_ATOM_URL',
self.settings['AUTHOR_FEED_ATOM']
).format(slug=auth.slug),
feed_title=auth.name
)
if self.settings.get('AUTHOR_FEED_RSS'):
writer.write_feed(
arts,
self.context,
self.settings['AUTHOR_FEED_RSS'].format(slug=auth.slug),
self.settings.get(
'AUTHOR_FEED_RSS_URL',
self.settings['AUTHOR_FEED_RSS']
).format(slug=auth.slug),
feed_title=auth.name,
feed_type='rss'
)
if (self.settings.get('TAG_FEED_ATOM') or
self.settings.get('TAG_FEED_RSS')):
for tag, arts in self.tags.items():
if self.settings.get('TAG_FEED_ATOM'):
writer.write_feed(
arts,
self.context,
self.settings['TAG_FEED_ATOM'].format(slug=tag.slug),
self.settings.get(
'TAG_FEED_ATOM_URL',
self.settings['TAG_FEED_ATOM']
).format(slug=tag.slug),
feed_title=tag.name
)
if self.settings.get('TAG_FEED_RSS'):
writer.write_feed(
arts,
self.context,
self.settings['TAG_FEED_RSS'].format(slug=tag.slug),
self.settings.get(
'TAG_FEED_RSS_URL',
self.settings['TAG_FEED_RSS']
).format(slug=tag.slug),
feed_title=tag.name,
feed_type='rss'
)
if (self.settings.get('TRANSLATION_FEED_ATOM') or
self.settings.get('TRANSLATION_FEED_RSS')):
translations_feeds = defaultdict(list)
for article in chain(self.articles, self.translations):
translations_feeds[article.lang].append(article)
for lang, items in translations_feeds.items():
items = order_content(
items, order_by=self.settings['ARTICLE_ORDER_BY'])
if self.settings.get('TRANSLATION_FEED_ATOM'):
writer.write_feed(
items,
self.context,
self.settings['TRANSLATION_FEED_ATOM']
.format(lang=lang),
self.settings.get(
'TRANSLATION_FEED_ATOM_URL',
self.settings['TRANSLATION_FEED_ATOM']
).format(lang=lang),
)
if self.settings.get('TRANSLATION_FEED_RSS'):
writer.write_feed(
items,
self.context,
self.settings['TRANSLATION_FEED_RSS']
.format(lang=lang),
self.settings.get(
'TRANSLATION_FEED_RSS_URL',
self.settings['TRANSLATION_FEED_RSS']
).format(lang=lang),
feed_type='rss'
)
def generate_articles(self, write):
"""Generate the articles."""
for article in chain(self.translations, self.articles):
signals.article_generator_write_article.send(self, content=article)
write(article.save_as, self.get_template(article.template),
self.context, article=article, category=article.category,
override_output=hasattr(article, 'override_save_as'),
url=article.url, blog=True)
def generate_period_archives(self, write):
"""Generate per-year, per-month, and per-day archives."""
try:
template = self.get_template('period_archives')
except PelicanTemplateNotFound:
template = self.get_template('archives')
period_save_as = {
'year': self.settings['YEAR_ARCHIVE_SAVE_AS'],
'month': self.settings['MONTH_ARCHIVE_SAVE_AS'],
'day': self.settings['DAY_ARCHIVE_SAVE_AS'],
}
period_url = {
'year': self.settings['YEAR_ARCHIVE_URL'],
'month': self.settings['MONTH_ARCHIVE_URL'],
'day': self.settings['DAY_ARCHIVE_URL'],
}
period_date_key = {
'year': attrgetter('date.year'),
'month': attrgetter('date.year', 'date.month'),
'day': attrgetter('date.year', 'date.month', 'date.day')
}
def _generate_period_archives(dates, key, save_as_fmt, url_fmt):
"""Generate period archives from `dates`, grouped by
`key` and written to `save_as`.
"""
# `dates` is already sorted by date
for _period, group in groupby(dates, key=key):
archive = list(group)
articles = [a for a in self.articles if a in archive]
# arbitrarily grab the first date so that the usual
# format string syntax can be used for specifying the
# period archive dates
date = archive[0].date
save_as = save_as_fmt.format(date=date)
url = url_fmt.format(date=date)
context = self.context.copy()
if key == period_date_key['year']:
context["period"] = (_period,)
else:
month_name = calendar.month_name[_period[1]]
if key == period_date_key['month']:
context["period"] = (_period[0],
month_name)
else:
context["period"] = (_period[0],
month_name,
_period[2])
write(save_as, template, context, articles=articles,
dates=archive, template_name='period_archives',
blog=True, url=url, all_articles=self.articles)
for period in 'year', 'month', 'day':
save_as = period_save_as[period]
url = period_url[period]
if save_as:
key = period_date_key[period]
_generate_period_archives(self.dates, key, save_as, url)
def generate_direct_templates(self, write):
"""Generate direct templates pages"""
for template in self.settings['DIRECT_TEMPLATES']:
save_as = self.settings.get("%s_SAVE_AS" % template.upper(),
'%s.html' % template)
url = self.settings.get("%s_URL" % template.upper(),
'%s.html' % template)
if not save_as:
continue
write(save_as, self.get_template(template), self.context,
articles=self.articles, dates=self.dates, blog=True,
template_name=template,
page_name=os.path.splitext(save_as)[0], url=url)
def generate_tags(self, write):
"""Generate Tags pages."""
tag_template = self.get_template('tag')
for tag, articles in self.tags.items():
dates = [article for article in self.dates if article in articles]
write(tag.save_as, tag_template, self.context, tag=tag,
url=tag.url, articles=articles, dates=dates,
template_name='tag', blog=True, page_name=tag.page_name,
all_articles=self.articles)
def generate_categories(self, write):
"""Generate category pages."""
category_template = self.get_template('category')
for cat, articles in self.categories:
dates = [article for article in self.dates if article in articles]
write(cat.save_as, category_template, self.context, url=cat.url,
category=cat, articles=articles, dates=dates,
template_name='category', blog=True, page_name=cat.page_name,
all_articles=self.articles)
def generate_authors(self, write):
"""Generate Author pages."""
author_template = self.get_template('author')
for aut, articles in self.authors:
dates = [article for article in self.dates if article in articles]
write(aut.save_as, author_template, self.context,
url=aut.url, author=aut, articles=articles, dates=dates,
template_name='author', blog=True,
page_name=aut.page_name, all_articles=self.articles)
def generate_drafts(self, write):
"""Generate drafts pages."""
for draft in chain(self.drafts_translations, self.drafts):
write(draft.save_as, self.get_template(draft.template),
self.context, article=draft, category=draft.category,
override_output=hasattr(draft, 'override_save_as'),
blog=True, all_articles=self.articles, url=draft.url)
def generate_pages(self, writer):
"""Generate the pages on the disk"""
write = partial(writer.write_file,
relative_urls=self.settings['RELATIVE_URLS'])
# to minimize the number of relative path stuff modification
# in writer, articles pass first
self.generate_articles(write)
self.generate_period_archives(write)
self.generate_direct_templates(write)
# and subfolders after that
self.generate_tags(write)
self.generate_categories(write)
self.generate_authors(write)
self.generate_drafts(write)
def generate_context(self):
"""Add the articles into the shared context"""
all_articles = []
all_drafts = []
for f in self.get_files(
self.settings['ARTICLE_PATHS'],
exclude=self.settings['ARTICLE_EXCLUDES']):
article = self.get_cached_data(f, None)
if article is None:
try:
article = self.readers.read_file(
base_path=self.path, path=f, content_class=Article,
context=self.context,
preread_signal=signals.article_generator_preread,
preread_sender=self,
context_signal=signals.article_generator_context,
context_sender=self)
except Exception as e:
logger.error(
'Could not process %s\n%s', f, e,
exc_info=self.settings.get('DEBUG', False))
self._add_failed_source_path(f)
continue
if not article.is_valid():
self._add_failed_source_path(f)
continue
self.cache_data(f, article)
if article.status == "published":
all_articles.append(article)
elif article.status == "draft":
all_drafts.append(article)
self.add_source_path(article)
self.add_static_links(article)
def _process(arts):
origs, translations = process_translations(
arts, translation_id=self.settings['ARTICLE_TRANSLATION_ID'])
origs = order_content(origs, self.settings['ARTICLE_ORDER_BY'])
return origs, translations
self.articles, self.translations = _process(all_articles)
self.drafts, self.drafts_translations = _process(all_drafts)
signals.article_generator_pretaxonomy.send(self)
for article in self.articles:
# only main articles are listed in categories and tags
# not translations
self.categories[article.category].append(article)
if hasattr(article, 'tags'):
for tag in article.tags:
self.tags[tag].append(article)
for author in getattr(article, 'authors', []):
self.authors[author].append(article)
self.dates = list(self.articles)
self.dates.sort(key=attrgetter('date'),
reverse=self.context['NEWEST_FIRST_ARCHIVES'])
# and generate the output :)
# order the categories per name
self.categories = list(self.categories.items())
self.categories.sort(
reverse=self.settings['REVERSE_CATEGORY_ORDER'])
self.authors = list(self.authors.items())
self.authors.sort()
self._update_context(('articles', 'dates', 'tags', 'categories',
'authors', 'related_posts', 'drafts'))
self.save_cache()
self.readers.save_cache()
signals.article_generator_finalized.send(self)
def generate_output(self, writer):
self.generate_feeds(writer)
self.generate_pages(writer)
signals.article_writer_finalized.send(self, writer=writer)
def refresh_metadata_intersite_links(self):
for e in chain(self.articles,
self.translations,
self.drafts,
self.drafts_translations):
if hasattr(e, 'refresh_metadata_intersite_links'):
e.refresh_metadata_intersite_links()
class PagesGenerator(CachingGenerator):
"""Generate pages"""
def __init__(self, *args, **kwargs):
self.pages = []
self.translations = []
self.hidden_pages = []
self.hidden_translations = []
self.draft_pages = []
self.draft_translations = []
super().__init__(*args, **kwargs)
signals.page_generator_init.send(self)
def generate_context(self):
all_pages = []
hidden_pages = []
draft_pages = []
for f in self.get_files(
self.settings['PAGE_PATHS'],
exclude=self.settings['PAGE_EXCLUDES']):
page = self.get_cached_data(f, None)
if page is None:
try:
page = self.readers.read_file(
base_path=self.path, path=f, content_class=Page,
context=self.context,
preread_signal=signals.page_generator_preread,
preread_sender=self,
context_signal=signals.page_generator_context,
context_sender=self)
except Exception as e:
logger.error(
'Could not process %s\n%s', f, e,
exc_info=self.settings.get('DEBUG', False))
self._add_failed_source_path(f)
continue
if not page.is_valid():
self._add_failed_source_path(f)
continue
self.cache_data(f, page)
if page.status == "published":
all_pages.append(page)
elif page.status == "hidden":
hidden_pages.append(page)
elif page.status == "draft":
draft_pages.append(page)
self.add_source_path(page)
self.add_static_links(page)
def _process(pages):
origs, translations = process_translations(
pages, translation_id=self.settings['PAGE_TRANSLATION_ID'])
origs = order_content(origs, self.settings['PAGE_ORDER_BY'])
return origs, translations
self.pages, self.translations = _process(all_pages)
self.hidden_pages, self.hidden_translations = _process(hidden_pages)
self.draft_pages, self.draft_translations = _process(draft_pages)
self._update_context(('pages', 'hidden_pages', 'draft_pages'))
self.save_cache()
self.readers.save_cache()
signals.page_generator_finalized.send(self)
def generate_output(self, writer):
for page in chain(self.translations, self.pages,
self.hidden_translations, self.hidden_pages,
self.draft_translations, self.draft_pages):
signals.page_generator_write_page.send(self, content=page)
writer.write_file(
page.save_as, self.get_template(page.template),
self.context, page=page,
relative_urls=self.settings['RELATIVE_URLS'],
override_output=hasattr(page, 'override_save_as'),
url=page.url)
signals.page_writer_finalized.send(self, writer=writer)
def refresh_metadata_intersite_links(self):
for e in chain(self.pages,
self.hidden_pages,
self.hidden_translations,
self.draft_pages,
self.draft_translations):
if hasattr(e, 'refresh_metadata_intersite_links'):
e.refresh_metadata_intersite_links()
class StaticGenerator(Generator):
"""copy static paths (what you want to copy, like images, medias etc.
to output"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fallback_to_symlinks = False
signals.static_generator_init.send(self)
def generate_context(self):
self.staticfiles = []
linked_files = set(self.context['static_links'])
found_files = self.get_files(self.settings['STATIC_PATHS'],
exclude=self.settings['STATIC_EXCLUDES'],
extensions=False)
for f in linked_files | found_files:
# skip content source files unless the user explicitly wants them
if self.settings['STATIC_EXCLUDE_SOURCES']:
if self._is_potential_source_path(f):
continue
static = self.readers.read_file(
base_path=self.path, path=f, content_class=Static,
fmt='static', context=self.context,
preread_signal=signals.static_generator_preread,
preread_sender=self,
context_signal=signals.static_generator_context,
context_sender=self)
self.staticfiles.append(static)
self.add_source_path(static, static=True)
self._update_context(('staticfiles',))
signals.static_generator_finalized.send(self)
def generate_output(self, writer):
self._copy_paths(self.settings['THEME_STATIC_PATHS'], self.theme,
self.settings['THEME_STATIC_DIR'], self.output_path,
os.curdir)
for sc in self.context['staticfiles']:
if self._file_update_required(sc):
self._link_or_copy_staticfile(sc)
else:
logger.debug('%s is up to date, not copying', sc.source_path)
def _copy_paths(self, paths, source, destination, output_path,
final_path=None):
"""Copy all the paths from source to destination"""
for path in paths:
source_path = os.path.join(source, path)
if final_path:
if os.path.isfile(source_path):
destination_path = os.path.join(output_path, destination,
final_path,
os.path.basename(path))
else:
destination_path = os.path.join(output_path, destination,
final_path)
else:
destination_path = os.path.join(output_path, destination, path)
copy(source_path, destination_path,
self.settings['IGNORE_FILES'])
def _file_update_required(self, staticfile):
source_path = os.path.join(self.path, staticfile.source_path)
save_as = os.path.join(self.output_path, staticfile.save_as)
if not os.path.exists(save_as):
return True
elif (self.settings['STATIC_CREATE_LINKS'] and
os.path.samefile(source_path, save_as)):
return False
elif (self.settings['STATIC_CREATE_LINKS'] and
os.path.realpath(save_as) == source_path):
return False
elif not self.settings['STATIC_CHECK_IF_MODIFIED']:
return True
else:
return self._source_is_newer(staticfile)
def _source_is_newer(self, staticfile):
source_path = os.path.join(self.path, staticfile.source_path)
save_as = os.path.join(self.output_path, staticfile.save_as)
s_mtime = os.path.getmtime(source_path)
d_mtime = os.path.getmtime(save_as)
return s_mtime - d_mtime > 0.000001
def _link_or_copy_staticfile(self, sc):
if self.settings['STATIC_CREATE_LINKS']:
self._link_staticfile(sc)
else:
self._copy_staticfile(sc)
def _copy_staticfile(self, sc):
source_path = os.path.join(self.path, sc.source_path)
save_as = os.path.join(self.output_path, sc.save_as)
self._mkdir(os.path.dirname(save_as))
copy(source_path, save_as)
logger.info('Copying %s to %s', sc.source_path, sc.save_as)
def _link_staticfile(self, sc):
source_path = os.path.join(self.path, sc.source_path)
save_as = os.path.join(self.output_path, sc.save_as)
self._mkdir(os.path.dirname(save_as))
try:
if os.path.lexists(save_as):
os.unlink(save_as)
logger.info('Linking %s and %s', sc.source_path, sc.save_as)
if self.fallback_to_symlinks:
os.symlink(source_path, save_as)
else:
os.link(source_path, save_as)
except OSError as err:
if err.errno == errno.EXDEV: # 18: Invalid cross-device link
logger.debug(
"Cross-device links not valid. "
"Creating symbolic links instead."
)
self.fallback_to_symlinks = True
self._link_staticfile(sc)
else:
raise err
def _mkdir(self, path):
if os.path.lexists(path) and not os.path.isdir(path):
os.unlink(path)
mkdir_p(path)
class SourceFileGenerator(Generator):
def generate_context(self):
self.output_extension = self.settings['OUTPUT_SOURCES_EXTENSION']
def _create_source(self, obj):
output_path, _ = os.path.splitext(obj.save_as)
dest = os.path.join(self.output_path,
output_path + self.output_extension)
copy(obj.source_path, dest)
def generate_output(self, writer=None):
logger.info('Generating source files...')
for obj in chain(self.context['articles'], self.context['pages']):
self._create_source(obj)
for obj_trans in obj.translations:
self._create_source(obj_trans)