Source code for shai_tix.tix

# -*- coding: utf-8 -*-

import shutil
import dataclasses
from pathlib import Path
from functools import cached_property
from datetime import datetime, timezone
from contextlib import contextmanager

import sqlalchemy as sa
import sqlalchemy.orm as orm

from .constants import WordsEnum, StatusEnum
from .db import Base, Story, Task
from .utils import build_folder_name, Ticket
from .title_codec import validate_title, encode_title


[docs] @dataclasses.dataclass(frozen=True) class Tix: dir_root: Path = dataclasses.field() # -------------------------------------------------------------------------- # Context Manager # --------------------------------------------------------------------------
[docs] @contextmanager def session(self): """ Start a session with synchronized index database. Rebuilds the SQLite index from filesystem on entry, ensuring all query_* methods return up-to-date results. Usage:: tix = Tix(dir_root=path) with tix.session(): stories = tix.query_stories() tasks = tix.query_tasks() :returns: Context manager yielding self """ self.rebuild_index_db() yield self
@cached_property def dir_stories(self) -> Path: return self.dir_root / "stories" # -------------------------------------------------------------------------- # Filesystem scan methods (iter_*) # --------------------------------------------------------------------------
[docs] def iter_stories(self): """ Iterate over all story folders and yield Story objects. Scans the stories directory and yields Story objects for each valid story folder found. :returns: Generator yielding Story objects """ if not self.dir_stories.exists(): return for folder in self.dir_stories.iterdir(): if folder.is_dir(): ticket = Ticket.from_folder(folder) if ticket is not None and ticket.type == WordsEnum.story.value: yield Story( id=ticket.id, date=ticket.date, title=ticket.title, path=str(folder), )
[docs] def iter_tasks(self): """ Iterate over all task folders and yield Task objects. Directly scans all task folders using glob pattern ``stories/*/tasks/*`` for better efficiency, avoiding per-story API calls. :returns: Generator yielding Task objects """ if not self.dir_stories.exists(): return for folder in self.dir_stories.glob( f"{WordsEnum.story.value}-*/{WordsEnum.tasks.value}/{WordsEnum.task.value}*" ): if folder.is_dir(): ticket = Ticket.from_folder(folder) if ticket is not None and ticket.type == WordsEnum.task.value: # Extract story_id from parent folder story_folder = folder.parent.parent story_ticket = Ticket.from_folder(story_folder) story_id = story_ticket.id if story_ticket else 0 yield Task( id=ticket.id, story_id=story_id, date=ticket.date, title=ticket.title, path=str(folder), )
[docs] def iter_stories_or_tasks(self): """ Iterate over all stories and tasks using a single rglob scan. Uses one ``rglob("*")`` call to scan all paths, then filters by folder name prefix (story- or task-). No is_dir() check needed since Ticket.from_folder() validates the naming pattern. Paths are sorted to ensure depth-first order: each story appears before its tasks (shorter paths come first when sorted). :returns: Generator yielding Story or Task objects """ if not self.dir_stories.exists(): return # Track story IDs for tasks story_id_map: dict[Path, int] = {} # Single rglob call for directories only, sorted for depth-first order for path in sorted(self.dir_stories.rglob("*/")): name = path.name # Quick prefix check before expensive Ticket parsing if not ( name.startswith(WordsEnum.story.value + "-") or name.startswith(WordsEnum.task.value + "-") ): continue ticket = Ticket.from_folder(path) if ticket is None: continue if ticket.type == WordsEnum.story.value: story_id_map[path] = ticket.id yield Story( id=ticket.id, date=ticket.date, title=ticket.title, path=str(path), ) elif ticket.type == WordsEnum.task.value: # Get story_id from parent folder story_folder = path.parent.parent story_id = story_id_map.get(story_folder, 0) yield Task( id=ticket.id, story_id=story_id, date=ticket.date, title=ticket.title, path=str(path), )
# -------------------------------------------------------------------------- # Index database methods # -------------------------------------------------------------------------- @cached_property def path_index_db(self) -> Path: return self.dir_root / "index.sqlite" @cached_property def engine(self) -> sa.Engine: return sa.create_engine(f"sqlite:///{self.path_index_db}")
[docs] def ensure_dir_root(self): """ Ensure the root directory exists. Creates the .tix directory if it doesn't exist. This is called automatically before any database operations. """ self.dir_root.mkdir(parents=True, exist_ok=True)
[docs] def rebuild_index_db(self): """ Rebuild the SQLite index database from filesystem. Scans all story and task folders, creates ORM objects, and writes them to the SQLite database. Existing data is cleared first. """ # Ensure directory exists before accessing database self.ensure_dir_root() # Create engine and tables engine = self.engine Base.metadata.drop_all(engine) Base.metadata.create_all(engine) with orm.Session(engine) as session: for story in self.iter_stories(): session.add( Story( id=story.id, date=story.date, title=story.title, path=story.path, ) ) for task in self.iter_tasks(): session.add( Task( id=task.id, story_id=task.story_id, date=task.date, title=task.title, path=task.path, ) ) session.commit()
[docs] def ensure_index_db(self): """ Ensure the index database exists, rebuilding if necessary. """ if not self.path_index_db.exists(): self.rebuild_index_db()
[docs] def get_next_id(self) -> int: """ Get the next available ID from the index database. Stories and tasks share the same global ID space. This method queries the database for max ID and returns max_id + 1. If no entities exist, returns 1. :returns: Next available global ID """ self.ensure_index_db() with orm.Session(self.engine) as session: max_story_id = session.query(sa.func.max(Story.id)).scalar() or 0 max_task_id = session.query(sa.func.max(Task.id)).scalar() or 0 return max(max_story_id, max_task_id) + 1
# -------------------------------------------------------------------------- # Database Query Methods (use within context manager) # --------------------------------------------------------------------------
[docs] def query_stories(self, limit: int = 20) -> list[Story]: """ Query all stories from the index database. Use within context manager to ensure database is synchronized. :param limit: Maximum number of stories to return :returns: List of all Story objects from database, sorted by ID descending """ with orm.Session(self.engine) as session: query = session.query(Story).order_by(Story.id.desc()).limit(limit) return [ Story(id=s.id, date=s.date, title=s.title, path=s.path) for s in query.all() ]
[docs] def query_tasks(self, limit: int = 20) -> list[Task]: """ Query all tasks from the index database. Use within context manager to ensure database is synchronized. :param limit: Maximum number of tasks to return :returns: List of all Task objects from database, sorted by ID descending """ with orm.Session(self.engine) as session: query = session.query(Task).order_by(Task.id.desc()).limit(limit) return [ Task( id=t.id, story_id=t.story_id, date=t.date, title=t.title, path=t.path, ) for t in query.all() ]
[docs] def query_story(self, id: int) -> Story | None: """ Query a single story by ID from the index database. :param id: Story ID to query :returns: Story object if found, None otherwise """ with orm.Session(self.engine) as session: s = session.get(Story, id) if s is None: return None return Story(id=s.id, date=s.date, title=s.title, path=s.path)
[docs] def query_task(self, id: int) -> Task | None: """ Query a single task by ID from the index database. :param id: Task ID to query :returns: Task object if found, None otherwise """ with orm.Session(self.engine) as session: t = session.get(Task, id) if t is None: return None return Task( id=t.id, story_id=t.story_id, date=t.date, title=t.title, path=t.path )
def _tokenize_title(self, title: str) -> set[str]: """ Tokenize a title string for search matching. Splits on spaces and special characters, converts to lowercase. :param title: Title string to tokenize :returns: Set of lowercase tokens """ # Replace non-alphanumeric characters with spaces, then split chars = [c.lower() if c.isalnum() else " " for c in title] return set("".join(chars).split()) def _title_matches(self, entity_title: str, search_tokens: set[str]) -> bool: """ Check if entity title matches any of the search tokens. :param entity_title: Title from the entity (Story/Task) :param search_tokens: Set of tokens to match against :returns: True if any token matches """ entity_tokens = self._tokenize_title(entity_title) return bool(entity_tokens & search_tokens)
[docs] def search_stories( self, title: str | None = None, date_lower: str | None = None, date_upper: str | None = None, id_lower: int | None = None, id_upper: int | None = None, status: list[StatusEnum] | None = None, limit: int = 20, ) -> list[Story]: """ Search stories by title, date range, ID range, and/or status. At least one parameter must be provided. Results are sorted by ID descending (newest first). Title matching: tokenizes the search string (splits on spaces and special characters, lowercases), matches if any token appears in the story title. Status matching: when status list is provided, only stories with status in the list are returned. This requires reading metadata.json for each candidate story. :param title: Search string to match against story titles :param date_lower: Minimum date (inclusive), format YYYY-MM-DD :param date_upper: Maximum date (inclusive), format YYYY-MM-DD :param id_lower: Minimum ID (inclusive) :param id_upper: Maximum ID (inclusive) :param status: List of status values to match (e.g., [StatusEnum.TODO, StatusEnum.IN_PROGRESS]) :param limit: Maximum number of stories to return :returns: List of matching Story objects, sorted by ID descending :raises ValueError: If all parameters are None """ if all(p is None for p in [title, date_lower, date_upper, id_lower, id_upper, status]): raise ValueError("At least one search parameter must be provided") search_tokens = self._tokenize_title(title) if title else None status_values = {s.value for s in status} if status else None with orm.Session(self.engine) as session: query = session.query(Story) if id_lower is not None: query = query.where(Story.id >= id_lower) if id_upper is not None: query = query.where(Story.id <= id_upper) if date_lower is not None: query = query.where(Story.date >= date_lower) if date_upper is not None: query = query.where(Story.date <= date_upper) # Sort by ID descending (newest first) query = query.order_by(Story.id.desc()) # Apply limit at SQL level only when status filter is not used if status is None: query = query.limit(limit) results = [] for s in query.all(): story = Story(id=s.id, date=s.date, title=s.title, path=s.path) # Apply title filter in Python (token matching) if search_tokens and not self._title_matches(s.title, search_tokens): continue # Apply status filter in Python (requires file read) if status_values and story.status not in status_values: continue results.append(story) if len(results) >= limit: break return results
[docs] def search_tasks( self, title: str | None = None, date_lower: str | None = None, date_upper: str | None = None, id_lower: int | None = None, id_upper: int | None = None, status: list[StatusEnum] | None = None, limit: int = 20, ) -> list[Task]: """ Search tasks by title, date range, ID range, and/or status. At least one parameter must be provided. Results are sorted by ID descending (newest first). Title matching: tokenizes the search string (splits on spaces and special characters, lowercases), matches if any token appears in the task title. Status matching: when status list is provided, only tasks with status in the list are returned. This requires reading metadata.json for each candidate task. :param title: Search string to match against task titles :param date_lower: Minimum date (inclusive), format YYYY-MM-DD :param date_upper: Maximum date (inclusive), format YYYY-MM-DD :param id_lower: Minimum ID (inclusive) :param id_upper: Maximum ID (inclusive) :param status: List of status values to match (e.g., [StatusEnum.TODO, StatusEnum.IN_PROGRESS]) :param limit: Maximum number of tasks to return :returns: List of matching Task objects, sorted by ID descending :raises ValueError: If all parameters are None """ if all(p is None for p in [title, date_lower, date_upper, id_lower, id_upper, status]): raise ValueError("At least one search parameter must be provided") search_tokens = self._tokenize_title(title) if title else None status_values = {s.value for s in status} if status else None with orm.Session(self.engine) as session: query = session.query(Task) if id_lower is not None: query = query.where(Task.id >= id_lower) if id_upper is not None: query = query.where(Task.id <= id_upper) if date_lower is not None: query = query.where(Task.date >= date_lower) if date_upper is not None: query = query.where(Task.date <= date_upper) # Sort by ID descending (newest first) query = query.order_by(Task.id.desc()) # Apply limit at SQL level only when status filter is not used if status is None: query = query.limit(limit) results = [] for t in query.all(): task = Task( id=t.id, story_id=t.story_id, date=t.date, title=t.title, path=t.path, ) # Apply title filter in Python (token matching) if search_tokens and not self._title_matches(t.title, search_tokens): continue # Apply status filter in Python (requires file read) if status_values and task.status not in status_values: continue results.append(task) if len(results) >= limit: break return results
# -------------------------------------------------------------------------- # Story CRUD # --------------------------------------------------------------------------
[docs] def create_story( self, title: str, description: str | None = None, ) -> Story: """ Create a new story with auto-generated ID. Automatically assigns the next available ID and updates the index database. :param title: Story title (only letters, digits, and spaces allowed) :param description: Optional story description :returns: Created Story object :raises TitleValidationError: If title contains invalid characters """ # Validate and encode title validate_title(title) encoded_title = encode_title(title) # Ensure index exists self.ensure_index_db() # Get next ID story_id = self.get_next_id() # Build folder name utc_now = datetime.now(timezone.utc) date_str = str(utc_now.date()) folder_name = build_folder_name( type=WordsEnum.story.value, date=date_str, id=story_id, sanitized_title=encoded_title, ) dir_root = self.dir_stories / folder_name # Create Story, write filesystem artifacts, and add to database with orm.Session(self.engine) as session: # Story.title stores the original title, not sanitized story = Story( id=story_id, date=date_str, title=title, path=str(dir_root), ) story.write_metadata() if description: story.write_description(description) session.add(story) session.commit() # Return a fresh detached Story object return Story( id=story_id, date=date_str, title=title, path=str(dir_root), )
[docs] def get_story(self, id: int) -> Story | None: """ Get a story by ID from the index database. :param id: Story ID to retrieve :returns: Story object if found, None otherwise """ return self.query_story(id)
[docs] def update_story( self, id: int, title: str | None = None, status: StatusEnum | None = None, description: str | None = None, report: str | None = None, ) -> Story | None: """ Update a story's metadata and content files. Supports updating title, status, description, and report. When title changes, the story folder is renamed accordingly. :param id: Story ID to update :param title: New title (optional, triggers folder rename) :param status: New status value (optional) :param description: New description content (optional) :param report: New report content (optional) :returns: Updated Story object, or None if story not found """ story = self.query_story(id) if story is None: return None new_title = title if title is not None else story.title new_path = story.path # Handle title change - requires folder rename if title is not None and title != story.title: # Validate and encode new title validate_title(title) encoded_title = encode_title(title) new_folder_name = build_folder_name( type=WordsEnum.story.value, date=story.date, id=story.id, sanitized_title=encoded_title, ) new_dir = self.dir_stories / new_folder_name is_folder_changed = new_dir != story.dir_root if is_folder_changed: shutil.move(str(story.dir_root), str(new_dir)) new_path = str(new_dir) # Update title (and path if folder changed) in database with orm.Session(self.engine) as session: if is_folder_changed: session.execute( sa.update(Story) .where(Story.id == id) .values(title=title, path=new_path) ) # Update Task.path for all tasks under this story old_story_path = str(story.dir_root) for task in session.query(Task).where(Task.story_id == id).all(): new_task_path = task.path.replace(old_story_path, new_path, 1) session.execute( sa.update(Task) .where(Task.id == task.id) .values(path=new_task_path) ) else: session.execute( sa.update(Story).where(Story.id == id).values(title=title) ) session.commit() # Create a temporary Story object to access filesystem methods temp_story = Story( id=story.id, date=story.date, title=new_title, path=new_path, ) # Update metadata file if status provided if status is not None: temp_story.write_metadata(status=status) # Update description file if provided if description is not None: temp_story.write_description(description) # Update report file if provided if report is not None: temp_story.write_report(report) # Return a fresh detached Story object return temp_story
[docs] def delete_story(self, id: int) -> bool: """ Delete a story by ID from filesystem and index database. Removes the story directory and all its tasks from filesystem, then removes the story from the index database. :param id: Story ID to delete :returns: True if deleted, False if story not found """ story = self.query_story(id) if story is None: return False # Delete from filesystem if story.dir_root.exists(): shutil.rmtree(story.dir_root) # Delete from database with orm.Session(self.engine) as session: # Delete tasks first (cascade) session.execute(sa.delete(Task).where(Task.story_id == id)) session.execute(sa.delete(Story).where(Story.id == id)) session.commit() return True
# -------------------------------------------------------------------------- # Task CRUD # --------------------------------------------------------------------------
[docs] def create_task( self, story_id: int, title: str, description: str | None = None, ) -> Task: """ Create a new task under a story with auto-generated ID. :param story_id: Parent story ID :param title: Task title (only letters, digits, and spaces allowed) :param description: Optional task description :returns: Created Task object :raises TitleValidationError: If title contains invalid characters :raises ValueError: If parent story not found """ # Validate and encode title validate_title(title) encoded_title = encode_title(title) # Ensure index exists self.ensure_index_db() # Verify parent story exists story = self.query_story(story_id) if story is None: raise ValueError(f"Story with ID {story_id} not found") # Get next ID task_id = self.get_next_id() # Build folder name utc_now = datetime.now(timezone.utc) date_str = str(utc_now.date()) folder_name = build_folder_name( type=WordsEnum.task.value, date=date_str, id=task_id, sanitized_title=encoded_title, ) dir_task = story.dir_root / "tasks" / folder_name # Create Task, write filesystem artifacts, and add to database with orm.Session(self.engine) as session: # Task.title stores the original title, not sanitized task = Task( id=task_id, story_id=story_id, date=date_str, title=title, path=str(dir_task), ) task.write_metadata() if description: task.write_description(description) session.add(task) session.commit() # Return a fresh detached Task object return Task( id=task_id, story_id=story_id, date=date_str, title=title, path=str(dir_task), )
[docs] def get_task(self, id: int) -> Task | None: """ Get a task by ID from the index database. :param id: Task ID to retrieve :returns: Task object if found, None otherwise """ return self.query_task(id)
[docs] def update_task( self, id: int, title: str | None = None, status: StatusEnum | None = None, description: str | None = None, report: str | None = None, ) -> Task | None: """ Update a task's metadata and content files. Supports updating title, status, description, and report. When title changes, the task folder is renamed accordingly. :param id: Task ID to update :param title: New title (optional, triggers folder rename) :param status: New status value (optional) :param description: New description content (optional) :param report: New report content (optional) :returns: Updated Task object, or None if task not found """ task = self.query_task(id) if task is None: return None new_title = title if title is not None else task.title new_path = task.path # Handle title change - requires folder rename if title is not None and title != task.title: # Validate and encode new title validate_title(title) encoded_title = encode_title(title) new_folder_name = build_folder_name( type=WordsEnum.task.value, date=task.date, id=task.id, sanitized_title=encoded_title, ) # Task folder is inside story/tasks/ new_dir = task.dir_root.parent / new_folder_name is_folder_changed = new_dir != task.dir_root if is_folder_changed: shutil.move(str(task.dir_root), str(new_dir)) new_path = str(new_dir) # Update title (and path if folder changed) in database with orm.Session(self.engine) as session: if is_folder_changed: session.execute( sa.update(Task) .where(Task.id == id) .values(title=title, path=new_path) ) else: session.execute( sa.update(Task).where(Task.id == id).values(title=title) ) session.commit() # Create a temporary Task object to access filesystem methods temp_task = Task( id=task.id, story_id=task.story_id, date=task.date, title=new_title, path=new_path, ) # Update metadata file if status provided if status is not None: temp_task.write_metadata(status=status) # Update description file if provided if description is not None: temp_task.write_description(description) # Update report file if provided if report is not None: temp_task.write_report(report) # Return a fresh detached Task object return temp_task
[docs] def delete_task(self, id: int) -> bool: """ Delete a task by ID from filesystem and index database. :param id: Task ID to delete :returns: True if deleted, False if task not found """ task = self.query_task(id) if task is None: return False # Delete from filesystem import shutil if task.dir_root.exists(): shutil.rmtree(task.dir_root) # Delete from database with orm.Session(self.engine) as session: session.execute(sa.delete(Task).where(Task.id == id)) session.commit() return True
[docs] def query_tasks_by_story(self, story_id: int) -> list[Task]: """ Query all tasks belonging to a specific story. :param story_id: Parent story ID :returns: List of Task objects belonging to the story, sorted by ID descending """ with orm.Session(self.engine) as session: return [ Task( id=t.id, story_id=t.story_id, date=t.date, title=t.title, path=t.path, ) for t in session.query(Task).where(Task.story_id == story_id).order_by(Task.id.desc()).all() ]