0

I'm encountering issues in streaming response in flask-smorest. I'm following the guidance here - https://flask.palletsprojects.com/en/2.3.x/patterns/streaming/ for streaming responses from my flask-smorest application. Below is the MRE version of my code. Say my application is fetching foreign exchange rates for the past 1000 days for any currency requested by the end user.

This is the version without using streaming. It works perfectly and returns a list of json responses:

from flask import request, Response
from flask.views import MethodView
from flask_smorest import Blueprint, abort
from marshmallow import Schema, fields
import asyncio

class CurrencySchema(Schema):
    name = fields.Str()
    rate = fields.Str()
    date = fields.Str()
    source = fields.Str()

blp = Blueprint("test",__name__, description="test")

@blp.route("/test")
class Test(MethodView):
    @blp.response(200, CurrencySchema(many=True))
    def get(self):
        currency = request.args.get('currency')
        results = asyncio.run(func_that_fetches_currency_rates_from_three_APIs(
            currency))  # returns a list of dictionaries
        return results

When I run this, it successfully runs and returns a list of json responses on my browser, like:

[{'name': 'USD', 'rate': '1.2333', 'date': 'Mar 21, 2024', 'source': 'currency.com'}, 
 {'name': 'USD', 'rate': '1.2121', 'date': 'Mar 22, 2024', 'source': 'currency.com'}, 
 .................so on and so forth up to 1000 jsons]

Now, comes the part when I try streaming the responses. I make the below changes to my code:

@blp.route("/test")
class Test(MethodView):
    @blp.response(200, CurrencySchema(many=True))
    def get(self):
        currency = request.args.get('currency')        
        results = asyncio.run(func_that_fetches_currency_rates_from_three_APIs(
            currency))  # returns a list of dictionaries
        def generate_rates():
             batch_size = 100
             for i in range(0, len(results), batch_size):
                  yield results[i:i+batch_size]
        return generate_rates()

This strangely returns a list of 50 empty json responses:

[{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {},
 {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {},
 {}, {}, {}, {}, {}, {}, {}, {}]

I also tried this, but with the same result i.e. list of empty json responses, but additionally flask-smorest gave me:

AssertionError: applications must write bytes

Seems like the werkzeug serving.py file was throwing issues.

@blp.route("/test")
class Test(MethodView):
    @blp.response(200, CurrencySchema(many=True))
    def get(self):
        currency = request.args.get('currency')        
        results = asyncio.run(func_that_fetches_currency_rates_from_three_APIs(
            currency))  # returns a list of dictionaries
        def generate_rates():
             batch_size = 100
             for i in range(0, len(results), batch_size):
                  yield results[i:i+batch_size]
        return Response(generate_rates(), mimetype = 'application/json')

My entire application is ready and this is the last bit that is giving issues. I want to stream the responses, and there is something in flask-smorest that is causing the issue. Would really appreciate your support. Thanks!

3 Answers 3

2

So I was finally able to resolve this issue. The root cause of the issue was the @blp.response decorator that was not accepting generator responses. Finally ended up removing the decorator from my code and manually serializing the results. Here's the final code-

@blp.route("/test")
class Test(MethodView):
    #@blp.response(200, CurrencySchema(many=True))           #removing the decorator
    def get(self):
        currency = request.args.get('currency')
        schema = CurrencySchema()

        results = asyncio.run(func_that_fetches_currency_rates_from_three_APIs(
            currency))  # returns a list of dictionaries 

        @stream_with_context
        def generate_results():
            yield '['
            for result in results[:-1]:
                yield schema.dumps(result)
                yield ', '
            yield schema.dumps(results[-1])
            yield ']'

        return Response(generate_results(), mimetype='application/json') 
0
+50

You need to generate a response as a string and stream it to a client. You can do it by modifying the API:

@blp.route("/test")
class Test(MethodView):
    @blp.response(200, CurrencySchema(many=True))
    def get(self):
        currency = request.args.get('currency')        
        results = asyncio.run(func_that_fetches_currency_rates_from_three_APIs(
            currency))  # returns a list of dictionaries
        def generate_rates():
             batch_size = 100
             yield '['

             previous_val = CurrencySchema(many=True).dumps(results[0:batch_size])[1:-1]

             for i in range(batch_size, len(results), batch_size):
                  next_val = CurrencySchema(many=True).dumps(results[i:i+batch_size])[1:-1]
                  yield previous_val + ', '
                  previous_val = next_val
             yield previous_val
             yield ']'
        return Response(generate_rates(), status=200, content_type='application/json')
0

Solution: Change you last line to this.

return Response(json.dumps(next(generate_rates())),mimetype='application/json')

Explanation:

  • Response takes Iterable[bytes], bytes, Iterable[str], str and we were passing generator to it which will not work.
  • Another thing is to access generator values you need to use next() which returns you next item from iterator hence accessing each value.
  • Then you use json.dumps to convert it to json formatted string. Which eventually you return to the browser as json using mimetype.

Not the answer you're looking for? Browse other questions tagged or ask your own question.