Postgresql

The playhouse.postgres_ext module exposes Postgresql-specific field types and features that are not available in the standard PostgresqlDatabase.

Getting Started

To get started import the playhouse.postgres_ext module and use the PostgresqlExtDatabase database class:

from playhouse.postgres_ext import *

db = PostgresqlExtDatabase('peewee_test', user='postgres')

class BaseExtModel(Model):
    class Meta:
        database = db

PostgresqlExtDatabase

class PostgresqlExtDatabase(database, server_side_cursors=False, register_hstore=False, prefer_psycopg3=False, **kwargs)

Extends PostgresqlDatabase and is required to use:

Parameters:
  • database (str) – Name of database to connect to.

  • server_side_cursors (bool) – Whether SELECT queries should utilize server-side cursors.

  • register_hstore (bool) – Register the hstore extension.

  • prefer_psycopg3 (bool) – If both psycopg2 and psycopg3 are installed, instruct Peewee to prefer psycopg3.

When using server_side_cursors be sure to wrap your queries with ServerSide().

class PooledPostgresqlExtDatabase(database, **kwargs)

Connection-pooling variant of PostgresqlExtDatabase.

class Psycopg3Database(database, **kwargs)

Same as PostgresqlExtDatabase but specifies prefer_psycopg3=True.

class PooledPsycopg3Database(database, **kwargs)

Connection-pooling variant of Psycopg3Database.

JSON Support

Peewee provides two JSON field types for Postgresql:

  • BinaryJSONField - stores JSON in the efficient binary jsonb format. Supports key/item access, containment operations.

  • JSONField - stores JSON as text, supports key/item access.

Most applications will wish to use BinaryJSONField (JSONB):

  • Faster Queries: direct access to data elements without parsing the entire JSON document each time.

  • Index Support: supports indexing via GiST or GIN.

  • Faster updates without requiring rewriting the entire document.

The only time JSONField is preferable is when you must store the exact JSON data verbatim (whitespace, object key ordering).

from playhouse.postgres_ext import PostgresqlExtDatabase, BinaryJSONField

db = PostgresqlExtDatabase('my_app')

class Event(Model):
    data = BinaryJSONField()
    class Meta:
        database = db

# Store data:
Event.create(data={
    'type': 'login',
    'user_id': 42,
    'request': {'ip': '1.2.3.4'},
    'success': True})

# Filter using a nested key:
query = (Event
         .select()
         .where(Event.data['request']['ip'] == '1.2.3.4'))

# Select, group and order-by JSON values.
query = (Event
         .select(Event.data['user_id'],
                 fn.COUNT(Event.id))
         .group_by(Event.data['user_id'])
         .order_by(Event.data['user_id'])
         .tuples())

# Retrieve JSON objects.
query = (Event
         .select(Event.data['request'].as_json().alias('request'))
         .where(Event.data['user_id'] == 42))
for event in query:
    print(event.request['ip'])

Tip

Refer to the Postgresql JSON documentation for in-depth discussion and examples of using JSON and JSONB.

BinaryJSONField and JSONField

class BinaryJSONField(dumps=None, *args, **kwargs)
Parameters:

dumps – custom implementation of json.dumps

Extends JSONField for the jsonb type.

By default BinaryJSONField will use a GiST index. To disable this, initialize the field with index=False.

as_json()

Deserialize and return the JSON value at the given path.

concat(data)

Concatenate the field value with data. Note this is a shallow operation and does not deep-merge nested objects.

Example:

# Add object - if "result" key existed before it is overwritten.
(Event
 .update(data=Event.data.concat({'result': {'success': True}}))
 .execute())

Nested data can also use concat():

# Select the result subkey and merge with additional data:
# {'ip': '1.2.3.4'} --> {'ip': '1.2.3.4', 'status': 'ok'}

Event.select(Event.data['result'].concat({'status': 'ok'}))
contains(other)

Test whether this field’s value contains other (as a subset). other may be a partial dict, list, or scalar value. Useful for matching against a partial JSON object or checking if an item is in an array.

Event.create(data={
    'type': 'rename',
    'name': 'new name',
    'metadata': {'old_name': 'the old name'},
    'tags': ['t1', 't2', 't3']})

# These queries match the above row:

