Skip to content
WeftKitBeta

Python Integration

This guide shows how to connect to every WeftKit engine from Python using standard database drivers, and how to integrate with Django, FastAPI, and SQLAlchemy. WeftKit Standalone speaks the same wire protocols as well-known databases, so any driver that works with PostgreSQL, MongoDB, Redis, Neo4j, DynamoDB, or gRPC will work with WeftKit.

All examples assume weftkit-standalone is running locally. Adjust the host and port to match your deployment.


Prerequisites

Install the packages you need for the engines your application uses:

bash
# WeftKitRel — PostgreSQL protocol
pip install psycopg2-binary            # synchronous
pip install asyncpg                    # async
pip install sqlalchemy[asyncio] asyncpg # SQLAlchemy 2.0 (async)
pip install sqlalchemy psycopg2-binary  # SQLAlchemy 2.0 (sync)

# WeftKitDoc — MongoDB Wire protocol
pip install pymongo                    # synchronous
pip install motor                      # async (built on pymongo)

# WeftKitMem — Redis RESP3
pip install redis                      # includes both sync and asyncio clients

# WeftKitGraph — Bolt (Neo4j)
pip install neo4j

# WeftKitVec — gRPC
pip install grpcio grpcio-tools

# WeftKitKV — DynamoDB REST
pip install boto3

# Environment variable loading
pip install python-dotenv

# Django / FastAPI
pip install django
pip install fastapi uvicorn[standard]

Environment Variables

Store connection details in a .env file and load them with python-dotenv:

bash
# .env

# WeftKitRel
WEFTKIT_REL_DSN=postgresql://myuser:mypassword@localhost:5432/mydb

# WeftKitDoc
WEFTKIT_DOC_URI=mongodb://myuser:mypassword@localhost:27017/mydb

# WeftKitMem
WEFTKIT_MEM_HOST=localhost
WEFTKIT_MEM_PORT=6379
WEFTKIT_MEM_PASSWORD=mypassword

# WeftKitGraph
WEFTKIT_GRAPH_URI=bolt://localhost:7687
WEFTKIT_GRAPH_USER=neo4j
WEFTKIT_GRAPH_PASSWORD=mypassword

# WeftKitVec
WEFTKIT_VEC_TARGET=localhost:50051

# WeftKitKV
WEFTKIT_KV_ENDPOINT=http://localhost:8000
WEFTKIT_KV_REGION=us-east-1
WEFTKIT_KV_ACCESS_KEY=any-value
WEFTKIT_KV_SECRET_KEY=any-value

Load them at the top of your application:

python
from dotenv import load_dotenv
import os

load_dotenv()   # reads .env in the current directory

DSN = os.environ["WEFTKIT_REL_DSN"]

1. WeftKitRel (PostgreSQL) via psycopg2

psycopg2 is the most widely used PostgreSQL driver for Python.

Basic Connection

python
import psycopg2
import os

conn = psycopg2.connect(os.environ["WEFTKIT_REL_DSN"])

Cursor, Execute, and Fetch

Use %s placeholders for parameterized queries — psycopg2 handles escaping:

python
import psycopg2
import os

conn = psycopg2.connect(os.environ["WEFTKIT_REL_DSN"])
cur  = conn.cursor()

# Parameterized INSERT
cur.execute(
    "INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id",
    ("Alice", "alice@example.com"),
)
user_id = cur.fetchone()[0]
conn.commit()

# Fetch all rows
cur.execute("SELECT id, name, email FROM users ORDER BY name")
rows = cur.fetchall()
for row in rows:
    print(row)  # (1, 'Alice', 'alice@example.com')

# Fetch one row
cur.execute("SELECT id, name, email FROM users WHERE id = %s", (user_id,))
user = cur.fetchone()

cur.close()
conn.close()

Transaction Management (commit / rollback)

python
import psycopg2
import os

conn = psycopg2.connect(os.environ["WEFTKIT_REL_DSN"])
conn.autocommit = False   # default; explicit here for clarity

