Update type annotations

This commit is contained in:
Tim Van Baak 2024-11-06 19:16:21 -08:00
parent c903947f00
commit d0780a9fd1

View File

@ -3,7 +3,7 @@ from pathlib import Path
from subprocess import Popen, PIPE, TimeoutExpired
from threading import Thread
from time import time as current_time
from typing import List
from typing import Generator
import json
import os
import os.path
@ -168,7 +168,7 @@ class LocalSource:
def get_item_path(self, item_id: dict) -> Path:
return self.source_path / f"{item_id}.item"
def get_item_ids(self) -> List[str]:
def get_item_ids(self) -> list[str]:
return [
filepath.name[:-5]
for filepath in self.source_path.iterdir()
@ -193,7 +193,7 @@ class LocalSource:
def delete_item(self, item_id) -> None:
os.remove(self.get_item_path(item_id))
def get_all_items(self) -> List[Item]:
def get_all_items(self) -> Generator[Item, None, None]:
for filepath in self.source_path.iterdir():
if filepath.name.endswith(".item"):
yield Item(self, json.loads(filepath.read_text(encoding="utf8")))
@ -228,7 +228,7 @@ def _read_stderr(process: Popen) -> None:
def _execute_source_action(
source: LocalSource, action: str, input: str, timeout: timedelta
) -> List[str]:
) -> list[str]:
"""
Execute the action from a given source. If stdin is specified, pass it
along to the process. Returns lines from stdout.
@ -296,13 +296,13 @@ def _execute_source_action(
return output
def fetch_items(source: LocalSource, timeout: int = 60) -> List[Item]:
def fetch_items(source: LocalSource, timeout: int = 60) -> list[Item]:
"""
Execute the feed source and return the current feed items.
Returns a list of feed items on success.
Throws SourceUpdateException if the feed source update failed.
"""
items: List[Item] = []
items: list[Item] = []
output = _execute_source_action(source, "fetch", None, timedelta(timeout))
@ -338,7 +338,7 @@ def execute_action(
raise SourceUpdateException("invalid json")
def update_items(source: LocalSource, fetched_items: List[Item]):
def update_items(source: LocalSource, fetched_items: list[Item]):
"""
Update the source with a batch of new items, doing creations, updates, and
deletions as necessary.
@ -350,8 +350,8 @@ def update_items(source: LocalSource, fetched_items: List[Item]):
print(f"Found {len(prior_ids)} prior items", file=sys.stderr)
# Determine which items are new and which are updates.
new_items: List[Item] = []
upd_items: List[Item] = []
new_items: list[Item] = []
upd_items: list[Item] = []
for item in fetched_items:
if source.item_exists(item["id"]):
upd_items.append(item)