# Search by partial object:
Event.select().where(Event.data.contains({'type': 'rename'}))

# Partial object and partial array:
Event.select().where(Event.data.contains({
    'type': 'rename',
    'tags': ['t2', 't1'],
}))

# Partial array, irrespective of ordering:
Event.select().where(Event.data['tags'].contains(['t2', 't1']))

# Search array by individual item:
Event.select().where(Event.data['tags'].contains('t1'))

To test whether a key simply exists, use has_key():

Event.select().where(Event.data.has_key('name'))

# Or search a sub-key.
Event.select().where(Event.data['metadata'].has_key('old_name'))
contains_any(*keys)

Test whether any of keys is present in the JSON value.

Event.create(data={
    'type': 'rename',
    'name': 'new name',
    'metadata': {'old_name': 'the old name'},
    'tags': ['t1', 't2', 't3']})

# These queries match the above row:

Event.select().where(Event.data.contains_any('name', 'other'))

# Search a nested object:
Event.select().where(
    Event.data['metadata'].contains_any('old_name', 'old_status'))

# Search nested object for items in an array:
Event.select().where(Event.data['tags'].contains_any('t3', 'tx'))
contains_all(*keys)

Test whether all of keys are present in the JSON value.

Event.create(data={
    'type': 'rename',
    'name': 'new name',
    'metadata': {'old_name': 'the old name'},
    'tags': ['t1', 't2', 't3']})

# These queries match the above row:

Event.select().where(Event.data.contains_all('name', 'tags'))

# Search nested object for items in an array:
Event.select().where(Event.data['tags'].contains_all('t3', 't2'))
contained_by(other)

Test whether this field’s value is a subset of other.

Event.create(data={
    'type': 'login',
    'result': {'success': True}})

Event.create(data={
    'type': 'rename',
    'name': 'new name',
    'metadata': {'old_name': 'the old name'},
    'tags': ['t1', 't2', 't3']})

# Matches the login row.
(Event
 .select()
 .where(Event.data.contained_by({
     'type': 'login',
     'result': {'success': True, 'message': 'OK'}})))

