10

[Edit: Read accepted answer first. The long investigation below stems from a subtle blunder in the timing measurement.]

I often need to process extremely large (100GB+) text/CSV-like files containing highly redundant data that cannot practically be stored on disk uncompressed. I rely heavily on external compressors like lz4 and zstd, which produce stdout streams approaching 1GB/s.

As such, I care a lot about the performance of Unix shell pipelines. But large shell scripts are difficult to maintain, so I tend to construct pipelines in Python, stitching commands together with careful use of shlex.quote().

This process is tedious and error-prone, so I'd like a "Pythonic" way to achieve the same end, managing the stdin/stdout file descriptors in Python without offloading to /bin/sh. However, I've never found a method of doing this without greatly sacrificing performance.

Python 3's documentation recommends replacing shell pipelines with the communicate() method on subprocess.Popen. I've adapted this example to create the following test script, which pipes 3GB of /dev/zero into a useless grep, which outputs nothing:

#!/usr/bin/env python3
from shlex import quote
from subprocess import Popen, PIPE
from time import perf_counter

BYTE_COUNT = 3_000_000_000
UNQUOTED_HEAD_CMD = ["head", "-c", str(BYTE_COUNT), "/dev/zero"]
UNQUOTED_GREP_CMD = ["grep", "Arbitrary string which will not be found."]

QUOTED_SHELL_PIPELINE = " | ".join(
    " ".join(quote(s) for s in cmd)
    for cmd in [UNQUOTED_HEAD_CMD, UNQUOTED_GREP_CMD]
)

perf_counter()
proc = Popen(QUOTED_SHELL_PIPELINE, shell=True)
proc.wait()
print(f"Time to run using shell pipeline: {perf_counter()} seconds")

perf_counter()
p1 = Popen(UNQUOTED_HEAD_CMD, stdout=PIPE)
p2 = Popen(UNQUOTED_GREP_CMD, stdin=p1.stdout, stdout=PIPE)
p1.stdout.close()
p2.communicate()
print(f"Time to run using subprocess.PIPE: {perf_counter()} seconds")

Output:

Time to run using shell pipeline: 2.412427189 seconds
Time to run using subprocess.PIPE: 4.862174164 seconds

The subprocess.PIPE approach is more than twice as slow as /bin/sh. If we raise the input size to 90GB (BYTE_COUNT = 90_000_000_000), we confirm this is not a constant-time overhead:

Time to run using shell pipeline: 88.796322932 seconds
Time to run using subprocess.PIPE: 183.734968687 seconds

My assumption up to now was that subprocess.PIPE is simply a high-level abstraction for connecting file descriptors, and that data is never copied into the Python process itself. As expected, when running the above test head uses 100% CPU but subproc_test.py uses near-zero CPU and RAM.

Given that, why is my pipeline so slow? Is this an intrinsic limitation of Python's subprocess? If so, what does /bin/sh do differently under the hood that makes it twice as fast?

More generally, are there better methods for building large, high-performance subprocess pipelines in Python?

12
  • Is this really the kind of data that should be processed locally? It sounds like it needs some kind of cluster technology
    – roganjosh
    Commented Oct 18, 2018 at 18:28
  • 1
    In most cases, the time to upload the data to another server is immensely greater than the runtime of the shell scripts I'm replacing. Any DBMS/Hadoop-like/"big data" tool I know of would take much longer still to ingest/ETL the data, let alone do the processing of my scripts. The tasks I'm considering are all perfectly viable on a single laptop today, and could be written in Bash. I just prefer Python control flow and would like to avoid shelling out if possible.
    – goodside
    Commented Oct 18, 2018 at 18:44
  • That's fair enough, I was curious whether the whole project should be hoisted into the cloud so the infrastructure already exists :)
    – roganjosh
    Commented Oct 18, 2018 at 18:45
  • BTW, the use of shell=True here is... unfortunate. If your substring_which_will_never_be_found contained $(rm -rf ~) in it, or -- worse -- $(rm -rf ~)'$(rm -rf ~)', you'd have a very bad day. (Relying on shlex.split() isn't good form either -- if you have a name with a space, you want to keep it as one name; populate an array or tuple by hand, and you don't need to worry about your content being munged). Commented Oct 18, 2018 at 19:22
  • ...moving towards the topic -- yes, subprocess.PIPE is a high-level abstraction for connecting file descriptors; no, the data isn't copied into the Python process's namespace. Why you're seeing a difference here is a good question -- I'd need to dig in; wouldn't be surprised if it were related to buffering settings on the file descriptors. Commented Oct 18, 2018 at 19:24

3 Answers 3

3

You're timing it wrong. Your perf_counter() calls don't start and stop a timer; they just return a number of seconds since some arbitrary starting point. That starting point probably happens to be the first perf_counter() call here, but it could be any point, even one in the future.

The actual time taken by the subprocess.PIPE method is 4.862174164 - 2.412427189 = 2.449746975 seconds, not 4.862174164 seconds. This timing does not show a measurable performance penalty from subprocess.PIPE.

1
  • Wow, I am dumb. Thank you.
    – goodside
    Commented Oct 18, 2018 at 19:41
0

Also, take this into account, for Popen:

Changed in version 3.3.1: bufsize now defaults to -1 to enable buffering by default to match the behavior that most code expects. In versions prior to Python 3.2.4 and 3.3.1 it incorrectly defaulted to 0 which was unbuffered and allowed short reads. This was unintentional and did not match the behavior of Python 2 as most code expected.

-1

In python3 there is "the python way" and "the one we don't mention". (Though it pains me to abuse RAM, there does seem to be rather a lot of it available these days.)

#!/usr/bin/env python3
# how you are "meant" to do it
import subprocess
ps = subprocess.Popen(('ip', 'a'), stdout=subprocess.PIPE)
pt = subprocess.Popen(('grep', '192'), stdin=ps.stdout, stdout=subprocess.PIPE)
pu = subprocess.Popen(('awk', '{print $2}'), stdin=pt.stdout, stdout=subprocess.PIPE)
pv = subprocess.Popen(('sed', 's;/.*;;'), stdin=pu.stdout, stdout=subprocess.PIPE)
#ps.wait()
#ps.stdout.close()
output = pv.communicate()[0]
print(output.decode('utf-8').rstrip())

# OR (the 1 we don't mention)
import os
print(os.popen('ip a|grep 192|awk \'{print $2}\'|sed \'s;/.*;;\'').read().rstrip())

# or (the 1 we don't mention, pretending to be PEM compliant)
cmd="ip a|grep 192|awk '{print $2}'|sed 's;/.*;;'"
print(os.popen(cmd).read().rstrip())
2
  • Seems some pythonistas like to bite at information that might be helpful without explaining their objection. Commented Oct 18, 2018 at 22:39
  • 4
    I think the downvote (not by me) is likely due to your answer being bad in multiple ways. First, you do not answer the question of the OP; second, you do not explain what you are doing; third you are using a deprecated function (os.popen) which, in Python 3, is implemented using subprocess.Popen anyway, as can be seen in stackoverflow.com/a/41678241/7738328; and fourth, comes with not explained assertion that it somehow absues RAM.
    – JohanL
    Commented Oct 19, 2018 at 17:12

Not the answer you're looking for? Browse other questions tagged or ask your own question.