import logging
import os
from pathlib import Path
import re
import typing
from typing import Annotated, Any, Literal
import pydantic
from pydantic import (
AfterValidator,
AnyUrl,
BeforeValidator,
ConfigDict,
Field,
ValidationInfo,
computed_field,
field_validator,
model_validator,
)
from pydantic_core import PydanticCustomError
import taskw
import taskw.task
from .data import BugwarriorData, get_data_path
log = logging.getLogger(__name__)
Priority = Literal['', 'L', 'M', 'H']
def validate_url(url: str) -> str:
return str(AnyUrl(url)).rstrip("/")
StrippedTrailingSlashUrl = Annotated[str, BeforeValidator(validate_url)]
def validate_no_scheme_url(value: str) -> str:
if "://" in value:
scheme = value.split("://")[0]
raise PydanticCustomError(
"url_scheme_not_allowed",
"URL should not include scheme ('{scheme}')",
{"scheme": scheme},
)
return value.rstrip("/")
NoSchemeUrl = Annotated[str, BeforeValidator(validate_no_scheme_url)]
def parse_config_list(value: str | list[str]) -> list[str]:
"""Cast ini string to a list of strings."""
if isinstance(value, str):
return [
item.strip()
for item in re.split(r",(?![^{]*})", value.strip())
if item != ""
]
return value
ConfigList = Annotated[list[str], BeforeValidator(parse_config_list)]
ExpandedPath = Annotated[
Path,
BeforeValidator(os.path.expandvars),
AfterValidator(lambda path: path.expanduser()),
]
def _validate_file_exists(path: Path) -> Path:
"""Validate that path points to an existing file."""
resolved = path.resolve()
if not resolved.is_file():
raise PydanticCustomError(
"file_not_found",
"Unable to find taskrc file at {path}.",
{"path": str(resolved)},
)
return resolved
TaskrcPath = Annotated[ExpandedPath, AfterValidator(_validate_file_exists)]
def get_default_taskrc() -> Path:
"""Mimic taskwarrior's logic for finding taskrc."""
# Allow $TASKRC override.
env_taskrc = os.getenv("TASKRC")
if env_taskrc:
path = Path(os.path.expandvars(env_taskrc)).expanduser()
return _validate_file_exists(path)
# Default to ~/.taskrc
taskrc = Path.home() / ".taskrc"
if taskrc.is_file():
return taskrc
# If no ~/.taskrc, use $XDG_CONFIG_HOME/task/taskrc if exists, or
# ~/.config/task/taskrc if $XDG_CONFIG_HOME is unset
xdg_config_home = os.getenv("XDG_CONFIG_HOME")
if xdg_config_home:
xdg_config_taskrc = Path(xdg_config_home) / "task" / "taskrc"
if xdg_config_taskrc.is_file():
return xdg_config_taskrc
else:
dotconfig_taskrc = Path.home() / ".config" / "task" / "taskrc"
if dotconfig_taskrc.is_file():
return dotconfig_taskrc.expanduser()
raise OSError("Unable to find taskrc file. (Try running `task`.)")
T = typing.TypeVar("T")
def _validate_unsupported(value: T) -> T:
if value:
raise ValueError("Option is unsupported by service.")
return value
UnsupportedOption = Annotated[T, AfterValidator(_validate_unsupported)]
class BaseConfig(pydantic.BaseModel):
model_config = ConfigDict(frozen=True, extra="forbid", validate_default=True)
[docs]
class MainSectionConfig(BaseConfig):
"""The :ref:`common_configuration:Main Section` configuration, plus computed attributes:"""
model_config = ConfigDict(arbitrary_types_allowed=True)
# required
targets: ConfigList
#: *DEPRECATED*
interactive: bool = True
@computed_field
@property
def data(self) -> BugwarriorData:
"""Local data storage."""
return BugwarriorData(get_data_path(self.taskrc))
# optional
taskrc: TaskrcPath = Field(default_factory=get_default_taskrc)
shorten: bool = False
inline_links: bool = True
annotation_links: bool = False
annotation_comments: bool = True
annotation_newlines: bool = False
annotation_length: typing.Optional[int] = 45
description_length: typing.Optional[int] = 35
merge_annotations: bool = True
merge_tags: bool = True
replace_tags: bool = False
static_tags: ConfigList = []
static_fields: ConfigList = ["priority"]
log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "DISABLED"] = (
"INFO"
)
log_file: typing.Optional[ExpandedPath] = None
class Hooks(BaseConfig):
pre_import: ConfigList = []
class Notifications(BaseConfig):
notifications: bool = False
backend: typing.Optional[Literal["gobject", "growlnotify", "applescript"]] = None
finished_querying_sticky: bool = True
task_crud_sticky: bool = True
only_on_new_tasks: bool = False
# Dynamically add template fields to model.
_ServiceConfig = pydantic.create_model( # type: ignore[ty:no-matching-overload]
"_ServiceConfig",
__base__=BaseConfig,
**{
f"{key}_template": (typing.Optional[str], None)
for key in taskw.task.Task.FIELDS
},
)
[docs]
class ServiceConfig(_ServiceConfig):
"""Pydantic_ base class for service configurations.
.. _Pydantic: https://docs.pydantic.dev/latest/
"""
# Added before validation (computed field)
service: str
target: str
# Added during validation (computed field)
templates: dict[str, str] = {}
# Optional fields shared by all services.
only_if_assigned: str = ""
also_unassigned: bool = False
default_priority: Priority = "M"
add_tags: ConfigList = []
static_fields: ConfigList = []
@model_validator(mode="before")
@classmethod
def compute_templates(cls, values: dict[str, Any]) -> dict[str, Any]:
"""Get any defined templates for configuration values.
Users can override the value of any Taskwarrior field using
this feature on a per-key basis. The key should be the name of
the field to you would like to configure the value of, followed
by '_template', and the value should be a Jinja template
generating the field's value. As context variables, all fields
on the taskwarrior record are available.
For example, to prefix the returned
project name for tickets returned by a service with 'workproject_',
you could add an entry reading:
project_template = workproject_{{project}}
Or, if you'd simply like to override the returned project name
for all tickets incoming from a specific service, you could add
an entry like:
project_template = myprojectname
The above would cause all issues to receive a project name
of 'myprojectname', regardless of what the project name of the
generated issue was.
"""
templates = {}
for key in taskw.task.Task.FIELDS.keys():
template = values.get(f'{key}_template')
if template is not None:
templates[key] = template
values["templates"] = templates
return values
@field_validator('include_merge_requests', mode='after', check_fields=False)
@classmethod
def deprecate_filter_merge_requests(
cls, value: bool | str, info: ValidationInfo
) -> bool | str:
if not hasattr(cls, '_DEPRECATE_FILTER_MERGE_REQUESTS'):
return value
filter_mr = info.data.get('filter_merge_requests', 'Undefined')
if filter_mr != 'Undefined':
if value != 'Undefined':
raise ValueError(
'filter_merge_requests and include_merge_requests are incompatible.'
)
log.warning(
'filter_merge_requests is deprecated in favor of include_merge_requests'
)
return not filter_mr
elif value == 'Undefined':
return True
return value
@field_validator('project_name', mode='after', check_fields=False)
@classmethod
def deprecate_project_name(cls, value: str) -> str:
if hasattr(cls, '_DEPRECATE_PROJECT_NAME'):
if value != '':
log.warning('project_name is deprecated in favor of project_template')
return value