# Match events that have a result w/success=True and/or
# error=False:
Event.select().where(Event.data['result'].contained_by({
    'success': True,
    'error': False})

# Check that tags are subset of the popular tags (matches rename row).
popular_tags = ['t3', 't2', 't1', 'tx', 'ty']
Event.select().where(Event.data['tags'].contained_by(popular_tags))
has_key(key)

Test whether key exists.

Event.select().where(Event.data.has_key('result'))

Event.select().where(Event.data['result'].has_key('success'))
remove(*keys)

Remove one or more keys from the JSON object.

# Atomically remove key:
Event.update(data=Event.data.remove('result')).execute()

# Equivalent to above:
Event.update(data=Event.data['result'].remove()).execute()

# Remove deeply-nested item:
Event.update(data=Event.data['metadata']['prior'].remove())
length()

Return the length of the JSON array at the given path.

Event.select().where(Event.data['tags'].length() > 3)
extract(*path)

Extract the JSON data at the given path.

Event.select().where(Event.data.extract('tags', 0) == 'first_tag')

Event.select().where(Event.data.extract('result', 'success') == True)

# Equivalent to above.
Event.select().where(Event.data['result'].extract('success') == True)
class JSONField(dumps=None, *args, **kwargs)
Parameters:

dumps – custom implementation of json.dumps

Field that stores and retrieves JSON data. Supports __getitem__ key access for filtering and sub-object retrieval.

Consider using the BinaryJSONField instead as it offers better performance and more powerful querying options.

as_json()

Deserialize and return the JSON value at the given path.

concat(data)

Concatenate the field value with data. Note this is a shallow operation and does not deep-merge nested objects.

See BinaryJSONField.concat() for example usage.

length()

Return the length of the JSON array at the given path.

See BinaryJSONField.length() for example usage.

extract(*path)

Extract the JSON data at the given path.

See BinaryJSONField.extract() for example usage.

HStore

Postgresql’s hstore extension stores arbitrary key/value pairs in a single column. Enable it by passing register_hstore=True when initializing the database:

db = PostgresqlExtDatabase('my_app', register_hstore=True)

class Event(Model):
    data = HStoreField()
    class Meta:
        database = db

HStoreField supports the following operations:

  • Store and retrieve arbitrary dictionaries

  • Filter by key(s) or partial dictionary

  • Update/add one or more keys to an existing dictionary

  • Delete one or more keys from an existing dictionary

  • Select keys, values, or zip keys and values

  • Retrieve a slice of keys/values

  • Test for the existence of a key

  • Test that a key has a non-NULL value

Example:

# Create a record with arbitrary attributes:
Event.create(data={
    'type': 'register',
    'ip': '1.2.3.4',
    'email': 'charles@example.com',
    'result': 'success',
    'referrer': 'google.com'})

Event.create(data={
    'type': 'login',
    'ip': '1.2.3.4',
    'email': 'charles@example.com',
    'result': 'success'})

# Lookup nested values in the data:
Event.select().where(Event.data['type'] == 'login')

# Filter by a key/value pair:
Event.select().where(Event.data.contains({'result': 'success'})

# Filter by key existence:
Event.select().where(Event.data.exists('referrer'))

# Atomic update - adds new keys, updates existing ones:
new_data = Event.data.update({
    'result': 'ok',
    'status': 'success'})
(Event
 .update(data=new_data)
 .where(Event.data['result'] == 'success')
 .execute())

# Atomic key deletion:
(Event
 .update(data=Event.data.delete('referrer'))
 .where(Event.data['referrer'] == 'google.com')
 .execute())

# Retrieve keys or values as a list:
for event in Event.select(Event.id, Event.data.keys().alias('k')):
    print(event.id, event.k)

# Prints:
# 1 ['ip', 'type', 'email', 'result', 'status']

# Retrieve a subset of data:
query = (Event
         .select(Event.id,
                 Event.data.slice('ip', 'email').alias('source'))
         .order_by(Event.data['ip']))
for event in query:
    print(event.id, event.source)

# Prints:
# 1 {'ip': '1.2.3.4', 'email': 'charles@example.com'}

HStoreField API

class HStoreField

By default HStoreField will use a GiST index. To disable this, initialize the field with index=False.

__getitem__(key)
Parameters:

key (str) – get value at given key.

Example:

Event.select().where(Event.data['type'] == 'login')
contains(value)
Parameters:

value (dict, list, tuple or string key.) – value to search for.

Test whether the HStore data contains the given dict (match keys and values), list/tuple (match keys), or str key.

Example:

# Contains key/value pairs:
Event.select().where(Event.data.contains({'result': 'success'}))

# Contains a list of keys:
Event.select().where(Event.data.contains(['result', 'status']))

# Contains a single key:
Event.select().where(Event.data.contains('result'))
contains_any(*keys)

Test whether the HStore contains any of the given keys.

exists(key)

Test whether key exists in data.

defined(key)

Test whether key is non-NULL in data.

update(__data=None, **data)
Parameters:
  • __data (dict) – Specify update as a dict.

  • data – Specify update as keyword arguments.

Perform an in-place, atomic update.

# Atomic update - adds new keys, updates existing ones:
new_data = Event.data.update({
    'result': 'ok',
    'status': 'success'})
(Event
 .update(data=new_data)
 .where(Event.data['result'] == 'success')
 .execute())
delete(*keys)
Parameters:

keys – one or more keys to delete from data.

# Atomic key deletion:
(Event
 .update(data=Event.data.delete('referrer'))
 .where(Event.data['referrer'] == 'google.com')
 .execute())
slice(*keys)
Parameters:

keys (str) – keys to retrieve.

Retrieve only the provided key/value pairs:

query = (Event
         .select(Event.id,
                 Event.data.slice('ip', 'email').alias('source'))
         .order_by(Event.data['ip']))
for event in query:
    print(event.id, event.source)

# 1 {'ip': '1.2.3.4', 'email': 'charles@example.com'}
keys()

Return the keys as a list.

query = Event.select(Event.data.keys().alias('keys'))
for event in query:
    print(event.keys)

# ['ip', 'type', 'email', 'result', 'status']
values()

Return the values as a list.

items()

Return the key-value pairs as a 2-dimensional list.

query = Event.select(Event.data.items().alias('items'))
for event in query:
    print(event.items)

# [['ip', '1.2.3.4'],
#  ['type', 'register'],
#  ['email', 'charles@example.com'],
#  ['result', 'ok'],
#  ['status', 'success']]

Arrays

class ArrayField(field_class=IntegerField, field_kwargs=None, dimensions=1, convert_values=False)

Stores a Postgresql array of the given field type.

Parameters:
  • field_class – a subclass of Field, e.g. IntegerField.

  • field_kwargs (dict) – arguments to initialize field_class.

  • dimensions (int) – Number of array dimensions.

  • convert_values (bool) – Apply field_class value conversion to retrieved data.

By default ArrayField will use a GIN index. To disable this, initialize the field with index=False.

Example:

class Post(Model):
    tags = ArrayField(CharField)

Post.create(tags=['python', 'peewee', 'postgresql'])
Post.create(tags=['python', 'sqlite'])

# Get an item by index.
Post.select(Post.tags[0].alias('first_tag'))

# Get a slice:
Post.select(Post.tags[:2].alias('first_two'))

Multi-dimensional array example:

class Outline(Model):
    points = ArrayField(IntegerField, dimensions=2)

Outline.create(points=[[1, 1], [1, 5], [5, 5], [5, 1]])
contains(*items)

Filter rows where the array contains all of the given values.

Parameters:

items – One or more items that must be in the given array field.

Post.select().where(Post.tags.contains('postgresql', 'python'))
contains_any(*items)

Filter rows where the array contains any of the given values.

Parameters:

items – One or more items to search for in the given array field.

Post.select().where(Post.tags.contains('postgresql', 'python'))

Interval

class IntervalField(**kwargs)

Stores Python datetime.timedelta instances using Postgresql’s native INTERVAL type.

from datetime import timedelta

class Subscription(Model):
    duration = IntervalField()

Subscription.create(duration=timedelta(days=30))

(Subscription
 .select()
 .where(Subscription.duration > timedelta(days=10)))

DateTimeTZ Field

class DateTimeTZField(**kwargs)

Timezone-aware datetime field using Postgresql’s TIMESTAMP WITH TIME ZONE type.

class Event(Model):
    timestamp = DateTimeTZField()

now = datetime.datetime.now().astimezone(datetime.timezone.utc)

Event.create(timestamp=now)

event = Event.get()
print(event.timestamp)
# 2026-01-02 03:04:05.012345+00:00

Server-Side Cursors

For large result sets, server-side (named) cursors stream rows from the server rather than loading the entire result into memory. Rows are fetched transparently from the server as you iterate.

Refer to your driver documentation for details:

To use server-side (or named) cursors, you must be using PostgresqlExtDatabase.

Wrap any SELECT query with ServerSide():

from playhouse.postgres_ext import ServerSide

# Must be in a transaction to use server-side cursors.
with db.atomic():

    # Create a normal SELECT query.
    large_query = PageView.select()

    # Then wrap in `ServerSide` and iterate.
    for page_view in ServerSide(large_query):
        # Do something interesting.
        pass

    # At this point server side resources are released.

For more granular control or to close the cursor explicitly:

with db.atomic():
    large_query = PageView.select().order_by(PageView.id.desc())

    # Rows will be fetched 1000 at-a-time, but iteration is transparent.
    query = ServerSideQuery(query, array_size=1000)

    # Read 9500 rows then close server-side cursor.
    accum = []
    for i, obj in enumerate(query.iterator()):
        if i == 9500:
            break
        accum.append(obj)

    # Release server-side resource.
    query.close()

Warning

Server-side cursors live only within a transaction. If you are using psycopg2 (not psycopg3), cursors are declared WITH HOLD and must be fully exhausted or explicitly closed to release server resources.

ServerSide(select_query)
Parameters:

select_query – a SelectQuery instance.

Rtype generator:

Wrap select_query in a transaction and iterate using iterator() (disables row caching).

CockroachDB

CockroachDB (CRDB) is compatible with Postgresql’s wire protocol and is well-supported by Peewee. Use the dedicated CockroachDatabase class rather than PostgresqlDatabase to get CRDB-specific handling.

from playhouse.cockroachdb import CockroachDatabase

db = CockroachDatabase('my_app', user='root', host='10.8.0.1')

If you are using Cockroach Cloud, you may find it easier to specify the connection parameters using a connection-string:

db = CockroachDatabase('postgresql://root:secret@host:26257/defaultdb...')

SSL configuration:

db = CockroachDatabase(
    'my_app',
    user='root',
    host='10.8.0.1',
    sslmode='verify-full',
    sslrootcert='/path/to/root.crt')

# Or, alternatively, specified as part of a connection-string:
db = CockroachDatabase('postgresql://root:secret@host:26257/dbname'
                       '?sslmode=verify-full&sslrootcert=/path/to/root.crt'
                       '&options=--cluster=my-cluster-xyz')

Key differences from Postgresql

  • No nested transactions. CRDB does not support savepoints, so calling atomic() inside another atomic() block raises an exception. Use transaction() instead, which ignores nested calls and commits only when the outermost block exits.

  • Client-side retries. CRDB may abort transactions due to contention. Use run_transaction() for automatic retries.

Special field-types that may be useful when using CRDB:

  • UUIDKeyField - a primary-key field implementation that uses CRDB’s UUID type with a default randomly-generated UUID.

  • RowIDField - a primary-key field implementation that uses CRDB’s INT type with a default unique_rowid().

  • JSONField - same as the Postgres BinaryJSONField, as CRDB treats all JSON as JSONB.

  • ArrayField - same as the Postgres extension (but does not support multi-dimensional arrays).

Transactions:

# transaction() is safe to nest; the outer block manages the commit.
@db.transaction()
def create_user(username):
    return User.create(username=username)

with db.transaction():
    create_user('alice')   # Nested call is folded into outer transaction.
    create_user('bob')
# Transaction is committed here.

Client-side retries:

from playhouse.cockroachdb import CockroachDatabase

db = CockroachDatabase('my_app')

def transfer_funds(from_id, to_id, amt):
    """
    Returns a 3-tuple of (success?, from balance, to balance). If there are
    not sufficient funds, then the original balances are returned.
    """
    def thunk(db_ref):
        src, dest = (Account
                     .select()
                     .where(Account.id.in_([from_id, to_id])))
        if src.id != from_id:
            src, dest = dest, src  # Swap order.

        # Cannot perform transfer, insufficient funds!
        if src.balance < amt:
            return False, src.balance, dest.balance

        # Update each account, returning the new balance.
        src, = (Account
                .update(balance=Account.balance - amt)
                .where(Account.id == from_id)
                .returning(Account.balance)
                .execute())
        dest, = (Account
                 .update(balance=Account.balance + amt)
                 .where(Account.id == to_id)
                 .returning(Account.balance)
                 .execute())
        return True, src.balance, dest.balance

    # Perform the queries that comprise a logical transaction. In the
    # event the transaction fails due to contention, it will be auto-
    # matically retried (up to 10 times).
    return db.run_transaction(thunk, max_attempts=10)

CRDB API

class CockroachDatabase(database, **kwargs)

Subclass of PostgresqlDatabase for CockroachDB.

run_transaction(callback, max_attempts=None, system_time=None, priority=None)
Parameters:
  • callback – Callable accepting a single db argument. Must not manage the transaction itself. May be called multiple times.

  • max_attempts (int) – Retry limit.

  • system_time (datetime) – Execute AS OF SYSTEM TIME with respect to the given value.

  • priority (str) – 'low', 'normal', or 'high'.

Raises:

ExceededMaxAttempts – When max_attempts is exceeded.

Execute SQL in a transaction with automatic client-side retries.

User-provided callback:

  • Must accept one parameter, the db instance representing the connection the transaction is running under.

  • Must not attempt to commit, rollback or otherwise manage the transaction.

  • May be called more than one time.

  • Should ideally only contain SQL operations.

Additionally, the database must not have any open transactions at the time this function is called, as CRDB does not support nested transactions. Attempting to do so will raise a NotImplementedError.

class PooledCockroachDatabase(database, **kwargs)

Connection-pooling variant of CockroachDatabase.

run_transaction(db, callback, max_attempts=None, system_time=None, priority=None)

Run SQL in a transaction with automatic client-side retries. See CockroachDatabase.run_transaction() for details.

This function is equivalent to the identically-named method on the CockroachDatabase class.

CRDB-specific field types:

class UUIDKeyField

UUID primary key auto-populated with CRDB’s gen_random_uuid().

class RowIDField

Integer primary key auto-populated with CRDB’s unique_rowid().