FastAPI et SQLAlchemy : un duo puissant mais attention aux transactions !
R&D FastAPI Python SQLAlchemy AsyncIO
rédigé par Sebastien GRIGNARD le 26/11/2021

FastAPI et SQLAlchemy mixins

Parfois, faire cohabiter des modules conçus avec des approches différentes peut causer des problèmes… intéressants. Voici une petite histoire détaillant une séance de détective à la recherche d'un bug assez coriace.

Le contexte : un client nous a demandé de l'aide pour corriger un bug dans son projet. Certaines écritures dans la base de données n'étaient pas systématiquement visibles avec certaines requêtes SQL. Il y avait également des blocages avec SQLite utilisé pour les tests automatiques (message « Database is locked »).

Dans ce projet, la pile technique est basée sur FastAPI pour l'application web et SQLAlchemy pour l'interaction avec la base de données. SQLAlchemy est la solution décrite dans le chapitre base de données SQL de FastAPI, c'est donc une combinaison classique. Un autre module, sqlalchemy_mixins, a été ajouté à ce duo pour simplifier les requêtes faites avec SQLAlchemy.

FastAPI propose un modèle basé sur asyncio, un module Python permettant d'écrire du code asynchrone, et ajoute par-dessus une gestion du code synchrone via un pool de thread. Tout ceci est assez transparent pour le développeur : il lui suffit de déclarer une route ou une de ses dépendances avec un def classique pour que cette fonction soit exécutée dans un thread, ce qui permet d'éviter de bloquer les autres requêtes pendant son exécution.

Le message « Database is locked » de SQLite est très souvent la manifestation d'une connexion à la base restant ouverte dans un état où une transaction est en cours. L'autre bug d'écritures en base parfois invisibles fait également penser à une transaction pas encore validée. Vu ces symptômes, nous avons d'abord exploré la piste d'une transaction pas toujours correctement terminée. Un peu de lecture du code nous amène à une implémentation via try+except dans un middleware (une fonction de traitement de requête) :

async def db_session_middleware(request: Request, call_next):
    response = Response("Internal server error", status_code=500)
    try:
        request.state.db = ScopedSession
        response = await call_next(request)
        request.state.db.commit()
    except Exception:
        logging.exception("Unexpected exception during request")
        request.state.db.rollback()
    return response

Ça parait conforme à la documentation FastAPI… avec un petit écart : l'utilisation de scoped_session et donc la création d'une session par thread, ce qui n'est pas la méthode documentée qui utilise une session classique. C'est le module sqlalchemy_mixins qui a fait utiliser scoped_session à notre client. Ce module permet de passer la session SQLAlchemy directement par les classes de modèles :

bob = User.create(name='Bobby', age=1)

L'équivalent en SQLAlchemy :

bob = User(name='Bobby', age=1)
session.add(bob)
session.flush()

Cacher la session se fait en la configurant par une méthode de classe avant de faire des requêtes SQL :

BaseModel.set_session(session)

OK, ça parait correct à première vue : une requête API correspond à un thread et donc la session passée à sqlalchemy_mixins est unique par requête vu que scoped_session est utilisée. Ce n'est qu'après avoir trouvé le détail de l'implémentation du pool de threads FastAPI (dans un ticket github) qu'est apparu l'erreur : au contraire de frameworks plus anciens, FastAPI n'utilise pas un modèle « une requête HTTP correspond à un thread » mais un modèle « une requête HTTP correspond à une ou plusieurs dépendances », ces dépendances étant des fonctions exécutées par le pool de thread quand elles sont bloquantes. Et il n'y a pas de garantie que toutes les dépendances d'une requête soient traitées par un seul thread. Il peut donc parfaitement (et aléatoirement) arriver que des requêtes SQL de DML (insert/update/delete) soient exécutées par un thread avec une session A, et le COMMIT correspondant par un autre thread et donc une session B. Ce qui fait que les requêtes DML de la session A ne sont pas visibles à la fin de la requête HTTP, et que la session A reste dans un état où une transaction est en cours jusqu'à ce qu'une autre requête HTTP utilise le thread de cette session.

