diff --git a/intake/source.py b/intake/source.py index 10edaa7..aa45194 100644 --- a/intake/source.py +++ b/intake/source.py @@ -204,7 +204,7 @@ class LocalSource: yield Item(self, json.loads(filepath.read_text(encoding="utf8"))) -def _read_stdout(process: Popen, output: list) -> None: +def _read_stdout(process: Popen, source_name: str, output: list) -> None: """ Read the subprocess's stdout into memory. This prevents the process from blocking when the pipe fills up. @@ -212,13 +212,23 @@ def _read_stdout(process: Popen, output: list) -> None: while True: data = process.stdout.readline() if data: - print(f"[stdout] {data.rstrip()}", file=sys.stderr) + try: + item = json.loads(data) + if "id" not in item: + print(f"[{source_name}:out] {data.rstrip()}", file=sys.stderr) + else: + item_id = f"id: {item['id']}" + attrs = [item_id] + [k for k in item.keys() if k != "id"] + print(f"[{source_name}:item] {{ {', '.join(attrs)} }}", file=sys.stderr) + except json.JSONDecodeError: + print(f"[{source_name}:out] {data.rstrip()}", file=sys.stderr) + output.append(data) if process.poll() is not None: break -def _read_stderr(process: Popen) -> None: +def _read_stderr(process: Popen, source_name: str) -> None: """ Read the subprocess's stderr stream and pass it to logging. This prevents the process from blocking when the pipe fills up. @@ -226,7 +236,7 @@ def _read_stderr(process: Popen) -> None: while True: data = process.stderr.readline() if data: - print(f"[stderr] {data.rstrip()}", file=sys.stderr) + print(f"[{source_name}:log] {data.rstrip()}", file=sys.stderr) if process.poll() is not None: break @@ -274,9 +284,9 @@ def _execute_source_action( # Kick off monitoring threads output = [] - t_stdout: Thread = Thread(target=_read_stdout, args=(process, output), daemon=True) + t_stdout: Thread = Thread(target=_read_stdout, args=(process, source.source_name, output), daemon=True) t_stdout.start() - t_stderr: Thread = Thread(target=_read_stderr, args=(process,), daemon=True) + t_stderr: Thread = Thread(target=_read_stderr, args=(process, source.source_name), daemon=True) t_stderr.start() # Send input to the process, if provided