Add basic chunk reading and subprocess execution

This commit is contained in:
Tim Van Baak 2020-02-10 23:16:22 -08:00
parent 7421e2e009
commit f749606f57
1 changed files with 162 additions and 1 deletions

View File

@ -1,2 +1,163 @@
# Standard library imports
from argparse import ArgumentParser, FileType
import logging
import shlex
from signal import signal, SIGPIPE, SIG_DFL
import subprocess
import sys
# Configure SIGPIPE signal handler
signal(SIGPIPE,SIG_DFL)
def parse_args():
"""
Helper function for argument parsing.
"""
parser = ArgumentParser(description="Pass chunks of a stream to another "
"command invocation")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose "
"logging.")
parser.add_argument("--chunk", type=int, default=1024, help="Chunk size in bytes.")
parser.add_argument("--bufsize", type=int, default=1024, help="Buffer "
"size for reading from stdin.")
parser.add_argument("--exec", required=True, help="Command to pipe chunks to.")
# parser.add_argument("--inc", help="Pattern to find/replace with the "
# "chunk number.")
# parser.add_argument("--inc-width", type=int, default=3, help="Number of "
# "digits to pad the increment with.")
parser.add_argument("--skip", type=int, default=0, help="Skip a number of "
"chunks at the start of the input stream.")
args = parser.parse_args()
return args
def initialize_logging(verbose):
"""
Sets up logging.
"""
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
logger.addHandler(handler)
if not verbose:
logger.setLevel(logging.INFO)
handler.setLevel(logging.INFO)
formatter = logging.Formatter("{message}", style="{")
else:
logger.setLevel(logging.DEBUG)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("[{asctime} {levelname}] {funcName}: {message}", style="{")
handler.setFormatter(formatter)
return logger
def chunk_buffers(stream, chunk_size, buf_size):
"""
A generator that yields buffers from the given stream of size
buf_size, totaling chunk_size bytes.
"""
logger = logging.getLogger(__name__)
logger.debug(f"ENTER ({stream.name}, {chunk_size}, {buf_size})")
total_bytes_read = 0
while total_bytes_read < chunk_size:
# Read up to chunk_size bytes
bytes_to_read = min(chunk_size - total_bytes_read, buf_size)
logger.debug(f"Read {total_bytes_read} of {chunk_size} bytes, reading {bytes_to_read} bytes")
buf = stream.read(bytes_to_read)
# Terminate if no more bytes are available
if not buf:
logger.debug(f"EXIT Terminated after {total_bytes_read} of {chunk_size} bytes")
return
# Otherwise, update how many bytes have been read and yield them
bytes_read = len(buf)
total_bytes_read += bytes_read
yield buf
logger.debug(f"EXIT Completed after {total_bytes_read} of {chunk_size} bytes")
return
def chunk_and_first_buf(buf, chunk):
"""
Generator wrapper to allow peeking without losing the first buffer.
"""
yield buf
for chunk_buf in chunk:
yield chunk_buf
return
def next_chunk(stream, chunk_size, buf_size):
"""
Returns a chunk_buffers generator if there are buffers left to be
read in the input stream, otherwise returns None.
"""
logger = logging.getLogger(__name__)
logger.debug(f"ENTER ({stream.name}, {chunk_size}, {buf_size})")
try:
chunk = chunk_buffers(stream, chunk_size, buf_size)
buf = next(chunk)
if buf:
logger.debug("EXIT Returning nonempty chunk")
return chunk_and_first_buf(buf, chunk)
except:
# Control reaches here when next(chunk) throws StopIteration
logger.debug("EXIT No next chunk")
raise
def input_chunks(stream, chunk_size, buf_size):
"""
Provides a generator for each chunk in the given stream.
"""
logger = logging.getLogger(__name__)
logger.debug(f"ENTER ({stream.name}, {chunk_size}, {buf_size})")
chunk = next_chunk(stream, chunk_size, buf_size)
while chunk:
yield chunk
chunk = next_chunk(stream, chunk_size, buf_size)
yield chunk
logger.debug("EXIT No more chunks in input stream")
return
def pipe_chunk_to_command(command, chunk):
"""
Executes the given command, passing the chunk to its stdin via buf_size buffers.
"""
logger = logging.getLogger(__name__)
logger.info(f"Piping chunk | {command}")
out = b""
with subprocess.Popen(command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT) as process:
for buf in chunk:
outs, errs = process.communicate(input=buf)
out += outs
logger.info(out.decode('utf8'))
logger.debug(f"Process returned {process.returncode}")
return process
def main(): def main():
print("oh lawd he comin") """
Entry function.
"""
# Parse command line arguments
args = parse_args()
logger = initialize_logging(args.verbose)
# Skip chunks to get to the beginning of the operating section
logger.debug(f"Skipping {args.skip} chunks")
for i in range(args.skip):
for buf in chunk_buffers(sys.stdin.buffer, args.chunk, args.bufsize):
pass
# Process each chunk in the section of the stream to operate on
logger.debug("Reading chunks")
for chunk in input_chunks(sys.stdin.buffer, args.chunk, args.bufsize):
pipe_chunk_to_command(shlex.split(args.exec), chunk)