cur = conn.cursor()
try:
    cur.execute(
        "UPDATE accounts SET balance = balance - %s WHERE id = %s",
        (500, 1),
    )
    cur.execute(
        "UPDATE accounts SET balance = balance + %s WHERE id = %s",
        (500, 2),
    )
    conn.commit()
    print("Transfer complete")
except Exception as e:
    conn.rollback()
    print(f"Transfer failed, rolled back: {e}")
finally:
    cur.close()
    conn.close()

Context Manager (recommended)

python
import psycopg2
import os

with psycopg2.connect(os.environ["WEFTKIT_REL_DSN"]) as conn:
    with conn.cursor() as cur:
        cur.execute(
            "INSERT INTO orders (user_id, total) VALUES (%s, %s) RETURNING id",
            (42, 99.99),
        )
        order_id = cur.fetchone()[0]
    # conn.commit() is called automatically on exiting the `with conn` block

Connection Pool with psycopg2.pool

For production use, pool connections rather than opening a new one per request:

python
from psycopg2 import pool
import os

connection_pool = pool.ThreadedConnectionPool(
    minconn=2,
    maxconn=20,
    dsn=os.environ["WEFTKIT_REL_DSN"],
)

def get_users():
    conn = connection_pool.getconn()
    try:
        with conn.cursor() as cur:
            cur.execute("SELECT id, name FROM users")
            return cur.fetchall()
    finally:
        connection_pool.putconn(conn)

2. WeftKitRel via asyncpg (Async)

asyncpg provides a high-performance async interface. Use $1, $2, ... placeholders.

Connection Pool

python
import asyncpg
import asyncio
import os

async def create_pool() -> asyncpg.Pool:
    return await asyncpg.create_pool(
        dsn=os.environ["WEFTKIT_REL_DSN"],
        min_size=2,
        max_size=20,
    )

# Typically called once during application startup
pool: asyncpg.Pool | None = None

async def startup():
    global pool
    pool = await create_pool()

Async Queries

python
async def get_users():
    rows = await pool.fetch("SELECT id, name, email FROM users ORDER BY name")
    return [dict(row) for row in rows]

async def create_user(name: str, email: str) -> int:
    row = await pool.fetchrow(
        "INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id",
        name, email,
    )
    return row["id"]

async def get_user_by_id(user_id: int):
    row = await pool.fetchrow(
        "SELECT id, name, email FROM users WHERE id = $1",
        user_id,
    )
    return dict(row) if row else None

Async Transaction

python
async def transfer_balance(from_id: int, to_id: int, amount: float):
    async with pool.acquire() as conn:
        async with conn.transaction():
            await conn.execute(
                "UPDATE accounts SET balance = balance - $1 WHERE id = $2",
                amount, from_id,
            )
            await conn.execute(
                "UPDATE accounts SET balance = balance + $1 WHERE id = $2",
                amount, to_id,
            )

3. WeftKitRel via SQLAlchemy 2.0

SQLAlchemy's ORM and Core work with WeftKit just as they do with PostgreSQL.

Synchronous Engine and Session

python
from sqlalchemy import create_engine, text, Column, Integer, String, DateTime, func
from sqlalchemy.orm import DeclarativeBase, Session
import os

engine = create_engine(
    os.environ["WEFTKIT_REL_DSN"].replace("postgresql://", "postgresql+psycopg2://"),
    pool_size=10,
    max_overflow=20,
    echo=False,
)

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"

    id         = Column(Integer, primary_key=True)
    name       = Column(String(255), nullable=False)
    email      = Column(String(255), unique=True, nullable=False)
    created_at = Column(DateTime, server_default=func.now())

# Create tables
Base.metadata.create_all(engine)

Session Usage

python
# Add a new record
with Session(engine) as session:
    user = User(name="Alice", email="alice@example.com")
    session.add(user)
    session.commit()
    session.refresh(user)
    print(user.id)

