Postgresql¶
The playhouse.postgres_ext module exposes Postgresql-specific field types
and features that are not available in the standard PostgresqlDatabase.
Getting Started¶
To get started import the playhouse.postgres_ext module and use the PostgresqlExtDatabase
database class:
from playhouse.postgres_ext import *
db = PostgresqlExtDatabase('peewee_test', user='postgres')
class BaseExtModel(Model):
class Meta:
database = db
PostgresqlExtDatabase¶
- class PostgresqlExtDatabase(database, server_side_cursors=False, register_hstore=False, prefer_psycopg3=False, **kwargs)¶
Extends
PostgresqlDatabaseand is required to use:- Parameters:
database (str) – Name of database to connect to.
server_side_cursors (bool) – Whether
SELECTqueries should utilize server-side cursors.register_hstore (bool) – Register the hstore extension.
prefer_psycopg3 (bool) – If both psycopg2 and psycopg3 are installed, instruct Peewee to prefer psycopg3.
When using
server_side_cursorsbe sure to wrap your queries withServerSide().
- class PooledPostgresqlExtDatabase(database, **kwargs)¶
Connection-pooling variant of
PostgresqlExtDatabase.
- class Psycopg3Database(database, **kwargs)¶
Same as
PostgresqlExtDatabasebut specifiesprefer_psycopg3=True.
- class PooledPsycopg3Database(database, **kwargs)¶
Connection-pooling variant of
Psycopg3Database.
JSON Support¶
Peewee provides two JSON field types for Postgresql:
BinaryJSONField- stores JSON in the efficient binaryjsonbformat. Supports key/item access, containment operations.JSONField- stores JSON as text, supports key/item access.
Most applications will wish to use BinaryJSONField (JSONB):
Faster Queries: direct access to data elements without parsing the entire JSON document each time.
Index Support: supports indexing via GiST or GIN.
Faster updates without requiring rewriting the entire document.
The only time JSONField is preferable is when you must store
the exact JSON data verbatim (whitespace, object key ordering).
from playhouse.postgres_ext import PostgresqlExtDatabase, BinaryJSONField
db = PostgresqlExtDatabase('my_app')
class Event(Model):
data = BinaryJSONField()
class Meta:
database = db
# Store data:
Event.create(data={
'type': 'login',
'user_id': 42,
'request': {'ip': '1.2.3.4'},
'success': True})
# Filter using a nested key:
query = (Event
.select()
.where(Event.data['request']['ip'] == '1.2.3.4'))
# Select, group and order-by JSON values.
query = (Event
.select(Event.data['user_id'],
fn.COUNT(Event.id))
.group_by(Event.data['user_id'])
.order_by(Event.data['user_id'])
.tuples())
# Retrieve JSON objects.
query = (Event
.select(Event.data['request'].as_json().alias('request'))
.where(Event.data['user_id'] == 42))
for event in query:
print(event.request['ip'])
Tip
Refer to the Postgresql JSON documentation for in-depth discussion and examples of using JSON and JSONB.
BinaryJSONField and JSONField¶
- class BinaryJSONField(dumps=None, *args, **kwargs)¶
- Parameters:
dumps – custom implementation of
json.dumps
Extends
JSONFieldfor thejsonbtype.By default BinaryJSONField will use a GiST index. To disable this, initialize the field with
index=False.- as_json()¶
Deserialize and return the JSON value at the given path.
- concat(data)¶
Concatenate the field value with
data. Note this is a shallow operation and does not deep-merge nested objects.Example:
# Add object - if "result" key existed before it is overwritten. (Event .update(data=Event.data.concat({'result': {'success': True}})) .execute())
Nested data can also use
concat():# Select the result subkey and merge with additional data: # {'ip': '1.2.3.4'} --> {'ip': '1.2.3.4', 'status': 'ok'} Event.select(Event.data['result'].concat({'status': 'ok'}))
- contains(other)¶
Test whether this field’s value contains
other(as a subset).othermay be a partial dict, list, or scalar value. Useful for matching against a partial JSON object or checking if an item is in an array.Event.create(data={ 'type': 'rename', 'name': 'new name', 'metadata': {'old_name': 'the old name'}, 'tags': ['t1', 't2', 't3']}) # These queries match the above row: # Search by partial object: Event.select().where(Event.data.contains({'type': 'rename'})) # Partial object and partial array: Event.select().where(Event.data.contains({ 'type': 'rename', 'tags': ['t2', 't1'], })) # Partial array, irrespective of ordering: Event.select().where(Event.data['tags'].contains(['t2', 't1'])) # Search array by individual item: Event.select().where(Event.data['tags'].contains('t1'))
To test whether a key simply exists, use
has_key():Event.select().where(Event.data.has_key('name')) # Or search a sub-key. Event.select().where(Event.data['metadata'].has_key('old_name'))
- contains_any(*keys)¶
Test whether any of
keysis present in the JSON value.Event.create(data={ 'type': 'rename', 'name': 'new name', 'metadata': {'old_name': 'the old name'}, 'tags': ['t1', 't2', 't3']}) # These queries match the above row: Event.select().where(Event.data.contains_any('name', 'other')) # Search a nested object: Event.select().where( Event.data['metadata'].contains_any('old_name', 'old_status')) # Search nested object for items in an array: Event.select().where(Event.data['tags'].contains_any('t3', 'tx'))
- contains_all(*keys)¶
Test whether all of
keysare present in the JSON value.Event.create(data={ 'type': 'rename', 'name': 'new name', 'metadata': {'old_name': 'the old name'}, 'tags': ['t1', 't2', 't3']}) # These queries match the above row: Event.select().where(Event.data.contains_all('name', 'tags')) # Search nested object for items in an array: Event.select().where(Event.data['tags'].contains_all('t3', 't2'))
- contained_by(other)¶
Test whether this field’s value is a subset of
other.Event.create(data={ 'type': 'login', 'result': {'success': True}}) Event.create(data={ 'type': 'rename', 'name': 'new name', 'metadata': {'old_name': 'the old name'}, 'tags': ['t1', 't2', 't3']}) # Matches the login row. (Event .select() .where(Event.data.contained_by({ 'type': 'login', 'result': {'success': True, 'message': 'OK'}}))) # Match events that have a result w/success=True and/or # error=False: Event.select().where(Event.data['result'].contained_by({ 'success': True, 'error': False}) # Check that tags are subset of the popular tags (matches rename row). popular_tags = ['t3', 't2', 't1', 'tx', 'ty'] Event.select().where(Event.data['tags'].contained_by(popular_tags))
- has_key(key)¶
Test whether
keyexists.Event.select().where(Event.data.has_key('result')) Event.select().where(Event.data['result'].has_key('success'))
- remove(*keys)¶
Remove one or more keys from the JSON object.
# Atomically remove key: Event.update(data=Event.data.remove('result')).execute() # Equivalent to above: Event.update(data=Event.data['result'].remove()).execute() # Remove deeply-nested item: Event.update(data=Event.data['metadata']['prior'].remove())
- length()¶
Return the length of the JSON array at the given path.
Event.select().where(Event.data['tags'].length() > 3)
- extract(*path)¶
Extract the JSON data at the given path.
Event.select().where(Event.data.extract('tags', 0) == 'first_tag') Event.select().where(Event.data.extract('result', 'success') == True) # Equivalent to above. Event.select().where(Event.data['result'].extract('success') == True)
- class JSONField(dumps=None, *args, **kwargs)¶
- Parameters:
dumps – custom implementation of
json.dumps
Field that stores and retrieves JSON data. Supports
__getitem__key access for filtering and sub-object retrieval.Consider using the
BinaryJSONFieldinstead as it offers better performance and more powerful querying options.- as_json()¶
Deserialize and return the JSON value at the given path.
- concat(data)¶
Concatenate the field value with
data. Note this is a shallow operation and does not deep-merge nested objects.See
BinaryJSONField.concat()for example usage.
- length()¶
Return the length of the JSON array at the given path.
See
BinaryJSONField.length()for example usage.
- extract(*path)¶
Extract the JSON data at the given path.
See
BinaryJSONField.extract()for example usage.
HStore¶
Postgresql’s hstore
extension stores arbitrary key/value pairs in a single column. Enable it by
passing register_hstore=True when initializing the database:
db = PostgresqlExtDatabase('my_app', register_hstore=True)
class Event(Model):
data = HStoreField()
class Meta:
database = db
HStoreField supports the following operations:
Store and retrieve arbitrary dictionaries
Filter by key(s) or partial dictionary
Update/add one or more keys to an existing dictionary
Delete one or more keys from an existing dictionary
Select keys, values, or zip keys and values
Retrieve a slice of keys/values
Test for the existence of a key
Test that a key has a non-NULL value
Example:
# Create a record with arbitrary attributes:
Event.create(data={
'type': 'register',
'ip': '1.2.3.4',
'email': 'charles@example.com',
'result': 'success',
'referrer': 'google.com'})
Event.create(data={
'type': 'login',
'ip': '1.2.3.4',
'email': 'charles@example.com',
'result': 'success'})
# Lookup nested values in the data:
Event.select().where(Event.data['type'] == 'login')
# Filter by a key/value pair:
Event.select().where(Event.data.contains({'result': 'success'})
# Filter by key existence:
Event.select().where(Event.data.exists('referrer'))
# Atomic update - adds new keys, updates existing ones:
new_data = Event.data.update({
'result': 'ok',
'status': 'success'})
(Event
.update(data=new_data)
.where(Event.data['result'] == 'success')
.execute())
# Atomic key deletion:
(Event
.update(data=Event.data.delete('referrer'))
.where(Event.data['referrer'] == 'google.com')
.execute())
# Retrieve keys or values as a list:
for event in Event.select(Event.id, Event.data.keys().alias('k')):
print(event.id, event.k)
# Prints:
# 1 ['ip', 'type', 'email', 'result', 'status']
# Retrieve a subset of data:
query = (Event
.select(Event.id,
Event.data.slice('ip', 'email').alias('source'))
.order_by(Event.data['ip']))
for event in query:
print(event.id, event.source)
# Prints:
# 1 {'ip': '1.2.3.4', 'email': 'charles@example.com'}
HStoreField API¶
- class HStoreField¶
By default
HStoreFieldwill use a GiST index. To disable this, initialize the field withindex=False.- __getitem__(key)¶
- Parameters:
key (str) – get value at given key.
Example:
Event.select().where(Event.data['type'] == 'login')
- contains(value)¶
- Parameters:
value (dict, list, tuple or string key.) – value to search for.
Test whether the HStore data contains the given
dict(match keys and values),list/tuple(match keys), orstrkey.Example:
# Contains key/value pairs: Event.select().where(Event.data.contains({'result': 'success'})) # Contains a list of keys: Event.select().where(Event.data.contains(['result', 'status'])) # Contains a single key: Event.select().where(Event.data.contains('result'))
- contains_any(*keys)¶
Test whether the HStore contains any of the given keys.
- exists(key)¶
Test whether key exists in data.
- defined(key)¶
Test whether key is non-NULL in data.
- update(__data=None, **data)¶
- Parameters:
__data (dict) – Specify update as a
dict.data – Specify update as keyword arguments.
Perform an in-place, atomic update.
# Atomic update - adds new keys, updates existing ones: new_data = Event.data.update({ 'result': 'ok', 'status': 'success'}) (Event .update(data=new_data) .where(Event.data['result'] == 'success') .execute())
- delete(*keys)¶
- Parameters:
keys – one or more keys to delete from data.
# Atomic key deletion: (Event .update(data=Event.data.delete('referrer')) .where(Event.data['referrer'] == 'google.com') .execute())
- slice(*keys)¶
- Parameters:
keys (str) – keys to retrieve.
Retrieve only the provided key/value pairs:
query = (Event .select(Event.id, Event.data.slice('ip', 'email').alias('source')) .order_by(Event.data['ip'])) for event in query: print(event.id, event.source) # 1 {'ip': '1.2.3.4', 'email': 'charles@example.com'}
- keys()¶
Return the keys as a list.
query = Event.select(Event.data.keys().alias('keys')) for event in query: print(event.keys) # ['ip', 'type', 'email', 'result', 'status']
- values()¶
Return the values as a list.
- items()¶
Return the key-value pairs as a 2-dimensional list.
query = Event.select(Event.data.items().alias('items')) for event in query: print(event.items) # [['ip', '1.2.3.4'], # ['type', 'register'], # ['email', 'charles@example.com'], # ['result', 'ok'], # ['status', 'success']]
Arrays¶
- class ArrayField(field_class=IntegerField, field_kwargs=None, dimensions=1, convert_values=False)¶
Stores a Postgresql array of the given field type.
- Parameters:
field_class – a subclass of
Field, e.g.IntegerField.field_kwargs (dict) – arguments to initialize
field_class.dimensions (int) – Number of array dimensions.
convert_values (bool) – Apply
field_classvalue conversion to retrieved data.
By default ArrayField will use a GIN index. To disable this, initialize the field with
index=False.Example:
class Post(Model): tags = ArrayField(CharField) Post.create(tags=['python', 'peewee', 'postgresql']) Post.create(tags=['python', 'sqlite']) # Get an item by index. Post.select(Post.tags[0].alias('first_tag')) # Get a slice: Post.select(Post.tags[:2].alias('first_two'))
Multi-dimensional array example:
class Outline(Model): points = ArrayField(IntegerField, dimensions=2) Outline.create(points=[[1, 1], [1, 5], [5, 5], [5, 1]])
- contains(*items)¶
Filter rows where the array contains all of the given values.
- Parameters:
items – One or more items that must be in the given array field.
Post.select().where(Post.tags.contains('postgresql', 'python'))
- contains_any(*items)¶
Filter rows where the array contains any of the given values.
- Parameters:
items – One or more items to search for in the given array field.
Post.select().where(Post.tags.contains('postgresql', 'python'))
Interval¶
- class IntervalField(**kwargs)¶
Stores Python
datetime.timedeltainstances using Postgresql’s nativeINTERVALtype.from datetime import timedelta class Subscription(Model): duration = IntervalField() Subscription.create(duration=timedelta(days=30)) (Subscription .select() .where(Subscription.duration > timedelta(days=10)))
DateTimeTZ Field¶
- class DateTimeTZField(**kwargs)¶
Timezone-aware datetime field using Postgresql’s
TIMESTAMP WITH TIME ZONEtype.class Event(Model): timestamp = DateTimeTZField() now = datetime.datetime.now().astimezone(datetime.timezone.utc) Event.create(timestamp=now) event = Event.get() print(event.timestamp) # 2026-01-02 03:04:05.012345+00:00
Full-Text Search¶
Postgresql full-text search uses the tsvector and tsquery types.
Peewee offers two approaches: the simple Match() function (no schema
changes required) and the TSVectorField for dedicated search columns
(better performance).
Simple approach - no schema changes required:
from playhouse.postgres_ext import Match
def search_posts(term):
return Post.select().where(
(Post.status == 'published') &
Match(Post.body, term))
The Match() function will automatically convert the left-hand operand
to a tsvector, and the right-hand operand to a tsquery. For better
performance, create a GIN index:
CREATE INDEX posts_fts ON post USING gin(to_tsvector('english', body));
Dedicated column - better performance:
class Post(Model):
body = TextField()
search_content = TSVectorField() # Automatically gets a GIN index.
# Store a post and populate the search vector:
Post.create(
body=body_text,
search_content=fn.to_tsvector(body_text))
# Search:
Post.select().where(Post.search_content.match('python postgresql'))
# Search using expressions:
terms = 'python & (sqlite | postgres)'
Post.select().where(Post.search_content.match(terms))
For more information, see the Postgres full-text search docs.
- Match(field, query)¶
Generate a full-text search expression that converts
fieldtotsvectorandquerytotsqueryautomatically.
- class TSVectorField¶
Field type for storing pre-computed
tsvectordata. Automatically created with a GIN index (useindex=Falseto disable).Data must be explicitly converted to
tsvectoron write usingfn.to_tsvector().Example:
class Post(Model): body = TextField() search_content = TSVectorField() Post.create( body=body_text, search_content=fn.to_tsvector(body_text)) (Post .select() .where(Post.search_content.match('python & (sqlite | postgres)')))
- match(query, language=None, plain=False)¶
- Parameters:
query (str) – Full-text search query.
language (str) – Optional language name.
plain (bool) – Use the plain (simple) query parser instead of the default one, which supports
&,|, and!operators.
Server-Side Cursors¶
For large result sets, server-side (named) cursors stream rows from the server rather than loading the entire result into memory. Rows are fetched transparently from the server as you iterate.
Refer to your driver documentation for details:
To use server-side (or named) cursors, you must be using PostgresqlExtDatabase.
Wrap any SELECT query with ServerSide():
from playhouse.postgres_ext import ServerSide
# Must be in a transaction to use server-side cursors.
with db.atomic():
# Create a normal SELECT query.
large_query = PageView.select()
# Then wrap in `ServerSide` and iterate.
for page_view in ServerSide(large_query):
# Do something interesting.
pass
# At this point server side resources are released.
For more granular control or to close the cursor explicitly:
with db.atomic():
large_query = PageView.select().order_by(PageView.id.desc())
# Rows will be fetched 1000 at-a-time, but iteration is transparent.
query = ServerSideQuery(query, array_size=1000)
# Read 9500 rows then close server-side cursor.
accum = []
for i, obj in enumerate(query.iterator()):
if i == 9500:
break
accum.append(obj)
# Release server-side resource.
query.close()
Warning
Server-side cursors live only within a transaction. If you are using psycopg2
(not psycopg3), cursors are declared WITH HOLD and must be fully
exhausted or explicitly closed to release server resources.
- ServerSide(select_query)¶
- Parameters:
select_query – a
SelectQueryinstance.- Rtype generator:
Wrap
select_queryin a transaction and iterate usingiterator()(disables row caching).
CockroachDB¶
CockroachDB (CRDB) is compatible with
Postgresql’s wire protocol and is well-supported by Peewee. Use the dedicated
CockroachDatabase class rather than PostgresqlDatabase
to get CRDB-specific handling.
from playhouse.cockroachdb import CockroachDatabase
db = CockroachDatabase('my_app', user='root', host='10.8.0.1')
If you are using Cockroach Cloud, you may find it easier to specify the connection parameters using a connection-string:
db = CockroachDatabase('postgresql://root:secret@host:26257/defaultdb...')
SSL configuration:
db = CockroachDatabase(
'my_app',
user='root',
host='10.8.0.1',
sslmode='verify-full',
sslrootcert='/path/to/root.crt')
# Or, alternatively, specified as part of a connection-string:
db = CockroachDatabase('postgresql://root:secret@host:26257/dbname'
'?sslmode=verify-full&sslrootcert=/path/to/root.crt'
'&options=--cluster=my-cluster-xyz')
Key differences from Postgresql¶
No nested transactions. CRDB does not support savepoints, so calling
atomic()inside anotheratomic()block raises an exception. Usetransaction()instead, which ignores nested calls and commits only when the outermost block exits.Client-side retries. CRDB may abort transactions due to contention. Use
run_transaction()for automatic retries.
Special field-types that may be useful when using CRDB:
UUIDKeyField- a primary-key field implementation that uses CRDB’sUUIDtype with a default randomly-generated UUID.RowIDField- a primary-key field implementation that uses CRDB’sINTtype with a defaultunique_rowid().JSONField- same as the PostgresBinaryJSONField, as CRDB treats all JSON as JSONB.ArrayField- same as the Postgres extension (but does not support multi-dimensional arrays).
Transactions:
# transaction() is safe to nest; the outer block manages the commit.
@db.transaction()
def create_user(username):
return User.create(username=username)
with db.transaction():
create_user('alice') # Nested call is folded into outer transaction.
create_user('bob')
# Transaction is committed here.
Client-side retries:
from playhouse.cockroachdb import CockroachDatabase
db = CockroachDatabase('my_app')
def transfer_funds(from_id, to_id, amt):
"""
Returns a 3-tuple of (success?, from balance, to balance). If there are
not sufficient funds, then the original balances are returned.
"""
def thunk(db_ref):
src, dest = (Account
.select()
.where(Account.id.in_([from_id, to_id])))
if src.id != from_id:
src, dest = dest, src # Swap order.
# Cannot perform transfer, insufficient funds!
if src.balance < amt:
return False, src.balance, dest.balance
# Update each account, returning the new balance.
src, = (Account
.update(balance=Account.balance - amt)
.where(Account.id == from_id)
.returning(Account.balance)
.execute())
dest, = (Account
.update(balance=Account.balance + amt)
.where(Account.id == to_id)
.returning(Account.balance)
.execute())
return True, src.balance, dest.balance
# Perform the queries that comprise a logical transaction. In the
# event the transaction fails due to contention, it will be auto-
# matically retried (up to 10 times).
return db.run_transaction(thunk, max_attempts=10)
CRDB API¶
- class CockroachDatabase(database, **kwargs)¶
Subclass of
PostgresqlDatabasefor CockroachDB.- run_transaction(callback, max_attempts=None, system_time=None, priority=None)¶
- Parameters:
callback – Callable accepting a single
dbargument. Must not manage the transaction itself. May be called multiple times.max_attempts (int) – Retry limit.
system_time (datetime) – Execute
AS OF SYSTEM TIMEwith respect to the given value.priority (str) –
'low','normal', or'high'.
- Raises:
ExceededMaxAttempts – When
max_attemptsis exceeded.
Execute SQL in a transaction with automatic client-side retries.
User-provided
callback:Must accept one parameter, the
dbinstance representing the connection the transaction is running under.Must not attempt to commit, rollback or otherwise manage the transaction.
May be called more than one time.
Should ideally only contain SQL operations.
Additionally, the database must not have any open transactions at the time this function is called, as CRDB does not support nested transactions. Attempting to do so will raise a
NotImplementedError.
- class PooledCockroachDatabase(database, **kwargs)¶
Connection-pooling variant of
CockroachDatabase.
- run_transaction(db, callback, max_attempts=None, system_time=None, priority=None)¶
Run SQL in a transaction with automatic client-side retries. See
CockroachDatabase.run_transaction()for details.This function is equivalent to the identically-named method on the
CockroachDatabaseclass.
CRDB-specific field types:
- class UUIDKeyField
UUID primary key auto-populated with CRDB’s
gen_random_uuid().
- class RowIDField
Integer primary key auto-populated with CRDB’s
unique_rowid().