Python Integration
This guide shows how to connect to every WeftKit engine from Python using standard database drivers, and how to integrate with Django, FastAPI, and SQLAlchemy. WeftKit Standalone speaks the same wire protocols as well-known databases, so any driver that works with PostgreSQL, MongoDB, Redis, Neo4j, DynamoDB, or gRPC will work with WeftKit.
All examples assume weftkit-standalone is running locally. Adjust the host and port to match your deployment.
Prerequisites
Install the packages you need for the engines your application uses:
bash# WeftKitRel — PostgreSQL protocol pip install psycopg2-binary # synchronous pip install asyncpg # async pip install sqlalchemy[asyncio] asyncpg # SQLAlchemy 2.0 (async) pip install sqlalchemy psycopg2-binary # SQLAlchemy 2.0 (sync) # WeftKitDoc — MongoDB Wire protocol pip install pymongo # synchronous pip install motor # async (built on pymongo) # WeftKitMem — Redis RESP3 pip install redis # includes both sync and asyncio clients # WeftKitGraph — Bolt (Neo4j) pip install neo4j # WeftKitVec — gRPC pip install grpcio grpcio-tools # WeftKitKV — DynamoDB REST pip install boto3 # Environment variable loading pip install python-dotenv # Django / FastAPI pip install django pip install fastapi uvicorn[standard]
Environment Variables
Store connection details in a .env file and load them with python-dotenv:
bash# .env # WeftKitRel WEFTKIT_REL_DSN=postgresql://myuser:mypassword@localhost:5432/mydb # WeftKitDoc WEFTKIT_DOC_URI=mongodb://myuser:mypassword@localhost:27017/mydb # WeftKitMem WEFTKIT_MEM_HOST=localhost WEFTKIT_MEM_PORT=6379 WEFTKIT_MEM_PASSWORD=mypassword # WeftKitGraph WEFTKIT_GRAPH_URI=bolt://localhost:7687 WEFTKIT_GRAPH_USER=neo4j WEFTKIT_GRAPH_PASSWORD=mypassword # WeftKitVec WEFTKIT_VEC_TARGET=localhost:50051 # WeftKitKV WEFTKIT_KV_ENDPOINT=http://localhost:8000 WEFTKIT_KV_REGION=us-east-1 WEFTKIT_KV_ACCESS_KEY=any-value WEFTKIT_KV_SECRET_KEY=any-value
Load them at the top of your application:
pythonfrom dotenv import load_dotenv import os load_dotenv() # reads .env in the current directory DSN = os.environ["WEFTKIT_REL_DSN"]
1. WeftKitRel (PostgreSQL) via psycopg2
psycopg2 is the most widely used PostgreSQL driver for Python.
Basic Connection
pythonimport psycopg2 import os conn = psycopg2.connect(os.environ["WEFTKIT_REL_DSN"])
Cursor, Execute, and Fetch
Use %s placeholders for parameterized queries — psycopg2 handles escaping:
pythonimport psycopg2 import os conn = psycopg2.connect(os.environ["WEFTKIT_REL_DSN"]) cur = conn.cursor() # Parameterized INSERT cur.execute( "INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id", ("Alice", "alice@example.com"), ) user_id = cur.fetchone()[0] conn.commit() # Fetch all rows cur.execute("SELECT id, name, email FROM users ORDER BY name") rows = cur.fetchall() for row in rows: print(row) # (1, 'Alice', 'alice@example.com') # Fetch one row cur.execute("SELECT id, name, email FROM users WHERE id = %s", (user_id,)) user = cur.fetchone() cur.close() conn.close()
Transaction Management (commit / rollback)
pythonimport psycopg2 import os conn = psycopg2.connect(os.environ["WEFTKIT_REL_DSN"]) conn.autocommit = False # default; explicit here for clarity cur = conn.cursor() try: cur.execute( "UPDATE accounts SET balance = balance - %s WHERE id = %s", (500, 1), ) cur.execute( "UPDATE accounts SET balance = balance + %s WHERE id = %s", (500, 2), ) conn.commit() print("Transfer complete") except Exception as e: conn.rollback() print(f"Transfer failed, rolled back: {e}") finally: cur.close() conn.close()
Context Manager (recommended)
pythonimport psycopg2 import os with psycopg2.connect(os.environ["WEFTKIT_REL_DSN"]) as conn: with conn.cursor() as cur: cur.execute( "INSERT INTO orders (user_id, total) VALUES (%s, %s) RETURNING id", (42, 99.99), ) order_id = cur.fetchone()[0] # conn.commit() is called automatically on exiting the `with conn` block
Connection Pool with psycopg2.pool
For production use, pool connections rather than opening a new one per request:
pythonfrom psycopg2 import pool import os connection_pool = pool.ThreadedConnectionPool( minconn=2, maxconn=20, dsn=os.environ["WEFTKIT_REL_DSN"], ) def get_users(): conn = connection_pool.getconn() try: with conn.cursor() as cur: cur.execute("SELECT id, name FROM users") return cur.fetchall() finally: connection_pool.putconn(conn)
2. WeftKitRel via asyncpg (Async)
asyncpg provides a high-performance async interface. Use $1, $2, ... placeholders.
Connection Pool
pythonimport asyncpg import asyncio import os async def create_pool() -> asyncpg.Pool: return await asyncpg.create_pool( dsn=os.environ["WEFTKIT_REL_DSN"], min_size=2, max_size=20, ) # Typically called once during application startup pool: asyncpg.Pool | None = None async def startup(): global pool pool = await create_pool()
Async Queries
pythonasync def get_users(): rows = await pool.fetch("SELECT id, name, email FROM users ORDER BY name") return [dict(row) for row in rows] async def create_user(name: str, email: str) -> int: row = await pool.fetchrow( "INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id", name, email, ) return row["id"] async def get_user_by_id(user_id: int): row = await pool.fetchrow( "SELECT id, name, email FROM users WHERE id = $1", user_id, ) return dict(row) if row else None
Async Transaction
pythonasync def transfer_balance(from_id: int, to_id: int, amount: float): async with pool.acquire() as conn: async with conn.transaction(): await conn.execute( "UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, from_id, ) await conn.execute( "UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, to_id, )
3. WeftKitRel via SQLAlchemy 2.0
SQLAlchemy's ORM and Core work with WeftKit just as they do with PostgreSQL.
Synchronous Engine and Session
pythonfrom sqlalchemy import create_engine, text, Column, Integer, String, DateTime, func from sqlalchemy.orm import DeclarativeBase, Session import os engine = create_engine( os.environ["WEFTKIT_REL_DSN"].replace("postgresql://", "postgresql+psycopg2://"), pool_size=10, max_overflow=20, echo=False, ) class Base(DeclarativeBase): pass class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True) name = Column(String(255), nullable=False) email = Column(String(255), unique=True, nullable=False) created_at = Column(DateTime, server_default=func.now()) # Create tables Base.metadata.create_all(engine)
Session Usage
python# Add a new record with Session(engine) as session: user = User(name="Alice", email="alice@example.com") session.add(user) session.commit() session.refresh(user) print(user.id) # Query records with Session(engine) as session: users = session.query(User).filter(User.name.like("A%")).order_by(User.name).all() for u in users: print(u.id, u.name, u.email) # Update with Session(engine) as session: user = session.get(User, 1) if user: user.email = "alice.new@example.com" session.commit() # Delete with Session(engine) as session: user = session.get(User, 1) if user: session.delete(user) session.commit()
Raw SQL with Core
pythonwith engine.connect() as conn: result = conn.execute( text("SELECT id, name FROM users WHERE created_at > :since"), {"since": "2025-01-01"}, ) for row in result: print(row.id, row.name)
Async Engine with asyncpg
pythonfrom sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker import os async_engine = create_async_engine( os.environ["WEFTKIT_REL_DSN"].replace("postgresql://", "postgresql+asyncpg://"), pool_size=10, max_overflow=20, ) AsyncSessionLocal = async_sessionmaker(async_engine, expire_on_commit=False) async def get_users_async(): async with AsyncSessionLocal() as session: result = await session.execute(text("SELECT id, name FROM users")) return result.fetchall()
4. WeftKitDoc (MongoDB) via pymongo
WeftKit Standalone speaks the MongoDB Wire Protocol. pymongo connects without any modification.
MongoClient Connection
pythonfrom pymongo import MongoClient import os client = MongoClient(os.environ["WEFTKIT_DOC_URI"]) db = client["mydb"] products = db["products"]
Insert One / Insert Many
python# Single document result = products.insert_one({ "name": "Wireless Headphones", "category": "electronics", "price": 149.99, "tags": ["audio", "wireless"], }) print("Inserted:", result.inserted_id) # Multiple documents result = products.insert_many([ {"name": "USB-C Hub", "category": "electronics", "price": 39.99, "tags": ["usb"]}, {"name": "Desk Lamp", "category": "office", "price": 24.99, "tags": ["lighting"]}, ]) print(f"Inserted {len(result.inserted_ids)} documents")
Find with Filter Operators
python# Less-than filter affordable = list(products.find({"price": {"$lt": 50}}).sort("price", 1)) # In a set of values multi_cat = list(products.find({"category": {"$in": ["electronics", "office"]}})) # Combined conditions specific = list(products.find({ "$and": [ {"price": {"$gt": 20, "$lt": 100}}, {"tags": {"$in": ["wireless", "usb"]}}, ] }))
UpdateOne with Operators
pythonfrom pymongo import ReturnDocument # Set fields products.update_one( {"name": "Wireless Headphones"}, {"$set": {"price": 129.99}}, ) # Increment a field products.update_one( {"name": "USB-C Hub"}, {"$inc": {"view_count": 1}}, ) # Upsert products.update_one( {"sku": "KB-001"}, {"$set": {"name": "Mechanical Keyboard", "price": 89.99}}, upsert=True, ) # Return the updated document updated = products.find_one_and_update( {"name": "Desk Lamp"}, {"$set": {"price": 19.99}}, return_document=ReturnDocument.AFTER, )
Aggregation Pipeline
pythonpipeline = [ {"$match": {"price": {"$gt": 0}}}, { "$group": { "_id": "$category", "count": {"$sum": 1}, "avg_price": {"$avg": "$price"}, "min_price": {"$min": "$price"}, "max_price": {"$max": "$price"}, } }, {"$sort": {"avg_price": -1}}, ] summary = list(products.aggregate(pipeline)) for cat in summary: print(cat["_id"], cat["count"], round(cat["avg_price"], 2))
Index Creation
pythonfrom pymongo import ASCENDING, DESCENDING, TEXT products.create_index([("category", ASCENDING)]) products.create_index([("category", ASCENDING), ("price", DESCENDING)]) products.create_index([("sku", ASCENDING)], unique=True) products.create_index([("name", TEXT), ("tags", TEXT)])
5. WeftKitDoc via motor (Async MongoDB)
motor wraps pymongo with an asyncio-friendly interface.
Client Setup
pythonimport motor.motor_asyncio import os client = motor.motor_asyncio.AsyncIOMotorClient(os.environ["WEFTKIT_DOC_URI"]) db = client["mydb"] products = db["products"]
Async Find / Insert / Update
pythonimport asyncio async def get_electronics(): cursor = products.find({"category": "electronics"}).sort("price", 1) return await cursor.to_list(length=100) async def add_product(name: str, price: float, category: str) -> str: result = await products.insert_one({ "name": name, "price": price, "category": category }) return str(result.inserted_id) async def update_price(name: str, new_price: float): result = await products.update_one( {"name": name}, {"$set": {"price": new_price}}, ) return result.modified_count async def aggregate_by_category(): pipeline = [ {"$group": {"_id": "$category", "count": {"$sum": 1}, "avg": {"$avg": "$price"}}}, {"$sort": {"avg": -1}}, ] cursor = products.aggregate(pipeline) return await cursor.to_list(length=None)
6. WeftKitMem (Redis) via redis-py
redis-py includes both a synchronous client and an asyncio-native client. WeftKit Standalone speaks RESP3.
Synchronous Client
pythonimport redis import os r = redis.Redis( host=os.environ.get("WEFTKIT_MEM_HOST", "localhost"), port=int(os.environ.get("WEFTKIT_MEM_PORT", 6379)), password=os.environ.get("WEFTKIT_MEM_PASSWORD"), decode_responses=True, )
Basic Commands
pythonimport json # SET with TTL r.set("session:abc123", json.dumps({"user_id": 42}), ex=600) # GET raw = r.get("session:abc123") session = json.loads(raw) if raw else None # Hash r.hset("user:1001", mapping={"name": "Alice", "role": "admin", "logins": "0"}) profile = r.hgetall("user:1001") r.hincrby("user:1001", "logins", 1) # List r.lpush("notifications:1001", json.dumps({"msg": "New comment"})) items = [json.loads(i) for i in r.lrange("notifications:1001", 0, 19)] # Sorted set r.zadd("leaderboard", {"alice": 9800, "bob": 8500, "carol": 7200}) top = r.zrange("leaderboard", 0, 2, rev=True, withscores=True) # [('alice', 9800.0), ('bob', 8500.0), ('carol', 7200.0)] # Delete r.delete("session:abc123")
Pipeline (Batching Commands)
Send multiple commands in a single round-trip:
pythonwith r.pipeline() as pipe: pipe.set("key1", "value1") pipe.set("key2", "value2") pipe.incr("counter") results = pipe.execute() # [True, True, <new_count>]
Pub/Sub
pythonimport threading def listener(): sub = r.pubsub() sub.subscribe("events:orders") for message in sub.listen(): if message["type"] == "message": event = json.loads(message["data"]) print("Received:", event) thread = threading.Thread(target=listener, daemon=True) thread.start() # Publish from anywhere r.publish("events:orders", json.dumps({"order_id": 42, "status": "shipped"}))
7. WeftKitMem via redis.asyncio
The async client ships inside the redis package as of version 4.2.
pythonimport redis.asyncio as aioredis import json import os async def make_redis() -> aioredis.Redis: return aioredis.Redis( host=os.environ.get("WEFTKIT_MEM_HOST", "localhost"), port=int(os.environ.get("WEFTKIT_MEM_PORT", 6379)), password=os.environ.get("WEFTKIT_MEM_PASSWORD"), decode_responses=True, ) async def cache_set(client: aioredis.Redis, key: str, value: dict, ttl: int = 300): await client.set(key, json.dumps(value), ex=ttl) async def cache_get(client: aioredis.Redis, key: str) -> dict | None: raw = await client.get(key) return json.loads(raw) if raw else None async def increment_counter(client: aioredis.Redis, key: str) -> int: return await client.incr(key) async def async_pipeline_example(client: aioredis.Redis): async with client.pipeline() as pipe: pipe.set("a", "1") pipe.set("b", "2") pipe.incr("counter") results = await pipe.execute() return results
8. WeftKitGraph (Neo4j) via the neo4j Python Driver
WeftKit Standalone speaks the Bolt protocol. The official neo4j Python driver connects directly.
Driver Initialization
pythonfrom neo4j import GraphDatabase import os driver = GraphDatabase.driver( os.environ.get("WEFTKIT_GRAPH_URI", "bolt://localhost:7687"), auth=( os.environ.get("WEFTKIT_GRAPH_USER", "neo4j"), os.environ.get("WEFTKIT_GRAPH_PASSWORD", "password"), ), max_connection_pool_size=50, ) driver.verify_connectivity() print("Graph connected")
Session Execute Read
Wrap read queries in execute_read for automatic retry on transient failures:
pythondef get_person_by_name(name: str) -> list[dict]: def _query(tx): result = tx.run( "MATCH (p:Person { name: $name }) RETURN p", name=name, ) return [record["p"] for record in result] with driver.session(database="neo4j") as session: return session.execute_read(_query)
Session Execute Write
pythondef create_person(person_id: str, name: str, age: int) -> None: def _query(tx): tx.run( "MERGE (p:Person { id: $id }) SET p.name = $name, p.age = $age", id=person_id, name=name, age=age, ) with driver.session() as session: session.execute_write(_query) def create_friendship(id_a: str, id_b: str) -> None: def _query(tx): tx.run( """ MATCH (a:Person { id: $id_a }), (b:Person { id: $id_b }) MERGE (a)-[:FRIEND]->(b) MERGE (b)-[:FRIEND]->(a) """, id_a=id_a, id_b=id_b, ) with driver.session() as session: session.execute_write(_query)
Cypher Query with Parameters
pythondef get_friends(person_id: str) -> list[str]: def _query(tx): result = tx.run( """ MATCH (p:Person { id: $id })-[:FRIEND]->(f:Person) RETURN f.name AS name ORDER BY name """, id=person_id, ) return [record["name"] for record in result] with driver.session() as session: return session.execute_read(_query) def shortest_path(from_id: str, to_id: str) -> dict | None: def _query(tx): result = tx.run( """ MATCH path = shortestPath( (a:Person { id: $from_id })-[*..6]-(b:Person { id: $to_id }) ) RETURN [node IN nodes(path) | node.name] AS names, length(path) AS hops """, from_id=from_id, to_id=to_id, ) record = result.single() if record: return {"names": record["names"], "hops": record["hops"]} return None with driver.session() as session: return session.execute_read(_query)
Graceful Shutdown
pythonimport atexit atexit.register(driver.close)
9. WeftKitVec via grpcio
WeftKit exposes its vector engine over gRPC. Generate Python stubs from the .proto file distributed with WeftKit Standalone.
Generate Stubs
bashpython -m grpc_tools.protoc \ -I protos/ \ --python_out=. \ --grpc_python_out=. \ protos/weftkit_vec.proto
Proto File (provided with WeftKit Standalone)
protobuf// weftkit_vec.proto syntax = "proto3"; package weftkit.vec; service VectorService { rpc Upsert (UpsertRequest) returns (UpsertResponse); rpc Search (SearchRequest) returns (SearchResponse); rpc Delete (DeleteRequest) returns (DeleteResponse); } message Vector { string id = 1; repeated float embedding = 2; map<string, string> metadata = 3; } message UpsertRequest { repeated Vector vectors = 1; string namespace = 2; } message UpsertResponse { int32 upserted_count = 1; } message SearchRequest { repeated float query_vector = 1; int32 top_k = 2; string namespace = 3; string filter = 4; } message SearchResult { string id = 1; float score = 2; map<string, string> metadata = 3; } message SearchResponse { repeated SearchResult results = 1; } message DeleteRequest { repeated string ids = 1; string namespace = 2; } message DeleteResponse { int32 deleted_count = 1; }
Channel and Stub
pythonimport grpc import os import weftkit_vec_pb2 as vec_pb2 import weftkit_vec_pb2_grpc as vec_grpc channel = grpc.insecure_channel( os.environ.get("WEFTKIT_VEC_TARGET", "localhost:50051") ) stub = vec_grpc.VectorServiceStub(channel)
Upsert Vectors
pythondef upsert_vectors( vectors: list[dict], namespace: str = "default", ) -> int: request = vec_pb2.UpsertRequest( namespace=namespace, vectors=[ vec_pb2.Vector( id=v["id"], embedding=v["embedding"], metadata=v.get("metadata", {}), ) for v in vectors ], ) response = stub.Upsert(request) return response.upserted_count # Example upserted = upsert_vectors([ {"id": "prod-001", "embedding": [0.12, 0.84, 0.37, ...], "metadata": {"category": "electronics"}}, {"id": "prod-002", "embedding": [0.55, 0.21, 0.90, ...], "metadata": {"category": "office"}}, ], namespace="products") print(f"Upserted {upserted} vectors")
Similarity Search
pythondef search_vectors( query_embedding: list[float], top_k: int = 10, namespace: str = "default", filter_expr: str = "", ) -> list[dict]: request = vec_pb2.SearchRequest( query_vector=query_embedding, top_k=top_k, namespace=namespace, filter=filter_expr, ) response = stub.Search(request) return [ {"id": r.id, "score": r.score, "metadata": dict(r.metadata)} for r in response.results ] results = search_vectors( query_embedding=[0.10, 0.80, 0.40, ...], top_k=5, namespace="products", filter_expr='{"category": "electronics"}', )
Using Metadata for Auth
Pass a bearer token as gRPC metadata:
pythondef search_with_auth(query_embedding: list[float], token: str) -> list[dict]: metadata = [("authorization", f"Bearer {token}")] request = vec_pb2.SearchRequest(query_vector=query_embedding, top_k=10, namespace="default") response = stub.Search(request, metadata=metadata) return [{"id": r.id, "score": r.score} for r in response.results]
10. WeftKitKV via boto3
Point the AWS SDK at WeftKit Standalone's DynamoDB-compatible REST endpoint. Use any non-empty string for the region, access key, and secret key.
Client Setup
pythonimport boto3 import os ddb = boto3.client( "dynamodb", endpoint_url=os.environ.get("WEFTKIT_KV_ENDPOINT", "http://localhost:8000"), region_name=os.environ.get("WEFTKIT_KV_REGION", "us-east-1"), aws_access_key_id=os.environ.get("WEFTKIT_KV_ACCESS_KEY", "weftkit"), aws_secret_access_key=os.environ.get("WEFTKIT_KV_SECRET_KEY", "weftkit"), )
PutItem
pythonddb.put_item( TableName="Sessions", Item={ "sessionId": {"S": "sess_abc123"}, "userId": {"N": "42"}, "expiresAt": {"N": str(int(__import__("time").time()) + 3600)}, "data": {"S": '{"theme":"dark","locale":"en-US"}'}, }, )
GetItem
pythonresponse = ddb.get_item( TableName="Sessions", Key={"sessionId": {"S": "sess_abc123"}}, ) item = response.get("Item") if item: user_id = int(item["userId"]["N"]) print("Session user:", user_id)
DeleteItem
pythonddb.delete_item( TableName="Sessions", Key={"sessionId": {"S": "sess_abc123"}}, )
QueryCommand
pythonresponse = ddb.query( TableName="Orders", KeyConditionExpression="userId = :uid AND createdAt > :since", ExpressionAttributeValues={ ":uid": {"N": "1001"}, ":since": {"S": "2025-01-01T00:00:00Z"}, }, ScanIndexForward=False, Limit=20, ) orders = response.get("Items", []) for order in orders: print(order["orderId"]["S"], order["total"]["N"])
Using the boto3 Resource API (higher-level)
pythondynamodb = boto3.resource( "dynamodb", endpoint_url=os.environ.get("WEFTKIT_KV_ENDPOINT", "http://localhost:8000"), region_name="us-east-1", aws_access_key_id="weftkit", aws_secret_access_key="weftkit", ) table = dynamodb.Table("Sessions") # Put table.put_item(Item={"sessionId": "sess_xyz", "userId": 99, "ttl": 3600}) # Get — returns Python native types automatically response = table.get_item(Key={"sessionId": "sess_xyz"}) item = response.get("Item") # {"sessionId": "sess_xyz", "userId": Decimal("99"), "ttl": Decimal("3600")}
11. Django Integration
settings.py — DATABASES Configuration
Point Django's PostgreSQL backend at WeftKit Standalone:
python# settings.py import os from dotenv import load_dotenv load_dotenv() DATABASES = { "default": { "ENGINE": "django.db.backends.postgresql", "NAME": os.environ.get("WEFTKIT_REL_DB", "mydb"), "USER": os.environ.get("WEFTKIT_REL_USER", "myuser"), "PASSWORD": os.environ.get("WEFTKIT_REL_PASSWORD", "mypassword"), "HOST": os.environ.get("WEFTKIT_REL_HOST", "localhost"), "PORT": os.environ.get("WEFTKIT_REL_PORT", "5432"), "CONN_MAX_AGE": 60, # keep connections alive for 60 seconds } }
Requirement:
psycopg2-binarymust be installed (pip install psycopg2-binary).
Example Model
python# myapp/models.py from django.db import models class Product(models.Model): name = models.CharField(max_length=255) category = models.CharField(max_length=100, db_index=True) price = models.DecimalField(max_digits=10, decimal_places=2) created_at = models.DateTimeField(auto_now_add=True) class Meta: ordering = ["name"] def __str__(self) -> str: return self.name
Django Migrations
Django's migration system generates and applies CREATE TABLE, ALTER TABLE, and index statements against WeftKit Rel just as it does against PostgreSQL:
bashpython manage.py makemigrations python manage.py migrate
Example View
python# myapp/views.py from django.http import JsonResponse from django.views import View from django.db.models import Avg from .models import Product class ProductListView(View): def get(self, request): category = request.GET.get("category") qs = Product.objects.all() if category: qs = qs.filter(category=category) data = list(qs.values("id", "name", "category", "price")) return JsonResponse(data, safe=False) class ProductStatsView(View): def get(self, request): stats = ( Product.objects .values("category") .annotate(avg_price=Avg("price")) .order_by("-avg_price") ) return JsonResponse(list(stats), safe=False)
12. FastAPI Integration
Lifespan Pattern (startup and shutdown)
Use FastAPI's lifespan context manager to create the connection pool on startup and close it on shutdown:
python# main.py from contextlib import asynccontextmanager from fastapi import FastAPI, Depends, HTTPException from typing import AsyncGenerator import asyncpg import os pool: asyncpg.Pool | None = None @asynccontextmanager async def lifespan(app: FastAPI): global pool pool = await asyncpg.create_pool( dsn=os.environ["WEFTKIT_REL_DSN"], min_size=2, max_size=20, ) yield if pool: await pool.close() app = FastAPI(lifespan=lifespan)
Dependency Injection with get_db()
pythonasync def get_db() -> AsyncGenerator[asyncpg.Connection, None]: """Yield a single connection from the pool for the duration of a request.""" assert pool is not None, "Pool not initialized" async with pool.acquire() as conn: yield conn
Example Endpoints
pythonfrom pydantic import BaseModel class UserCreate(BaseModel): name: str email: str class UserOut(BaseModel): id: int name: str email: str @app.get("/users", response_model=list[UserOut]) async def list_users( limit: int = 100, db: asyncpg.Connection = Depends(get_db), ): rows = await db.fetch( "SELECT id, name, email FROM users ORDER BY name LIMIT $1", limit, ) return [dict(row) for row in rows] @app.get("/users/{user_id}", response_model=UserOut) async def get_user( user_id: int, db: asyncpg.Connection = Depends(get_db), ): row = await db.fetchrow( "SELECT id, name, email FROM users WHERE id = $1", user_id, ) if not row: raise HTTPException(status_code=404, detail="User not found") return dict(row) @app.post("/users", response_model=UserOut, status_code=201) async def create_user( body: UserCreate, db: asyncpg.Connection = Depends(get_db), ): try: row = await db.fetchrow( "INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id, name, email", body.name, body.email, ) except asyncpg.UniqueViolationError: raise HTTPException(status_code=409, detail="Email already registered") return dict(row)
Running the Application
bashuvicorn main:app --host 0.0.0.0 --port 8080 --reload
Combining Multiple Engines in FastAPI
You can expose connections for multiple WeftKit engines as separate dependencies:
python# deps.py from typing import AsyncGenerator import asyncpg import motor.motor_asyncio import redis.asyncio as aioredis async def get_rel_db() -> AsyncGenerator[asyncpg.Connection, None]: async with rel_pool.acquire() as conn: yield conn async def get_doc_db() -> AsyncGenerator[motor.motor_asyncio.AsyncIOMotorDatabase, None]: yield doc_client["mydb"] async def get_mem_db() -> AsyncGenerator[aioredis.Redis, None]: yield mem_client
Use multiple dependencies in a single endpoint:
python@app.get("/dashboard/{user_id}") async def dashboard( user_id: int, rel: asyncpg.Connection = Depends(get_rel_db), mem: aioredis.Redis = Depends(get_mem_db), ): import json cached = await mem.get(f"dashboard:{user_id}") if cached: return json.loads(cached) user = await rel.fetchrow("SELECT id, name FROM users WHERE id = $1", user_id) if not user: raise HTTPException(status_code=404, detail="User not found") result = dict(user) await mem.set(f"dashboard:{user_id}", json.dumps(result), ex=60) return result
Connection String Quick Reference
| Engine | Format |
|---|---|
| WeftKitRel | postgresql://user:pass@host:5432/dbname |
| WeftKitDoc | mongodb://user:pass@host:27017/dbname |
| WeftKitMem | redis://:pass@host:6379/0 |
| WeftKitGraph | bolt://host:7687 |
| WeftKitVec | host:50051 (gRPC target) |
| WeftKitKV | http://host:8000 (endpoint override) |
All ports are configurable in weftkit.toml. See the Security guide for TLS and JWT configuration.
On this page