# Query records
with Session(engine) as session:
    users = session.query(User).filter(User.name.like("A%")).order_by(User.name).all()
    for u in users:
        print(u.id, u.name, u.email)

# Update
with Session(engine) as session:
    user = session.get(User, 1)
    if user:
        user.email = "alice.new@example.com"
        session.commit()

# Delete
with Session(engine) as session:
    user = session.get(User, 1)
    if user:
        session.delete(user)
        session.commit()

Raw SQL with Core

python
with engine.connect() as conn:
    result = conn.execute(
        text("SELECT id, name FROM users WHERE created_at > :since"),
        {"since": "2025-01-01"},
    )
    for row in result:
        print(row.id, row.name)

Async Engine with asyncpg

python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
import os

async_engine = create_async_engine(
    os.environ["WEFTKIT_REL_DSN"].replace("postgresql://", "postgresql+asyncpg://"),
    pool_size=10,
    max_overflow=20,
)

AsyncSessionLocal = async_sessionmaker(async_engine, expire_on_commit=False)

async def get_users_async():
    async with AsyncSessionLocal() as session:
        result = await session.execute(text("SELECT id, name FROM users"))
        return result.fetchall()

4. WeftKitDoc (MongoDB) via pymongo

WeftKit Standalone speaks the MongoDB Wire Protocol. pymongo connects without any modification.

MongoClient Connection

python
from pymongo import MongoClient
import os

client = MongoClient(os.environ["WEFTKIT_DOC_URI"])
db         = client["mydb"]
products   = db["products"]

Insert One / Insert Many

python
# Single document
result = products.insert_one({
    "name":     "Wireless Headphones",
    "category": "electronics",
    "price":    149.99,
    "tags":     ["audio", "wireless"],
})
print("Inserted:", result.inserted_id)

# Multiple documents
result = products.insert_many([
    {"name": "USB-C Hub", "category": "electronics", "price": 39.99, "tags": ["usb"]},
    {"name": "Desk Lamp",  "category": "office",      "price": 24.99, "tags": ["lighting"]},
])
print(f"Inserted {len(result.inserted_ids)} documents")

Find with Filter Operators

python
# Less-than filter
affordable = list(products.find({"price": {"$lt": 50}}).sort("price", 1))

# In a set of values
multi_cat = list(products.find({"category": {"$in": ["electronics", "office"]}}))

# Combined conditions
specific = list(products.find({
    "$and": [
        {"price": {"$gt": 20, "$lt": 100}},
        {"tags": {"$in": ["wireless", "usb"]}},
    ]
}))

UpdateOne with Operators

python
from pymongo import ReturnDocument

# Set fields
products.update_one(
    {"name": "Wireless Headphones"},
    {"$set": {"price": 129.99}},
)

# Increment a field
products.update_one(
    {"name": "USB-C Hub"},
    {"$inc": {"view_count": 1}},
)

# Upsert
products.update_one(
    {"sku": "KB-001"},
    {"$set": {"name": "Mechanical Keyboard", "price": 89.99}},
    upsert=True,
)

# Return the updated document
updated = products.find_one_and_update(
    {"name": "Desk Lamp"},
    {"$set": {"price": 19.99}},
    return_document=ReturnDocument.AFTER,
)

Aggregation Pipeline

python
pipeline = [
    {"$match": {"price": {"$gt": 0}}},
    {
        "$group": {
            "_id":       "$category",
            "count":     {"$sum": 1},
            "avg_price": {"$avg": "$price"},
            "min_price": {"$min": "$price"},
            "max_price": {"$max": "$price"},
        }
    },
    {"$sort": {"avg_price": -1}},
]

summary = list(products.aggregate(pipeline))
for cat in summary:
    print(cat["_id"], cat["count"], round(cat["avg_price"], 2))

Index Creation

python
from pymongo import ASCENDING, DESCENDING, TEXT

products.create_index([("category", ASCENDING)])
products.create_index([("category", ASCENDING), ("price", DESCENDING)])
products.create_index([("sku", ASCENDING)], unique=True)
products.create_index([("name", TEXT), ("tags", TEXT)])

