6
\$\begingroup\$

As indicated in the title, this small project fetches content from the SEC's Edgar REST API. It was designed with the intention of automatically gathering/refreshing large datasets of public company financial data on a Linux server.

For some additional context, I am starting a pivot from a pure financial services role (where I program 1-2 hours per day on average) into a role that involves more programming. I do not have any formal education in STEM, so I am looking for harsh critique on my code (conventions, project structure, etc.). My goal is to be able to write production-quality code upon starting.

Without further ado, please see: https://github.com/ryan-d-young/edgar

(demo at bottom of readme)

Posted in rough order of the program's primary loop:

endpoint.py

import requests
import time
from typing import Callable, Optional

from .constants import (
    USER_AGENT, LAST_PERIOD, REQUESTS_PER_SEC,
    BUFFER_MS, TIMEOUT_SEC, DEFAULT_TAX, DEFAULT_UNIT)


class Limiter(object):
    HISTORY = []

    @staticmethod
    def request(func: Callable):
        def wrapper(self, url: str):
            now = time.time()
            Limiter.HISTORY.append(now)
            if len(Limiter.HISTORY) > REQUESTS_PER_SEC:
                elapsed = now - Limiter.HISTORY[-REQUESTS_PER_SEC]
                if remaining := 1 - elapsed > 0:
                    time.sleep(remaining + BUFFER_MS/1000)
            return func(self, url)
        return wrapper


class Endpoint(object):
    def __init__(self):
        self._session = requests.Session()
        self._session.headers = {
            'Host': 'data.sec.gov',
            'User-Agent': USER_AGENT,
            'Accept-Encoding': 'gzip, deflate'}

    @property
    def session(self):
        return self._session

    @Limiter.request
    def _get(self, url: str):
        try:
            return self.session.get(url, timeout=TIMEOUT_SEC)
        except Exception as e:
            raise Exception("Error occurred making request...\n"
                            f"    {e.__class__.__name__}: {e}")

    def get_submissions(
            self,
            cik: str = '0000320193'
    ) -> requests.Response:
        url = f'https://data.sec.gov/submissions/CIK{cik}.json'
        return self._get(url)

    def get_concept(
            self,
            cik: str = '0000320193',
            tag: str = 'Assets',
            taxonomy: str = DEFAULT_TAX
    ) -> requests.Response:
        url = f"https://data.sec.gov/api/xbrl/companyconcept/CIK{cik}/{taxonomy}/{tag}.json"
        return self._get(url)

    def get_facts(
            self,
            cik: str = '0000320193'
    ) -> requests.Response:
        url = f"https://data.sec.gov/api/xbrl/companyfacts/CIK{cik}.json"
        return self._get(url)

    def get_frame(
            self,
            tag: str = 'Assets',
            period: str = LAST_PERIOD,
            taxonomy: str = DEFAULT_TAX,
            unit: str = DEFAULT_UNIT,
    ) -> requests.Response:
        url = f"https://data.sec.gov/api/xbrl/frames/{taxonomy}/{tag}/{unit}/{period}.json"
        return self._get(url)

parse.py

import datetime
from requests import Response

from .constants import DFMT


def _parse_submissions(response: Response) -> list[dict]:
    response = response.json()
    filings = response['filings']['recent']
    res = []

    for ix in range(len(filings['form'])):
        if (filings['form'][ix] == '10-K') or (filings['form'][ix] == '10-Q'):
            submission = {
                'form': filings['form'][ix],
                'accession_number': filings['accessionNumber'][ix],
                'filing_date': datetime.datetime.strptime(filings['filingDate'][ix], DFMT).date(),
                'report_date': datetime.datetime.strptime(filings['reportDate'][ix], DFMT).date(),
                'file_number': filings['fileNumber'][ix],
                'film_number': filings['filmNumber'][ix],
                'primary_document': filings['primaryDocument'][ix],
                'is_xbrl': bool(filings['isXBRL'][ix])}
            res.append(submission)

    return res


