Source code for bugwarrior.config.schema

import logging
import os
from pathlib import Path
import re
import sys
import typing
from typing import Annotated, Any, Generic, Literal

import pydantic
from pydantic import (
    AfterValidator,
    AnyUrl,
    BeforeValidator,
    ConfigDict,
    Field,
    TypeAdapter,
    ValidationInfo,
    computed_field,
    field_validator,
    model_validator,
)
from pydantic_core import PydanticCustomError
import taskw

from bugwarrior.collect import get_service

from .data import BugwarriorData, get_data_path

log = logging.getLogger(__name__)


def validate_url(url: str):
    return str(AnyUrl(url)).rstrip("/")


StrippedTrailingSlashUrl = Annotated[str, BeforeValidator(validate_url)]


def validate_no_scheme_url(value: str) -> str:
    if "://" in value:
        scheme = value.split("://")[0]
        raise PydanticCustomError(
            "url_scheme_not_allowed",
            "URL should not include scheme ('{scheme}')",
            {"scheme": scheme},
        )

    return value.rstrip("/")


NoSchemeUrl = Annotated[str, BeforeValidator(validate_no_scheme_url)]


def parse_config_list(value: str | list[str]) -> list[str]:
    """Cast ini string to a list of strings."""
    if isinstance(value, str):
        return [
            item.strip()
            for item in re.split(r",(?![^{]*})", value.strip())
            if item != ""
        ]
    return value


ConfigList = Annotated[list[str], BeforeValidator(parse_config_list)]


ExpandedPath = Annotated[
    Path,
    BeforeValidator(os.path.expandvars),
    AfterValidator(lambda path: path.expanduser()),
]


def _validate_file_exists(path: Path) -> Path:
    """Validate that path points to an existing file."""
    resolved = path.resolve()
    if not resolved.is_file():
        raise PydanticCustomError(
            "file_not_found",
            "Unable to find taskrc file at {path}.",
            {"path": str(resolved)},
        )
    return resolved


TaskrcPath = Annotated[ExpandedPath, AfterValidator(_validate_file_exists)]


def get_default_taskrc() -> Path:
    """Mimic taskwarrior's logic for finding taskrc."""
    # Allow $TASKRC override.
    env_taskrc = os.getenv("TASKRC")
    if env_taskrc:
        path = Path(os.path.expandvars(env_taskrc)).expanduser()
        return _validate_file_exists(path)

    # Default to ~/.taskrc
    taskrc = Path.home() / ".taskrc"
    if taskrc.is_file():
        return taskrc

    # If no ~/.taskrc, use $XDG_CONFIG_HOME/task/taskrc if exists, or
    # ~/.config/task/taskrc if $XDG_CONFIG_HOME is unset
    xdg_config_home = os.getenv("XDG_CONFIG_HOME")
    if xdg_config_home:
        xdg_config_taskrc = Path(xdg_config_home) / "task" / "taskrc"
        if xdg_config_taskrc.is_file():
            return xdg_config_taskrc
    else:
        dotconfig_taskrc = Path.home() / ".config" / "task" / "taskrc"
        if dotconfig_taskrc.is_file():
            return dotconfig_taskrc.expanduser()

    raise OSError("Unable to find taskrc file. (Try running `task`.)")


T = typing.TypeVar("T")


def _validate_unsupported(value: T) -> T:
    if value:
        raise ValueError("Option is unsupported by service.")
    return value


class UnsupportedOption(Generic[T]):
    def __class_getitem__(cls, item: type) -> Any:
        return Annotated[item, AfterValidator(_validate_unsupported)]


class BaseConfig(pydantic.BaseModel):
    model_config = ConfigDict(frozen=True, extra="forbid", validate_default=True)