5. WeftKitDoc via motor (Async MongoDB)

motor wraps pymongo with an asyncio-friendly interface.

Client Setup

python
import motor.motor_asyncio
import os

client = motor.motor_asyncio.AsyncIOMotorClient(os.environ["WEFTKIT_DOC_URI"])
db       = client["mydb"]
products = db["products"]

Async Find / Insert / Update

python
import asyncio

async def get_electronics():
    cursor = products.find({"category": "electronics"}).sort("price", 1)
    return await cursor.to_list(length=100)

async def add_product(name: str, price: float, category: str) -> str:
    result = await products.insert_one({
        "name": name, "price": price, "category": category
    })
    return str(result.inserted_id)

async def update_price(name: str, new_price: float):
    result = await products.update_one(
        {"name": name},
        {"$set": {"price": new_price}},
    )
    return result.modified_count

async def aggregate_by_category():
    pipeline = [
        {"$group": {"_id": "$category", "count": {"$sum": 1}, "avg": {"$avg": "$price"}}},
        {"$sort": {"avg": -1}},
    ]
    cursor = products.aggregate(pipeline)
    return await cursor.to_list(length=None)

6. WeftKitMem (Redis) via redis-py

redis-py includes both a synchronous client and an asyncio-native client. WeftKit Standalone speaks RESP3.

Synchronous Client

python
import redis
import os

r = redis.Redis(
    host=os.environ.get("WEFTKIT_MEM_HOST", "localhost"),
    port=int(os.environ.get("WEFTKIT_MEM_PORT", 6379)),
    password=os.environ.get("WEFTKIT_MEM_PASSWORD"),
    decode_responses=True,
)

Basic Commands

python
import json

# SET with TTL
r.set("session:abc123", json.dumps({"user_id": 42}), ex=600)

# GET
raw = r.get("session:abc123")
session = json.loads(raw) if raw else None

# Hash
r.hset("user:1001", mapping={"name": "Alice", "role": "admin", "logins": "0"})
profile = r.hgetall("user:1001")
r.hincrby("user:1001", "logins", 1)

# List
r.lpush("notifications:1001", json.dumps({"msg": "New comment"}))
items = [json.loads(i) for i in r.lrange("notifications:1001", 0, 19)]

# Sorted set
r.zadd("leaderboard", {"alice": 9800, "bob": 8500, "carol": 7200})
top = r.zrange("leaderboard", 0, 2, rev=True, withscores=True)
# [('alice', 9800.0), ('bob', 8500.0), ('carol', 7200.0)]

# Delete
r.delete("session:abc123")

Pipeline (Batching Commands)

Send multiple commands in a single round-trip:

python
with r.pipeline() as pipe:
    pipe.set("key1", "value1")
    pipe.set("key2", "value2")
    pipe.incr("counter")
    results = pipe.execute()
    # [True, True, <new_count>]

Pub/Sub

python
import threading

def listener():
    sub = r.pubsub()
    sub.subscribe("events:orders")
    for message in sub.listen():
        if message["type"] == "message":
            event = json.loads(message["data"])
            print("Received:", event)

thread = threading.Thread(target=listener, daemon=True)
thread.start()

# Publish from anywhere
r.publish("events:orders", json.dumps({"order_id": 42, "status": "shipped"}))

7. WeftKitMem via redis.asyncio

The async client ships inside the redis package as of version 4.2.

python
import redis.asyncio as aioredis
import json
import os

async def make_redis() -> aioredis.Redis:
    return aioredis.Redis(
        host=os.environ.get("WEFTKIT_MEM_HOST", "localhost"),
        port=int(os.environ.get("WEFTKIT_MEM_PORT", 6379)),
        password=os.environ.get("WEFTKIT_MEM_PASSWORD"),
        decode_responses=True,
    )

async def cache_set(client: aioredis.Redis, key: str, value: dict, ttl: int = 300):
    await client.set(key, json.dumps(value), ex=ttl)