def _parse_concept(response: Response) -> list[dict]:
    response = response.json()
    res = []

    for unit in response['units'].keys():
        for record in response['units'][unit]:
            concept = {
                'unit': unit,
                'fiscal_year': record['fy'],
                'fiscal_quarter': record['fp'],
                'form': record['form'],
                'value': record['val'],
                'accession_number': record['accn']}
            res.append(concept)

    return res


def _parse_facts(response: Response) -> list[dict]:
    response = response.json()
    res = []

    for taxonomy in response['facts'].keys():
        for line_item in response['facts'][taxonomy].keys():
            facts = response['facts'][taxonomy][line_item]
            units = facts['units']
            for unit, records in units.items():
                for record in records:
                    fact = {
                        'taxonomy': taxonomy,
                        'line_item': line_item,
                        'unit': unit,
                        'label': facts['label'],
                        'description': facts['description'],
                        'end': datetime.datetime.strptime(record['end'], DFMT).date(),
                        'accession_number': record['accn'],
                        'fiscal_year': record['fy'],
                        'fiscal_period': record['fp'],
                        'form': record['form'],
                        'filed': record['filed']}
                    res.append(fact)

    return res


def _parse_frame(response: Response) -> list[dict]:
    response = response.json()
    res = []

    for record in response['data']:
        frame = {
            'taxonomy': response['taxonomy'],
            'line_item': response['tag'],
            'frame': response['ccp'],
            'unit': response['uom'],
            'label': response['label'],
            'description': response['description'],
            'accession_number': record['accn'],
            'cik': record['cik'],
            'entity_name': record['entityName'],
            'location': record['loc'],
            'end': datetime.datetime.strptime(record['end'], DFMT).date(),
            'value': record['val']}
        res.append(frame)

    return res


def parse_response(response: Response):
    url = response.url.split('/')
    try:
        if "submissions" in url:
            return _parse_submissions(response)
        elif "companyconcept" in url:
            return _parse_concept(response)
        elif "companyfacts" in url:
            return _parse_facts(response)
        elif "frames" in url:
            return _parse_frame(response)
        raise ValueError("Unrecognized response format"
                         f"url: {response.url}")
    except Exception as e:
        raise Exception("Error occurred during parsing...\n"
                        f"    {e.__class__.__name__}: {e}")

cli.py

import argparse


ARG_PARSER = argparse.ArgumentParser(
    description='programmatic access to SEC EDGAR API')
ARG_PARSER.add_argument(
    'endpoint',
    metavar='endpoint',
    nargs=1,
    type=str,
    choices=['facts', 'concept', 'frame', 'submissions'])
ARG_PARSER.add_argument(
    '--args',
    metavar='args',
    nargs="*",
    type=str)
ARG_PARSER.add_argument(
    '--output',
    metavar='dest',
    nargs=1,
    type=str)

__main__.py

import os
import json
from pathlib import Path
from pprint import pprint

from .cli import ARG_PARSER
from .endpoint import Endpoint
from .constants import MODNAME
from .map_tickers import sanitize_cik
from .parse import parse_response


with open(Path(os.getcwd()) / MODNAME / 'mappings' / 'ticker_to_cik_txt.json') as fp:
    MAP = json.load(fp)

endpoint = Endpoint()
namespace = ARG_PARSER.parse_args()
func = getattr(endpoint, f'get_{namespace.endpoint[0]}')

args = []
if namespace.args:
    if namespace.endpoint[0] in ['submissions', 'concept', 'facts']:
        ticker = namespace.args.pop(0)
        cik = MAP[ticker.lower()][0]
        args += [sanitize_cik(cik),]
    args += namespace.args

response = func(*args)

if response.status_code != 200:
    raise Exception(response.text)

processed = parse_response(response)
pprint(processed)