[docs] class MainSectionConfig(BaseConfig): """The :ref:`common_configuration:Main Section` configuration, plus computed attributes:""" model_config = ConfigDict(arbitrary_types_allowed=True) # required targets: ConfigList # added during configuration loading #: Interactive status. interactive: bool = False @computed_field @property def data(self) -> BugwarriorData: """Local data storage.""" return BugwarriorData(get_data_path(self.taskrc)) # optional taskrc: TaskrcPath = Field(default_factory=get_default_taskrc) shorten: bool = False inline_links: bool = True annotation_links: bool = False annotation_comments: bool = True annotation_newlines: bool = False annotation_length: typing.Optional[int] = 45 description_length: typing.Optional[int] = 35 merge_annotations: bool = True merge_tags: bool = True replace_tags: bool = False static_tags: ConfigList = [] static_fields: ConfigList = ["priority"] log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "DISABLED"] = ( "INFO" ) log_file: typing.Optional[ExpandedPath] = None
class Hooks(BaseConfig): pre_import: ConfigList = [] class Notifications(BaseConfig): notifications: bool = False backend: typing.Optional[Literal["gobject", "growlnotify", "applescript"]] = None finished_querying_sticky: bool = True task_crud_sticky: bool = True only_on_new_tasks: bool = False class SchemaBase(BaseConfig): # Allow extra top-level sections so all targets don't have to be selected. model_config = ConfigDict(extra="ignore") hooks: Hooks = Hooks() notifications: Notifications = Notifications() def get_validation_error_enhanced_messages( error: pydantic.ValidationError, ) -> list[str]: errors = [] for _error in error.errors(): msg = _error["msg"] if _error["type"] == "extra_forbidden": msg = "unrecognized option" loc = _error["loc"] loc_len = len(loc) if loc_len == 1 or (loc_len > 1 and loc[1] == "__root__"): formatted_error_loc = f"[{loc[0]}]" elif loc_len == 2: formatted_error_loc = f"[{loc[0]}]\n{loc[1]}" if _error["type"] != "missing": formatted_error_loc = f"{formatted_error_loc} = {_error['input']}" else: raise ValueError( "Configuration should not be nested more than two layers deep." ) errors.append(f"{formatted_error_loc} <- {msg}\n") return errors def raise_validation_error(msg, config_path, no_errors=1) -> typing.NoReturn: log.error( ("Validation error" if no_errors == 1 else f"{no_errors} validation errors") + f" found in {config_path}\n" f"See https://bugwarrior.readthedocs.io\n\n{msg}" ) sys.exit(1) def get_target_validator(targets): @model_validator(mode='before') @classmethod def compute_target(cls, values): for target in targets: values[target]['target'] = target return values return compute_target def validate_config(config: dict, main_section: str, config_path: str) -> dict: # Pre-validate the minimum requirements to build our pydantic models. try: main = config[main_section] except KeyError: raise_validation_error(f"No section: '{main_section}'", config_path) try: targets = TypeAdapter(ConfigList).validate_python(main['targets']) except KeyError: raise_validation_error( f"No option 'targets' in section: '{main_section}'", config_path ) try: configmap = {target: config[target] for target in targets} except KeyError as e: raise_validation_error(f"No section: '{e.args[0]}'", config_path) servicemap = {} for target, serviceconfig in configmap.items(): try: servicemap[target] = serviceconfig['service'] except KeyError: raise_validation_error( f"No option 'service' in section: '{target}'", config_path ) # Construct Service Models target_schemas = { target: (get_service(service).CONFIG_SCHEMA, ...) for target, service in servicemap.items() } # Construct Flavors flavor_schemas = { section: (MainSectionConfig, ...) for section in config.keys() if section.startswith('flavor.') } # Construct Validation Model bugwarrior_config_model = pydantic.create_model( 'bugwarriorrc', __base__=SchemaBase, __validators__={'compute_target': get_target_validator(targets)}, general=(MainSectionConfig, ...), **flavor_schemas, **target_schemas, ) # Validate try: # Convert top-level model to dict since target names are dynamic and # a bunch of calls to getattr(config, target) inhibits readability. return dict(bugwarrior_config_model.model_validate(config)) except pydantic.ValidationError as e: errors = get_validation_error_enhanced_messages(e) raise_validation_error("\n".join(errors), config_path, no_errors=len(errors)) # Dynamically add template fields to model. _ServiceConfig = pydantic.create_model( "_ServiceConfig", __base__=BaseConfig, **{ f"{key}_template": (typing.Optional[str], None) for key in taskw.task.Task.FIELDS }, )
[docs] class ServiceConfig(_ServiceConfig): """Pydantic_ base class for service configurations. .. _Pydantic: https://docs.pydantic.dev/latest/ """ # Added during validation (computed field) templates: dict = {} target: typing.Optional[str] = None # Optional fields shared by all services. only_if_assigned: str = "" also_unassigned: bool = False default_priority: Literal["", "L", "M", "H"] = "M" add_tags: ConfigList = [] static_fields: ConfigList = [] @model_validator(mode="before") @classmethod def compute_templates(cls, values): """Get any defined templates for configuration values. Users can override the value of any Taskwarrior field using this feature on a per-key basis. The key should be the name of the field to you would like to configure the value of, followed by '_template', and the value should be a Jinja template generating the field's value. As context variables, all fields on the taskwarrior record are available. For example, to prefix the returned project name for tickets returned by a service with 'workproject_', you could add an entry reading: project_template = workproject_{{project}} Or, if you'd simply like to override the returned project name for all tickets incoming from a specific service, you could add an entry like: project_template = myprojectname The above would cause all issues to receive a project name of 'myprojectname', regardless of what the project name of the generated issue was. """ templates = {} for key in taskw.task.Task.FIELDS.keys(): template = values.get(f'{key}_template') if template is not None: templates[key] = template values["templates"] = templates return values @field_validator('include_merge_requests', mode='after', check_fields=False) @classmethod def deprecate_filter_merge_requests(cls, value, info: ValidationInfo): if not hasattr(cls, '_DEPRECATE_FILTER_MERGE_REQUESTS'): return value filter_mr = info.data.get('filter_merge_requests', 'Undefined') if filter_mr != 'Undefined': if value != 'Undefined': raise ValueError( 'filter_merge_requests and include_merge_requests are incompatible.' ) log.warning( 'filter_merge_requests is deprecated in favor of include_merge_requests' ) return not filter_mr elif value == 'Undefined': return True return value @field_validator('project_name', mode='after', check_fields=False) @classmethod def deprecate_project_name(cls, value): if hasattr(cls, '_DEPRECATE_PROJECT_NAME'): if value != '': log.warning('project_name is deprecated in favor of project_template') return value