from __future__ import annotations from datetime import date, datetime from typing import Iterable from urllib.parse import quote, urlsplit from fastapi import Depends, FastAPI, Form, Request from fastapi.exceptions import RequestValidationError from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from sqlalchemy import func, or_, select from sqlalchemy.orm import Session, joinedload, selectinload from starlette.middleware.sessions import SessionMiddleware from app.config import settings from app.content import SITE_CONTENT, get_listing_card_meta, get_listing_content from app.database import SessionLocal, get_db from app.models import ( Availability, Booking, Listing, Message, MessageThread, Review, TaskDefinition, User, WishlistItem, ) from app.seed import daterange, ensure_database_seeded, parse_date, reset_database from app.tasks import evaluate_task, serialize_task app = FastAPI( title=settings.app_name, description="A resettable vacation-rental marketplace with booking, messaging, and hosting flows.", ) app.add_middleware( SessionMiddleware, secret_key=settings.session_secret, same_site=settings.session_same_site, https_only=settings.session_https_only, ) app.mount("/static", StaticFiles(directory="app/static"), name="static") templates = Jinja2Templates(directory="app/templates") @app.on_event("startup") def startup() -> None: ensure_database_seeded() def get_current_user(request: Request, db: Session) -> User | None: user_id = request.session.get("user_id") if user_id is None: return None return db.get(User, user_id) def build_context(request: Request, db: Session, **kwargs): current_user = get_current_user(request, db) return { "request": request, "current_user": current_user, "site_content": SITE_CONTENT, "notice": request.query_params.get("notice"), **kwargs, } def redirect_with_notice(path: str, notice: str) -> RedirectResponse: separator = "&" if "?" in path else "?" return RedirectResponse(url=f"{path}{separator}notice={quote(notice)}", status_code=303) def _safe_notice_target(request: Request) -> str: referer = request.headers.get("referer", "") if referer: parsed = urlsplit(referer) path = parsed.path or "/" query = f"?{parsed.query}" if parsed.query else "" if path.startswith("/"): return f"{path}{query}" if request.url.path and request.url.path.startswith("/"): return request.url.path return "/" def _parse_optional_int(value: str) -> int | None: stripped = value.strip() if not stripped: return None return int(stripped) def _parse_optional_float(value: str) -> float | None: stripped = value.strip() if not stripped: return None return float(stripped) @app.exception_handler(RequestValidationError) async def validation_exception_handler(request: Request, exc: RequestValidationError): if request.url.path.startswith("/api"): return JSONResponse(status_code=422, content={"detail": exc.errors()}) return redirect_with_notice(_safe_notice_target(request), "Please check the values you entered and try again.") def require_login(request: Request, db: Session, next_path: str) -> User | RedirectResponse: user = get_current_user(request, db) if user is None: return RedirectResponse(url=f"/login?next={quote(next_path)}", status_code=303) return user def require_host(request: Request, db: Session, next_path: str) -> User | RedirectResponse: user = require_login(request, db, next_path) if isinstance(user, RedirectResponse): return user if not user.is_host: return redirect_with_notice("/", "Switch to a host account to manage listings.") return user def normalize_redirect_target(target: str | None) -> str: if not target or not target.startswith("/"): return "/" return target def merge_ranges(ranges: list[dict[str, str]]) -> list[dict[str, str]]: if not ranges: return [] ordered = sorted( [(parse_date(item["start"]), parse_date(item["end"])) for item in ranges], key=lambda item: item[0], ) merged: list[tuple[date, date]] = [ordered[0]] for start_date, end_date in ordered[1:]: previous_start, previous_end = merged[-1] if start_date <= previous_end: merged[-1] = (previous_start, max(previous_end, end_date)) else: merged.append((start_date, end_date)) return [{"start": start.isoformat(), "end": end.isoformat()} for start, end in merged] def is_date_blocked(current_date: date, blocked_ranges: list[dict[str, str]]) -> bool: for blocked_range in blocked_ranges: start_date = parse_date(blocked_range["start"]) end_date = parse_date(blocked_range["end"]) if start_date <= current_date < end_date: return True return False def recompute_availability(db: Session, listing: Listing, start_date: date, end_date: date) -> None: entries = db.scalars( select(Availability).where( Availability.listing_id == listing.id, Availability.date >= start_date, Availability.date < end_date, ) ).all() entry_by_date = {entry.date: entry for entry in entries} bookings = db.scalars( select(Booking).where( Booking.listing_id == listing.id, Booking.status == "confirmed", Booking.check_out > start_date, Booking.check_in < end_date, ) ).all() booked_dates = set() for booking in bookings: booked_dates.update(daterange(booking.check_in, booking.check_out)) for current_date in daterange(start_date, end_date): entry = entry_by_date.get(current_date) if entry is None: entry = Availability(listing_id=listing.id, date=current_date) db.add(entry) entry.is_available = ( current_date not in booked_dates and not is_date_blocked(current_date, listing.blocked_ranges) ) def get_listing(db: Session, slug: str) -> Listing | None: return db.scalar( select(Listing) .where(Listing.slug == slug) .options( joinedload(Listing.host), selectinload(Listing.images), selectinload(Listing.reviews).joinedload(Review.user), ) ) def serialize_state(db: Session) -> dict: listings = db.scalars(select(Listing).options(joinedload(Listing.host)).order_by(Listing.id)).all() bookings = db.scalars( select(Booking).options(joinedload(Booking.listing), joinedload(Booking.guest)).order_by(Booking.id) ).all() wishlists = db.scalars( select(WishlistItem).options(joinedload(WishlistItem.user), joinedload(WishlistItem.listing)) ).all() threads = db.scalars( select(MessageThread) .options(joinedload(MessageThread.listing), joinedload(MessageThread.host), joinedload(MessageThread.guest)) .order_by(MessageThread.id) ).all() return { "summary": { "users": db.scalar(select(func.count(User.id))) or 0, "listings": len(listings), "bookings": len(bookings), "wishlist_items": len(wishlists), "threads": len(threads), }, "bookings": [ { "confirmation_code": booking.confirmation_code, "status": booking.status, "guest": booking.guest.email, "listing": booking.listing.slug, "check_in": booking.check_in.isoformat(), "check_out": booking.check_out.isoformat(), } for booking in bookings ], "wishlists": [ { "user": item.user.email, "listing": item.listing.slug, } for item in wishlists ], "threads": [ { "id": thread.id, "listing": thread.listing.slug, "guest": thread.guest.email, "host": thread.host.email, "subject": thread.subject, } for thread in threads ], } @app.get("/", response_class=HTMLResponse) def home( request: Request, q: str = "", city: str = "", guests: str = "", max_price: str = "", min_rating: str = "", amenity: str = "", sort: str = "recommended", db: Session = Depends(get_db), ): try: guests_filter = _parse_optional_int(guests) max_price_filter = _parse_optional_int(max_price) min_rating_filter = _parse_optional_float(min_rating) except ValueError: return redirect_with_notice("/", "Please use numbers for guests, price, and rating filters.") listings = db.scalars( select(Listing).options(joinedload(Listing.host), selectinload(Listing.images)).order_by(Listing.id) ).all() def matches(listing: Listing) -> bool: haystack = " ".join( [listing.title, listing.city, listing.country, listing.neighborhood, listing.description] ).lower() return ( (not q or q.lower() in haystack) and (not city or listing.city == city) and (guests_filter is None or listing.max_guests >= guests_filter) and (max_price_filter is None or listing.price_per_night <= max_price_filter) and (min_rating_filter is None or listing.rating >= min_rating_filter) and (not amenity or amenity in listing.amenities) ) filtered = [listing for listing in listings if matches(listing)] if sort == "price_low": filtered.sort(key=lambda item: item.price_per_night) elif sort == "price_high": filtered.sort(key=lambda item: item.price_per_night, reverse=True) elif sort == "rating": filtered.sort(key=lambda item: (item.rating, item.review_count), reverse=True) else: filtered.sort(key=lambda item: (item.rating * item.review_count, item.review_count), reverse=True) all_amenities = sorted({amenity_name for listing in listings for amenity_name in listing.amenities}) cities = sorted({listing.city for listing in listings}) editorial_listings = sorted( listings, key=lambda item: (item.rating, item.review_count), reverse=True, )[:3] return templates.TemplateResponse( request=request, name="home.html", context=build_context( request, db, home_cards=[ { "listing": listing, "card_meta": get_listing_card_meta(listing), } for listing in filtered ], results_count=len(filtered), marketplace_listing_count=len(listings), cities=cities, amenities=all_amenities, city_count=len(cities), editorial_cards=[ { "listing": listing, "content": get_listing_content(listing), } for listing in editorial_listings ], ), ) @app.get("/guide", response_class=HTMLResponse) def guide(request: Request, db: Session = Depends(get_db)): users = db.scalars(select(User).order_by(User.id)).all() return templates.TemplateResponse( request=request, name="guide.html", context=build_context(request, db, users=users), ) @app.post("/reset") def browser_reset(): reset_database() return redirect_with_notice("/guide", "Environment reset to the seeded state.") @app.get("/login", response_class=HTMLResponse) def login_page(request: Request, next: str = "/", db: Session = Depends(get_db)): users = db.scalars(select(User).order_by(User.id)).all() return templates.TemplateResponse( request=request, name="login.html", context=build_context(request, db, next_path=normalize_redirect_target(next), users=users), ) @app.post("/login") def login( request: Request, email: str = Form(...), password: str = Form(...), next: str = Form("/"), db: Session = Depends(get_db), ): user = db.scalar(select(User).where(User.email == email)) if user is None or user.password != password: users = db.scalars(select(User).order_by(User.id)).all() return templates.TemplateResponse( request=request, name="login.html", context=build_context( request, db, error="That email and password combination does not match the seeded demo data.", next_path=normalize_redirect_target(next), users=users, ), status_code=400, ) request.session["user_id"] = user.id return redirect_with_notice(normalize_redirect_target(next), f"Signed in as {user.name}.") @app.post("/logout") def logout(request: Request): request.session.clear() return redirect_with_notice("/", "Signed out.") @app.get("/switch/{user_id}") def quick_switch(request: Request, user_id: int, next: str = "/", db: Session = Depends(get_db)): user = db.get(User, user_id) if user is None: return redirect_with_notice("/", "That account does not exist.") request.session["user_id"] = user.id return redirect_with_notice(normalize_redirect_target(next), f"Switched to {user.name}.") @app.get("/listings/{slug}", response_class=HTMLResponse, name="listing_detail") def listing_detail(request: Request, slug: str, db: Session = Depends(get_db)): listing = db.scalar( select(Listing) .where(Listing.slug == slug) .options( joinedload(Listing.host), selectinload(Listing.images), selectinload(Listing.reviews).joinedload(Review.user), ) ) if listing is None: return templates.TemplateResponse( request=request, name="home.html", context=build_context( request, db, home_cards=[], results_count=0, marketplace_listing_count=0, cities=[], amenities=[], city_count=0, editorial_cards=[], ), status_code=404, ) current_user = get_current_user(request, db) listing_content = get_listing_content(listing) wishlist_item = None if current_user is not None: wishlist_item = db.scalar( select(WishlistItem).where( WishlistItem.user_id == current_user.id, WishlistItem.listing_id == listing.id, ) ) next_unavailable = db.scalars( select(Availability) .where( Availability.listing_id == listing.id, Availability.is_available.is_(False), Availability.date >= date.today(), ) .order_by(Availability.date) .limit(8) ).all() first_availability = db.scalar( select(Availability) .where(Availability.listing_id == listing.id, Availability.is_available.is_(True)) .order_by(Availability.date) ) nearby_listings = db.scalars( select(Listing) .where(Listing.id != listing.id) .options(joinedload(Listing.host), selectinload(Listing.images)) .order_by(Listing.rating.desc(), Listing.review_count.desc()) .limit(3) ).all() return templates.TemplateResponse( request=request, name="listing.html", context=build_context( request, db, listing=listing, listing_content=listing_content, gallery_images=listing_content["gallery_images"] or [ {"url": image.url, "alt_text": image.alt_text} for image in listing.images ], next_unavailable=next_unavailable, has_wishlisted=wishlist_item is not None, first_available_date=first_availability.date.isoformat() if first_availability else "", nearby_cards=[ { "listing": other, "card_meta": get_listing_card_meta(other), } for other in nearby_listings ], ), ) @app.post("/listings/{listing_id}/book") def book_listing( request: Request, listing_id: int, check_in: str = Form(...), check_out: str = Form(...), guests: int = Form(...), db: Session = Depends(get_db), ): listing = db.get(Listing, listing_id) if listing is None: return redirect_with_notice("/", "Listing not found.") current_user = require_login(request, db, f"/listings/{listing.slug}") if isinstance(current_user, RedirectResponse): return current_user if current_user.id == listing.host_id: return redirect_with_notice(f"/listings/{listing.slug}", "Hosts cannot book their own listing.") check_in_date = parse_date(check_in) check_out_date = parse_date(check_out) if check_out_date <= check_in_date: return redirect_with_notice(f"/listings/{listing.slug}", "Choose a check-out date after check-in.") if guests > listing.max_guests: return redirect_with_notice(f"/listings/{listing.slug}", "Guest count exceeds the listing capacity.") availability_entries = db.scalars( select(Availability).where( Availability.listing_id == listing.id, Availability.date >= check_in_date, Availability.date < check_out_date, ) ).all() nights = (check_out_date - check_in_date).days if len(availability_entries) != nights or any(not entry.is_available for entry in availability_entries): return redirect_with_notice(f"/listings/{listing.slug}", "Those dates are not fully available.") next_booking_number = (db.scalar(select(func.max(Booking.id))) or 0) + 2001 total_price = nights * listing.price_per_night + listing.cleaning_fee + listing.service_fee booking = Booking( confirmation_code=f"BKG-{next_booking_number}", listing_id=listing.id, guest_id=current_user.id, check_in=check_in_date, check_out=check_out_date, guests=guests, total_price=round(total_price, 2), status="confirmed", created_at=datetime.utcnow(), ) db.add(booking) for availability_entry in availability_entries: availability_entry.is_available = False db.commit() return redirect_with_notice("/trips", f"Booked {listing.title}.") @app.post("/listings/{listing_id}/wishlist") def toggle_wishlist(request: Request, listing_id: int, db: Session = Depends(get_db)): listing = db.get(Listing, listing_id) if listing is None: return redirect_with_notice("/", "Listing not found.") current_user = require_login(request, db, f"/listings/{listing.slug}") if isinstance(current_user, RedirectResponse): return current_user existing = db.scalar( select(WishlistItem).where(WishlistItem.user_id == current_user.id, WishlistItem.listing_id == listing_id) ) if existing is None: db.add(WishlistItem(user_id=current_user.id, listing_id=listing_id)) notice = f"Saved {listing.title}." else: db.delete(existing) notice = f"Removed {listing.title} from the wishlist." db.commit() return redirect_with_notice(f"/listings/{listing.slug}", notice) @app.post("/listings/{listing_id}/message") def send_listing_message( request: Request, listing_id: int, subject: str = Form("Question about the stay"), body: str = Form(...), db: Session = Depends(get_db), ): listing = db.get(Listing, listing_id) if listing is None: return redirect_with_notice("/", "Listing not found.") current_user = require_login(request, db, f"/listings/{listing.slug}") if isinstance(current_user, RedirectResponse): return current_user thread = db.scalar( select(MessageThread).where( MessageThread.listing_id == listing_id, MessageThread.guest_id == current_user.id, MessageThread.host_id == listing.host_id, ) ) if thread is None: thread = MessageThread( listing_id=listing_id, guest_id=current_user.id, host_id=listing.host_id, subject=subject.strip() or "Question about the stay", last_message_at=datetime.utcnow(), ) db.add(thread) db.flush() thread.last_message_at = datetime.utcnow() db.add(Message(thread_id=thread.id, sender_id=current_user.id, body=body.strip(), created_at=datetime.utcnow())) db.commit() return redirect_with_notice(f"/inbox?thread_id={thread.id}", "Message sent.") @app.get("/trips", response_class=HTMLResponse) def trips(request: Request, db: Session = Depends(get_db)): current_user = require_login(request, db, "/trips") if isinstance(current_user, RedirectResponse): return current_user bookings = db.scalars( select(Booking) .where(Booking.guest_id == current_user.id) .options(joinedload(Booking.listing).joinedload(Listing.host), joinedload(Booking.listing).selectinload(Listing.images)) .order_by(Booking.check_in) ).all() return templates.TemplateResponse( request=request, name="trips.html", context=build_context(request, db, bookings=bookings), ) @app.post("/bookings/{booking_id}/cancel") def cancel_booking(request: Request, booking_id: int, db: Session = Depends(get_db)): current_user = require_login(request, db, "/trips") if isinstance(current_user, RedirectResponse): return current_user booking = db.scalar( select(Booking).where(Booking.id == booking_id, Booking.guest_id == current_user.id).options(joinedload(Booking.listing)) ) if booking is None: return redirect_with_notice("/trips", "Booking not found.") if booking.status == "canceled": return redirect_with_notice("/trips", "That booking is already canceled.") booking.status = "canceled" recompute_availability(db, booking.listing, booking.check_in, booking.check_out) db.commit() return redirect_with_notice("/trips", f"Canceled {booking.confirmation_code}.") @app.get("/wishlists", response_class=HTMLResponse) def wishlists(request: Request, db: Session = Depends(get_db)): current_user = require_login(request, db, "/wishlists") if isinstance(current_user, RedirectResponse): return current_user items = db.scalars( select(WishlistItem) .where(WishlistItem.user_id == current_user.id) .options(joinedload(WishlistItem.listing).joinedload(Listing.host), joinedload(WishlistItem.listing).selectinload(Listing.images)) ).all() return templates.TemplateResponse( request=request, name="wishlists.html", context=build_context( request, db, wishlist_cards=[ { "item": item, "card_meta": get_listing_card_meta(item.listing), } for item in items ], ), ) @app.get("/inbox", response_class=HTMLResponse) def inbox(request: Request, thread_id: int | None = None, db: Session = Depends(get_db)): current_user = require_login(request, db, "/inbox") if isinstance(current_user, RedirectResponse): return current_user threads = db.scalars( select(MessageThread) .where(or_(MessageThread.guest_id == current_user.id, MessageThread.host_id == current_user.id)) .options( joinedload(MessageThread.listing), joinedload(MessageThread.guest), joinedload(MessageThread.host), selectinload(MessageThread.messages).joinedload(Message.sender), ) .order_by(MessageThread.last_message_at.desc()) ).all() selected_thread = next((thread for thread in threads if thread.id == thread_id), None) if selected_thread is None and threads: selected_thread = threads[0] return templates.TemplateResponse( request=request, name="inbox.html", context=build_context(request, db, threads=threads, selected_thread=selected_thread), ) @app.post("/threads/{thread_id}/reply") def reply_to_thread(request: Request, thread_id: int, body: str = Form(...), db: Session = Depends(get_db)): current_user = require_login(request, db, f"/inbox?thread_id={thread_id}") if isinstance(current_user, RedirectResponse): return current_user thread = db.scalar( select(MessageThread).where( MessageThread.id == thread_id, or_(MessageThread.guest_id == current_user.id, MessageThread.host_id == current_user.id), ) ) if thread is None: return redirect_with_notice("/inbox", "Conversation not found.") thread.last_message_at = datetime.utcnow() db.add(Message(thread_id=thread.id, sender_id=current_user.id, body=body.strip(), created_at=datetime.utcnow())) db.commit() return redirect_with_notice(f"/inbox?thread_id={thread.id}", "Reply sent.") @app.get("/host", response_class=HTMLResponse) def host_dashboard(request: Request, db: Session = Depends(get_db)): current_user = require_host(request, db, "/host") if isinstance(current_user, RedirectResponse): return current_user today = date.today() listings = db.scalars( select(Listing) .where(Listing.host_id == current_user.id) .options(selectinload(Listing.images)) .order_by(Listing.id) ).all() reservations = db.scalars( select(Booking) .join(Listing, Booking.listing_id == Listing.id) .where(Listing.host_id == current_user.id) .options(joinedload(Booking.guest), joinedload(Booking.listing)) .order_by(Booking.check_in) ).all() reservations_by_listing: dict[int, list[Booking]] = {} for booking in reservations: reservations_by_listing.setdefault(booking.listing_id, []).append(booking) confirmed_reservation_count = 0 blocked_window_count = 0 upcoming_reservations = [ booking for booking in reservations if booking.status == "confirmed" and booking.check_out >= today ] host_cards = [] for listing in listings: listing_reservations = reservations_by_listing.get(listing.id, []) confirmed_count = sum(1 for booking in listing_reservations if booking.status == "confirmed") confirmed_reservation_count += confirmed_count blocked_windows = sorted(listing.blocked_ranges, key=lambda blocked_window: blocked_window["start"]) blocked_window_count += len(blocked_windows) upcoming_listing_reservations = [ booking for booking in listing_reservations if booking.status == "confirmed" and booking.check_out >= today ] host_cards.append( { "listing": listing, "blocked_windows": blocked_windows, "confirmed_count": confirmed_count, "upcoming_count": len(upcoming_listing_reservations), "next_arrival": min( (booking.check_in for booking in upcoming_listing_reservations), default=None, ), } ) return templates.TemplateResponse( request=request, name="host_dashboard.html", context=build_context( request, db, listings=listings, reservations=reservations, host_cards=host_cards, today=today, blocked_window_count=blocked_window_count, confirmed_reservation_count=confirmed_reservation_count, upcoming_reservations=upcoming_reservations, ), ) @app.post("/host/listings/{listing_id}/block") def block_listing_dates( request: Request, listing_id: int, start_date: str = Form(...), end_date: str = Form(...), db: Session = Depends(get_db), ): current_user = require_host(request, db, "/host") if isinstance(current_user, RedirectResponse): return current_user listing = db.scalar(select(Listing).where(Listing.id == listing_id, Listing.host_id == current_user.id)) if listing is None: return redirect_with_notice("/host", "Listing not found.") block_start = parse_date(start_date) block_end = parse_date(end_date) if block_end <= block_start: return redirect_with_notice("/host", "Choose an end date after the start date.") listing.blocked_ranges = merge_ranges( [*listing.blocked_ranges, {"start": block_start.isoformat(), "end": block_end.isoformat()}] ) recompute_availability(db, listing, block_start, block_end) db.commit() return redirect_with_notice("/host", f"Blocked dates on {listing.title}.") @app.get("/api/tasks") def list_tasks(db: Session = Depends(get_db)): tasks = db.scalars( select(TaskDefinition).options(joinedload(TaskDefinition.persona)).order_by(TaskDefinition.id) ).all() return {"tasks": [serialize_task(task) for task in tasks]} @app.get("/api/tasks/{task_id}") def task_detail(task_id: int, db: Session = Depends(get_db)): task = db.scalar( select(TaskDefinition).where(TaskDefinition.id == task_id).options(joinedload(TaskDefinition.persona)) ) if task is None: return JSONResponse({"error": "Task not found."}, status_code=404) return serialize_task(task) @app.get("/api/tasks/{task_id}/evaluate") @app.post("/api/tasks/{task_id}/evaluate") def task_evaluate(task_id: int, db: Session = Depends(get_db)): task = db.scalar( select(TaskDefinition).where(TaskDefinition.id == task_id).options(joinedload(TaskDefinition.persona)) ) if task is None: return JSONResponse({"error": "Task not found."}, status_code=404) return evaluate_task(db, task) @app.get("/api/state") def state(db: Session = Depends(get_db)): return serialize_state(db) @app.post("/api/reset") def api_reset(token: str = ""): if settings.reset_token and token != settings.reset_token: return JSONResponse({"error": "Invalid reset token."}, status_code=403) reset_database() with SessionLocal() as fresh_db: return {"status": "ok", "message": "Environment reset.", "state": serialize_state(fresh_db)} @app.get("/healthz") def healthz(): return {"status": "ok"}