async def cache_get(client: aioredis.Redis, key: str) -> dict | None:
    raw = await client.get(key)
    return json.loads(raw) if raw else None

async def increment_counter(client: aioredis.Redis, key: str) -> int:
    return await client.incr(key)

async def async_pipeline_example(client: aioredis.Redis):
    async with client.pipeline() as pipe:
        pipe.set("a", "1")
        pipe.set("b", "2")
        pipe.incr("counter")
        results = await pipe.execute()
    return results

8. WeftKitGraph (Neo4j) via the neo4j Python Driver

WeftKit Standalone speaks the Bolt protocol. The official neo4j Python driver connects directly.

Driver Initialization

python
from neo4j import GraphDatabase
import os

driver = GraphDatabase.driver(
    os.environ.get("WEFTKIT_GRAPH_URI", "bolt://localhost:7687"),
    auth=(
        os.environ.get("WEFTKIT_GRAPH_USER",     "neo4j"),
        os.environ.get("WEFTKIT_GRAPH_PASSWORD", "password"),
    ),
    max_connection_pool_size=50,
)

driver.verify_connectivity()
print("Graph connected")

Session Execute Read

Wrap read queries in execute_read for automatic retry on transient failures:

python
def get_person_by_name(name: str) -> list[dict]:
    def _query(tx):
        result = tx.run(
            "MATCH (p:Person { name: $name }) RETURN p",
            name=name,
        )
        return [record["p"] for record in result]

    with driver.session(database="neo4j") as session:
        return session.execute_read(_query)

Session Execute Write

python
def create_person(person_id: str, name: str, age: int) -> None:
    def _query(tx):
        tx.run(
            "MERGE (p:Person { id: $id }) SET p.name = $name, p.age = $age",
            id=person_id, name=name, age=age,
        )

    with driver.session() as session:
        session.execute_write(_query)

def create_friendship(id_a: str, id_b: str) -> None:
    def _query(tx):
        tx.run(
            """
            MATCH (a:Person { id: $id_a }), (b:Person { id: $id_b })
            MERGE (a)-[:FRIEND]->(b)
            MERGE (b)-[:FRIEND]->(a)
            """,
            id_a=id_a, id_b=id_b,
        )

    with driver.session() as session:
        session.execute_write(_query)

Cypher Query with Parameters

python
def get_friends(person_id: str) -> list[str]:
    def _query(tx):
        result = tx.run(
            """
            MATCH (p:Person { id: $id })-[:FRIEND]->(f:Person)
            RETURN f.name AS name
            ORDER BY name
            """,
            id=person_id,
        )
        return [record["name"] for record in result]

    with driver.session() as session:
        return session.execute_read(_query)

def shortest_path(from_id: str, to_id: str) -> dict | None:
    def _query(tx):
        result = tx.run(
            """
            MATCH path = shortestPath(
              (a:Person { id: $from_id })-[*..6]-(b:Person { id: $to_id })
            )
            RETURN [node IN nodes(path) | node.name] AS names,
                   length(path) AS hops
            """,
            from_id=from_id, to_id=to_id,
        )
        record = result.single()
        if record:
            return {"names": record["names"], "hops": record["hops"]}
        return None

    with driver.session() as session:
        return session.execute_read(_query)

Graceful Shutdown

python
import atexit

atexit.register(driver.close)

9. WeftKitVec via grpcio

WeftKit exposes its vector engine over gRPC. Generate Python stubs from the .proto file distributed with WeftKit Standalone.

Generate Stubs

bash
python -m grpc_tools.protoc \
  -I protos/ \
  --python_out=. \
  --grpc_python_out=. \
  protos/weftkit_vec.proto

Proto File (provided with WeftKit Standalone)

protobuf
// weftkit_vec.proto
syntax = "proto3";
package weftkit.vec;

