diff --git a/intake/source.py b/intake/source.py index c1e05d2..3bf4d04 100644 --- a/intake/source.py +++ b/intake/source.py @@ -3,7 +3,7 @@ from pathlib import Path from subprocess import Popen, PIPE, TimeoutExpired from threading import Thread from time import time as current_time -from typing import List +from typing import Generator import json import os import os.path @@ -168,7 +168,7 @@ class LocalSource: def get_item_path(self, item_id: dict) -> Path: return self.source_path / f"{item_id}.item" - def get_item_ids(self) -> List[str]: + def get_item_ids(self) -> list[str]: return [ filepath.name[:-5] for filepath in self.source_path.iterdir() @@ -193,7 +193,7 @@ class LocalSource: def delete_item(self, item_id) -> None: os.remove(self.get_item_path(item_id)) - def get_all_items(self) -> List[Item]: + def get_all_items(self) -> Generator[Item, None, None]: for filepath in self.source_path.iterdir(): if filepath.name.endswith(".item"): yield Item(self, json.loads(filepath.read_text(encoding="utf8"))) @@ -228,7 +228,7 @@ def _read_stderr(process: Popen) -> None: def _execute_source_action( source: LocalSource, action: str, input: str, timeout: timedelta -) -> List[str]: +) -> list[str]: """ Execute the action from a given source. If stdin is specified, pass it along to the process. Returns lines from stdout. @@ -296,13 +296,13 @@ def _execute_source_action( return output -def fetch_items(source: LocalSource, timeout: int = 60) -> List[Item]: +def fetch_items(source: LocalSource, timeout: int = 60) -> list[Item]: """ Execute the feed source and return the current feed items. Returns a list of feed items on success. Throws SourceUpdateException if the feed source update failed. """ - items: List[Item] = [] + items: list[Item] = [] output = _execute_source_action(source, "fetch", None, timedelta(timeout)) @@ -338,7 +338,7 @@ def execute_action( raise SourceUpdateException("invalid json") -def update_items(source: LocalSource, fetched_items: List[Item]): +def update_items(source: LocalSource, fetched_items: list[Item]): """ Update the source with a batch of new items, doing creations, updates, and deletions as necessary. @@ -350,8 +350,8 @@ def update_items(source: LocalSource, fetched_items: List[Item]): print(f"Found {len(prior_ids)} prior items", file=sys.stderr) # Determine which items are new and which are updates. - new_items: List[Item] = [] - upd_items: List[Item] = [] + new_items: list[Item] = [] + upd_items: list[Item] = [] for item in fetched_items: if source.item_exists(item["id"]): upd_items.append(item)