"""Runner of rstcheck_core."""
import logging
import multiprocessing
import os
import pathlib
import re
import sys
import typing as t
from . import _sphinx, checker, config, types
logger = logging.getLogger(__name__)
[docs]class RstcheckMainRunner:
"""Main runner of rstcheck_core."""
def __init__(
self,
check_paths: t.List[pathlib.Path],
rstcheck_config: config.RstcheckConfig,
overwrite_config: bool = True,
) -> None:
"""Initialize the :py:class:`RstcheckMainRunner` with a base config.
:param check_paths: Files to check.
:param rstcheck_config: Base configuration config from e.g. the CLI.
:param overwrite_config: If file config overwrites current config; defaults to True
"""
self.config = rstcheck_config
self.overwrite_config = overwrite_config
if rstcheck_config.config_path:
self.load_config_file(
rstcheck_config.config_path, rstcheck_config.warn_unknown_settings or False
)
self.check_paths = check_paths
self._files_to_check: t.List[pathlib.Path] = []
self._nonexisting_paths: t.List[pathlib.Path] = []
self.update_file_list()
pool_size = multiprocessing.cpu_count()
# NOTE: Work around https://bugs.python.org/issue45077
self._pool_size = pool_size if sys.platform != "win32" else min(pool_size, 61)
self.errors: t.List[types.LintError] = []
@property
def files_to_check(self) -> t.List[pathlib.Path]:
"""List of files to check.
This list is updated via the :py:meth:`RstcheckMainRunner.update_file_list` method.
"""
return self._files_to_check
@property
def nonexisting_paths(self) -> t.List[pathlib.Path]:
"""List of paths which do not exist.
This list is updated via the :py:meth:`RstcheckMainRunner.update_file_list` method.
"""
return self._nonexisting_paths
[docs] def load_config_file(
self, config_path: pathlib.Path, warn_unknown_settings: bool = False
) -> None:
"""Load config from file and merge with current config.
If the loaded file config overwrites the current config depends on the
``self.overwrite_config`` attribute set on initialization.
:param config_path: Path to config file; can be directory or file
:param warn_unknown_settings: If a warning should be logged for unknown settings in config
file;
defaults to :py:obj:`False`
"""
logger.info(f"Load config file for main runner: '{config_path}'.")
file_config = config.load_config_file_from_path(
config_path, warn_unknown_settings=warn_unknown_settings
)
if file_config is None:
logger.warning("Config file was empty or not found.")
return
logger.debug(
"Merging config from file into main config. "
f"File config is dominant: {self.overwrite_config}"
)
self.config = config.merge_configs(
self.config, file_config, config_add_is_dominant=self.overwrite_config
)
[docs] def update_file_list(self) -> None: # noqa: CCR001
"""Update file path list with paths specified on initialization.
Uses paths from ``RstcheckMainRunner.check_paths``, resolves all file paths and
saves them in :py:attr:`RstcheckMainRunner.files_to_check`.
If a given path does not exist, it is filtered out and saved in
:py:attr:`RstcheckMainRunner.files_to_check`.
Clear the current file list. Then get the file and directory paths specified with
``self.check_paths`` attribute set on initialization and search them for rst files
to check. Add those files to the file list.
""" # noqa: Q440,Q441,Q447,Q449
logger.debug("Updating list of files to check.")
paths = list(self.check_paths)
self._files_to_check = []
if len(paths) == 1 and paths[0].name == "-":
logger.info("'-' detected. Using stdin for input.'")
self._files_to_check.append(paths[0])
return
paths = self._filter_nonexisting_paths(paths)
checkable_rst_file: t.Callable[[pathlib.Path], bool] = (
lambda f: f.is_file() and not f.name.startswith(".") and f.suffix.casefold() == ".rst"
)
while paths:
path = paths.pop(0)
resolved_path = path.resolve()
if self.config.recursive and resolved_path.is_dir():
for root, directories, children in os.walk(path):
root_path = pathlib.Path(root)
paths += [
root_path / f
for f in children
if checkable_rst_file((root_path / f).resolve())
]
directories[:] = [d for d in directories if not d.startswith(".")]
continue
if checkable_rst_file(resolved_path):
self._files_to_check.append(path)
def _filter_nonexisting_paths(self, paths: t.List[pathlib.Path]) -> t.List[pathlib.Path]:
"""Filter nonexisting paths out.
If recursive is not active only files are allowed, else directories are also allowed.
:param paths: List of paths to filter
:return: Filtered path list
"""
self._nonexisting_paths = []
_paths = list(paths)
for path in _paths:
resolved_path = path.resolve()
if resolved_path.is_file():
continue
if self.config.recursive and resolved_path.is_dir():
continue
_paths.remove(path)
self._nonexisting_paths.append(path)
if self.config.recursive:
logger.warning(
f"Path does not exist or is neither a file nor a directory: '{path}'."
)
continue
logger.warning(f"Path does not exist or is not a file: '{path}'.")
return _paths
def _run_checks_sync(self) -> t.List[t.List[types.LintError]]:
"""Check all files from the file list syncronously and return the errors.
:return: List of lists of errors found per file
"""
logger.debug("Runnning checks synchronically.")
with _sphinx.load_sphinx_if_available():
results = [
checker.check_file(file, self.config, self.overwrite_config)
for file in self._files_to_check
]
return results
def _run_checks_parallel(self) -> t.List[t.List[types.LintError]]:
"""Check all files from the file list in parallel and return the errors.
:return: List of lists of errors found per file
"""
logger.debug(f"Runnning checks in parallel with pool size of {self._pool_size}.")
with _sphinx.load_sphinx_if_available(), multiprocessing.Pool(self._pool_size) as pool:
results = pool.starmap(
checker.check_file,
[(file, self.config, self.overwrite_config) for file in self._files_to_check],
)
return results
def _update_results(self, results: t.List[t.List[types.LintError]]) -> None:
"""Take results and update error cache.
Result normally come from :py:meth:`RstcheckMainRunner._run_checks_sync` or
:py:meth:`RstcheckMainRunner._run_checks_parallel`.
:param results: List of lists of errors found
"""
self.errors = []
for errors in results:
self.errors += errors
[docs] def check(self) -> None:
"""Check all files in the file list and save the errors.
Multiple files are run in parallel.
A new call overwrite the old cached errors.
"""
logger.info("Run checks for all files.")
results = (
self._run_checks_parallel()
if len(self._files_to_check) > 1
else self._run_checks_sync()
)
self._update_results(results)
[docs] def print_result(self, output_file: t.Optional[t.TextIO] = None) -> int:
"""Print all cached error messages and return exit code.
:param output_file: file to print to; defaults to sys.stderr (if ``None``)
:return: exit code 0 if no error is printed; 1 if any error is printed
"""
if len(self.errors) == 0 and len(self._nonexisting_paths) == 0:
print("Success! No issues detected.", file=output_file or sys.stdout)
return 0
err_msg_regex = re.compile(r"\([A-Z]+/[0-9]+\)")
for error in self.errors:
err_msg = error["message"]
if not err_msg_regex.match(err_msg):
err_msg = "(ERROR/3) " + err_msg
message = f"{error['source_origin']}:{error['line_number']}: {err_msg}"
print(message, file=output_file or sys.stderr)
print("Error! Issues detected.", file=output_file or sys.stderr)
return 1
[docs] def run(self) -> int: # pragma: no cover
"""Run checks, print error messages and return the result.
:return: exit code 0 if no error is printed; 1 if any error is printed
"""
logger.info("Run checks and print results.")
self.check()
return self.print_result()