service VectorService {
  rpc Upsert (UpsertRequest)  returns (UpsertResponse);
  rpc Search (SearchRequest)  returns (SearchResponse);
  rpc Delete (DeleteRequest)  returns (DeleteResponse);
}

message Vector {
  string id                   = 1;
  repeated float embedding    = 2;
  map<string, string> metadata = 3;
}

message UpsertRequest  { repeated Vector vectors = 1; string namespace = 2; }
message UpsertResponse { int32 upserted_count = 1; }

message SearchRequest  {
  repeated float query_vector = 1;
  int32 top_k      = 2;
  string namespace = 3;
  string filter    = 4;
}
message SearchResult  { string id = 1; float score = 2; map<string, string> metadata = 3; }
message SearchResponse { repeated SearchResult results = 1; }

message DeleteRequest  { repeated string ids = 1; string namespace = 2; }
message DeleteResponse { int32 deleted_count = 1; }

Channel and Stub

python
import grpc
import os
import weftkit_vec_pb2 as vec_pb2
import weftkit_vec_pb2_grpc as vec_grpc

channel = grpc.insecure_channel(
    os.environ.get("WEFTKIT_VEC_TARGET", "localhost:50051")
)
stub = vec_grpc.VectorServiceStub(channel)

Upsert Vectors

python
def upsert_vectors(
    vectors: list[dict],
    namespace: str = "default",
) -> int:
    request = vec_pb2.UpsertRequest(
        namespace=namespace,
        vectors=[
            vec_pb2.Vector(
                id=v["id"],
                embedding=v["embedding"],
                metadata=v.get("metadata", {}),
            )
            for v in vectors
        ],
    )
    response = stub.Upsert(request)
    return response.upserted_count

# Example
upserted = upsert_vectors([
    {"id": "prod-001", "embedding": [0.12, 0.84, 0.37, ...], "metadata": {"category": "electronics"}},
    {"id": "prod-002", "embedding": [0.55, 0.21, 0.90, ...], "metadata": {"category": "office"}},
], namespace="products")
print(f"Upserted {upserted} vectors")

Similarity Search

python
def search_vectors(
    query_embedding: list[float],
    top_k: int = 10,
    namespace: str = "default",
    filter_expr: str = "",
) -> list[dict]:
    request = vec_pb2.SearchRequest(
        query_vector=query_embedding,
        top_k=top_k,
        namespace=namespace,
        filter=filter_expr,
    )
    response = stub.Search(request)
    return [
        {"id": r.id, "score": r.score, "metadata": dict(r.metadata)}
        for r in response.results
    ]

results = search_vectors(
    query_embedding=[0.10, 0.80, 0.40, ...],
    top_k=5,
    namespace="products",
    filter_expr='{"category": "electronics"}',
)

Using Metadata for Auth

Pass a bearer token as gRPC metadata:

python
def search_with_auth(query_embedding: list[float], token: str) -> list[dict]:
    metadata = [("authorization", f"Bearer {token}")]
    request  = vec_pb2.SearchRequest(query_vector=query_embedding, top_k=10, namespace="default")
    response = stub.Search(request, metadata=metadata)
    return [{"id": r.id, "score": r.score} for r in response.results]

10. WeftKitKV via boto3

Point the AWS SDK at WeftKit Standalone's DynamoDB-compatible REST endpoint. Use any non-empty string for the region, access key, and secret key.

Client Setup

python
import boto3
import os

ddb = boto3.client(
    "dynamodb",
    endpoint_url=os.environ.get("WEFTKIT_KV_ENDPOINT", "http://localhost:8000"),
    region_name=os.environ.get("WEFTKIT_KV_REGION", "us-east-1"),
    aws_access_key_id=os.environ.get("WEFTKIT_KV_ACCESS_KEY", "weftkit"),
    aws_secret_access_key=os.environ.get("WEFTKIT_KV_SECRET_KEY", "weftkit"),
)

PutItem

