Add unit test suite with pytest configuration
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
0
tests/__init__.py
Normal file
0
tests/__init__.py
Normal file
11
tests/conftest.py
Normal file
11
tests/conftest.py
Normal file
@@ -0,0 +1,11 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
# Ensure project root is on the path so imports work
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||
|
||||
# Set FERNET_KEY for tests that import email models (EncryptedTextField needs it at import time)
|
||||
if "FERNET_KEY" not in os.environ:
|
||||
from cryptography.fernet import Fernet
|
||||
|
||||
os.environ["FERNET_KEY"] = Fernet.generate_key().decode()
|
||||
0
tests/unit/__init__.py
Normal file
0
tests/unit/__init__.py
Normal file
139
tests/unit/test_chunker.py
Normal file
139
tests/unit/test_chunker.py
Normal file
@@ -0,0 +1,139 @@
|
||||
"""Tests for text preprocessing functions in utils/chunker.py."""
|
||||
|
||||
from utils.chunker import (
|
||||
remove_headers_footers,
|
||||
remove_special_characters,
|
||||
remove_repeated_substrings,
|
||||
remove_extra_spaces,
|
||||
preprocess_text,
|
||||
)
|
||||
|
||||
|
||||
class TestRemoveHeadersFooters:
|
||||
def test_removes_default_header(self):
|
||||
text = "Header Line\nActual content here"
|
||||
result = remove_headers_footers(text)
|
||||
assert "Header" not in result
|
||||
assert "Actual content here" in result
|
||||
|
||||
def test_removes_default_footer(self):
|
||||
text = "Actual content\nFooter Line"
|
||||
result = remove_headers_footers(text)
|
||||
assert "Footer" not in result
|
||||
assert "Actual content" in result
|
||||
|
||||
def test_custom_patterns(self):
|
||||
text = "PAGE 1\nContent\nCopyright 2024"
|
||||
result = remove_headers_footers(
|
||||
text,
|
||||
header_patterns=[r"^PAGE \d+$"],
|
||||
footer_patterns=[r"^Copyright.*$"],
|
||||
)
|
||||
assert "PAGE 1" not in result
|
||||
assert "Copyright" not in result
|
||||
assert "Content" in result
|
||||
|
||||
def test_no_match_preserves_text(self):
|
||||
text = "Just normal content"
|
||||
result = remove_headers_footers(text)
|
||||
assert result == "Just normal content"
|
||||
|
||||
def test_empty_string(self):
|
||||
assert remove_headers_footers("") == ""
|
||||
|
||||
|
||||
class TestRemoveSpecialCharacters:
|
||||
def test_removes_special_chars(self):
|
||||
text = "Hello @world #test $100"
|
||||
result = remove_special_characters(text)
|
||||
assert "@" not in result
|
||||
assert "#" not in result
|
||||
assert "$" not in result
|
||||
|
||||
def test_preserves_allowed_chars(self):
|
||||
text = "Hello, world! How's it going? Yes-no."
|
||||
result = remove_special_characters(text)
|
||||
assert "," in result
|
||||
assert "!" in result
|
||||
assert "'" in result
|
||||
assert "?" in result
|
||||
assert "-" in result
|
||||
assert "." in result
|
||||
|
||||
def test_custom_pattern(self):
|
||||
text = "keep @this but not #that"
|
||||
result = remove_special_characters(text, special_chars=r"[#]")
|
||||
assert "@this" in result
|
||||
assert "#" not in result
|
||||
|
||||
def test_empty_string(self):
|
||||
assert remove_special_characters("") == ""
|
||||
|
||||
|
||||
class TestRemoveRepeatedSubstrings:
|
||||
def test_collapses_dots(self):
|
||||
text = "Item.....Value"
|
||||
result = remove_repeated_substrings(text)
|
||||
assert result == "Item.Value"
|
||||
|
||||
def test_single_dot_preserved(self):
|
||||
text = "End of sentence."
|
||||
result = remove_repeated_substrings(text)
|
||||
assert result == "End of sentence."
|
||||
|
||||
def test_custom_pattern(self):
|
||||
text = "hello---world"
|
||||
result = remove_repeated_substrings(text, pattern=r"-{2,}")
|
||||
# Function always replaces matched pattern with "."
|
||||
assert result == "hello.world"
|
||||
|
||||
def test_empty_string(self):
|
||||
assert remove_repeated_substrings("") == ""
|
||||
|
||||
|
||||
class TestRemoveExtraSpaces:
|
||||
def test_collapses_multiple_blank_lines(self):
|
||||
text = "Line 1\n\n\n\nLine 2"
|
||||
result = remove_extra_spaces(text)
|
||||
# After collapsing newlines to \n\n, then \s+ collapses everything to single spaces
|
||||
assert "\n\n\n" not in result
|
||||
|
||||
def test_collapses_multiple_spaces(self):
|
||||
text = "Hello world"
|
||||
result = remove_extra_spaces(text)
|
||||
assert result == "Hello world"
|
||||
|
||||
def test_strips_whitespace(self):
|
||||
text = " Hello world "
|
||||
result = remove_extra_spaces(text)
|
||||
assert result == "Hello world"
|
||||
|
||||
def test_empty_string(self):
|
||||
assert remove_extra_spaces("") == ""
|
||||
|
||||
|
||||
class TestPreprocessText:
|
||||
def test_full_pipeline(self):
|
||||
text = "Header Info\nHello @world... with spaces\nFooter Info"
|
||||
result = preprocess_text(text)
|
||||
assert "Header" not in result
|
||||
assert "Footer" not in result
|
||||
assert "@" not in result
|
||||
assert "..." not in result
|
||||
assert " " not in result
|
||||
|
||||
def test_preserves_meaningful_content(self):
|
||||
text = "The cat weighs 10 pounds."
|
||||
result = preprocess_text(text)
|
||||
assert "cat" in result
|
||||
assert "10" in result
|
||||
assert "pounds" in result
|
||||
|
||||
def test_empty_string(self):
|
||||
assert preprocess_text("") == ""
|
||||
|
||||
def test_already_clean(self):
|
||||
text = "Simple clean text here."
|
||||
result = preprocess_text(text)
|
||||
assert "Simple" in result
|
||||
assert "clean" in result
|
||||
91
tests/unit/test_crypto_service.py
Normal file
91
tests/unit/test_crypto_service.py
Normal file
@@ -0,0 +1,91 @@
|
||||
"""Tests for encryption/decryption in blueprints/email/crypto_service.py."""
|
||||
|
||||
import os
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
from cryptography.fernet import Fernet
|
||||
|
||||
|
||||
# Generate a valid key for testing
|
||||
TEST_FERNET_KEY = Fernet.generate_key().decode()
|
||||
|
||||
|
||||
class TestEncryptedTextField:
|
||||
@pytest.fixture
|
||||
def field(self):
|
||||
with patch.dict(os.environ, {"FERNET_KEY": TEST_FERNET_KEY}):
|
||||
from blueprints.email.crypto_service import EncryptedTextField
|
||||
|
||||
return EncryptedTextField()
|
||||
|
||||
def test_encrypt_decrypt_roundtrip(self, field):
|
||||
original = "my secret password"
|
||||
encrypted = field.to_db_value(original, None)
|
||||
decrypted = field.to_python_value(encrypted)
|
||||
assert decrypted == original
|
||||
assert encrypted != original
|
||||
|
||||
def test_none_passthrough(self, field):
|
||||
assert field.to_db_value(None, None) is None
|
||||
assert field.to_python_value(None) is None
|
||||
|
||||
def test_unicode_roundtrip(self, field):
|
||||
original = "Hello 世界 🐱"
|
||||
encrypted = field.to_db_value(original, None)
|
||||
decrypted = field.to_python_value(encrypted)
|
||||
assert decrypted == original
|
||||
|
||||
def test_empty_string_roundtrip(self, field):
|
||||
encrypted = field.to_db_value("", None)
|
||||
decrypted = field.to_python_value(encrypted)
|
||||
assert decrypted == ""
|
||||
|
||||
def test_long_text_roundtrip(self, field):
|
||||
original = "x" * 10000
|
||||
encrypted = field.to_db_value(original, None)
|
||||
decrypted = field.to_python_value(encrypted)
|
||||
assert decrypted == original
|
||||
|
||||
def test_different_encryptions_differ(self, field):
|
||||
"""Fernet includes a timestamp, so two encryptions of the same value differ."""
|
||||
e1 = field.to_db_value("same", None)
|
||||
e2 = field.to_db_value("same", None)
|
||||
assert e1 != e2 # Different ciphertexts
|
||||
assert field.to_python_value(e1) == field.to_python_value(e2) == "same"
|
||||
|
||||
def test_wrong_key_fails(self, field):
|
||||
encrypted = field.to_db_value("secret", None)
|
||||
|
||||
# Create a field with a different key
|
||||
other_key = Fernet.generate_key().decode()
|
||||
with patch.dict(os.environ, {"FERNET_KEY": other_key}):
|
||||
from blueprints.email.crypto_service import EncryptedTextField
|
||||
|
||||
other_field = EncryptedTextField()
|
||||
|
||||
with pytest.raises(Exception):
|
||||
other_field.to_python_value(encrypted)
|
||||
|
||||
|
||||
class TestValidateFernetKey:
|
||||
def test_valid_key(self):
|
||||
with patch.dict(os.environ, {"FERNET_KEY": TEST_FERNET_KEY}):
|
||||
from blueprints.email.crypto_service import validate_fernet_key
|
||||
|
||||
validate_fernet_key() # Should not raise
|
||||
|
||||
def test_missing_key(self):
|
||||
with patch.dict(os.environ, {}, clear=True):
|
||||
os.environ.pop("FERNET_KEY", None)
|
||||
from blueprints.email.crypto_service import validate_fernet_key
|
||||
|
||||
with pytest.raises(ValueError, match="not set"):
|
||||
validate_fernet_key()
|
||||
|
||||
def test_invalid_key(self):
|
||||
with patch.dict(os.environ, {"FERNET_KEY": "not-a-valid-key"}):
|
||||
from blueprints.email.crypto_service import validate_fernet_key
|
||||
|
||||
with pytest.raises(ValueError, match="validation failed"):
|
||||
validate_fernet_key()
|
||||
38
tests/unit/test_email_helpers.py
Normal file
38
tests/unit/test_email_helpers.py
Normal file
@@ -0,0 +1,38 @@
|
||||
"""Tests for email helper functions in blueprints/email/helpers.py."""
|
||||
|
||||
from blueprints.email.helpers import generate_email_token, get_user_email_address
|
||||
|
||||
|
||||
class TestGenerateEmailToken:
|
||||
def test_returns_16_char_hex(self):
|
||||
token = generate_email_token("user-123", "my-secret")
|
||||
assert len(token) == 16
|
||||
assert all(c in "0123456789abcdef" for c in token)
|
||||
|
||||
def test_deterministic(self):
|
||||
t1 = generate_email_token("user-123", "my-secret")
|
||||
t2 = generate_email_token("user-123", "my-secret")
|
||||
assert t1 == t2
|
||||
|
||||
def test_different_users_different_tokens(self):
|
||||
t1 = generate_email_token("user-1", "secret")
|
||||
t2 = generate_email_token("user-2", "secret")
|
||||
assert t1 != t2
|
||||
|
||||
def test_different_secrets_different_tokens(self):
|
||||
t1 = generate_email_token("user-1", "secret-a")
|
||||
t2 = generate_email_token("user-1", "secret-b")
|
||||
assert t1 != t2
|
||||
|
||||
|
||||
class TestGetUserEmailAddress:
|
||||
def test_formats_correctly(self):
|
||||
addr = get_user_email_address("abc123", "example.com")
|
||||
assert addr == "ask+abc123@example.com"
|
||||
|
||||
def test_preserves_token(self):
|
||||
token = "deadbeef12345678"
|
||||
addr = get_user_email_address(token, "mail.test.org")
|
||||
assert token in addr
|
||||
assert addr.startswith("ask+")
|
||||
assert "@mail.test.org" in addr
|
||||
259
tests/unit/test_obsidian_service.py
Normal file
259
tests/unit/test_obsidian_service.py
Normal file
@@ -0,0 +1,259 @@
|
||||
"""Tests for ObsidianService markdown parsing and file operations."""
|
||||
|
||||
import os
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
# Set vault path before importing so __init__ validation passes
|
||||
_test_vault_dir = None
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def vault_dir(tmp_path):
|
||||
"""Create a temporary vault directory with a sample .md file."""
|
||||
global _test_vault_dir
|
||||
_test_vault_dir = tmp_path
|
||||
|
||||
# Create a sample markdown file so vault validation passes
|
||||
sample = tmp_path / "sample.md"
|
||||
sample.write_text("# Sample\nHello world")
|
||||
|
||||
with patch.dict(os.environ, {"OBSIDIAN_VAULT_PATH": str(tmp_path)}):
|
||||
yield tmp_path
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def service(vault_dir):
|
||||
from utils.obsidian_service import ObsidianService
|
||||
|
||||
return ObsidianService()
|
||||
|
||||
|
||||
class TestParseMarkdown:
|
||||
def test_extracts_frontmatter(self, service):
|
||||
content = "---\ntitle: Test Note\ntags: [cat, vet]\n---\n\nBody content"
|
||||
result = service.parse_markdown(content)
|
||||
assert result["metadata"]["title"] == "Test Note"
|
||||
assert result["metadata"]["tags"] == ["cat", "vet"]
|
||||
|
||||
def test_no_frontmatter(self, service):
|
||||
content = "Just body content with no frontmatter"
|
||||
result = service.parse_markdown(content)
|
||||
assert result["metadata"] == {}
|
||||
assert "Just body content" in result["content"]
|
||||
|
||||
def test_invalid_yaml_frontmatter(self, service):
|
||||
content = "---\n: invalid: yaml: [[\n---\n\nBody"
|
||||
result = service.parse_markdown(content)
|
||||
assert result["metadata"] == {}
|
||||
|
||||
def test_extracts_tags(self, service):
|
||||
content = "Some text with #tag1 and #tag2 here"
|
||||
result = service.parse_markdown(content)
|
||||
assert "tag1" in result["tags"]
|
||||
assert "tag2" in result["tags"]
|
||||
|
||||
def test_extracts_wikilinks(self, service):
|
||||
content = "Link to [[Other Note]] and [[Another Page]]"
|
||||
result = service.parse_markdown(content)
|
||||
assert "Other Note" in result["wikilinks"]
|
||||
assert "Another Page" in result["wikilinks"]
|
||||
|
||||
def test_extracts_embeds(self, service):
|
||||
content = "An embed [[!my_embed]] here"
|
||||
result = service.parse_markdown(content)
|
||||
assert "my_embed" in result["embeds"]
|
||||
|
||||
def test_cleans_wikilinks_from_content(self, service):
|
||||
content = "Text with [[link]] included"
|
||||
result = service.parse_markdown(content)
|
||||
assert "[[" not in result["content"]
|
||||
assert "]]" not in result["content"]
|
||||
|
||||
def test_filepath_passed_through(self, service):
|
||||
result = service.parse_markdown("text", filepath=Path("/vault/note.md"))
|
||||
assert result["filepath"] == "/vault/note.md"
|
||||
|
||||
def test_filepath_none_by_default(self, service):
|
||||
result = service.parse_markdown("text")
|
||||
assert result["filepath"] is None
|
||||
|
||||
def test_empty_content(self, service):
|
||||
result = service.parse_markdown("")
|
||||
assert result["metadata"] == {}
|
||||
assert result["tags"] == []
|
||||
assert result["wikilinks"] == []
|
||||
assert result["embeds"] == []
|
||||
|
||||
|
||||
class TestGetDailyNotePath:
|
||||
def test_formats_path_correctly(self, service):
|
||||
date = datetime(2026, 3, 15)
|
||||
path = service.get_daily_note_path(date)
|
||||
assert path == "journal/2026/2026-03-15.md"
|
||||
|
||||
def test_defaults_to_today(self, service):
|
||||
path = service.get_daily_note_path()
|
||||
today = datetime.now()
|
||||
assert today.strftime("%Y-%m-%d") in path
|
||||
assert path.startswith(f"journal/{today.strftime('%Y')}/")
|
||||
|
||||
|
||||
class TestWalkVault:
|
||||
def test_finds_markdown_files(self, service, vault_dir):
|
||||
(vault_dir / "note1.md").write_text("# Note 1")
|
||||
(vault_dir / "subdir").mkdir()
|
||||
(vault_dir / "subdir" / "note2.md").write_text("# Note 2")
|
||||
|
||||
files = service.walk_vault()
|
||||
filenames = [f.name for f in files]
|
||||
assert "sample.md" in filenames
|
||||
assert "note1.md" in filenames
|
||||
assert "note2.md" in filenames
|
||||
|
||||
def test_excludes_obsidian_dir(self, service, vault_dir):
|
||||
obsidian_dir = vault_dir / ".obsidian"
|
||||
obsidian_dir.mkdir()
|
||||
(obsidian_dir / "config.md").write_text("config")
|
||||
|
||||
files = service.walk_vault()
|
||||
filenames = [f.name for f in files]
|
||||
assert "config.md" not in filenames
|
||||
|
||||
def test_ignores_non_md_files(self, service, vault_dir):
|
||||
(vault_dir / "image.png").write_bytes(b"\x89PNG")
|
||||
|
||||
files = service.walk_vault()
|
||||
filenames = [f.name for f in files]
|
||||
assert "image.png" not in filenames
|
||||
|
||||
|
||||
class TestCreateNote:
|
||||
def test_creates_file(self, service, vault_dir):
|
||||
path = service.create_note("My Test Note", "Body content")
|
||||
full_path = vault_dir / path
|
||||
assert full_path.exists()
|
||||
|
||||
def test_sanitizes_title(self, service, vault_dir):
|
||||
path = service.create_note("Hello World! @#$", "Body")
|
||||
assert "hello-world" in path
|
||||
assert "@" not in path
|
||||
assert "#" not in path
|
||||
|
||||
def test_includes_frontmatter(self, service, vault_dir):
|
||||
path = service.create_note("Test", "Body", tags=["cat", "vet"])
|
||||
full_path = vault_dir / path
|
||||
content = full_path.read_text()
|
||||
assert "---" in content
|
||||
assert "created_by: simbarag" in content
|
||||
assert "cat" in content
|
||||
assert "vet" in content
|
||||
|
||||
def test_custom_folder(self, service, vault_dir):
|
||||
path = service.create_note("Test", "Body", folder="custom/subfolder")
|
||||
assert path.startswith("custom/subfolder/")
|
||||
assert (vault_dir / path).exists()
|
||||
|
||||
|
||||
class TestDailyNoteTasks:
|
||||
def test_get_tasks_from_daily_note(self, service, vault_dir):
|
||||
# Create a daily note with tasks
|
||||
date = datetime(2026, 1, 15)
|
||||
rel_path = service.get_daily_note_path(date)
|
||||
note_path = vault_dir / rel_path
|
||||
note_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
note_path.write_text(
|
||||
"---\nmodified: 2026-01-15\n---\n"
|
||||
"### tasks\n\n"
|
||||
"- [ ] Feed the cat\n"
|
||||
"- [x] Clean litter box\n"
|
||||
"- [ ] Buy cat food\n\n"
|
||||
"### log\n"
|
||||
)
|
||||
|
||||
result = service.get_daily_tasks(date)
|
||||
assert result["found"] is True
|
||||
assert len(result["tasks"]) == 3
|
||||
assert result["tasks"][0] == {"text": "Feed the cat", "done": False}
|
||||
assert result["tasks"][1] == {"text": "Clean litter box", "done": True}
|
||||
assert result["tasks"][2] == {"text": "Buy cat food", "done": False}
|
||||
|
||||
def test_get_tasks_no_note(self, service):
|
||||
date = datetime(2099, 12, 31)
|
||||
result = service.get_daily_tasks(date)
|
||||
assert result["found"] is False
|
||||
assert result["tasks"] == []
|
||||
|
||||
def test_add_task_creates_note(self, service, vault_dir):
|
||||
date = datetime(2026, 6, 1)
|
||||
result = service.add_task_to_daily_note("Walk the cat", date)
|
||||
assert result["success"] is True
|
||||
assert result["created_note"] is True
|
||||
|
||||
# Verify file was created with the task
|
||||
note_path = vault_dir / result["path"]
|
||||
content = note_path.read_text()
|
||||
assert "- [ ] Walk the cat" in content
|
||||
|
||||
def test_add_task_to_existing_note(self, service, vault_dir):
|
||||
date = datetime(2026, 6, 2)
|
||||
rel_path = service.get_daily_note_path(date)
|
||||
note_path = vault_dir / rel_path
|
||||
note_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
note_path.write_text(
|
||||
"---\nmodified: 2026-06-02\n---\n"
|
||||
"### tasks\n\n"
|
||||
"- [ ] Existing task\n\n"
|
||||
"### log\n"
|
||||
)
|
||||
|
||||
result = service.add_task_to_daily_note("New task", date)
|
||||
assert result["success"] is True
|
||||
assert result["created_note"] is False
|
||||
|
||||
content = note_path.read_text()
|
||||
assert "- [ ] Existing task" in content
|
||||
assert "- [ ] New task" in content
|
||||
|
||||
def test_complete_task_exact_match(self, service, vault_dir):
|
||||
date = datetime(2026, 6, 3)
|
||||
rel_path = service.get_daily_note_path(date)
|
||||
note_path = vault_dir / rel_path
|
||||
note_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
note_path.write_text("### tasks\n\n" "- [ ] Feed the cat\n" "- [ ] Buy food\n")
|
||||
|
||||
result = service.complete_task_in_daily_note("Feed the cat", date)
|
||||
assert result["success"] is True
|
||||
|
||||
content = note_path.read_text()
|
||||
assert "- [x] Feed the cat" in content
|
||||
assert "- [ ] Buy food" in content # Other task unchanged
|
||||
|
||||
def test_complete_task_partial_match(self, service, vault_dir):
|
||||
date = datetime(2026, 6, 4)
|
||||
rel_path = service.get_daily_note_path(date)
|
||||
note_path = vault_dir / rel_path
|
||||
note_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
note_path.write_text("### tasks\n\n- [ ] Feed the cat at 5pm\n")
|
||||
|
||||
result = service.complete_task_in_daily_note("Feed the cat", date)
|
||||
assert result["success"] is True
|
||||
|
||||
def test_complete_task_not_found(self, service, vault_dir):
|
||||
date = datetime(2026, 6, 5)
|
||||
rel_path = service.get_daily_note_path(date)
|
||||
note_path = vault_dir / rel_path
|
||||
note_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
note_path.write_text("### tasks\n\n- [ ] Feed the cat\n")
|
||||
|
||||
result = service.complete_task_in_daily_note("Walk the dog", date)
|
||||
assert result["success"] is False
|
||||
assert "not found" in result["error"]
|
||||
|
||||
def test_complete_task_no_note(self, service):
|
||||
date = datetime(2099, 12, 31)
|
||||
result = service.complete_task_in_daily_note("Something", date)
|
||||
assert result["success"] is False
|
||||
92
tests/unit/test_rate_limiting.py
Normal file
92
tests/unit/test_rate_limiting.py
Normal file
@@ -0,0 +1,92 @@
|
||||
"""Tests for rate limiting logic in email and WhatsApp blueprints."""
|
||||
|
||||
import time
|
||||
|
||||
|
||||
class TestEmailRateLimit:
|
||||
def setup_method(self):
|
||||
"""Reset rate limit store before each test."""
|
||||
from blueprints.email import _rate_limit_store
|
||||
|
||||
_rate_limit_store.clear()
|
||||
|
||||
def test_allows_under_limit(self):
|
||||
from blueprints.email import _check_rate_limit, RATE_LIMIT_MAX
|
||||
|
||||
for _ in range(RATE_LIMIT_MAX):
|
||||
assert _check_rate_limit("sender@test.com") is True
|
||||
|
||||
def test_blocks_at_limit(self):
|
||||
from blueprints.email import _check_rate_limit, RATE_LIMIT_MAX
|
||||
|
||||
for _ in range(RATE_LIMIT_MAX):
|
||||
_check_rate_limit("sender@test.com")
|
||||
|
||||
assert _check_rate_limit("sender@test.com") is False
|
||||
|
||||
def test_different_senders_independent(self):
|
||||
from blueprints.email import _check_rate_limit, RATE_LIMIT_MAX
|
||||
|
||||
for _ in range(RATE_LIMIT_MAX):
|
||||
_check_rate_limit("user1@test.com")
|
||||
|
||||
# user1 is at limit, but user2 should be fine
|
||||
assert _check_rate_limit("user1@test.com") is False
|
||||
assert _check_rate_limit("user2@test.com") is True
|
||||
|
||||
def test_window_expiry(self):
|
||||
from blueprints.email import (
|
||||
_check_rate_limit,
|
||||
_rate_limit_store,
|
||||
RATE_LIMIT_MAX,
|
||||
)
|
||||
|
||||
# Fill up the rate limit with timestamps in the past
|
||||
past = time.monotonic() - 999 # Well beyond any window
|
||||
_rate_limit_store["old@test.com"] = [past] * RATE_LIMIT_MAX
|
||||
|
||||
# Should be allowed because all timestamps are expired
|
||||
assert _check_rate_limit("old@test.com") is True
|
||||
|
||||
|
||||
class TestWhatsAppRateLimit:
|
||||
def setup_method(self):
|
||||
"""Reset rate limit store before each test."""
|
||||
from blueprints.whatsapp import _rate_limit_store
|
||||
|
||||
_rate_limit_store.clear()
|
||||
|
||||
def test_allows_under_limit(self):
|
||||
from blueprints.whatsapp import _check_rate_limit, RATE_LIMIT_MAX
|
||||
|
||||
for _ in range(RATE_LIMIT_MAX):
|
||||
assert _check_rate_limit("whatsapp:+1234567890") is True
|
||||
|
||||
def test_blocks_at_limit(self):
|
||||
from blueprints.whatsapp import _check_rate_limit, RATE_LIMIT_MAX
|
||||
|
||||
for _ in range(RATE_LIMIT_MAX):
|
||||
_check_rate_limit("whatsapp:+1234567890")
|
||||
|
||||
assert _check_rate_limit("whatsapp:+1234567890") is False
|
||||
|
||||
def test_different_numbers_independent(self):
|
||||
from blueprints.whatsapp import _check_rate_limit, RATE_LIMIT_MAX
|
||||
|
||||
for _ in range(RATE_LIMIT_MAX):
|
||||
_check_rate_limit("whatsapp:+1111111111")
|
||||
|
||||
assert _check_rate_limit("whatsapp:+1111111111") is False
|
||||
assert _check_rate_limit("whatsapp:+2222222222") is True
|
||||
|
||||
def test_window_expiry(self):
|
||||
from blueprints.whatsapp import (
|
||||
_check_rate_limit,
|
||||
_rate_limit_store,
|
||||
RATE_LIMIT_MAX,
|
||||
)
|
||||
|
||||
past = time.monotonic() - 999
|
||||
_rate_limit_store["whatsapp:+9999999999"] = [past] * RATE_LIMIT_MAX
|
||||
|
||||
assert _check_rate_limit("whatsapp:+9999999999") is True
|
||||
86
tests/unit/test_user_model.py
Normal file
86
tests/unit/test_user_model.py
Normal file
@@ -0,0 +1,86 @@
|
||||
"""Tests for User model methods in blueprints/users/models.py."""
|
||||
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import bcrypt
|
||||
|
||||
|
||||
class TestUserModelMethods:
|
||||
"""Test User model methods without requiring a database connection.
|
||||
|
||||
We instantiate a mock object with the same methods as User
|
||||
to avoid Tortoise ORM initialization.
|
||||
"""
|
||||
|
||||
def _make_user(self, ldap_groups=None, password=None):
|
||||
"""Create a mock user with real method implementations."""
|
||||
from blueprints.users.models import User
|
||||
|
||||
user = MagicMock(spec=User)
|
||||
user.ldap_groups = ldap_groups
|
||||
user.password = password
|
||||
|
||||
# Bind real methods
|
||||
user.has_group = lambda group: group in (user.ldap_groups or [])
|
||||
user.is_admin = lambda: user.has_group("lldap_admin")
|
||||
|
||||
def set_password(plain):
|
||||
user.password = bcrypt.hashpw(plain.encode("utf-8"), bcrypt.gensalt())
|
||||
|
||||
user.set_password = set_password
|
||||
|
||||
def verify_password(plain):
|
||||
if not user.password:
|
||||
return False
|
||||
return bcrypt.checkpw(plain.encode("utf-8"), user.password)
|
||||
|
||||
user.verify_password = verify_password
|
||||
|
||||
return user
|
||||
|
||||
def test_has_group_true(self):
|
||||
user = self._make_user(ldap_groups=["lldap_admin", "users"])
|
||||
assert user.has_group("lldap_admin") is True
|
||||
assert user.has_group("users") is True
|
||||
|
||||
def test_has_group_false(self):
|
||||
user = self._make_user(ldap_groups=["users"])
|
||||
assert user.has_group("lldap_admin") is False
|
||||
|
||||
def test_has_group_empty_list(self):
|
||||
user = self._make_user(ldap_groups=[])
|
||||
assert user.has_group("anything") is False
|
||||
|
||||
def test_has_group_none(self):
|
||||
user = self._make_user(ldap_groups=None)
|
||||
assert user.has_group("anything") is False
|
||||
|
||||
def test_is_admin_true(self):
|
||||
user = self._make_user(ldap_groups=["lldap_admin"])
|
||||
assert user.is_admin() is True
|
||||
|
||||
def test_is_admin_false(self):
|
||||
user = self._make_user(ldap_groups=["users"])
|
||||
assert user.is_admin() is False
|
||||
|
||||
def test_is_admin_empty(self):
|
||||
user = self._make_user(ldap_groups=[])
|
||||
assert user.is_admin() is False
|
||||
|
||||
def test_set_and_verify_password(self):
|
||||
user = self._make_user()
|
||||
user.set_password("hunter2")
|
||||
assert user.password is not None
|
||||
assert user.verify_password("hunter2") is True
|
||||
assert user.verify_password("wrong") is False
|
||||
|
||||
def test_verify_password_no_password_set(self):
|
||||
user = self._make_user(password=None)
|
||||
assert user.verify_password("anything") is False
|
||||
|
||||
def test_password_is_hashed(self):
|
||||
user = self._make_user()
|
||||
user.set_password("mypassword")
|
||||
# The stored password should not be the plaintext
|
||||
assert user.password != b"mypassword"
|
||||
assert user.password != "mypassword"
|
||||
254
tests/unit/test_ynab_service.py
Normal file
254
tests/unit/test_ynab_service.py
Normal file
@@ -0,0 +1,254 @@
|
||||
"""Tests for YNAB service data formatting and filtering logic."""
|
||||
|
||||
import os
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def _mock_category(
|
||||
name, budgeted, activity, balance, deleted=False, hidden=False, goal_type=None
|
||||
):
|
||||
cat = MagicMock()
|
||||
cat.name = name
|
||||
cat.budgeted = budgeted
|
||||
cat.activity = activity
|
||||
cat.balance = balance
|
||||
cat.deleted = deleted
|
||||
cat.hidden = hidden
|
||||
cat.goal_type = goal_type
|
||||
return cat
|
||||
|
||||
|
||||
def _mock_transaction(
|
||||
var_date, payee_name, category_name, amount, memo="", deleted=False, approved=True
|
||||
):
|
||||
txn = MagicMock()
|
||||
txn.var_date = var_date
|
||||
txn.payee_name = payee_name
|
||||
txn.category_name = category_name
|
||||
txn.amount = amount
|
||||
txn.memo = memo
|
||||
txn.deleted = deleted
|
||||
txn.approved = approved
|
||||
return txn
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ynab_service():
|
||||
"""Create a YNABService with mocked API client."""
|
||||
with patch.dict(
|
||||
os.environ, {"YNAB_ACCESS_TOKEN": "fake-token", "YNAB_BUDGET_ID": "test-budget"}
|
||||
):
|
||||
with patch("utils.ynab_service.ynab") as mock_ynab:
|
||||
# Mock the configuration and API client chain
|
||||
mock_ynab.Configuration.return_value = MagicMock()
|
||||
mock_ynab.ApiClient.return_value = MagicMock()
|
||||
mock_ynab.PlansApi.return_value = MagicMock()
|
||||
mock_ynab.TransactionsApi.return_value = MagicMock()
|
||||
mock_ynab.MonthsApi.return_value = MagicMock()
|
||||
mock_ynab.CategoriesApi.return_value = MagicMock()
|
||||
|
||||
from utils.ynab_service import YNABService
|
||||
|
||||
service = YNABService()
|
||||
yield service
|
||||
|
||||
|
||||
class TestGetBudgetSummary:
|
||||
def test_calculates_totals(self, ynab_service):
|
||||
categories = [
|
||||
_mock_category("Groceries", 500_000, -350_000, 150_000),
|
||||
_mock_category("Rent", 1_500_000, -1_500_000, 0),
|
||||
]
|
||||
|
||||
mock_month = MagicMock()
|
||||
mock_month.to_be_budgeted = 200_000
|
||||
|
||||
mock_budget = MagicMock()
|
||||
mock_budget.name = "My Budget"
|
||||
mock_budget.months = [mock_month]
|
||||
mock_budget.categories = categories
|
||||
mock_budget.currency_format = MagicMock(iso_code="USD")
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.data.budget = mock_budget
|
||||
ynab_service.plans_api.get_plan_by_id.return_value = mock_response
|
||||
|
||||
result = ynab_service.get_budget_summary()
|
||||
|
||||
assert result["budget_name"] == "My Budget"
|
||||
assert result["to_be_budgeted"] == 200.0
|
||||
assert result["total_budgeted"] == 2000.0 # (500k + 1500k) / 1000
|
||||
assert result["total_activity"] == -1850.0
|
||||
assert result["currency_format"] == "USD"
|
||||
|
||||
def test_skips_deleted_and_hidden(self, ynab_service):
|
||||
categories = [
|
||||
_mock_category("Active", 100_000, -50_000, 50_000),
|
||||
_mock_category("Deleted", 999_000, -999_000, 0, deleted=True),
|
||||
_mock_category("Hidden", 999_000, -999_000, 0, hidden=True),
|
||||
]
|
||||
|
||||
mock_month = MagicMock()
|
||||
mock_month.to_be_budgeted = 0
|
||||
mock_budget = MagicMock()
|
||||
mock_budget.name = "Budget"
|
||||
mock_budget.months = [mock_month]
|
||||
mock_budget.categories = categories
|
||||
mock_budget.currency_format = None
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.data.budget = mock_budget
|
||||
ynab_service.plans_api.get_plan_by_id.return_value = mock_response
|
||||
|
||||
result = ynab_service.get_budget_summary()
|
||||
assert result["total_budgeted"] == 100.0
|
||||
assert result["currency_format"] == "USD" # Default fallback
|
||||
|
||||
|
||||
class TestGetTransactions:
|
||||
def test_filters_by_date_range(self, ynab_service):
|
||||
transactions = [
|
||||
_mock_transaction("2026-01-05", "Store", "Groceries", -25_000),
|
||||
_mock_transaction("2026-01-15", "Gas", "Transport", -40_000),
|
||||
_mock_transaction(
|
||||
"2026-02-01", "Store", "Groceries", -30_000
|
||||
), # Out of range
|
||||
]
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.data.transactions = transactions
|
||||
ynab_service.transactions_api.get_transactions.return_value = mock_response
|
||||
|
||||
result = ynab_service.get_transactions(
|
||||
start_date="2026-01-01", end_date="2026-01-31"
|
||||
)
|
||||
|
||||
assert result["count"] == 2
|
||||
assert result["total_amount"] == -65.0
|
||||
|
||||
def test_filters_by_category(self, ynab_service):
|
||||
transactions = [
|
||||
_mock_transaction("2026-01-05", "Store", "Groceries", -25_000),
|
||||
_mock_transaction("2026-01-06", "Gas", "Transport", -40_000),
|
||||
]
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.data.transactions = transactions
|
||||
ynab_service.transactions_api.get_transactions.return_value = mock_response
|
||||
|
||||
result = ynab_service.get_transactions(
|
||||
start_date="2026-01-01",
|
||||
end_date="2026-01-31",
|
||||
category_name="groceries", # Case insensitive
|
||||
)
|
||||
|
||||
assert result["count"] == 1
|
||||
assert result["transactions"][0]["category"] == "Groceries"
|
||||
|
||||
def test_filters_by_payee(self, ynab_service):
|
||||
transactions = [
|
||||
_mock_transaction("2026-01-05", "Whole Foods", "Groceries", -25_000),
|
||||
_mock_transaction("2026-01-06", "Shell Gas", "Transport", -40_000),
|
||||
]
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.data.transactions = transactions
|
||||
ynab_service.transactions_api.get_transactions.return_value = mock_response
|
||||
|
||||
result = ynab_service.get_transactions(
|
||||
start_date="2026-01-01",
|
||||
end_date="2026-01-31",
|
||||
payee_name="whole",
|
||||
)
|
||||
|
||||
assert result["count"] == 1
|
||||
assert result["transactions"][0]["payee"] == "Whole Foods"
|
||||
|
||||
def test_skips_deleted(self, ynab_service):
|
||||
transactions = [
|
||||
_mock_transaction("2026-01-05", "Store", "Groceries", -25_000),
|
||||
_mock_transaction("2026-01-06", "Deleted", "Other", -10_000, deleted=True),
|
||||
]
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.data.transactions = transactions
|
||||
ynab_service.transactions_api.get_transactions.return_value = mock_response
|
||||
|
||||
result = ynab_service.get_transactions(
|
||||
start_date="2026-01-01", end_date="2026-01-31"
|
||||
)
|
||||
|
||||
assert result["count"] == 1
|
||||
|
||||
def test_converts_milliunits(self, ynab_service):
|
||||
transactions = [
|
||||
_mock_transaction("2026-01-05", "Store", "Groceries", -12_340),
|
||||
]
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.data.transactions = transactions
|
||||
ynab_service.transactions_api.get_transactions.return_value = mock_response
|
||||
|
||||
result = ynab_service.get_transactions(
|
||||
start_date="2026-01-01", end_date="2026-01-31"
|
||||
)
|
||||
|
||||
assert result["transactions"][0]["amount"] == -12.34
|
||||
|
||||
def test_sorts_by_date_descending(self, ynab_service):
|
||||
transactions = [
|
||||
_mock_transaction("2026-01-01", "A", "Cat", -10_000),
|
||||
_mock_transaction("2026-01-15", "B", "Cat", -20_000),
|
||||
_mock_transaction("2026-01-10", "C", "Cat", -30_000),
|
||||
]
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.data.transactions = transactions
|
||||
ynab_service.transactions_api.get_transactions.return_value = mock_response
|
||||
|
||||
result = ynab_service.get_transactions(
|
||||
start_date="2026-01-01", end_date="2026-01-31"
|
||||
)
|
||||
|
||||
dates = [t["date"] for t in result["transactions"]]
|
||||
assert dates == sorted(dates, reverse=True)
|
||||
|
||||
|
||||
class TestGetCategorySpending:
|
||||
def test_month_format_normalization(self, ynab_service):
|
||||
"""Passing YYYY-MM should be normalized to YYYY-MM-01."""
|
||||
categories = [_mock_category("Food", 100_000, -50_000, 50_000)]
|
||||
|
||||
mock_month = MagicMock()
|
||||
mock_month.categories = categories
|
||||
mock_month.to_be_budgeted = 0
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.data.month = mock_month
|
||||
ynab_service.months_api.get_plan_month.return_value = mock_response
|
||||
|
||||
result = ynab_service.get_category_spending("2026-03")
|
||||
|
||||
assert result["month"] == "2026-03"
|
||||
|
||||
def test_identifies_overspent(self, ynab_service):
|
||||
categories = [
|
||||
_mock_category("Dining", 200_000, -300_000, -100_000), # Overspent
|
||||
_mock_category("Groceries", 500_000, -400_000, 100_000), # Fine
|
||||
]
|
||||
|
||||
mock_month = MagicMock()
|
||||
mock_month.categories = categories
|
||||
mock_month.to_be_budgeted = 0
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.data.month = mock_month
|
||||
ynab_service.months_api.get_plan_month.return_value = mock_response
|
||||
|
||||
result = ynab_service.get_category_spending("2026-03")
|
||||
|
||||
assert len(result["overspent_categories"]) == 1
|
||||
assert result["overspent_categories"][0]["name"] == "Dining"
|
||||
assert result["overspent_categories"][0]["overspent_by"] == 100.0
|
||||
Reference in New Issue
Block a user