1c5e00d8e4
- embedder.py: lazy model load rationale, RGB conversion, shared vector space
- main.py: why vec appears twice, ::vector cast, 1-distance score formula
- main_oracle.py: why array.array("f") is required instead of plain list
- main_oracle_indb.py: no embedder import — embedding done inside Oracle SQL
- index_images_oracle.py: same array.array requirement on indexing path
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
56 lines
1.7 KiB
Python
56 lines
1.7 KiB
Python
import os
|
|
from fastapi import FastAPI, Query
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import FileResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from dotenv import load_dotenv
|
|
from db import get_connection
|
|
from embedder import embed_text
|
|
|
|
load_dotenv()
|
|
|
|
PHOTOS_DIR = os.getenv("PHOTOS_DIR")
|
|
|
|
FRONTEND_DIR = os.path.join(os.path.dirname(__file__), "../../pgvector-demo/frontend")
|
|
|
|
app = FastAPI()
|
|
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
|
|
app.mount("/ui", StaticFiles(directory=os.path.abspath(FRONTEND_DIR), html=True), name="ui")
|
|
|
|
@app.get("/search")
|
|
def search(q: str = Query(...), limit: int = Query(12)):
|
|
vec = embed_text(q)
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""
|
|
SELECT filename, 1 - (embedding <=> %s::vector) AS score
|
|
FROM images
|
|
ORDER BY embedding <=> %s::vector
|
|
LIMIT %s
|
|
""",
|
|
# vec appears twice: once for ORDER BY (uses HNSW index), once for the score column.
|
|
# ::vector cast is required — psycopg2 passes the list as text without it.
|
|
# 1 - distance converts cosine distance (0=identical) to similarity (1=identical).
|
|
(vec, vec, limit),
|
|
)
|
|
rows = cur.fetchall()
|
|
cur.close()
|
|
conn.close()
|
|
return [{"filename": r[0], "score": round(r[1], 4)} for r in rows]
|
|
|
|
@app.get("/stats")
|
|
def stats():
|
|
conn = get_connection()
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT COUNT(*) FROM images")
|
|
count = cur.fetchone()[0]
|
|
cur.close()
|
|
conn.close()
|
|
return {"count": count}
|
|
|
|
@app.get("/photos/{filename}")
|
|
def get_photo(filename: str):
|
|
path = os.path.join(PHOTOS_DIR, filename)
|
|
return FileResponse(path, media_type="image/jpeg")
|