Maintenant que la source du problème est identifiée, comment s'y prendre pour corriger ça ? Pas mal d'options s'offraient à nous :

  • ne pas utiliser sqlalchemy_mixins : plus idiomatique car sans variable statique pour la session, mais la refactorisation (passer la session partout) était conséquente…
  • un autre module sqlalchemy-mixins-for-starlette : prévu pour Starlette (et FastAPI), mais il demande au final la même refactorisation que la solution précédente.
  • arriver à garder une même session tout au long d'une requête HTTP : ça veut donc dire que le set_session() de sqlalchemy_mixins doit être appelé avec une session spécifique à chaque requête. C'est là qu'un module Python récent est venu à notre rescousse : contextvars.

Ce module a pour but de généraliser le thread local storage (utilisé par ScopedSession) aux tâches asyncio. Et FastAPI utilise asyncio… Il reste à vérifier que l'implémentation du pool de thread de FastAPI passe bien le contexte de contextvars à ses threads. Un peu de lecture de code plus tard (merci le logiciel libre !) nous a permis de constater que c'est bien le cas. Pour les curieux, voici la partie du code transmettant le contexte aux threads :

  async def run_in_threadpool(
func: typing.Callable[..., T], *args: typing.Any, **kwargs: typing.Any

) -> T: loop = asyncio.geteventloop() if contextvars is not None: # pragma: no cover # Ensure we run in the same context child = functools.partial(func, *args, **kwargs) context = contextvars.copycontext() func = context.run args = (child,) elif kwargs: # pragma: no cover # loop.runinexecutor doesn't accept 'kwargs', so bind them in here func = functools.partial(func, **kwargs) return await loop.runin_executor(None, func, *args)

L'implémentation du correctif ressemblant finalement à ça :

import contextvars

 _session = contextvars.ContextVar("_session", default=None)


class BaseModel(Base, AllFeaturesMixin):
    __abstract__ = True

    @classmethod
    def set_session(cls, session):
        """
        This class method allow to set a thread-local session
        which is also associated to current asyncio task
        :type session: Session
        """
        _threadLocal.session = session
        _session.set(session)

    @classproperty
    def session(cls):
        """
        :rtype: Session
        """
        try:
            if _session.get() is None:
                raise AttributeError
            return _session.get()
        except (AttributeError, LookupError):
            raise Exception('Cant get session.'
                            'Please, call BaseModel.set_session()')


@contextlib.contextmanager
def get_session():
    """Return a context manager in charge of session start and close."""
    try:
        session = DbSession()
        BaseModel.set_session(session)
        yield session
        session.commit()
        logging.info("Session commited: {}".format(id(session)))
    except Exception:
        session.rollback()
        logging.info("Session rollback-ed: {}".format(id(session)))
        raise
    finally:
        BaseModel.set_session(None)
        logging.info("Session closed: {}".format(id(session)))
        session.close()


async def get_session_as_middleware(request, call_next):

    The FastAPI middleware is in charge to instanciate (and close) the database session which is:

    - thread local
    - associated with the currently running asyncio task

    The mechanism is implemented in get_session context manager and DbSession Class.
    This is the context manager which is in charge of commit / rollabck the current database session.
    """
    with get_session():
        return await call_next(request)

Avec méthode et patience, nous avons pu passer de l'incompréhension totale à une compréhension fine du problème et une correction robuste.

Nous tirons plusieurs enseignements de ce périple. Tout d'abord, la concurrence et l'asynchrone causent rapidement des problèmes subtils et difficiles à comprendre. Ensuite, il est difficile de repérer des problèmes de compatibilité insoupçonnés entre différents modules conçus différemment ou avec des hypothèses de travail différentes. Une connaissance pointue du fonctionnement des différentes technologies est nécessaire pour déboguer ce genre de situation. L'accès au code des briques logicielles libres est essentiel et il permet de mener à bien les investigations sur ce genre de problèmes. C'est une des nombreuses raisons de pourquoi il est si important de s'appuyer sur des technologies ouvertes et libres.

Retrouvez nous sur Github : (https://github.com/algoo/) et (https://github.com/tracim/)