Framework Integration¶
For web applications, it is common to open a connection when a request is received, and to close the connection when the response is delivered. This document describes how to add hooks to your web app to ensure the database connection is handled properly.
These steps will ensure that regardless of whether you’re using a simple
SqliteDatabase or a PooledPostgresqlDatabase, peewee will
handle the connections correctly.
The pattern is always the same:
# On request start:
db.connect()
# On request end (success or error):
if not db.is_closed():
db.close()
Every framework exposes hooks for this. The sections below show the idiomatic approach for each.
Note
Applications that handle significant traffic should use a connection pool to avoid the overhead of establishing a new connection per request. Pooled databases can be used as drop-in replacements for their non-pooled counterparts.
Flask¶
For a complete Flask + Peewee application example, including authentication
and other common webapp functionality, see Example app. There is also a
full blog app
and an analytics app
in the project examples/ directory.
The minimal Flask integration ensures that database connection lifecycles
are tied to the request/response cycle via before_request and teardown_request
hooks:
from flask import Flask
from peewee import *
db = SqliteDatabase('my_app.db')
app = Flask(__name__)
@app.before_request
def _db_connect():
db.connect()
@app.teardown_request
def _db_close(exc):
if not db.is_closed():
db.close()
teardown_request is called regardless of whether the request succeeded or
raised an exception, making it the correct hook for cleanup.
For applications that receive a large number of requests, a connection pool is recommended:
from flask import Flask
from playhouse.pool import PooledPostgresqlDatabase
db = PooledPostgresqlDatabase('app', host='10.8.0.1', user='postgres')
app = Flask(__name__)
# Note that when using the pooled implementation the hooks are the exact
# same. Opening and closing the connection simply acquires and releases it
# from the pool for the lifetime of the request.
@app.before_request
def _db_connect():
db.connect()
@app.teardown_request
def _db_close(exc):
if not db.is_closed():
db.close()
See also
The Flask Utilities extension provides helpers for common tasks like declarative database configuration, object retrieval and pagination.
FastAPI¶
FastAPI is an async framework and can be used with Peewee’s Async Support integration or synchronously. Peewee also provides Pydantic Integration support, which works well with FastAPI.
Quick note on SQLModel¶
FastAPI advocates using SQLModel for database access. SQLModel combines SQLAlchemy and Pydantic into a single class, which may work well for simple examples. There are a few things to watch out for, though:
SQLModel’s official tutorial uses synchronous endpoints exclusively, which FastAPI runs on a threadpool. Async usage is listed as an “advanced” topic and is undocumented currently.
Lazy-loading often breaks in async contexts. SQLAlchemy’s implicit lazy-loading of relationships can trigger
MissingGreenleterrors when used with async sessions. This can also occur with Peewee, but it’s straightforward to avoid by selecting joined relations.Because SQLModel uses synchronous drivers for DDL and certain operations, you typically need both a sync AND async driver installed, along with separate engine configurations.
SQLModel uses inheritance to manage input, output and table schemas. In practice a single database table often requires three or four model classes, e.g.
UserBase,User,UserCreateandUserRead- all of this is managed through inheritance.
Peewee may provide a simpler experience - there is a single database to manage
with built-in pooling, fewer implicit lazy-load gotcha’s, and the Pydantic
schemas generated with to_pydantic() can be
configured to include/exclude fields without inheritance. Field metadata is
captured automatically: choice enums, default values, descriptions, titles and
type information are captured in the JSON schema and OpenAPI docs.
Peewee requires far less machinery to provide real asyncio database access, and of course works equally well for synchronous FastAPI endpoints.
Async Example using Pydantic¶
Below is a full example FastAPI application demonstrating dependency-injection style hooks, fully async query execution, and pydantic integration:
# example.py
from fastapi import Depends, FastAPI, HTTPException
from contextlib import asynccontextmanager
from peewee import *
from playhouse.pwasyncio import AsyncPostgresqlDatabase
from playhouse.pydantic_utils import to_pydantic
db = AsyncPostgresqlDatabase('peewee_test')
class User(Model):
name = CharField(verbose_name='Full Name', help_text='Display name')
email = CharField(unique=True)
status = IntegerField(default=1, choices=(
(1, 'Active'),
(2, 'Inactive'),
(3, 'Deleted')))
class Meta:
database = db
# Generate pydantic schemas suitable for create and responses.
# Schemas will include metadata from verbose_name, help_text, choices, and
# default settings.
UserCreate = to_pydantic(User, model_name='UserCreate')
UserResponse = to_pydantic(User, exclude_autofield=False, model_name='UserResponse')
async def get_db():
async with db:
yield db
@asynccontextmanager
async def lifespan(app):
# Create tables (if they don't exist) at application startup.
async with db:
await db.acreate_tables([User])
yield
await db.close_pool() # Shut-down pool and exit.
app = FastAPI(lifespan=lifespan)
@app.get('/users', response_model=list[UserResponse])
async def list_users(db=Depends(get_db)):
rows = await db.list(User.select().dicts())
return [UserResponse(**row) for row in rows]
@app.post('/users', response_model=UserResponse)
async def create_user(data: UserCreate, db=Depends(get_db)):
user = await db.run(User.create, **data.model_dump())
return UserResponse.model_validate(user)
@app.get('/users/{user_id}', response_model=UserResponse)
async def get_user(user_id: int, db=Depends(get_db)):
try:
user = await db.get(User.select().where(User.id == user_id))
except User.DoesNotExist:
raise HTTPException(status_code=404, detail='User not found')
return UserResponse.model_validate(user)
Run the example:
$ fastapi dev example.py
Populate and query data:
$ curl -X POST http://localhost:8000/users \
-H "Content-Type: application/json" \
-d '{"name": "Alice", "email": "alice@example.com"}'
{"id":1,"name":"Alice","email":"alice@example.com","status":1}
$ curl -X POST http://localhost:8000/users \
-H "Content-Type: application/json" \
-d '{"name": "Bob", "email": "bob@example.com", "status": 2}'
{"id":2,"name":"Bob","email":"bob@example.com","status":2}
$ curl http://localhost:8000/users
[{"id":1,"name":"Alice","email":"alice@example.com","status":1},
{"id":2,"name":"Bob","email":"bob@example.com","status":2}]
$ curl http://localhost:8000/users/1
{"id":1,"name":"Alice","email":"alice@example.com","status":1}
We can also verify that the pydantic schemas captured our Peewee model metadata:
>>> UserCreate.model_json_schema()
{'properties': {
'name': {
'description': 'Display name',
'title': 'Full Name',
'type': 'string'},
'email': {
'title': 'Email',
'type': 'string'},
'status': {
'default': 1,
'description': 'Choices: 1 = Active, 2 = Inactive, 3 = Deleted',
'enum': [1, 2, 3],
'title': 'Status',
'type': 'integer'}},
'required': ['name', 'email'],
'title': 'UserCreate',
'type': 'object'}
See also
Dependency injection¶
The following is a minimal example demonstrating:
Ensure connection is opened and closed automatically for endpoints that use the database.
Create tables/resources when app server starts.
Shut-down connection pool when app server exits.
from contextlib import asynccontextmanager
from fastapi import Depends, FastAPI
from peewee import *
from playhouse.pwasyncio import *
app = FastAPI()
db = AsyncPostgresqlDatabase('peewee_test', host='10.8.0.1', user='postgres')
async def get_db():
async with db:
yield db
@asynccontextmanager
async def lifespan(app):
async with db:
await db.acreate_tables([User])
yield
await db.close_pool()
app = FastAPI(lifespan=lifespan)
@app.get('/users')
async def list_users(db=Depends(get_db)):
return await db.list(User.select().dicts())
Middleware and startup hooks¶
The following example demonstrates how to use middleware and startup hooks instead of dependency injection.
Ensure connection is opened and closed for each request.
Create tables/resources when app server starts.
Shut-down connection pool when app server exits.
from fastapi import FastAPI
from peewee import *
from playhouse.pwasyncio import *
app = FastAPI()
db = AsyncPostgresqlDatabase('peewee_test', host='10.8.0.1', user='postgres')
@app.middleware('http')
async def database_connection(request, call_next):
async with db: # Obtain connection from connection pool.
response = await call_next(request)
return response
@app.on_event('startup')
async def on_startup():
async with db:
await db.acreate_tables([Model1, Model2, Model3, ...])
@app.on_event('shutdown')
async def on_shutdown():
await db.close_pool()
# Async queries.
@app.get('/users')
async def list_users():
return await db.list(User.select().dicts())
@app.post('/users')
async def create_user(name: str):
user = await db.run(User.create, name=name)
return {'id': user.id, 'name': user.name}
Synchronous FastAPI¶
If you are using synchronous endpoints with FastAPI, you can use the synchronous Peewee database implementations. Here is the above “Full Example” implemented using sync Peewee:
from fastapi import Depends, FastAPI, HTTPException
from contextlib import asynccontextmanager
from peewee import *
from playhouse.pydantic_utils import to_pydantic
db = PostgresqlDatabase('peewee_test')
class User(Model):
name = CharField(verbose_name='Full Name', help_text='Display name')
email = CharField(unique=True)
status = IntegerField(default=1, choices=(
(1, 'Active'),
(2, 'Inactive'),
(3, 'Deleted')))
class Meta:
database = db
# Generate pydantic schemas suitable for create and responses.
UserCreate = to_pydantic(User, model_name='UserCreate')
UserResponse = to_pydantic(User, exclude_autofield=False, model_name='UserResponse')
def get_db():
with db.connection_context():
yield db
@asynccontextmanager
async def lifespan(app):
with db:
db.create_tables([User])
yield
app = FastAPI(lifespan=lifespan)
@app.get('/users', response_model=list[UserResponse])
def list_users(database=Depends(get_db)):
rows = User.select().dicts()
return [UserResponse(**row) for row in rows]
@app.post('/users', response_model=UserResponse)
def create_user(data: UserCreate, database=Depends(get_db)):
user = User.create(**data.model_dump())
return UserResponse.model_validate(user)
@app.get('/users/{user_id}', response_model=UserResponse)
def get_user(user_id: int, database=Depends(get_db)):
try:
user = User.get(User.id == user_id)
except User.DoesNotExist:
raise HTTPException(status_code=404, detail='User not found')
return UserResponse.model_validate(user)
See also
Django¶
Add a middleware that opens the connection before the view runs and closes it
after the response is prepared. Place it first in MIDDLEWARE so it wraps
all other middleware:
# myproject/middleware.py
from myproject.db import database
def PeeweeConnectionMiddleware(get_response):
def middleware(request):
database.connect()
try:
response = get_response(request)
finally:
if not database.is_closed():
database.close()
return response
return middleware
# settings.py
MIDDLEWARE = [
'myproject.middleware.PeeweeConnectionMiddleware',
# ... rest of middleware ...
]
Bottle¶
Use the before_request and after_request hooks:
from bottle import hook
from peewee import *
db = SqliteDatabase('my_app.db')
@hook('before_request')
def _connect_db():
db.connect()
@hook('after_request')
def _close_db():
if not db.is_closed():
db.close()
Falcon¶
Add a middleware component:
import falcon
from peewee import *
db = SqliteDatabase('my_app.db')
class DatabaseMiddleware:
def process_request(self, req, resp):
db.connect()
def process_response(self, req, resp, resource, req_succeeded):
if not db.is_closed():
db.close()
app = falcon.App(middleware=[DatabaseMiddleware()])
Pyramid¶
Set up a custom Request factory:
from pyramid.request import Request
from peewee import *
db = SqliteDatabase('my_app.db')
class MyRequest(Request):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
db.connect()
self.add_finished_callback(self._close_db)
def _close_db(self, request):
if not db.is_closed():
db.close()
# In your application factory:
def main(global_settings, **settings):
config = Configurator(settings=settings)
config.set_request_factory(MyRequest)
Sanic¶
Sanic is an async framework and can be used with Peewee’s Async Support integration.
from sanic import Sanic
from peewee import *
from playhouse.pwasyncio import *
app = Sanic('PeeweeApp')
db = AsyncPostgresqlDatabase('peewee_test', host='10.8.0.1', user='postgres')
@app.on_request
async def open_connection(request):
await db.aconnect() # Obtain connection from connection pool.
@app.on_response
async def close_connection(request, response):
await db.aclose() # Return connection to pool.
@app.before_server_start
async def setup_db(app):
async with db:
await db.acreate_tables([Model1, Model2, Model3, ...])
@app.before_server_stop
async def shutdown_db(app):
await db.close_pool()
Example demonstrating executing an async query:
from sanic import json
@app.get('/message/')
async def message(request):
# Get the latest message from the database.
message = await db.get(Message.select().order_by(Message.id.desc()))
return json({'content': message.content, 'id': message.id})
See also
CherryPy¶
Subscribe to the engine’s before/after request events:
import cherrypy
from peewee import *
db = SqliteDatabase('my_app.db')
def _db_connect():
db.connect()
def _db_close():
if not db.is_closed():
db.close()
cherrypy.engine.subscribe('before_request', _db_connect)
cherrypy.engine.subscribe('after_request', _db_close)
General Pattern for Any Framework¶
If your framework is not listed here, the integration follows the same structure:
Find the hook that runs before every request handler.
Call
db.connect()there.Find the hook that runs after every request (success and error both).
Call
db.close()there if the connection is open.
Any WSGI or ASGI middleware that wraps the application callable can also manage this:
class PeeweeMiddleware:
def __init__(self, app, database):
self.app = app
self.db = database
def __call__(self, environ, start_response):
self.db.connect()
try:
return self.app(environ, start_response)
finally:
if not self.db.is_closed():
self.db.close()
# Wrap your WSGI app:
application = PeeweeMiddleware(application, db)