Asynchronous APIs Using Flask, Celery, and Redis

As our technology progresses, complexity also increases day by day. One such new complexity is managing asynchronous tasks with APIs. Suppose your application requires a lot of background calculations. Instead of making a user wait in front of an empty UI, asynchronous API endpoints can perform background jobs and inform the user when the task is complete.

Asynchronous processes not only improve the user experience, but allow you to manage a server load quite well. Imagine a different scenario involving a giant web app built on a standard REST API without multi-threading, without async, and without task queues. Now, what happens when the application suddenly has 50,000 users, all wanting the system to perform complex, lengthy processes? Without the right plumbing, the application could easily experience downtime.

By managing communication asynchronously, you can improve user experience, schedule jobs, and handle a large number of concurrent requests. Of course, asynchronous APIs aren’t always suitable for real-time situations or when tasks need to be executed sequentially.

It can also be challenging to develop an asynchronous API. So, what do we need to create one? It will require something that can perform multi-threading, queue tasks, and do some other functionality. This tutorial demonstrates how to build an asynchronous API with Flask and some additional technologies, like Celery, Redis, RabbitMQ, and Python.

  • Celery: Celery is an asynchronous task manager that lets you run and manage jobs in a queue. It is mostly used for real-time jobs but also lets you schedule jobs. There are three main components in Celery: worker, broker, and task queue.
  • RabbitMQ: RabbitMQ is a message broker that is used to communicate between the task workers and Celery. This can be independently used for chatbots as well.

Now let’s set up our asynchronous API using Flask:

Step –1: Prerequisites:

  • Knowledge of Python
  • RabbitMQ
  • Redis Server

Step –2: Environment Setup and Software Installation

Now open the terminal, let’s create a new folder, flaskelry, and open it:

mkdir flaskelry
cd flaskelry

Once you’re in the folder, initiate a virtual environment:

virtualenv .

Once that is done, duplicate the terminal tabs twice. You’ll need to open three separate tabs because we’ll have to run three different servers.

Now let’s install our first main player, RabbitMQ:

brew update
brew install rabbitmq

Note: The above command will only work on Mac. If you have another OS, you can get RabbitMQ from here.

Once that is done, let’s install Flask and Celery also:

python3 -m pip install celery
python3 -m pip install flask

We’re done with the setup, so let’s start coding now:

Step –2: Setting up the API

Now create a file api.py and paste the below code in the file:

from flask import Flask, request, jsonify
import json
import os
import sys
from celery import Celery, Task, current_app
import fns as cad

api = Flask(__name__)
celerymq = Celery('tasks', backend= 'rpc://', broker='pyamqp://guest@localhost//')
class CeleryApiTask(Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print('{0!r} failed: {1!r}'.format(task_id, exc))
    def on_success(self, retval, task_id, args, kwargs):
        print('{0!r} success!: {1!r}'.format(task_id, retval))

@celerymq.task(base=CeleryApiTask)
def async_cron_alert():
    output = cad.alert_daily()
    return output

@api.before_request
def before_request():
    if request.method=='OPTIONS':
        return jsonify({}), 200
  
@api.errorhandler(500)
def internal_error(error):
    return jsonify({"msg": "Internal Server Error", "status": 500}), 500

@api.errorhandler(400)
def bad_request(error):
    return jsonify({"msg": "Bad Request", "status": 400}), 400

@api.errorhandler(404)
def not_found(error):
    return jsonify({"msg": "Not Found", "status": 404}), 404
    
@api.route("/api/cron_alert_daily/", methods=['GET'])
def route_alert_daily():
    task = async_cron_alert.delay()
    return jsonify({'msg': 'process started', 'task_id': task.id, 'status': 202}), 202
    
if __name__ == "__main__":
    api.run(host='0.0.0.0',port=8124)

Code Explanation:

We have imported all the required packages like Flask, Celery, Task, os, json, sys in the first five lines. After that, we have created a Flask app and initiated the RabbitMQ connection with the Celery.

Then, we have created a class that is used to create jobs. After that, we have defined an async function async_cron_alert(), which is asynchronous.

To handle the server errors, we have also defined some error handler() functions.

Finally, we have an API endpoint, /api/cron_alert_daily/, which calls the async function we created above and responds with 202.

Now create one more file, fns.py, and paste the below code:

import time

def async_daily():
    time.sleep(50)
    return "New Job"

Code Explanation:

This is a very simple module that sleeps for 50 seconds.

Step –3: Testing the API

Remember we told you to duplicate the tabs of your terminal? We’ll use them here. Now go to the other two tabs and run the below commands in each terminal respectively:

On one tab, run this command:

/usr/local/sbin/rabbitmq-server

On another tab, run this command:

source bin/activate
celery -A api worker --loglevel=INFO

Now go to the first terminal tab and use the below command to run the Flask server:

export FLASK_APP=api.py
flask run

Now once everything is running fine, let’s open our favorite API testing tool and hit this URL:

https://127.0.0.1:5000/api/async_newfunction

If everything is fine, you should get a response like this:

{
    "msg": "a new process started",
    "status": 202,
    "task_id": "4845e5f9-7521-43f0-b457-b8b90c73827f"
}

TADA!!!! You’re API is ready now.

Final Words:

You can use this API to create asynchronous API endpoints. I would not suggest creating async endpoints to use everywhere. I mean, there are situations when you need an instant response from the API. For example, you don’t want to add a delay in a login form, right?

Using Celery can also help you to schedule jobs that you can use in situations like sending bulk emails. You can create a task queue of emails and send the email one by one. There’s a lot to cover in asynchronous APIs, and this is just a start point. You can very easily build complex applications using this API once you have understood how the API works.