SLQAlchemy dynamische query building en filtering, waaronder soft deletes

21 juni 2019 door Peter

Dit bericht laat zien hoe je een query builder kunt bouwen voor al je geselecteerde queries.

post main image
Original photo

Voortbouwend op de vorige post 'Flask, Jinja2 en SLQAlchemy many-to-many relatie met voorwaarden ', zocht ik een manier om dynamisch filtervoorwaarden toe te voegen en indien mogelijk ook een oplossing voor het soft delete patroon te vinden.

Soft delete is niet het verwijderen van records uit een tabel, maar het markeren van records als verwijderd. Dit betekent dat elke tabel een verwijderde vlag moet hebben en dat alle query's records die als verwijderd zijn gemarkeerd, moeten worden uitgesloten. Want zo ORM 'n situatie is nog complexer, want het gaat hier niet om platen, maar om objecten. De uitvoering soft delete is moeilijk, maar beperkt zich niet alleen tot soft deletes. Elke klas in mijn model heeft ook een statusveld. Dit kan gebruikt worden om het tonen van dit object aan niet-admin bezoekers tijdelijk uit te schakelen.

Ik denk dat de beste manier om dit te implementeren soft delete en/of status te verkrijgen is door dit in SQLAlchemy zichzelf te integreren en beschikbaar te stellen als een nutsfunctie. Dit is echter niet het geval, maar er zijn wel enkele recepten beschikbaar (met de optie before_compile).

Op zoek naar selectieve queries besloot ik mijn eigen query builder te bouwen voor selecte statements. De vereisten waren dat het mogelijk moet zijn om meer dan één klasse, bijvoorbeeld [Ouder, Kind], en/of kolommen, bijvoorbeeld [, Kind], en/of kolommen, bijvoorbeeld [, Kind], toe te voegen en ook dat het mogelijk moet zijn om dynamisch filtercondities toe te voegen, inclusief automatische toevoeging van de verwijderde kolom en statuskolom. Hieronder vindt u enkele referenties naar dynamische query building.

Natuurlijk raakte ik nog andere problemen zoals: AttributeError: 'scoped_session' object heeft geen attribuut '_autoflush'. Gelukkig heeft iemand hier een oplossing voor gevonden, zie referenties.

Vervolgens heb ik een back-up gemaakt en daarna ben ik deze gaan gebruiken. In het geval dat ik problemen ondervind kan ik altijd het FilteredQuery recept toevoegen.

Natuurlijk neemt dit een deel van het 'plezier' van het schrijven SQLAlchemy van vragen weg, maar laten we werkende apps bouwen!

Voor het geval je dit wilt proberen:

from sqlalchemy import Table, Column, Integer, String, Boolean, BigInteger, DateTime, ForeignKey, func, and_, or_, desc, asc, create_engine, inspect, sql
from sqlalchemy.orm import relationship, Session, with_polymorphic, backref, contains_eager, Query
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func, label
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from sqlalchemy import inspect

import os
import sys

Base = declarative_base()