if namespace.output:
    with open(Path(namespace.output[0]), 'x') as fp:
        fp.write(result)

This runs outside of the main loop:

map_tickers.py

import json
import requests
from pathlib import Path

from .constants import TIMEOUT_SEC, USER_AGENT, MODNAME


def sanitize_cik(cik: int | str) -> str:
    cik = str(cik)
    cik = '0' * (10 - len(cik)) + cik
    return cik


def _fetch_raw_json_mapping() -> requests.Response:
    url = 'https://www.sec.gov/files/company_tickers.json'
    request = requests.get(
        url,
        headers={
            'Host': 'www.sec.gov',
            'User-Agent': USER_AGENT,
            'Accept-Encoding': 'gzip, deflate'},
        timeout=TIMEOUT_SEC)
    return request


def _fetch_raw_txt_mapping() -> requests.Response:
    url = 'https://www.sec.gov/include/ticker.txt'
    request = requests.get(
        url,
        headers={
            'Host': 'www.sec.gov',
            'User-Agent': USER_AGENT,
            'Accept-Encoding': 'gzip, deflate'},
        timeout=TIMEOUT_SEC)
    return request


def _load_raw_json_mapping(root: str) -> str:
    fp = Path(root) / MODNAME / 'mappings' / 'company_tickers.json'
    with open(fp) as file:
        file = file.read()
    return file


def _load_raw_txt_mapping(root: str) -> str:
    fp = Path(root) / MODNAME / 'mappings' / 'ticker.txt'
    with open(fp) as file:
        file = file.read()
    return file


def _load_json_mapping(root: str, index_on_cik: bool = True):
    fp = Path(root) / MODNAME / 'mappings'
    fp = fp / 'cik_to_ticker_json.json' if index_on_cik else fp / 'ticker_to_cik_json.json'
    if fp.exists():
        with open(fp) as file:
            file = file.read()
            res = json.loads(file)
    else:
        file = _load_raw_json_mapping(root)
        res = process_json_mapping(file, index_on_cik)
    return res


def _load_txt_mapping(root: str, index_on_cik: bool = True):
    fp = Path(root) / MODNAME / 'mappings'
    fp = fp / 'cik_to_ticker_txt.txt' if index_on_cik else fp / 'ticker_to_cik_txt.txt'
    if fp.exists():
        with open(fp) as file:
            file = file.read()
            res = json.loads(file)
    else:
        file = load_raw_txt_mapping(root)
        res = process_txt_mapping(file, index_on_cik)
    return res


def _save_json_mapping(mapping: dict, root: str, index_on_cik: bool = True):
    fp = Path(root) / MODNAME / 'mappings'
    fp = fp / 'cik_to_ticker_json.json' if index_on_cik else fp / 'ticker_to_cik_json.json'
    mapping = json.dumps(mapping)
    with open(fp, mode='w') as file:
        file.write(mapping)


def _save_txt_mapping(mapping: dict, root: str, index_on_cik: bool = True):
    fp = Path(root) / MODNAME / 'mappings'
    fp = fp / 'cik_to_ticker_txt.json' if index_on_cik else fp / 'ticker_to_cik_txt.json'
    mapping = json.dumps(mapping)
    with open(fp, mode='w') as file:
        file.write(mapping)


def _process_txt_mapping(raw: bytes, index_on_cik: bool = True) -> dict[str, list[str]]:
    res = {}
    pairs = raw.decode().split('\n')

    def cik_to_ticker(res: dict, pairs: list[str]) -> dict[str, list[str]]:
        for pair in pairs:
            ticker, cik = pair.split('\t')
            if cik not in res.keys():
                res[cik] = [ticker,]
            else:
                res[cik] += [ticker,]
        return res
    
    def ticker_to_cik(res: dict, pairs: list[str]) -> dict[str, list[str]]:
        for pair in pairs:
            ticker, cik = pair.split('\t')
            if ticker not in res.keys():
                res[ticker] = [cik,]
            else:
                res[ticker] += [cik,]
        return res
            
    return cik_to_ticker(res, pairs) if index_on_cik else ticker_to_cik(res, pairs)


