Experiment with terser stdout logging

This commit is contained in:
Tim Van Baak 2025-01-16 06:48:53 -08:00
parent bbe3f53cd4
commit db5b748a4e

View File

@ -204,7 +204,7 @@ class LocalSource:
yield Item(self, json.loads(filepath.read_text(encoding="utf8")))
def _read_stdout(process: Popen, output: list) -> None:
def _read_stdout(process: Popen, source_name: str, output: list) -> None:
"""
Read the subprocess's stdout into memory.
This prevents the process from blocking when the pipe fills up.
@ -212,13 +212,23 @@ def _read_stdout(process: Popen, output: list) -> None:
while True:
data = process.stdout.readline()
if data:
print(f"[stdout] {data.rstrip()}", file=sys.stderr)
try:
item = json.loads(data)
if "id" not in item:
print(f"[{source_name}:out] {data.rstrip()}", file=sys.stderr)
else:
item_id = f"id: {item['id']}"
attrs = [item_id] + [k for k in item.keys() if k != "id"]
print(f"[{source_name}:item] {{ {', '.join(attrs)} }}", file=sys.stderr)
except json.JSONDecodeError:
print(f"[{source_name}:out] {data.rstrip()}", file=sys.stderr)
output.append(data)
if process.poll() is not None:
break
def _read_stderr(process: Popen) -> None:
def _read_stderr(process: Popen, source_name: str) -> None:
"""
Read the subprocess's stderr stream and pass it to logging.
This prevents the process from blocking when the pipe fills up.
@ -226,7 +236,7 @@ def _read_stderr(process: Popen) -> None:
while True:
data = process.stderr.readline()
if data:
print(f"[stderr] {data.rstrip()}", file=sys.stderr)
print(f"[{source_name}:log] {data.rstrip()}", file=sys.stderr)
if process.poll() is not None:
break
@ -274,9 +284,9 @@ def _execute_source_action(
# Kick off monitoring threads
output = []
t_stdout: Thread = Thread(target=_read_stdout, args=(process, output), daemon=True)
t_stdout: Thread = Thread(target=_read_stdout, args=(process, source.source_name, output), daemon=True)
t_stdout.start()
t_stderr: Thread = Thread(target=_read_stderr, args=(process,), daemon=True)
t_stderr: Thread = Thread(target=_read_stderr, args=(process, source.source_name), daemon=True)
t_stderr.start()
# Send input to the process, if provided