Source code for bugwarrior.config.schema

import logging
import os
from pathlib import Path
import re
import typing
from typing import Annotated, Any, Literal

import pydantic
from pydantic import (
    AfterValidator,
    AnyUrl,
    BeforeValidator,
    ConfigDict,
    Field,
    ValidationInfo,
    computed_field,
    field_validator,
    model_validator,
)
from pydantic_core import PydanticCustomError
import taskw
import taskw.task

from .data import BugwarriorData, get_data_path

log = logging.getLogger(__name__)

Priority = Literal['', 'L', 'M', 'H']


def validate_url(url: str) -> str:
    return str(AnyUrl(url)).rstrip("/")


StrippedTrailingSlashUrl = Annotated[str, BeforeValidator(validate_url)]


def validate_no_scheme_url(value: str) -> str:
    if "://" in value:
        scheme = value.split("://")[0]
        raise PydanticCustomError(
            "url_scheme_not_allowed",
            "URL should not include scheme ('{scheme}')",
            {"scheme": scheme},
        )

    return value.rstrip("/")


NoSchemeUrl = Annotated[str, BeforeValidator(validate_no_scheme_url)]


def parse_config_list(value: str | list[str]) -> list[str]:
    """Cast ini string to a list of strings."""
    if isinstance(value, str):
        return [
            item.strip()
            for item in re.split(r",(?![^{]*})", value.strip())
            if item != ""
        ]
    return value


ConfigList = Annotated[list[str], BeforeValidator(parse_config_list)]


ExpandedPath = Annotated[
    Path,
    BeforeValidator(os.path.expandvars),
    AfterValidator(lambda path: path.expanduser()),
]


def _validate_file_exists(path: Path) -> Path:
    """Validate that path points to an existing file."""
    resolved = path.resolve()
    if not resolved.is_file():
        raise PydanticCustomError(
            "file_not_found",
            "Unable to find taskrc file at {path}.",
            {"path": str(resolved)},
        )
    return resolved


TaskrcPath = Annotated[ExpandedPath, AfterValidator(_validate_file_exists)]


def get_default_taskrc() -> Path:
    """Mimic taskwarrior's logic for finding taskrc."""
    # Allow $TASKRC override.
    env_taskrc = os.getenv("TASKRC")
    if env_taskrc:
        path = Path(os.path.expandvars(env_taskrc)).expanduser()
        return _validate_file_exists(path)

    # Default to ~/.taskrc
    taskrc = Path.home() / ".taskrc"
    if taskrc.is_file():
        return taskrc

    # If no ~/.taskrc, use $XDG_CONFIG_HOME/task/taskrc if exists, or
    # ~/.config/task/taskrc if $XDG_CONFIG_HOME is unset
    xdg_config_home = os.getenv("XDG_CONFIG_HOME")
    if xdg_config_home:
        xdg_config_taskrc = Path(xdg_config_home) / "task" / "taskrc"
        if xdg_config_taskrc.is_file():
            return xdg_config_taskrc
    else:
        dotconfig_taskrc = Path.home() / ".config" / "task" / "taskrc"
        if dotconfig_taskrc.is_file():
            return dotconfig_taskrc.expanduser()

    raise OSError("Unable to find taskrc file. (Try running `task`.)")


T = typing.TypeVar("T")


def _validate_unsupported(value: T) -> T:
    if value:
        raise ValueError("Option is unsupported by service.")
    return value


UnsupportedOption = Annotated[T, AfterValidator(_validate_unsupported)]


class BaseConfig(pydantic.BaseModel):
    model_config = ConfigDict(frozen=True, extra="forbid", validate_default=True)


[docs] class MainSectionConfig(BaseConfig): """The :ref:`common_configuration:Main Section` configuration, plus computed attributes:""" model_config = ConfigDict(arbitrary_types_allowed=True) # required targets: ConfigList #: *DEPRECATED* interactive: bool = True @computed_field @property def data(self) -> BugwarriorData: """Local data storage.""" return BugwarriorData(get_data_path(self.taskrc)) # optional taskrc: TaskrcPath = Field(default_factory=get_default_taskrc) shorten: bool = False inline_links: bool = True annotation_links: bool = False annotation_comments: bool = True annotation_newlines: bool = False annotation_length: typing.Optional[int] = 45 description_length: typing.Optional[int] = 35 merge_annotations: bool = True merge_tags: bool = True replace_tags: bool = False static_tags: ConfigList = [] static_fields: ConfigList = ["priority"] log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "DISABLED"] = ( "INFO" ) log_file: typing.Optional[ExpandedPath] = None
class Hooks(BaseConfig): pre_import: ConfigList = [] class Notifications(BaseConfig): notifications: bool = False backend: typing.Optional[Literal["gobject", "growlnotify", "applescript"]] = None finished_querying_sticky: bool = True task_crud_sticky: bool = True only_on_new_tasks: bool = False # Dynamically add template fields to model. _ServiceConfig = pydantic.create_model( # type: ignore[ty:no-matching-overload] "_ServiceConfig", __base__=BaseConfig, **{ f"{key}_template": (typing.Optional[str], None) for key in taskw.task.Task.FIELDS }, )
[docs] class ServiceConfig(_ServiceConfig): """Pydantic_ base class for service configurations. .. _Pydantic: https://docs.pydantic.dev/latest/ """ # Added before validation (computed field) service: str target: str # Added during validation (computed field) templates: dict[str, str] = {} # Optional fields shared by all services. only_if_assigned: str = "" also_unassigned: bool = False default_priority: Priority = "M" add_tags: ConfigList = [] static_fields: ConfigList = [] @model_validator(mode="before") @classmethod def compute_templates(cls, values: dict[str, Any]) -> dict[str, Any]: """Get any defined templates for configuration values. Users can override the value of any Taskwarrior field using this feature on a per-key basis. The key should be the name of the field to you would like to configure the value of, followed by '_template', and the value should be a Jinja template generating the field's value. As context variables, all fields on the taskwarrior record are available. For example, to prefix the returned project name for tickets returned by a service with 'workproject_', you could add an entry reading: project_template = workproject_{{project}} Or, if you'd simply like to override the returned project name for all tickets incoming from a specific service, you could add an entry like: project_template = myprojectname The above would cause all issues to receive a project name of 'myprojectname', regardless of what the project name of the generated issue was. """ templates = {} for key in taskw.task.Task.FIELDS.keys(): template = values.get(f'{key}_template') if template is not None: templates[key] = template values["templates"] = templates return values @field_validator('include_merge_requests', mode='after', check_fields=False) @classmethod def deprecate_filter_merge_requests( cls, value: bool | str, info: ValidationInfo ) -> bool | str: if not hasattr(cls, '_DEPRECATE_FILTER_MERGE_REQUESTS'): return value filter_mr = info.data.get('filter_merge_requests', 'Undefined') if filter_mr != 'Undefined': if value != 'Undefined': raise ValueError( 'filter_merge_requests and include_merge_requests are incompatible.' ) log.warning( 'filter_merge_requests is deprecated in favor of include_merge_requests' ) return not filter_mr elif value == 'Undefined': return True return value @field_validator('project_name', mode='after', check_fields=False) @classmethod def deprecate_project_name(cls, value: str) -> str: if hasattr(cls, '_DEPRECATE_PROJECT_NAME'): if value != '': log.warning('project_name is deprecated in favor of project_template') return value