24

I am writing a web application which would do some heavy work. With that in mind I thought of making the tasks as background tasks(non blocking) so that other requests are not blocked by the previous ones.

I went with demonizing the thread so that it doesn't exit once the main thread (since I am using threaded=True) is finished, Now if a user sends a request my code will immediately tell them that their request is in progress, it'll be running in the background, and the application is ready to serve other requests.

My current application code looks something like this:

from flask import Flask
from flask import request
import threading

class threadClass:

    def __init__(self):
        thread = threading.Thread(target=self.run, args=())
        thread.daemon = True                       # Daemonize thread
        thread.start()                             # Start the execution

    def run(self):

         #
         # This might take several minutes to complete
         someHeavyFunction()

app = Flask(__name__)

@app.route('/start', methods=['POST'])
    try:
        begin = threadClass()
    except:
        abort(500)

    return "Task is in progress"

def main():
    """
    Main entry point into program execution

    PARAMETERS: none
    """
    app.run(host='0.0.0.0',threaded=True)

main()

I just want it to be able to handle a few concurrent requests (it's not gonna be used in production)

Could I have done this better? Did I miss anything? I was going through python's multi-threading package and found this

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

Can I demonize a process using multi-processing? How can I achieve better than what I have with threading module?

##EDIT

I went through the multi-processing package of python, it is similar to threading.

from flask import Flask
from flask import request
from multiprocessing import Process

class processClass:

    def __init__(self):
        p = Process(target=self.run, args=())
        p.daemon = True                       # Daemonize it
        p.start()                             # Start the execution

    def run(self):

         #
         # This might take several minutes to complete
         someHeavyFunction()

app = Flask(__name__)

@app.route('/start', methods=['POST'])
    try:
        begin = processClass()
    except:
        abort(500)

    return "Task is in progress"

def main():
    """
    Main entry point into program execution

    PARAMETERS: none
    """
    app.run(host='0.0.0.0',threaded=True)

main()

Does the above approach looks good?

4
  • 1
    Flask doc: Celery Based Background Tasks
    – furas
    Commented Dec 6, 2016 at 8:05
  • 1
    i know about celery, since it needs database server for backend support i didn't want to use that.
    – Bad_Coder
    Commented Dec 6, 2016 at 8:55
  • 1
    flask application with background threads
    – metmirr
    Commented Dec 6, 2016 at 9:10
  • 1
    If you've got some time have a look at Miguel Grinbergs keynote at FlaskCon (youtu.be/tdIIJuPh3SI). At 1:25:38 he begins to describe how to handle async requests and turn routes in async request handlers by using an async decorator.
    – MrLeeh
    Commented Dec 6, 2016 at 18:26

2 Answers 2

18

Best practice

The best way to implement background tasks in flask is with Celery as explained in this SO post. A good starting point is the official Flask documentation and the Celery documentation.

Crazy way: Build your own decorator

As @MrLeeh pointed out in a comment, Miguel Grinberg presented a solution in his Pycon 2016 talk by implementing a decorator. I want to emphasize that I have the highest respect for his solution; he called it a "crazy solution" himself. The below code is a minor adaptation of his solution.

Warning!!!

Don't use this in production! The main reason is that this app has a memory leak by using the global tasks dictionary. Even if you fix the memory leak issue, maintaining this sort of code is hard. If you just want to play around or use this in a private project, read on.

Minimal example

Assume you have a long running function call in your /foo endpoint. I mock this with a 10 second sleep timer. If you call the enpoint three times, it will take 30 seconds to finish.

Miguel Grinbergs decorator solution is implemented in flask_async. It runs a new thread in a Flask context which is identical to the current Flask context. Each thread is issued a new task_id. The result is saved in a global dictionary tasks[task_id]['result'].

With the decorator in place you only need to decorate the endpoint with @flask_async and the endpoint is asynchronous - just like that!

import threading
import time
import uuid
from functools import wraps

from flask import Flask, current_app, request, abort
from werkzeug.exceptions import HTTPException, InternalServerError

app = Flask(__name__)
tasks = {}


def flask_async(f):
    """
    This decorator transforms a sync route to asynchronous by running it in a background thread.
    """
    @wraps(f)
    def wrapped(*args, **kwargs):
        def task(app, environ):
            # Create a request context similar to that of the original request
            with app.request_context(environ):
                try:
                    # Run the route function and record the response
                    tasks[task_id]['result'] = f(*args, **kwargs)
                except HTTPException as e:
                    tasks[task_id]['result'] = current_app.handle_http_exception(e)
                except Exception as e:
                    # The function raised an exception, so we set a 500 error
                    tasks[task_id]['result'] = InternalServerError()
                    if current_app.debug:
                        # We want to find out if something happened so reraise
                        raise

        # Assign an id to the asynchronous task
        task_id = uuid.uuid4().hex

        # Record the task, and then launch it
        tasks[task_id] = {'task': threading.Thread(
            target=task, args=(current_app._get_current_object(), request.environ))}
        tasks[task_id]['task'].start()

        # Return a 202 response, with an id that the client can use to obtain task status
        return {'TaskId': task_id}, 202

    return wrapped


@app.route('/foo')
@flask_async
def foo():
    time.sleep(10)
    return {'Result': True}


@app.route('/foo/<task_id>', methods=['GET'])
def foo_results(task_id):
    """
        Return results of asynchronous task.
        If this request returns a 202 status code, it means that task hasn't finished yet.
        """
    task = tasks.get(task_id)
    if task is None:
        abort(404)
    if 'result' not in task:
        return {'TaskID': task_id}, 202
    return task['result']


if __name__ == '__main__':
    app.run(debug=True)

However, you need a little trick to get your results. The endpoint /foo will only return the HTTP code 202 and the task id, but not the result. You need another endpoint /foo/<task_id> to get the result. Here is an example for localhost:

import time
import requests

task_ids = [requests.get('http://127.0.0.1:5000/foo').json().get('TaskId')
            for _ in range(2)]
time.sleep(11)
results = [requests.get(f'http://127.0.0.1:5000/foo/{task_id}').json()
           for task_id in task_ids]
# [{'Result': True}, {'Result': True}]
0

I know this question is a bit old, but it still has some relevance nowadays, I developed a simple solution to achieve this, feel free to check it:

https://tiagohorta1995.medium.com/python-flask-api-background-task-96bf1120a855

2
  • Your answer could be improved with additional supporting information. Please edit to add further details, such as citations or documentation, so that others can confirm that your answer is correct. You can find more information on how to write good answers in the help center.
    – Community Bot
    Commented Aug 1, 2023 at 5:29
  • While this link may answer the question, it is better to include the essential parts of the answer here and provide the link for reference. Link-only answers can become invalid if the linked page changes. - From Review Commented Aug 1, 2023 at 6:33

Not the answer you're looking for? Browse other questions tagged or ask your own question.