python
ddb.put_item(
    TableName="Sessions",
    Item={
        "sessionId": {"S": "sess_abc123"},
        "userId":    {"N": "42"},
        "expiresAt": {"N": str(int(__import__("time").time()) + 3600)},
        "data":      {"S": '{"theme":"dark","locale":"en-US"}'},
    },
)

GetItem

python
response = ddb.get_item(
    TableName="Sessions",
    Key={"sessionId": {"S": "sess_abc123"}},
)
item = response.get("Item")
if item:
    user_id = int(item["userId"]["N"])
    print("Session user:", user_id)

DeleteItem

python
ddb.delete_item(
    TableName="Sessions",
    Key={"sessionId": {"S": "sess_abc123"}},
)

QueryCommand

python
response = ddb.query(
    TableName="Orders",
    KeyConditionExpression="userId = :uid AND createdAt > :since",
    ExpressionAttributeValues={
        ":uid":   {"N": "1001"},
        ":since": {"S": "2025-01-01T00:00:00Z"},
    },
    ScanIndexForward=False,
    Limit=20,
)

orders = response.get("Items", [])
for order in orders:
    print(order["orderId"]["S"], order["total"]["N"])

Using the boto3 Resource API (higher-level)

python
dynamodb = boto3.resource(
    "dynamodb",
    endpoint_url=os.environ.get("WEFTKIT_KV_ENDPOINT", "http://localhost:8000"),
    region_name="us-east-1",
    aws_access_key_id="weftkit",
    aws_secret_access_key="weftkit",
)

table = dynamodb.Table("Sessions")

# Put
table.put_item(Item={"sessionId": "sess_xyz", "userId": 99, "ttl": 3600})

# Get — returns Python native types automatically
response = table.get_item(Key={"sessionId": "sess_xyz"})
item = response.get("Item")
# {"sessionId": "sess_xyz", "userId": Decimal("99"), "ttl": Decimal("3600")}

11. Django Integration

settings.py — DATABASES Configuration

Point Django's PostgreSQL backend at WeftKit Standalone:

python
# settings.py
import os
from dotenv import load_dotenv

load_dotenv()

DATABASES = {
    "default": {
        "ENGINE":   "django.db.backends.postgresql",
        "NAME":     os.environ.get("WEFTKIT_REL_DB",       "mydb"),
        "USER":     os.environ.get("WEFTKIT_REL_USER",     "myuser"),
        "PASSWORD": os.environ.get("WEFTKIT_REL_PASSWORD", "mypassword"),
        "HOST":     os.environ.get("WEFTKIT_REL_HOST",     "localhost"),
        "PORT":     os.environ.get("WEFTKIT_REL_PORT",     "5432"),
        "CONN_MAX_AGE": 60,   # keep connections alive for 60 seconds
    }
}

Requirement: psycopg2-binary must be installed (pip install psycopg2-binary).

Example Model

python
# myapp/models.py
from django.db import models

class Product(models.Model):
    name      = models.CharField(max_length=255)
    category  = models.CharField(max_length=100, db_index=True)
    price     = models.DecimalField(max_digits=10, decimal_places=2)
    created_at = models.DateTimeField(auto_now_add=True)

    class Meta:
        ordering = ["name"]

    def __str__(self) -> str:
        return self.name

Django Migrations

Django's migration system generates and applies CREATE TABLE, ALTER TABLE, and index statements against WeftKit Rel just as it does against PostgreSQL:

bash
python manage.py makemigrations
python manage.py migrate

Example View

python
# myapp/views.py
from django.http import JsonResponse
from django.views import View
from django.db.models import Avg
from .models import Product

class ProductListView(View):
    def get(self, request):
        category = request.GET.get("category")
        qs = Product.objects.all()
        if category:
            qs = qs.filter(category=category)
        data = list(qs.values("id", "name", "category", "price"))
        return JsonResponse(data, safe=False)

class ProductStatsView(View):
    def get(self, request):
        stats = (
            Product.objects
            .values("category")
            .annotate(avg_price=Avg("price"))
            .order_by("-avg_price")
        )
        return JsonResponse(list(stats), safe=False)