def _process_json_mapping(raw: bytes, index_on_cik: bool = True) -> dict[str, dict[str, str]]:
    res = {}
    raw = json.loads(raw)
    
    def cik_to_ticker(res: dict, raw: dict) -> dict[str, dict[str, str]]:
        for record in raw.values():
            if record['cik_str'] not in res.keys():
                res[record['cik_str']] = {}
                res[record['cik_str']]['name'] = [record['title'],]
                res[record['cik_str']]['ticker'] = [record['ticker'],]
            else:
                res[record['cik_str']]['name'] += [record['title'],]
                res[record['cik_str']]['ticker'] += [record['ticker'],]
        return res
    
    def ticker_to_cik(res: dict, raw: dict) -> dict[str, dict[str, str]]:
        for record in raw.values():
            if record['ticker'] not in res.keys():
                res[record['ticker']] = {}
                res[record['ticker']]['name'] = [record['title'],]
                res[record['ticker']]['cik'] = [record['cik_str'],]
            else:
                res[record['ticker']]['name'] += [record['title'],]
                res[record['ticker']]['cik'] += [record['cik_str'],]
        return res
    
    return cik_to_ticker(res, raw) if index_on_cik else ticker_to_cik(res, raw)


def main():
    import os
    root = os.getcwd()
    
    json_mapping = _fetch_raw_json_mapping().content
    json_mapping_cik_ix = _process_json_mapping(json_mapping)
    json_mapping_ticker_ix = _process_json_mapping(json_mapping, index_on_cik=False)
    _save_json_mapping(json_mapping_cik_ix, root)
    _save_json_mapping(json_mapping_ticker_ix, root, index_on_cik=False)

    txt_mapping = _fetch_raw_txt_mapping().content
    txt_mapping_cik_ix = _process_txt_mapping(txt_mapping)
    txt_mapping_ticker_ix = _process_txt_mapping(txt_mapping, index_on_cik=False)
    _save_txt_mapping(txt_mapping_cik_ix, root)
    _save_txt_mapping(txt_mapping_ticker_ix, root, index_on_cik=False)


if __name__ == '__main__':
     main()

Lastly,

__init__.py

from .endpoint import Endpoint

and,

constants.py

import os
import json
import dotenv
import datetime
from math import ceil


MODNAME = "edgar"
DFMT = "%Y-%m-%d"
USER_AGENT = os.environ['EDGAR_USER_AGENT']
DEFAULT_CIK = "0000320193"  # AAPL
DEFAULT_TAX = 'us-gaap'
DEFAULT_UNIT = 'USD'
REQUESTS_PER_SEC = 10
BUFFER_MS = 100
TIMEOUT_SEC = 5
TODAY = datetime.date.today()

if TODAY.month - 3 < 0:
    LAST_PERIOD = f"CY{TODAY.year - 1}Q4I"
else:
    LAST_PERIOD = f"CY{TODAY.year}Q{ceil(4 * (TODAY.month/12))}I"
\$\endgroup\$
1
  • 1
    \$\begingroup\$ Thanks Sam. Apologies for missing this requirement originally. Will edit accordingly. \$\endgroup\$
    – Ryan
    Commented Feb 14 at 3:50

3 Answers 3

2
\$\begingroup\$

Optional

from typing import ..., Optional

Type annotations have been evolving. In recently authored source code, prefer e.g. x: int | None over x: Optional[int].

Also, please use isort *.py so your imports line up like pep-8 asked for.

It would be slightly more convenient to
from time import sleep, time.

inheritance

class Limiter(object):

No.

Writing class Limiter: suffices. Yes, we know it's an object. It was created by the python interpreter. It is definitely an object. Use that notation when app-specific inheritance details matter for the behavior of your class.