# many-to-many link table: parent - child
parent_mtm_child_table = Table('parent_mtm_child', Base.metadata,
    Column('parent_id', Integer, ForeignKey('')),
    Column('child_id', Integer, ForeignKey(''))

class Parent(Base):
    __tablename__ = 'parent'

    id = Column(Integer, primary_key=True)
    deleted = Column(Boolean, default=False)
    status = Column(Integer, server_default='0', index=True)
    name = Column(String)
    # many-to-many relationship with child
    children = relationship(

    def __repr__(self):
        return "%s(name=%r)" % \

class Child(Base):
    __tablename__ = 'child'

    id = Column(Integer, primary_key=True)
    deleted = Column(Boolean, default=False)
    status = Column(Integer, server_default='0', index=True)
    name = Column(String)
    age = Column(Integer)
    hair_color = Column(String)
    # many-to-many relationship with parent
    parents = relationship(

    def __repr__(self):
        return "%s(name=%r, age=%r, hair_color=%r)" % \
            self.age, self.hair_color) 

# show/hide sql
#engine = create_engine('sqlite://', echo=True)
engine = create_engine('sqlite://')

#session = sessionmaker()
#db = session()
# use scoped_session
db = scoped_session(sessionmaker(bind=engine))

# Attaching a pre-built query to a scoped_session in SQLAlchemy
db_local = db()


# parents
john = Parent(name='John', status=STATUS_ENABLED)
mary = Parent(name='Mary', status=STATUS_ENABLED)
gina = Parent(name='Gina', status=STATUS_ENABLED)
ryan = Parent(name='Ryan', status=STATUS_ENABLED)
eric = Parent(name='Eric', status=STATUS_ENABLED)
# children
liam = Child(name='Liam', age=6, hair_color='brown', status=STATUS_ENABLED)
emma = Child(name='Emma', age=8, hair_color='blond', status=STATUS_ENABLED)
alex = Child(name='Alex', age=10, hair_color='blond', status=STATUS_ENABLED)
sara = Child(name='Sara', age=9, hair_color='blond', status=STATUS_ENABLED)
rose = Child(name='Rose', age=9, hair_color='blond', status=STATUS_ENABLED)
# assign children to parents
db.add_all([john, mary, gina, ryan, eric, liam, emma, alex, sara, rose])

# delete some
sara.deleted = True


description: build a select query based on input,
also filter on deleted and status attributes

example 1: single table/object select

qry = db_select(
    order_by_list=[(parent, 'name', 'asc')]

example 2: two table with many-to-many select

qry = db_select(
   model_class_list=[Parent, Child], 
       (Parent, 'id', 'eq', parent_mtm_child_table.c.parent_id),
       (Child, 'id', 'eq', parent_mtm_child_table.c.child_id), 
       (Child, 'age', 'ge', 6),
       (Parent, 'id', 'in', [3, 4, 5]),
       (Child, 'name', 'asc'), 
   limit 10, offset 4, 

example 3: select column attribute instead of object

qry = db_select(
   model_class=[(Parent, 'id'), Child], 
       (Parent, 'id', 'eq', parent_mtm_child_table.c.parent_id),
       (Child, 'id', 'eq', parent_mtm_child_table.c.child_id), 
       (Child, 'age', 'ge', 6),
       (Parent, 'id', 'in', [3, 4, 5]),
       (Child, 'name', 'asc'), 
   limit 10, offset 4, 


def db_select(model_class_list=None, filter_by_list=None, order_by_list=None, limit=None, offset=None, filter_deleted=False, filter_status=STATUS_ENABLED):
    fname = 'db_select'
    dbg_print = False

    if dbg_print:
        print(fname + ": len(model_class_list) = {}".format(len(model_class_list)))

    if filter_by_list == None:
        filter_by_list = []
    if order_by_list == None:
        order_by_list = []

    if not isinstance(model_class_list, list):
        raise Exception('model_class_list not list')
    if not isinstance(filter_by_list, list):
        raise Exception('filter_by_list not list')
    if not isinstance(order_by_list, list):
        raise Exception('order_by_list not list')

    # collector for model_classes
    mcs = []
    # collector for columns
    columns = []
    for model_class_item in model_class_list:
        if isinstance(model_class_item, tuple):
            m, key = model_class_item
            column = getattr(m, key, None)
    query = Query(columns)

    if dbg_print:
        print(fname + ": after creating query, query = {}".format(query))

    # add deleted filter if column deleted exists
    if not filter_deleted is None:
        for model_class in mcs:
            if 'deleted' in inspect(model_class).columns.keys():
                filter_by_list.append( (model_class, 'deleted', 'eq', filter_deleted) )

    # add status filter if column status exists
    if not filter_status is None:
        for model_class in mcs:
            if 'status' in inspect(model_class).columns.keys():
                filter_by_list.append( (model_class, 'status', 'eq', filter_status) )

    if dbg_print:
        # filter_by_items
        for filter_by_item in filter_by_list:
            print(fname + ": filter_by_item = {}".format(filter_by_item))
        # order_by_items
        for order_by_item in order_by_list:
            print(fname + ": order_by_item = {}".format(order_by_item))

    for filter_by_item in filter_by_list:
        if dbg_print:
            print(fname + ": processing filter_by_item = {}".format(filter_by_item))
            model_class, key, op, value = filter_by_item
        except ValueError:
            raise Exception('Invalid filter_by_item: %s' % filter_by_item)

        if dbg_print:
           print(fname + ": processing key, op, value = {}, {}, {}".format(key, op, value))

        column = getattr(model_class, key, None)
        if not column:
            raise Exception('Invalid filter column: %s' % key)

        if op == 'in':
            if isinstance(value, list):
                filt = column.in_(value)
                filt = column.in_(value.split(','))

            if dbg_print:
                print(fname + ": if, filt = {}".format(filt))
                attr = list(filter(
                    lambda e: hasattr(column, e % op),
                    ['%s', '%s_', '__%s__']
                ))[0] % op
            except IndexError:
                raise Exception('Invalid filter operator: %s' % op)

            if dbg_print:
                print(fname + ": processing filter_cond, attr = {}".format(attr))

            if value == 'null':
                value = None
            filt = getattr(column, attr)(value)

            if dbg_print:
                print(fname + ": else, filt = {}".format(filt))

        if dbg_print:
            print(fname + ": adding filt")
        query = query.filter(filt)

    for order_by_item in order_by_list:
        if dbg_print:
            print(fname + ": processing order_by_item = {}".format(order_by_item))

            model_class, key, op = order_by_item
        except ValueError:
            raise Exception('Invalid order_by_item: %s' % order_by_item)

        if dbg_print:
            print(fname + ": processing model_class = {}, key = {}, op = {}".format(model_class, key, op))

        column = getattr(model_class, key, None)
        column_sorted = getattr(column, op)()
        query = query.order_by(column_sorted)        

    if limit:
        if dbg_print:
            print(fname + ": processing limit = {}".format(limit))
        query = query.limit(limit)

    if offset:
        if dbg_print:
            print(fname + ": processing offset = {}".format(offset))
        query = query.offset(offset)

    if dbg_print:
        print(fname + ": after building query, query = {}".format(query))
    return query.with_session(db_local)


parents = db_select(
        (Parent, 'name', 'asc'), 

for parent in parents:
    print(" = {}".format(	

# get parent_ids for next query
parent_ids = [ for parent in parents]
print("parent_ids = {}".format(parent_ids))


parent_child_tuples = db_select(
    model_class_list=[Parent, Child], 
        (Parent, 'id', 'eq', parent_mtm_child_table.c.parent_id),
        (Child, 'id', 'eq', parent_mtm_child_table.c.child_id), 
        (Child, 'age', 'ge', 8),
        (Parent, 'id', 'in', parent_ids),
        (Parent, 'name', 'asc'), 
        (Child, 'name', 'asc'), 
    limit=10, offset=0,

# show tuples
print("parent_child_tuples: {}".format(parent_child_tuples))	

# build list parent_id-children
from collections import defaultdict

parent_id2children = defaultdict(list)
for parent, child in parent_child_tuples:

# show parent_id2children
for parent_id in parent_id2children:
    print("parent: {}, children: {}".format(parent_id, parent_id2children[parent_id]))	


parent_id_child_tuples = db_select(
    model_class_list=[(Parent, 'id'), Child], 
        (Parent, 'id', 'eq', parent_mtm_child_table.c.parent_id),
        (Child, 'id', 'eq', parent_mtm_child_table.c.child_id), 
        (Child, 'age', 'ge', 8),
        (Parent, 'id', 'in', parent_ids),
        (Parent, 'name', 'asc'), 
        (Child, 'name', 'asc'), 
    limit=10, offset=0,

# show tuples
print("parent_id_child_tuples: {}".format(parent_id_child_tuples))	

# build list parent_id-children
from collections import defaultdict

parent_id2children = defaultdict(list)
for parent_id, child in parent_id_child_tuples:

# show parent_id2children
for parent_id in parent_id2children:
    print("parent: {}, children: {}".format(parent_id, parent_id2children[parent_id]))	

print("\nshow columns:")

# debug: show columns in parent 
for c in Parent.__table__.columns:
    print("parent table column c = {}".format(c))

# debug: show columns in parent using inspect
from sqlalchemy import inspect
mapper = inspect(Parent)
for column in mapper.attrs:
    print("column.key = {}".format(column.key))

for key in inspect(Parent).columns.keys():
    print("key = {}".format(key))

if 'deleted' in inspect(Parent).columns.keys():
    print("deleted found")
    print("deleted NOT found")

Links / credits

Attaching a pre-built query to a scoped_session in SQLAlchemy

Dynamically constructing filters based on string input using SQLAlchemy

Dynamically constructing filters in SQLAlchemy


Implementing the "Soft Delete" Pattern with Flask and SQLAlchemy

method of iterating over sqlalchemy model's defined columns?

Python - SqlAlchemy: convert lists of tuples to list of atomic values [duplicate]