As indicated in the title, this small project fetches content from the SEC's Edgar REST API. It was designed with the intention of automatically gathering/refreshing large datasets of public company financial data on a Linux server.
For some additional context, I am starting a pivot from a pure financial services role (where I program 1-2 hours per day on average) into a role that involves more programming. I do not have any formal education in STEM, so I am looking for harsh critique on my code (conventions, project structure, etc.). My goal is to be able to write production-quality code upon starting.
Without further ado, please see: https://github.com/ryan-d-young/edgar
(demo at bottom of readme)
Posted in rough order of the program's primary loop:
endpoint.py
import requests
import time
from typing import Callable, Optional
from .constants import (
USER_AGENT, LAST_PERIOD, REQUESTS_PER_SEC,
BUFFER_MS, TIMEOUT_SEC, DEFAULT_TAX, DEFAULT_UNIT)
class Limiter(object):
HISTORY = []
@staticmethod
def request(func: Callable):
def wrapper(self, url: str):
now = time.time()
Limiter.HISTORY.append(now)
if len(Limiter.HISTORY) > REQUESTS_PER_SEC:
elapsed = now - Limiter.HISTORY[-REQUESTS_PER_SEC]
if remaining := 1 - elapsed > 0:
time.sleep(remaining + BUFFER_MS/1000)
return func(self, url)
return wrapper
class Endpoint(object):
def __init__(self):
self._session = requests.Session()
self._session.headers = {
'Host': 'data.sec.gov',
'User-Agent': USER_AGENT,
'Accept-Encoding': 'gzip, deflate'}
@property
def session(self):
return self._session
@Limiter.request
def _get(self, url: str):
try:
return self.session.get(url, timeout=TIMEOUT_SEC)
except Exception as e:
raise Exception("Error occurred making request...\n"
f" {e.__class__.__name__}: {e}")
def get_submissions(
self,
cik: str = '0000320193'
) -> requests.Response:
url = f'https://data.sec.gov/submissions/CIK{cik}.json'
return self._get(url)
def get_concept(
self,
cik: str = '0000320193',
tag: str = 'Assets',
taxonomy: str = DEFAULT_TAX
) -> requests.Response:
url = f"https://data.sec.gov/api/xbrl/companyconcept/CIK{cik}/{taxonomy}/{tag}.json"
return self._get(url)
def get_facts(
self,
cik: str = '0000320193'
) -> requests.Response:
url = f"https://data.sec.gov/api/xbrl/companyfacts/CIK{cik}.json"
return self._get(url)
def get_frame(
self,
tag: str = 'Assets',
period: str = LAST_PERIOD,
taxonomy: str = DEFAULT_TAX,
unit: str = DEFAULT_UNIT,
) -> requests.Response:
url = f"https://data.sec.gov/api/xbrl/frames/{taxonomy}/{tag}/{unit}/{period}.json"
return self._get(url)
parse.py
import datetime
from requests import Response
from .constants import DFMT
def _parse_submissions(response: Response) -> list[dict]:
response = response.json()
filings = response['filings']['recent']
res = []
for ix in range(len(filings['form'])):
if (filings['form'][ix] == '10-K') or (filings['form'][ix] == '10-Q'):
submission = {
'form': filings['form'][ix],
'accession_number': filings['accessionNumber'][ix],
'filing_date': datetime.datetime.strptime(filings['filingDate'][ix], DFMT).date(),
'report_date': datetime.datetime.strptime(filings['reportDate'][ix], DFMT).date(),
'file_number': filings['fileNumber'][ix],
'film_number': filings['filmNumber'][ix],
'primary_document': filings['primaryDocument'][ix],
'is_xbrl': bool(filings['isXBRL'][ix])}
res.append(submission)
return res
def _parse_concept(response: Response) -> list[dict]:
response = response.json()
res = []
for unit in response['units'].keys():
for record in response['units'][unit]:
concept = {
'unit': unit,
'fiscal_year': record['fy'],
'fiscal_quarter': record['fp'],
'form': record['form'],
'value': record['val'],
'accession_number': record['accn']}
res.append(concept)
return res
def _parse_facts(response: Response) -> list[dict]:
response = response.json()
res = []
for taxonomy in response['facts'].keys():
for line_item in response['facts'][taxonomy].keys():
facts = response['facts'][taxonomy][line_item]
units = facts['units']
for unit, records in units.items():
for record in records:
fact = {
'taxonomy': taxonomy,
'line_item': line_item,
'unit': unit,
'label': facts['label'],
'description': facts['description'],
'end': datetime.datetime.strptime(record['end'], DFMT).date(),
'accession_number': record['accn'],
'fiscal_year': record['fy'],
'fiscal_period': record['fp'],
'form': record['form'],
'filed': record['filed']}
res.append(fact)
return res
def _parse_frame(response: Response) -> list[dict]:
response = response.json()
res = []
for record in response['data']:
frame = {
'taxonomy': response['taxonomy'],
'line_item': response['tag'],
'frame': response['ccp'],
'unit': response['uom'],
'label': response['label'],
'description': response['description'],
'accession_number': record['accn'],
'cik': record['cik'],
'entity_name': record['entityName'],
'location': record['loc'],
'end': datetime.datetime.strptime(record['end'], DFMT).date(),
'value': record['val']}
res.append(frame)
return res
def parse_response(response: Response):
url = response.url.split('/')
try:
if "submissions" in url:
return _parse_submissions(response)
elif "companyconcept" in url:
return _parse_concept(response)
elif "companyfacts" in url:
return _parse_facts(response)
elif "frames" in url:
return _parse_frame(response)
raise ValueError("Unrecognized response format"
f"url: {response.url}")
except Exception as e:
raise Exception("Error occurred during parsing...\n"
f" {e.__class__.__name__}: {e}")
cli.py
import argparse
ARG_PARSER = argparse.ArgumentParser(
description='programmatic access to SEC EDGAR API')
ARG_PARSER.add_argument(
'endpoint',
metavar='endpoint',
nargs=1,
type=str,
choices=['facts', 'concept', 'frame', 'submissions'])
ARG_PARSER.add_argument(
'--args',
metavar='args',
nargs="*",
type=str)
ARG_PARSER.add_argument(
'--output',
metavar='dest',
nargs=1,
type=str)
__main__.py
import os
import json
from pathlib import Path
from pprint import pprint
from .cli import ARG_PARSER
from .endpoint import Endpoint
from .constants import MODNAME
from .map_tickers import sanitize_cik
from .parse import parse_response
with open(Path(os.getcwd()) / MODNAME / 'mappings' / 'ticker_to_cik_txt.json') as fp:
MAP = json.load(fp)
endpoint = Endpoint()
namespace = ARG_PARSER.parse_args()
func = getattr(endpoint, f'get_{namespace.endpoint[0]}')
args = []
if namespace.args:
if namespace.endpoint[0] in ['submissions', 'concept', 'facts']:
ticker = namespace.args.pop(0)
cik = MAP[ticker.lower()][0]
args += [sanitize_cik(cik),]
args += namespace.args
response = func(*args)
if response.status_code != 200:
raise Exception(response.text)
processed = parse_response(response)
pprint(processed)
if namespace.output:
with open(Path(namespace.output[0]), 'x') as fp:
fp.write(result)
This runs outside of the main loop:
map_tickers.py
import json
import requests
from pathlib import Path
from .constants import TIMEOUT_SEC, USER_AGENT, MODNAME
def sanitize_cik(cik: int | str) -> str:
cik = str(cik)
cik = '0' * (10 - len(cik)) + cik
return cik
def _fetch_raw_json_mapping() -> requests.Response:
url = 'https://www.sec.gov/files/company_tickers.json'
request = requests.get(
url,
headers={
'Host': 'www.sec.gov',
'User-Agent': USER_AGENT,
'Accept-Encoding': 'gzip, deflate'},
timeout=TIMEOUT_SEC)
return request
def _fetch_raw_txt_mapping() -> requests.Response:
url = 'https://www.sec.gov/include/ticker.txt'
request = requests.get(
url,
headers={
'Host': 'www.sec.gov',
'User-Agent': USER_AGENT,
'Accept-Encoding': 'gzip, deflate'},
timeout=TIMEOUT_SEC)
return request
def _load_raw_json_mapping(root: str) -> str:
fp = Path(root) / MODNAME / 'mappings' / 'company_tickers.json'
with open(fp) as file:
file = file.read()
return file
def _load_raw_txt_mapping(root: str) -> str:
fp = Path(root) / MODNAME / 'mappings' / 'ticker.txt'
with open(fp) as file:
file = file.read()
return file
def _load_json_mapping(root: str, index_on_cik: bool = True):
fp = Path(root) / MODNAME / 'mappings'
fp = fp / 'cik_to_ticker_json.json' if index_on_cik else fp / 'ticker_to_cik_json.json'
if fp.exists():
with open(fp) as file:
file = file.read()
res = json.loads(file)
else:
file = _load_raw_json_mapping(root)
res = process_json_mapping(file, index_on_cik)
return res
def _load_txt_mapping(root: str, index_on_cik: bool = True):
fp = Path(root) / MODNAME / 'mappings'
fp = fp / 'cik_to_ticker_txt.txt' if index_on_cik else fp / 'ticker_to_cik_txt.txt'
if fp.exists():
with open(fp) as file:
file = file.read()
res = json.loads(file)
else:
file = load_raw_txt_mapping(root)
res = process_txt_mapping(file, index_on_cik)
return res
def _save_json_mapping(mapping: dict, root: str, index_on_cik: bool = True):
fp = Path(root) / MODNAME / 'mappings'
fp = fp / 'cik_to_ticker_json.json' if index_on_cik else fp / 'ticker_to_cik_json.json'
mapping = json.dumps(mapping)
with open(fp, mode='w') as file:
file.write(mapping)
def _save_txt_mapping(mapping: dict, root: str, index_on_cik: bool = True):
fp = Path(root) / MODNAME / 'mappings'
fp = fp / 'cik_to_ticker_txt.json' if index_on_cik else fp / 'ticker_to_cik_txt.json'
mapping = json.dumps(mapping)
with open(fp, mode='w') as file:
file.write(mapping)
def _process_txt_mapping(raw: bytes, index_on_cik: bool = True) -> dict[str, list[str]]:
res = {}
pairs = raw.decode().split('\n')
def cik_to_ticker(res: dict, pairs: list[str]) -> dict[str, list[str]]:
for pair in pairs:
ticker, cik = pair.split('\t')
if cik not in res.keys():
res[cik] = [ticker,]
else:
res[cik] += [ticker,]
return res
def ticker_to_cik(res: dict, pairs: list[str]) -> dict[str, list[str]]:
for pair in pairs:
ticker, cik = pair.split('\t')
if ticker not in res.keys():
res[ticker] = [cik,]
else:
res[ticker] += [cik,]
return res
return cik_to_ticker(res, pairs) if index_on_cik else ticker_to_cik(res, pairs)
def _process_json_mapping(raw: bytes, index_on_cik: bool = True) -> dict[str, dict[str, str]]:
res = {}
raw = json.loads(raw)
def cik_to_ticker(res: dict, raw: dict) -> dict[str, dict[str, str]]:
for record in raw.values():
if record['cik_str'] not in res.keys():
res[record['cik_str']] = {}
res[record['cik_str']]['name'] = [record['title'],]
res[record['cik_str']]['ticker'] = [record['ticker'],]
else:
res[record['cik_str']]['name'] += [record['title'],]
res[record['cik_str']]['ticker'] += [record['ticker'],]
return res
def ticker_to_cik(res: dict, raw: dict) -> dict[str, dict[str, str]]:
for record in raw.values():
if record['ticker'] not in res.keys():
res[record['ticker']] = {}
res[record['ticker']]['name'] = [record['title'],]
res[record['ticker']]['cik'] = [record['cik_str'],]
else:
res[record['ticker']]['name'] += [record['title'],]
res[record['ticker']]['cik'] += [record['cik_str'],]
return res
return cik_to_ticker(res, raw) if index_on_cik else ticker_to_cik(res, raw)
def main():
import os
root = os.getcwd()
json_mapping = _fetch_raw_json_mapping().content
json_mapping_cik_ix = _process_json_mapping(json_mapping)
json_mapping_ticker_ix = _process_json_mapping(json_mapping, index_on_cik=False)
_save_json_mapping(json_mapping_cik_ix, root)
_save_json_mapping(json_mapping_ticker_ix, root, index_on_cik=False)
txt_mapping = _fetch_raw_txt_mapping().content
txt_mapping_cik_ix = _process_txt_mapping(txt_mapping)
txt_mapping_ticker_ix = _process_txt_mapping(txt_mapping, index_on_cik=False)
_save_txt_mapping(txt_mapping_cik_ix, root)
_save_txt_mapping(txt_mapping_ticker_ix, root, index_on_cik=False)
if __name__ == '__main__':
main()
Lastly,
__init__.py
from .endpoint import Endpoint
and,
constants.py
import os
import json
import dotenv
import datetime
from math import ceil
MODNAME = "edgar"
DFMT = "%Y-%m-%d"
USER_AGENT = os.environ['EDGAR_USER_AGENT']
DEFAULT_CIK = "0000320193" # AAPL
DEFAULT_TAX = 'us-gaap'
DEFAULT_UNIT = 'USD'
REQUESTS_PER_SEC = 10
BUFFER_MS = 100
TIMEOUT_SEC = 5
TODAY = datetime.date.today()
if TODAY.month - 3 < 0:
LAST_PERIOD = f"CY{TODAY.year - 1}Q4I"
else:
LAST_PERIOD = f"CY{TODAY.year}Q{ceil(4 * (TODAY.month/12))}I"