Similarly for Endpoint.

manifest constant

    HISTORY = []

This is not a manifest constant.

It is a class variable, and we will be mutating it.

Please spell it history, so as not to mislead anyone.

    @staticmethod
    def request(func: Callable):
        ...
            Limiter.HISTORY.append(now)

This seems slightly inconvenient. Consider using @classmethod, so you can refer to cls.history.....

obscure interaction among globals

            if len(Limiter.HISTORY) > REQUESTS_PER_SEC:
                elapsed = now - Limiter.HISTORY[-REQUESTS_PER_SEC]
                if remaining := 1 - elapsed > 0:
                    time.sleep(remaining + BUFFER_MS/1000)

This is obscure. It does something, some kind of rate limiting, but it's not clear just what limit it enforces. Leaky bucket? Cite a reference. It seems to have a 1000 msec scheduling interval hardcoded in it. We need to see a """docstring""" for this. Do not merge this code down to main as-is.

magic string

class Endpoint ...
    def __init__(self):
        ...
        self._session.headers = {
            'Host': 'data.sec.gov',

This is pretty OK, as stated.

Prefer this ctor signature:

    def __init__(self, host='data.sec.gov'):
        ...
        self._session.headers = {
            'Host': host,

Then a unit test can override with e.g. a 'data.example.com' server.

app-specific exception

            raise Exception("Error occurred making request...

Prefer to define a TimeoutError or something similar, and raise that.

As written, you have made it very difficult for caller to focus on catching a specific error that it anticipates and knows how to recover from.

As a separate matter, please wrap the exception e, rather than discarding it. You're throwing away details such as source line of the caller who triggered the exception, things a maintenance engineer will need to know when diagnosing a failure. Always "wrap" when re-raising, rather than "replace".

Any way, re-raising is definitely a good call, here.

magic constant

    def get_submissions(
            self,
            cik: str = '0000320193'

This is obscure. Your keyboard has a # hash character, useful for writing comments. Give us a hint.

Also, it looks like we want to define a MANIFEST_CONSTANT, which will be used again by get_concept and get_facts.

EDIT

Oh, look at that:

DEFAULT_CIK = "0000320193"  # AAPL

Much better.

date parse

In _parse_submissions I really like how DFMT uniformly turns Y-m-d strings into a sensible date type.

        if (filings['form'][ix] == '10-K') or (filings['form'][ix] == '10-Q'):

nit: More simply this would be

        if filings['form'][ix] in ('10-K', '10-Q'):

And in _parse_concept:

    for unit in response['units'].keys():

it is fine as-is, but you don't need to ask for .keys(). Iterating over a dict will iterate over its keys.

Similarly in _parse_facts.

leading zeros

def sanitize_cik( ...
    cik = '0' * (10 - len(cik)) + cik

nit: Usual idiom would be to format an int with: f"{cik:010d}"

redundant headers

I don't understand what's happening in _fetch_raw_json_mapping.

    url = 'https://www.sec.gov/files/company_tickers.json'
    request = requests.get(
        url,
        headers={
            'Host': 'www.sec.gov',

Surely requests will synthesize a Host: www.sec.gov header, no? Based on the input url?

Similarly in _fetch_raw_txt_mapping.

read_text()

    with open(fp) as file:
        file = file.read()
    ...
    with open(fp) as file:
        file = file.read()

Certainly these work.

But it would suffice to return fp.read_text().

non-constant manifest constant

TODAY = datetime.date.today()

Prefer a spelling of today, as this computed value is clearly is not a constant.

Similarly for last_period.

\$\endgroup\$
1
\$\begingroup\$

Don't inherit from object in Python 3.

Limiter has been designed at the incorrect level of instantiation. Its HISTORY should be a member variable on a regular instance, and Limiter should be constructed to an instance on (my guess is) Endpoint, assuming that Edgar's rate limitation happens at the session level.

As a polite REST participant, you should be setting an Accept: application/json header since that appears to be what you expect from all responses.

_get carries a collection of problems related to error handling. Don't catch the over-broad Exception, don't throw the over-broad Exception, never rethrow without also setting the cause via the from keyword, and don't silently assume that the inner get is going to succeed. Instead, after getting the response from get(), perform a raise_for_status, and let all exceptions fall through without a try/except at this level. Your error message did not add any information. This will also replace your

if response.status_code != 200:
    raise Exception(response.text)

You've tried (some) typehints - that's good! Run mypy, and fill out the rest of them, including for _get().

Depending on a lot of things, parse.py may benefit from being rewritten with Pandas. This will offer better indexing, in-built JSON-like dictionary traversal, in-built datetime parsing, etc. It will also offer better structure than your submission dictionary. Don't use dictionaries for in-application data representation. If you don't use Pandas, at least use NamedTuple or @dataclass class instances.

In _parse_submissions, again if you don't use Pandas, res = []; res.append() can be cleaned up by rewriting this function to be a generator and yielding instead of appending.

Never overwrite a variable with a different type, as you do with response = response.json().

When you type-hint dicts, always fill in the inner types, even if they're [Any, Any]. The return type from _parse_frame is probably list[dict[str, Any]].

It's not a particularly good idea for parse_response to vary its behaviour implicitly based on the URL. Instead, write four separate functions, each with a fixed URL and fixed call to its appropriate parse method.

It's good that you're using argparse. The argument definition for endpoint can be enhanced by passing type= to be an Enum class of your endpoint names, and passing choices to be a list comprehension of the values from that same class. Among other benefits, namespace.endpoint[0] in ['submissions', 'concept', 'facts'] (which should be testing membership against a set literal and not a list literal) can move that logic to an instance method on the Enum class, something like (guessing) uses_cik(self) -> bool.

For argparse positional arguments, omit nargs=1 and omit the following [0].

It's good that you're using an actual module with an actual __main__.py. That file should still enclose most of its code in a function and call that function, to make explicit that most of the symbols there are in limited scope.

func = getattr is dangerous. What if your Endpoint also defined a get_password, or get_result_of_file_deletion, etc.? You should explicitly define a dictionary of endpoint names to methods.

Replace

open(Path(os.getcwd()) / MODNAME / 'mappings' / 'ticker_to_cik_txt.json')

with

(Path.cwd() / MODNAME / 'mappings' / 'ticker_to_cik_txt.json').open()

Replace all of sanitize_cik with one format string. The str() call is implicit when passing in {cik} as a formatting parameter, and the 0-padding can be expressed in the format specification mini-language. Once you've properly designed the format string, you can replace the function with a member binding like

sanitize_cik = '{cik:010d}'.format
# ...
sanitize_cik(cik=354)
\$\endgroup\$
1
\$\begingroup\$

If you need to throttle access against rate-limited resources or implement retry on failure you can use urllib3 in conjunction with requests like this:

Source: Example: Automatic Retries

from urllib3.util import Retry
from requests import Session
from requests.adapters import HTTPAdapter

s = Session()
retries = Retry(
    total=3,
    backoff_factor=0.1,
    status_forcelist=[502, 503, 504],
    allowed_methods={'POST'},
)
s.mount('https://', HTTPAdapter(max_retries=retries))

It's quite probable that a specific HTTP code is returned (409?) when you are consuming access faster than permitted by the remote host. Then you can handle that specific code to set up some sort of sliding scale by introducing varying delays.

That gives your code a chance to recover gracefully in case of transient errors, and keep running. It is irritating to have a breakdown in the middle of a long scrape loop.

It's a good thing that you are using session. On the same page the benefits of doing so are described in technical details.

\$\endgroup\$

Not the answer you're looking for? Browse other questions tagged or ask your own question.