12. FastAPI Integration

Lifespan Pattern (startup and shutdown)

Use FastAPI's lifespan context manager to create the connection pool on startup and close it on shutdown:

python
# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends, HTTPException
from typing import AsyncGenerator
import asyncpg
import os

pool: asyncpg.Pool | None = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global pool
    pool = await asyncpg.create_pool(
        dsn=os.environ["WEFTKIT_REL_DSN"],
        min_size=2,
        max_size=20,
    )
    yield
    if pool:
        await pool.close()

app = FastAPI(lifespan=lifespan)

Dependency Injection with get_db()

python
async def get_db() -> AsyncGenerator[asyncpg.Connection, None]:
    """Yield a single connection from the pool for the duration of a request."""
    assert pool is not None, "Pool not initialized"
    async with pool.acquire() as conn:
        yield conn

Example Endpoints

python
from pydantic import BaseModel

class UserCreate(BaseModel):
    name:  str
    email: str

class UserOut(BaseModel):
    id:    int
    name:  str
    email: str

@app.get("/users", response_model=list[UserOut])
async def list_users(
    limit: int = 100,
    db: asyncpg.Connection = Depends(get_db),
):
    rows = await db.fetch(
        "SELECT id, name, email FROM users ORDER BY name LIMIT $1",
        limit,
    )
    return [dict(row) for row in rows]

@app.get("/users/{user_id}", response_model=UserOut)
async def get_user(
    user_id: int,
    db: asyncpg.Connection = Depends(get_db),
):
    row = await db.fetchrow(
        "SELECT id, name, email FROM users WHERE id = $1",
        user_id,
    )
    if not row:
        raise HTTPException(status_code=404, detail="User not found")
    return dict(row)

@app.post("/users", response_model=UserOut, status_code=201)
async def create_user(
    body: UserCreate,
    db: asyncpg.Connection = Depends(get_db),
):
    try:
        row = await db.fetchrow(
            "INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id, name, email",
            body.name, body.email,
        )
    except asyncpg.UniqueViolationError:
        raise HTTPException(status_code=409, detail="Email already registered")
    return dict(row)

Running the Application

bash
uvicorn main:app --host 0.0.0.0 --port 8080 --reload

Combining Multiple Engines in FastAPI

You can expose connections for multiple WeftKit engines as separate dependencies:

python
# deps.py
from typing import AsyncGenerator
import asyncpg
import motor.motor_asyncio
import redis.asyncio as aioredis

async def get_rel_db() -> AsyncGenerator[asyncpg.Connection, None]:
    async with rel_pool.acquire() as conn:
        yield conn

async def get_doc_db() -> AsyncGenerator[motor.motor_asyncio.AsyncIOMotorDatabase, None]:
    yield doc_client["mydb"]

async def get_mem_db() -> AsyncGenerator[aioredis.Redis, None]:
    yield mem_client

Use multiple dependencies in a single endpoint:

python
@app.get("/dashboard/{user_id}")
async def dashboard(
    user_id: int,
    rel: asyncpg.Connection = Depends(get_rel_db),
    mem: aioredis.Redis      = Depends(get_mem_db),
):
    import json

    cached = await mem.get(f"dashboard:{user_id}")
    if cached:
        return json.loads(cached)

    user = await rel.fetchrow("SELECT id, name FROM users WHERE id = $1", user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    result = dict(user)
    await mem.set(f"dashboard:{user_id}", json.dumps(result), ex=60)
    return result

Connection String Quick Reference

| Engine | Format | |---|---| | WeftKitRel | postgresql://user:pass@host:5432/dbname | | WeftKitDoc | mongodb://user:pass@host:27017/dbname | | WeftKitMem | redis://:pass@host:6379/0 | | WeftKitGraph | bolt://host:7687 | | WeftKitVec | host:50051 (gRPC target) | | WeftKitKV | http://host:8000 (endpoint override) |

All ports are configurable in weftkit.toml. See the Security guide for TLS and JWT configuration.