import { Pool } from "pg"; import type { BillingAddress, OrgBilling, SupportTicket, SupportTicketComment, SupportTicketCommentAuthorKind, SupportTicketCategory, SupportTicketStatus, TenantRequest, TenantRequestStatus, } from "@/types"; import { listTenants, getTenant } from "./k8s"; // --------------------------------------------------------------------------- // Connection pool (singleton) // --------------------------------------------------------------------------- let pool: Pool | null = null; function getPool(): Pool { if (!pool) { const connectionString = process.env.DATABASE_URL ?? "postgresql://portal:portal@portal-db-rw.portal.svc:5432/portal"; pool = new Pool({ connectionString, max: 5 }); } return pool; } // --------------------------------------------------------------------------- // Schema migration (auto-run on first query) // --------------------------------------------------------------------------- // Notes on the Slice 3 changes // ---------------------------- // 1. Removed `UNIQUE` from `zitadel_org_id` in the CREATE TABLE for fresh // installs, AND emit a defensive `DROP CONSTRAINT IF EXISTS` for // existing installs whose schema was created pre-Slice-3. The // constraint was Postgres-autonamed; the name is deterministic. // 2. Added `instance_name TEXT` — the customer's human label per // instance (e.g. "Production", "Dev"). NULL is fine and means "use // the company name for display". // 3. Added a unique index on `tenant_name WHERE NOT NULL`. Multiple // rows in the table can have NULL tenant_name (pending/rejected // requests), but every approved row points to a distinct K8s CR. // 4. Added `(zitadel_org_id, status)` index for the list-by-org queries // introduced this slice. const MIGRATION_SQL = ` CREATE TABLE IF NOT EXISTS tenant_requests ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), zitadel_org_id TEXT NOT NULL, zitadel_user_id TEXT NOT NULL, company_name TEXT NOT NULL, instance_name TEXT, contact_name TEXT NOT NULL, contact_email TEXT NOT NULL, agent_name TEXT NOT NULL DEFAULT 'Assistant', soul_md TEXT, agents_md TEXT, packages TEXT[] DEFAULT '{}', billing_address JSONB DEFAULT '{}', billing_notes TEXT, status TEXT NOT NULL DEFAULT 'pending', admin_notes TEXT, tenant_name TEXT, encrypted_secrets BYTEA, is_personal BOOLEAN NOT NULL DEFAULT FALSE, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); CREATE INDEX IF NOT EXISTS idx_tenant_requests_status ON tenant_requests(status); CREATE INDEX IF NOT EXISTS idx_tenant_requests_org_id ON tenant_requests(zitadel_org_id); CREATE INDEX IF NOT EXISTS idx_tenant_requests_org_status ON tenant_requests(zitadel_org_id, status); -- Note: the unique constraint on tenant_name is NOT created here. -- Pre-Bug-37 we had a non-partial UNIQUE on tenant_name, which is -- incompatible with resume requests (same tenant_name, different -- request_type). The new partial unique indexes are created -- further down in the migration block, after the request_type -- column has been added and backfilled. This bootstrap section -- only creates indexes that are safe regardless of request_type -- semantics. -- Idempotent column adds for existing databases ALTER TABLE tenant_requests ADD COLUMN IF NOT EXISTS encrypted_secrets BYTEA; ALTER TABLE tenant_requests ADD COLUMN IF NOT EXISTS agents_md TEXT; ALTER TABLE tenant_requests ADD COLUMN IF NOT EXISTS instance_name TEXT; ALTER TABLE tenant_requests ADD COLUMN IF NOT EXISTS is_personal BOOLEAN NOT NULL DEFAULT FALSE; -- Bug 13: customer-side dismissal of rejected requests. NULL means "still -- visible on the dashboard"; non-null means "customer clicked Dismiss". -- Pending/approved/active rows keep this NULL by definition — the field -- is only meaningful for rejected and cancelled rows. ALTER TABLE tenant_requests ADD COLUMN IF NOT EXISTS dismissed_at TIMESTAMPTZ; -- Phase 9b: link a provision request to the paid setup-fee invoice -- it was charged against at order time. Null on requests created -- before Phase 9b, on resume requests, and during the brief -- 'pending_payment' window before the Stripe webhook fires. The -- admin reject flow refunds this invoice via the existing -- refundInvoice helper. ALTER TABLE tenant_requests ADD COLUMN IF NOT EXISTS setup_invoice_id UUID REFERENCES invoices(id); CREATE INDEX IF NOT EXISTS idx_tenant_requests_setup_invoice ON tenant_requests(setup_invoice_id) WHERE setup_invoice_id IS NOT NULL; -- Phase 9b: optional initial channel-user ids per channel package -- collected during onboarding. JSONB so the shape can vary by -- channel (today it's a string[] per channel id, matching -- PiecedTenantSpec.channelUsers). Default '{}' so reads on legacy -- rows return an empty object rather than null. ALTER TABLE tenant_requests ADD COLUMN IF NOT EXISTS channel_users JSONB NOT NULL DEFAULT '{}'::jsonb; -- Feature 6: free-form customer note attached to the request. -- Currently surfaced only by resume requests (where the customer -- explains why they want reactivation), but the column is generic -- so future flows could reuse it. Distinct from billing_notes -- (provision-only, accounting-related) and admin_notes (admin's -- reason on reject/approve). Optional — nullable. ALTER TABLE tenant_requests ADD COLUMN IF NOT EXISTS customer_notes TEXT; -- Bug 37a: resume requests use the same table as provision requests so -- the customer dashboard and admin queue share rendering. Discriminator -- is request_type. Default 'provision' on backfill keeps existing rows -- working without explicit migration. -- -- Resume rows have: -- request_type = 'resume' -- tenant_name = the existing tenant being requested for reactivation -- zitadel_org_id = the org owning that tenant -- zitadel_user_id = the requesting customer -- status = pending → approved/rejected (or cancelled by customer) -- most provision-only fields (packages, billing_address, etc.) are NULL ALTER TABLE tenant_requests ADD COLUMN IF NOT EXISTS request_type TEXT NOT NULL DEFAULT 'provision'; -- Constrain to the known set so a future code change can't accidentally -- write a third type without first widening this constraint. DO $$ BEGIN ALTER TABLE tenant_requests ADD CONSTRAINT tenant_requests_request_type_check CHECK (request_type IN ('provision', 'resume')); EXCEPTION WHEN duplicate_object THEN NULL; END $$; -- Tenant_name uniqueness was originally meant for "one tenant CR per -- approved provision request". Resume requests reuse a tenant_name, -- so the uniqueness must now be scoped to provision rows only. DROP INDEX IF EXISTS uniq_tenant_requests_tenant_name; CREATE UNIQUE INDEX IF NOT EXISTS uniq_tenant_requests_tenant_name_provision ON tenant_requests(tenant_name) WHERE tenant_name IS NOT NULL AND request_type = 'provision'; -- Only one pending resume request per tenant at a time. Otherwise a -- customer could spam-create resume requests (the admin queue would -- bloat) or two admins might race on approving duplicates. CREATE UNIQUE INDEX IF NOT EXISTS uniq_tenant_requests_pending_resume ON tenant_requests(tenant_name) WHERE tenant_name IS NOT NULL AND request_type = 'resume' AND status = 'pending'; -- Slice 3: drop the legacy 1-org-1-request constraint if it exists ALTER TABLE tenant_requests DROP CONSTRAINT IF EXISTS tenant_requests_zitadel_org_id_key; -- Workspace templates: admin-editable default content for workspace files CREATE TABLE IF NOT EXISTS workspace_templates ( file_key TEXT PRIMARY KEY, content TEXT NOT NULL, updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); -- --------------------------------------------------------------------------- -- Slice 6: per-tenant user assignments -- --------------------------------------------------------------------------- -- -- Each row grants ONE user visibility into ONE tenant within their own -- ZITADEL org. Used to narrow the customer 'user' role from "everything -- in the org" to "only the tenants I've been assigned to". Owners and -- platform users bypass this table entirely. -- -- Composite PK is (tenant_name, zitadel_user_id) — a user is either -- assigned to a tenant or not, no degree. -- -- The zitadel_org_id column is denormalised onto every row so cascade -- cleanups when a user leaves an org can be expressed as a single -- DELETE WHERE zitadel_org_id=$1 AND zitadel_user_id=$2 — without -- joining tenant_requests. The assigned_by column tracks which user -- (the owner usually) granted the assignment, for audit. -- -- Cascade on tenant deletion is enforced in application code (the -- admin delete handler calls removeAllAssignmentsForTenant) rather -- than via FK — there's no FK target, since K8s CRs aren't a Postgres -- table. CREATE TABLE IF NOT EXISTS tenant_user_assignments ( tenant_name TEXT NOT NULL, zitadel_org_id TEXT NOT NULL, zitadel_user_id TEXT NOT NULL, assigned_at TIMESTAMPTZ NOT NULL DEFAULT now(), assigned_by TEXT NOT NULL, PRIMARY KEY (tenant_name, zitadel_user_id) ); CREATE INDEX IF NOT EXISTS idx_tua_user ON tenant_user_assignments(zitadel_user_id); CREATE INDEX IF NOT EXISTS idx_tua_org ON tenant_user_assignments(zitadel_org_id); -- Bug 35: org-scoped billing. One row per ZITADEL org; captured by -- the first tenant request inline, editable afterwards via -- /settings/billing. Subsequent tenant requests in the same org read -- this and skip the billing step entirely. -- -- vat_number is nullable: required at write time for company orgs -- (enforced by the API, not the schema, because "company-or-personal" -- isn't expressible as a column constraint). Notes is free-form -- accounting context — VAT exemption reasons, special invoicing -- arrangements, etc. -- -- We do NOT migrate data from tenant_requests.billing_address into -- this table automatically. Existing customers re-enter on next -- tenant or via settings — the data set is small (single-digit -- customers in pilot) and re-entering is the simplest path. CREATE TABLE IF NOT EXISTS org_billing ( zitadel_org_id TEXT PRIMARY KEY, company_name TEXT NOT NULL, -- Phase 6 fix: optional contact-person line shown on the -- invoice PDF below the company name (e.g. "z.Hd. Herr Müller"). -- Not normally needed since invoices are delivered by email -- link, but useful when customers forward the PDF internally -- for AP routing in larger organizations. contact_name TEXT, street_address TEXT NOT NULL, postal_code TEXT NOT NULL, city TEXT NOT NULL, country TEXT NOT NULL, vat_number TEXT, billing_email TEXT NOT NULL, notes TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); -- Phase 6 fix: ensure the column exists on databases that were -- created before contact_name was added to the base schema above. -- IF NOT EXISTS makes this safe to run repeatedly via ensureSchema. ALTER TABLE org_billing ADD COLUMN IF NOT EXISTS contact_name TEXT; -- Feature 5: lightweight customer support / feedback tickets. -- Scoped strictly per-user (zitadel_user_id), not per-org — -- coworkers in the same org cannot see each other's tickets. The -- index on (zitadel_user_id, status) is what most customer-side -- queries hit; the index on (status, updated_at DESC) is for the -- admin queue. -- -- contact_email / contact_name are frozen at creation time so the -- ticket retains a working "reply-to" identity even if the user -- later changes their email or display name in ZITADEL. CREATE TABLE IF NOT EXISTS support_tickets ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), zitadel_org_id TEXT NOT NULL, zitadel_user_id TEXT NOT NULL, title TEXT NOT NULL, description TEXT NOT NULL, category TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'open', contact_email TEXT NOT NULL, contact_name TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); -- CHECK constraints added separately so re-running the migration -- against an existing table (without these constraints) works. -- IF NOT EXISTS isn't supported on ADD CONSTRAINT, hence the -- DO $$ wrapper. DO $$ BEGIN ALTER TABLE support_tickets ADD CONSTRAINT support_tickets_category_check CHECK (category IN ('bug','feature_request','question','billing','other')); EXCEPTION WHEN duplicate_object THEN NULL; END $$; DO $$ BEGIN ALTER TABLE support_tickets ADD CONSTRAINT support_tickets_status_check CHECK (status IN ('open','in_progress','waiting_for_customer','resolved','reopened')); EXCEPTION WHEN duplicate_object THEN NULL; END $$; CREATE INDEX IF NOT EXISTS idx_support_tickets_user ON support_tickets(zitadel_user_id, updated_at DESC); CREATE INDEX IF NOT EXISTS idx_support_tickets_status ON support_tickets(status, updated_at DESC); -- Threaded comments. ON DELETE CASCADE so deleting a ticket -- cleans up its history; we don't currently expose ticket -- deletion in the UI but the cascade keeps options open. CREATE TABLE IF NOT EXISTS support_ticket_comments ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), ticket_id UUID NOT NULL REFERENCES support_tickets(id) ON DELETE CASCADE, author_user_id TEXT NOT NULL, author_name TEXT NOT NULL, author_kind TEXT NOT NULL, body TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); DO $$ BEGIN ALTER TABLE support_ticket_comments ADD CONSTRAINT support_ticket_comments_author_kind_check CHECK (author_kind IN ('customer','admin')); EXCEPTION WHEN duplicate_object THEN NULL; END $$; CREATE INDEX IF NOT EXISTS idx_support_ticket_comments_ticket ON support_ticket_comments(ticket_id, created_at); -- ========================================================================= -- Billing — Phase 1: pricing, lifecycle, and events -- ========================================================================= -- -- This block introduces the schema for the consolidated billing -- subsystem. Phase 1 lands the schema + writes the lifecycle and -- skill-event rows that downstream phases consume; invoice -- generation, PDF rendering, Stripe wiring and reminders are added -- in later phases. The invoice/line/reminder tables are created -- here so all billing schema lives in a single migration block, -- but no code writes to them until Phase 2. -- -- Money columns: NUMERIC(10,2) for CHF amounts (max ~99 million.99 -- which is fine for the foreseeable future) and NUMERIC(10,5) for -- per-unit prices (Threema messages and skill-days can have -- sub-rappen unit prices — 0.00012 CHF/message is the kind of -- granularity we want to keep precise across multiplication). -- -- All timestamps are TIMESTAMPTZ. Billing-day computations use UTC -- by convention (matches all other stored timestamps; avoids DST -- ambiguity around month boundaries). -- Single-row platform pricing config. The id=1 CHECK and explicit -- DEFAULT 1 make accidental multi-row inserts impossible, and -- callers can always SELECT * FROM platform_pricing WHERE id = 1. -- (Could use a settings KV table; this shape is clearer for -- typed columns that the admin UI binds to directly.) CREATE TABLE IF NOT EXISTS platform_pricing ( id INT PRIMARY KEY DEFAULT 1 CHECK (id = 1), tenant_monthly_fee_chf NUMERIC(10,2) NOT NULL DEFAULT 0, tenant_setup_fee_chf NUMERIC(10,2) NOT NULL DEFAULT 0, -- Single price per Threema message; the relay reports in+out -- separately but the spec ("billed per Message") treats both -- directions identically. If asymmetric pricing is needed -- later, split into in/out columns — additive change. threema_message_chf NUMERIC(10,5) NOT NULL DEFAULT 0, -- Default VAT rate for CH/LI customers, as percent (e.g. 8.10). -- Foreign customers' effective rate is computed at invoice time -- from billing address + VAT number (reverse-charge logic). vat_rate_chli NUMERIC(5,2) NOT NULL DEFAULT 8.10, updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); -- Ensure the single row exists. ON CONFLICT DO NOTHING is idempotent -- on every migration run. INSERT INTO platform_pricing (id) VALUES (1) ON CONFLICT DO NOTHING; -- Per-package optional daily price. Any package id can have a row; -- the admin UI in Phase 2 will only expose skill-category packages -- because that's what the spec called for, but nothing here -- prevents pricing other categories later. -- -- "skill" naming follows the spec ("custom pricing also for skills"). CREATE TABLE IF NOT EXISTS skill_pricing ( skill_id TEXT PRIMARY KEY, daily_price_chf NUMERIC(10,5) NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); -- Phase 2 addition: per-skill one-time setup fee. Charged the -- first time a given (tenant, skill) appears on an invoice line. -- Default 0 so pricing rows created before this column exists -- stay free until the admin sets a fee. ALTER TABLE skill_pricing ADD COLUMN IF NOT EXISTS setup_fee_chf NUMERIC(10,2) NOT NULL DEFAULT 0; -- One row per tenant. created_at anchors first-month proration; -- deleted_at (nullable, stamped on delete) anchors last-month -- proration. The PiecedTenant CR is the source of truth for -- existence, but once the CR is deleted we lose its -- creationTimestamp — so we mirror those two bookends here. CREATE TABLE IF NOT EXISTS tenant_billing_lifecycle ( tenant_name TEXT PRIMARY KEY, zitadel_org_id TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL, deleted_at TIMESTAMPTZ ); CREATE INDEX IF NOT EXISTS idx_tenant_billing_lifecycle_org ON tenant_billing_lifecycle(zitadel_org_id); -- Skill enable/disable events. One row per state change; same-day -- toggles still produce multiple rows, and the billing computation -- collapses to distinct UTC days at compute time. This append-only -- log preserves history for audit and lets us re-bill historical -- months reproducibly. -- -- skill_id is the package id from PACKAGE_CATALOG. We store -- events for ALL package toggles, not just skill-category — the -- channel/core toggles are cheap to record and may become billable -- in the future without a schema change. CREATE TABLE IF NOT EXISTS tenant_skill_events ( id BIGSERIAL PRIMARY KEY, tenant_name TEXT NOT NULL, zitadel_org_id TEXT NOT NULL, skill_id TEXT NOT NULL, event_kind TEXT NOT NULL CHECK (event_kind IN ('enabled', 'disabled')), occurred_at TIMESTAMPTZ NOT NULL DEFAULT now() ); CREATE INDEX IF NOT EXISTS idx_tenant_skill_events_tenant_skill ON tenant_skill_events(tenant_name, skill_id, occurred_at); CREATE INDEX IF NOT EXISTS idx_tenant_skill_events_org_time ON tenant_skill_events(zitadel_org_id, occurred_at); -- Suspend/resume transitions. Same shape as skill events. Reading -- these in order reconstructs the suspended-state segments for -- monthly fee proration: a tenant in 'suspended' state pays no -- monthly fee for the days it was suspended. -- -- The portal commands the transition (PATCH spec.suspend); the -- operator observes and stamps PiecedTenantStatus.suspendedAt -- after reconcile. We record the event at command time — billing -- is monthly so the few-second reconcile lag is irrelevant. CREATE TABLE IF NOT EXISTS tenant_suspension_events ( id BIGSERIAL PRIMARY KEY, tenant_name TEXT NOT NULL, zitadel_org_id TEXT NOT NULL, event_kind TEXT NOT NULL CHECK (event_kind IN ('suspended','resumed')), occurred_at TIMESTAMPTZ NOT NULL DEFAULT now() ); CREATE INDEX IF NOT EXISTS idx_tenant_suspension_events_tenant ON tenant_suspension_events(tenant_name, occurred_at); -- Per-org billing configuration. Distinct from org_billing -- (address/VAT/email): that table is customer-editable, this one -- is admin-controlled and holds the payment posture. -- -- Defaults: a new org has pay_by_invoice = false (must use a -- credit card per the onboarding gate in Phase 4) and auto -- billing/reminders enabled. Admin can flip pay_by_invoice on -- per customer, after which approval no longer requires a card. CREATE TABLE IF NOT EXISTS org_billing_config ( zitadel_org_id TEXT PRIMARY KEY, pay_by_invoice BOOLEAN NOT NULL DEFAULT FALSE, stripe_customer_id TEXT, auto_invoice_enabled BOOLEAN NOT NULL DEFAULT TRUE, auto_reminders_enabled BOOLEAN NOT NULL DEFAULT TRUE, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); -- Phase 9: saved-card columns. The PaymentMethod id ('pm_xxx') -- is the handle for off-session charges; brand/last4/exp are -- display fields. No PAN, CVV, or anything PCI-scope — Stripe -- holds those. The columns are nullable because a fresh org has -- no saved card; setting up auto-pay populates them via the -- checkout.session.completed webhook in setup mode. ALTER TABLE org_billing_config ADD COLUMN IF NOT EXISTS stripe_default_payment_method_id TEXT; ALTER TABLE org_billing_config ADD COLUMN IF NOT EXISTS stripe_pm_brand TEXT; ALTER TABLE org_billing_config ADD COLUMN IF NOT EXISTS stripe_pm_last4 TEXT; ALTER TABLE org_billing_config ADD COLUMN IF NOT EXISTS stripe_pm_exp_month INTEGER; ALTER TABLE org_billing_config ADD COLUMN IF NOT EXISTS stripe_pm_exp_year INTEGER; -- Phase 9: off-session auto-charge gate. Default TRUE — new orgs -- pay by card automatically when an invoice is issued (assuming -- they've also set up a saved card). Admin can flip OFF to pause -- charging without removing the saved card. ALTER TABLE org_billing_config ADD COLUMN IF NOT EXISTS auto_charge_enabled BOOLEAN NOT NULL DEFAULT TRUE; -- Stripe payment methods. Populated by the Phase 4 webhook handler. -- Created in Phase 1 so all billing schema is together; rows are -- empty until Phase 4 ships. CREATE TABLE IF NOT EXISTS org_payment_methods ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), zitadel_org_id TEXT NOT NULL, stripe_payment_method_id TEXT NOT NULL UNIQUE, brand TEXT, last4 TEXT, exp_month INT, exp_year INT, is_default BOOLEAN NOT NULL DEFAULT FALSE, created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); CREATE INDEX IF NOT EXISTS idx_org_payment_methods_org ON org_payment_methods(zitadel_org_id); -- At most one default payment method per org. Partial unique index -- so non-default rows don't conflict with each other. CREATE UNIQUE INDEX IF NOT EXISTS uniq_org_payment_methods_default ON org_payment_methods(zitadel_org_id) WHERE is_default = TRUE; -- Gapless per-year invoice number counter (Art. 957a OR -- compliance). A Postgres SEQUENCE would be faster but allows -- gaps on rollback; this counter table is SELECT FOR UPDATE-able -- and produces gapless numbers when the invoice insert is in the -- same transaction. Populated lazily — the first invoice of each -- year inserts its row. CREATE TABLE IF NOT EXISTS invoice_number_counters ( year INT PRIMARY KEY, last_number INT NOT NULL DEFAULT 0 ); -- Issued invoices. Immutable once status leaves 'draft'. -- billing_snapshot captures the address/VAT/email at issue time -- so subsequent edits to org_billing don't mutate historical -- invoices. CREATE TABLE IF NOT EXISTS invoices ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), invoice_number TEXT NOT NULL UNIQUE, zitadel_org_id TEXT NOT NULL, -- Billing period as DATEs (not timestamps): a calendar month. -- period_end is the last day of the month, inclusive. period_start DATE NOT NULL, period_end DATE NOT NULL, issued_at TIMESTAMPTZ NOT NULL DEFAULT now(), due_at DATE NOT NULL, subtotal_chf NUMERIC(10,2) NOT NULL, vat_rate NUMERIC(5,2) NOT NULL, vat_amount_chf NUMERIC(10,2) NOT NULL, total_chf NUMERIC(10,2) NOT NULL, status TEXT NOT NULL DEFAULT 'open' CHECK ( status IN ('draft','open','paid','overdue','void','uncollectible') ), billing_snapshot JSONB NOT NULL, payment_method TEXT NOT NULL CHECK (payment_method IN ('invoice','card')), stripe_payment_intent_id TEXT, pdf_data BYTEA, pdf_filename TEXT, admin_notes TEXT, paid_at TIMESTAMPTZ, paid_by TEXT, paid_method_detail TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); -- Phase 2 addition: PDF locale, frozen at issue time so re-rendering -- an old invoice produces an identical document. Defaults to 'de' -- since most pilot customers are Swiss B2B; the generator UI lets -- admin override at issue time. ALTER TABLE invoices ADD COLUMN IF NOT EXISTS locale TEXT NOT NULL DEFAULT 'de'; -- Phase 7 schema: void + refund tracking on the existing invoices -- table, plus the credit-note and refund-event sub-entities. These -- ALTERs are idempotent (IF NOT EXISTS) so re-running ensureSchema -- on an already-migrated DB is a no-op. ALTER TABLE invoices ADD COLUMN IF NOT EXISTS void_reason TEXT; ALTER TABLE invoices ADD COLUMN IF NOT EXISTS voided_at TIMESTAMPTZ; ALTER TABLE invoices ADD COLUMN IF NOT EXISTS voided_by TEXT; ALTER TABLE invoices ADD COLUMN IF NOT EXISTS refunded_total_chf NUMERIC(10,2) NOT NULL DEFAULT 0; -- Extend the status CHECK to allow partially/fully_refunded. We -- drop-then-add because there's no "ALTER CONSTRAINT ... ADD -- VALUE" in stock Postgres. Drop is conditional on the constraint -- name; the constraint is auto-named after the table+column. DO $constraints$ BEGIN ALTER TABLE invoices DROP CONSTRAINT IF EXISTS invoices_status_check; ALTER TABLE invoices ADD CONSTRAINT invoices_status_check CHECK ( status IN ('draft','open','paid','overdue','void','uncollectible', 'partially_refunded','fully_refunded') ); END $constraints$; CREATE INDEX IF NOT EXISTS idx_invoices_org ON invoices(zitadel_org_id, issued_at DESC); CREATE INDEX IF NOT EXISTS idx_invoices_status ON invoices(status, due_at); -- Phase 8: distinguish auto (cron) from custom (admin-entered) -- invoices. All pre-Phase-8 rows backfill to 'auto' via the -- column DEFAULT. Custom invoices skip the per-org/per-month -- uniqueness guard (admin may issue multiple custom invoices -- against the same org in the same month). ALTER TABLE invoices ADD COLUMN IF NOT EXISTS source TEXT NOT NULL DEFAULT 'auto' CHECK (source IN ('auto','custom')); -- period_start / period_end become nullable so custom invoices -- (no fixed billing period) can leave them empty. Auto-cron -- invoices keep their values; only the NOT NULL constraint is -- relaxed. Idempotent: DROP NOT NULL on an already-nullable column -- is a no-op in Postgres. ALTER TABLE invoices ALTER COLUMN period_start DROP NOT NULL; ALTER TABLE invoices ALTER COLUMN period_end DROP NOT NULL; -- Replace the old unconditional uniqueness with a partial index -- limited to auto invoices. Both invariants the cron relies on -- (no double-issuance per period) survive; custom invoices are -- unaffected. Idempotent: DROP IF EXISTS handles the migration -- case, and CREATE IF NOT EXISTS handles re-runs. DROP INDEX IF EXISTS uniq_invoices_org_period; CREATE UNIQUE INDEX IF NOT EXISTS uniq_invoices_org_period_auto ON invoices(zitadel_org_id, period_start) WHERE source = 'auto'; -- Phase 7: credit notes. One row per void or refund event. The -- credit_note_number follows CN-YYYY-NNNNN allocated from the -- per-year counter below. CREATE TABLE IF NOT EXISTS credit_note_counters ( year INTEGER PRIMARY KEY, last_number INTEGER NOT NULL DEFAULT 0 ); CREATE TABLE IF NOT EXISTS credit_notes ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), credit_note_number TEXT NOT NULL UNIQUE, invoice_id UUID NOT NULL REFERENCES invoices(id), zitadel_org_id TEXT NOT NULL, kind TEXT NOT NULL CHECK (kind IN ('void','refund')), amount_chf NUMERIC(10,2) NOT NULL, vat_amount_chf NUMERIC(10,2) NOT NULL DEFAULT 0, reason TEXT, issued_at TIMESTAMPTZ NOT NULL DEFAULT now(), issued_by TEXT NOT NULL, locale TEXT NOT NULL DEFAULT 'de', pdf_data BYTEA, pdf_filename TEXT, -- Frozen snapshot of the customer's billing address at the time -- the credit note is issued. Mirrors invoices.billing_snapshot -- so the PDF is self-contained and reproducible. billing_snapshot JSONB NOT NULL ); CREATE INDEX IF NOT EXISTS idx_credit_notes_invoice ON credit_notes(invoice_id); CREATE INDEX IF NOT EXISTS idx_credit_notes_org ON credit_notes(zitadel_org_id, issued_at DESC); -- Phase 7: per-refund-event log. Each row is one Stripe Refund -- object (stripe_refund_id non-null) OR one manual admin action -- for invoice-paid customers (stripe_refund_id null). The -- credit_note_id links the refund to the credit note PDF it -- generated. CREATE TABLE IF NOT EXISTS invoice_refunds ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), invoice_id UUID NOT NULL REFERENCES invoices(id), stripe_refund_id TEXT UNIQUE, amount_chf NUMERIC(10,2) NOT NULL, reason TEXT, status TEXT NOT NULL DEFAULT 'succeeded' CHECK (status IN ('pending','succeeded','failed','canceled')), refunded_at TIMESTAMPTZ NOT NULL DEFAULT now(), refunded_by TEXT NOT NULL, credit_note_id UUID REFERENCES credit_notes(id) ); CREATE INDEX IF NOT EXISTS idx_invoice_refunds_invoice ON invoice_refunds(invoice_id); -- Phase 7 fix: the credit_notes.invoice_id and -- invoice_refunds.invoice_id FKs were originally created without -- ON DELETE CASCADE, which made admin "delete invoice" fail with -- a FK violation when the invoice had any voids/refunds attached. -- The production policy is to never delete an invoice that's been -- refunded (the credit notes are part of the customer's records), -- but the schema should allow the admin tool to clean up for test -- data. We drop and re-add the FKs with CASCADE so a delete tears -- down everything related. The DROP/ADD pair is idempotent — on a -- DB that already has the CASCADE variant it's a no-op (we drop -- by name and re-add identically). DO $cnfk$ BEGIN ALTER TABLE credit_notes DROP CONSTRAINT IF EXISTS credit_notes_invoice_id_fkey; ALTER TABLE credit_notes ADD CONSTRAINT credit_notes_invoice_id_fkey FOREIGN KEY (invoice_id) REFERENCES invoices(id) ON DELETE CASCADE; END $cnfk$; DO $irfk$ BEGIN ALTER TABLE invoice_refunds DROP CONSTRAINT IF EXISTS invoice_refunds_invoice_id_fkey; ALTER TABLE invoice_refunds ADD CONSTRAINT invoice_refunds_invoice_id_fkey FOREIGN KEY (invoice_id) REFERENCES invoices(id) ON DELETE CASCADE; END $irfk$; -- Invoice line items. The kind column lets the PDF renderer -- group lines (all monthly fees together, all AI usage together, -- etc.) and the admin UI filter by category. CREATE TABLE IF NOT EXISTS invoice_lines ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), invoice_id UUID NOT NULL REFERENCES invoices(id) ON DELETE CASCADE, -- NULL for org-wide items; tenant name for per-tenant breakdowns. tenant_name TEXT, kind TEXT NOT NULL CHECK (kind IN ( 'tenant_monthly','tenant_setup','ai_usage','threema_messages','skill_usage','skill_setup','adjustment' )), description TEXT NOT NULL, quantity NUMERIC(12,4) NOT NULL DEFAULT 1, unit_label TEXT, unit_price_chf NUMERIC(10,5) NOT NULL, amount_chf NUMERIC(10,2) NOT NULL, -- Per-kind audit metadata (e.g. {proration_days, days_in_month} -- for tenant_monthly; {in_count, out_count} for threema_messages). metadata JSONB, display_order INT NOT NULL DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_invoice_lines_invoice ON invoice_lines(invoice_id, display_order); -- Reminders fired against open/overdue invoices. Level 3 = final. -- One PDF per reminder, stored alongside. CREATE TABLE IF NOT EXISTS invoice_reminders ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), invoice_id UUID NOT NULL REFERENCES invoices(id) ON DELETE CASCADE, level INT NOT NULL CHECK (level IN (1, 2, 3)), sent_at TIMESTAMPTZ NOT NULL DEFAULT now(), sent_by TEXT NOT NULL, pdf_data BYTEA, pdf_filename TEXT, email_sent_to TEXT, UNIQUE (invoice_id, level) ); CREATE INDEX IF NOT EXISTS idx_invoice_reminders_invoice ON invoice_reminders(invoice_id, level); -- Phase 2.5: queue for skills flagged with requiresManualSetup in -- the package catalog. A user-side enable on a flagged skill -- creates a pending row here instead of mutating tenant.spec.packages; -- the operator never sees the skill until admin approves and adds -- it to the spec. Disable is always direct — there's no gate on -- turning a skill off, even one that previously required setup. CREATE TABLE IF NOT EXISTS skill_activation_requests ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), tenant_name TEXT NOT NULL, zitadel_org_id TEXT NOT NULL, zitadel_user_id TEXT NOT NULL, skill_id TEXT NOT NULL, status TEXT NOT NULL CHECK (status IN ('pending', 'approved', 'rejected', 'withdrawn')) DEFAULT 'pending', requested_at TIMESTAMPTZ NOT NULL DEFAULT now(), reviewed_at TIMESTAMPTZ, reviewed_by TEXT, rejection_reason TEXT, admin_notes TEXT ); -- Only one in-flight request per (tenant, skill). Rejected and -- approved rows don't block new requests; user can retry after a -- rejection by toggling the skill again. CREATE UNIQUE INDEX IF NOT EXISTS uniq_skill_act_one_pending ON skill_activation_requests (tenant_name, skill_id) WHERE status = 'pending'; -- Admin queue lookup — partial index keeps it tiny. CREATE INDEX IF NOT EXISTS idx_skill_act_pending_status ON skill_activation_requests (requested_at DESC) WHERE status = 'pending'; -- Per-tenant lookup for the customer UI's pending+rejected display. CREATE INDEX IF NOT EXISTS idx_skill_act_tenant ON skill_activation_requests (tenant_name, requested_at DESC); -- Phase 3 fix: the original invoice_lines.kind CHECK constraint -- was created without 'skill_setup' (which Phase 2-fix6 added as -- a new line kind for per-skill setup fees). CREATE TABLE IF NOT -- EXISTS doesn't update constraints on existing tables, so we -- explicitly drop and re-add with the full kind set on every -- boot. Idempotent — DROP IF EXISTS swallows the not-yet-exists -- case (fresh installs); ADD always re-creates. Constraint name -- follows Postgres's default __check. ALTER TABLE invoice_lines DROP CONSTRAINT IF EXISTS invoice_lines_kind_check; ALTER TABLE invoice_lines ADD CONSTRAINT invoice_lines_kind_check CHECK (kind IN ( 'tenant_monthly','tenant_setup','ai_usage','threema_messages', 'skill_usage','skill_setup','adjustment', -- Phase 8: lines on admin-created custom invoices. PDF -- groups these under a "Services" section header. 'custom_line' )); -- Phase 8: drafts for the admin "New invoice" flow. The payload -- is a JSONB blob with the in-progress form (lines, dates, -- locale, etc.). On Issue the payload is read, a real Invoice -- row is allocated via createInvoice with source='custom', and -- this draft row is deleted. Drafts are admin-only — they have -- no invoice number, no PDF, and aren't reachable from any -- customer-facing route. -- -- Why a JSONB blob instead of mirroring the invoices schema: -- drafts and issued invoices share only a fraction of fields -- (no number, no totals computed yet, possibly incomplete -- billing snapshot), and any parallel-table design would force -- a costly conversion step. The blob keeps the schema minimal -- and the read/write paths trivial. CREATE TABLE IF NOT EXISTS invoice_drafts ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), zitadel_org_id TEXT NOT NULL, created_by TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), payload JSONB NOT NULL ); CREATE INDEX IF NOT EXISTS idx_invoice_drafts_org ON invoice_drafts(zitadel_org_id, updated_at DESC); CREATE INDEX IF NOT EXISTS idx_invoice_drafts_recent ON invoice_drafts(updated_at DESC); -- Phase 4: Stripe webhook idempotency. Stripe guarantees at-least-once -- delivery and retries failures with exponential backoff for up to 72h, -- so the same event.id can arrive multiple times. We insert each -- event.id with the PK constraint enforcing uniqueness; INSERT either -- succeeds (first delivery → process the event) or fails with 23505 -- (duplicate → ack with 200 and skip). The payload column is invaluable -- when diagnosing a webhook that processed wrong; keep it small and -- prune old rows out-of-band if storage becomes a concern (Phase 7). CREATE TABLE IF NOT EXISTS stripe_events ( event_id TEXT PRIMARY KEY, event_type TEXT NOT NULL, received_at TIMESTAMPTZ NOT NULL DEFAULT now(), processed_at TIMESTAMPTZ, payload JSONB ); CREATE INDEX IF NOT EXISTS idx_stripe_events_type_received ON stripe_events (event_type, received_at DESC); -- Phase 5: Cron run history. One row per invocation of either the -- monthly issuance sweep or the daily reminder sweep, regardless of -- whether it ran from K8s CronJob or an admin's manual trigger. -- The summary counters let the admin UI render "last run: 12 issued, -- 0 failed" without joining against invoices/reminders. Detail rows -- live in the JSONB error_details on failure for diagnosis. CREATE TABLE IF NOT EXISTS cron_run_history ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), run_kind TEXT NOT NULL CHECK (run_kind IN ('monthly_issue','reminders')), triggered_by TEXT NOT NULL, -- 'cron' or '' started_at TIMESTAMPTZ NOT NULL DEFAULT now(), finished_at TIMESTAMPTZ, success_count INT NOT NULL DEFAULT 0, failure_count INT NOT NULL DEFAULT 0, skipped_count INT NOT NULL DEFAULT 0, error_details JSONB ); CREATE INDEX IF NOT EXISTS idx_cron_run_history_kind_started ON cron_run_history (run_kind, started_at DESC); `; let migrated = false; export async function ensureSchema(): Promise { if (migrated) return; await getPool().query(MIGRATION_SQL); migrated = true; } // --------------------------------------------------------------------------- // Workspace templates // --------------------------------------------------------------------------- /** * Get a workspace template by file key (e.g. "SOUL.md", "AGENTS.md", "TOOLS.md"). * Returns null if no template is stored for this key. */ export async function getWorkspaceTemplate( fileKey: string ): Promise { await ensureSchema(); const result = await getPool().query<{ content: string }>( "SELECT content FROM workspace_templates WHERE file_key = $1", [fileKey] ); return result.rows[0]?.content ?? null; } /** * Upsert a workspace template. */ export async function setWorkspaceTemplate( fileKey: string, content: string ): Promise { await ensureSchema(); await getPool().query( `INSERT INTO workspace_templates (file_key, content, updated_at) VALUES ($1, $2, now()) ON CONFLICT (file_key) DO UPDATE SET content = $2, updated_at = now()`, [fileKey, content] ); } /** * List all workspace templates. */ export async function listWorkspaceTemplates(): Promise< Array<{ fileKey: string; content: string; updatedAt: string }> > { await ensureSchema(); const result = await getPool().query( "SELECT file_key, content, updated_at FROM workspace_templates ORDER BY file_key" ); return result.rows.map((r: any) => ({ fileKey: r.file_key, content: r.content, updatedAt: r.updated_at?.toISOString?.() ?? r.updated_at, })); } // --------------------------------------------------------------------------- // Tenant requests CRUD // --------------------------------------------------------------------------- export async function createTenantRequest( params: Omit & { encryptedSecrets?: Buffer; } ): Promise { await ensureSchema(); const result = await getPool().query( `INSERT INTO tenant_requests (zitadel_org_id, zitadel_user_id, company_name, instance_name, contact_name, contact_email, agent_name, soul_md, agents_md, packages, billing_address, billing_notes, encrypted_secrets, is_personal, channel_users) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15::jsonb) RETURNING *`, [ params.zitadelOrgId, params.zitadelUserId, params.companyName, params.instanceName ?? null, params.contactName, params.contactEmail, params.agentName, params.soulMd, params.agentsMd ?? null, params.packages, JSON.stringify(params.billingAddress), params.billingNotes, params.encryptedSecrets ?? null, params.isPersonal ?? false, JSON.stringify(params.channelUsers ?? {}), ] ); return mapRow(result.rows[0]); } export async function getTenantRequestById( id: string ): Promise { await ensureSchema(); const result = await getPool().query( "SELECT * FROM tenant_requests WHERE id = $1", [id] ); return result.rows[0] ? mapRow(result.rows[0]) : null; } /** * Slice 3: returns ALL requests for an org, most recent first. * * Replaces the pre-Slice-3 `getTenantRequestByOrgId` which returned the * single most recent row. Callers that previously assumed one-row-per-org * must now iterate or pick by status. The intent is explicit at every * call site, which is the point of the rename. * * Includes rows in every status (pending, approved, provisioning, active, * rejected, deleted). For "active or in-flight only" filtering, see * {@link listActiveTenantRequestsByOrgId}. */ export async function listTenantRequestsByOrgId( orgId: string ): Promise { await ensureSchema(); const result = await getPool().query( "SELECT * FROM tenant_requests WHERE zitadel_org_id = $1 ORDER BY created_at DESC", [orgId] ); return result.rows.map(mapRow); } /** * As {@link listTenantRequestsByOrgId} but tuned for the customer's * dashboard view. * * Returns: * - All non-terminal rows (pending, approved, provisioning, active), * because the customer needs to see what's in flight. * - Terminal-failed rows (rejected, cancelled) that the customer * hasn't dismissed yet (Bug 13). Without this, a rejection that * happens while the customer isn't online would only be * communicated by email — easy to miss. * * Excludes: * - `deleted` rows (admin tore down the tenant — historical, not * actionable). * - Dismissed rejected/cancelled rows. */ export async function listActiveTenantRequestsByOrgId( orgId: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM tenant_requests WHERE zitadel_org_id = $1 AND status <> 'deleted' AND (status NOT IN ('rejected', 'cancelled') OR dismissed_at IS NULL) ORDER BY created_at DESC`, [orgId] ); return result.rows.map(mapRow); } /** * Returns the most recent approved-or-active request for an org. Used to * seed billing/contact defaults when a customer creates an additional * instance — saves them re-typing data already on file. * * Returns null if the org has never had an approved instance (e.g. first * registration is still pending). */ export async function getMostRecentApprovedRequestForOrg( orgId: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM tenant_requests WHERE zitadel_org_id = $1 AND status IN ('approved', 'provisioning', 'active') ORDER BY created_at DESC LIMIT 1`, [orgId] ); return result.rows[0] ? mapRow(result.rows[0]) : null; } export async function listTenantRequests( status?: TenantRequestStatus ): Promise { await ensureSchema(); // Phase 9b: 'pending_payment' rows are pre-Checkout: the customer // submitted the wizard but hasn't paid the setup fee yet. They're // invisible to admin until the webhook flips them to 'pending'. // The explicit filter path still allows querying them (e.g. // ?status=pending_payment) for debugging. const result = status ? await getPool().query( "SELECT * FROM tenant_requests WHERE status = $1 ORDER BY created_at DESC", [status] ) : await getPool().query( "SELECT * FROM tenant_requests WHERE status <> 'pending_payment' ORDER BY created_at DESC" ); return result.rows.map(mapRow); } export async function updateTenantRequestStatus( id: string, status: TenantRequestStatus, extra?: { adminNotes?: string | null; tenantName?: string; clearAdminNotes?: boolean; } ): Promise { await ensureSchema(); const sets = ["status = $2", "updated_at = now()"]; const values: any[] = [id, status]; let idx = 3; if (extra?.adminNotes !== undefined) { sets.push(`admin_notes = $${idx}`); values.push(extra.adminNotes); idx++; } if (extra?.clearAdminNotes) { sets.push("admin_notes = NULL"); } if (extra?.tenantName) { sets.push(`tenant_name = $${idx}`); values.push(extra.tenantName); idx++; } const result = await getPool().query( `UPDATE tenant_requests SET ${sets.join(", ")} WHERE id = $1 RETURNING *`, values ); return mapRow(result.rows[0]); } /** * Clear the encrypted_secrets column after secrets have been written to OpenBao. * Called during admin approval after successful vault writes. */ export async function clearEncryptedSecrets(requestId: string): Promise { await ensureSchema(); await getPool().query( "UPDATE tenant_requests SET encrypted_secrets = NULL, updated_at = now() WHERE id = $1", [requestId] ); } /** * Set dismissed_at = now() on a request row. Used when a customer * clicks "Dismiss" on a rejected/cancelled card on their dashboard * (Bug 13). The row stays in the database for history/audit but * stops appearing in `listActiveTenantRequestsByOrgId`. * * Idempotent: dismissing an already-dismissed row is a no-op. * Caller is responsible for verifying the row belongs to the user's * org before calling. */ /** * Create a resume request (Bug 37a). Used when an owner of a suspended * tenant wants to reactivate it. Resume is admin-gated — the request * sits as `pending` until a platform admin approves or rejects it. * * Tenant-name uniqueness is enforced for `pending` resume rows by a * partial unique index, so a customer can't spam the queue with * duplicate resume requests for the same tenant. The DB throws a * unique-violation if they try; callers should catch that and translate * to a 409. * * Why this lives in tenant_requests instead of a separate table: * - the lifecycle is identical (pending → approved/rejected, plus * customer-side cancel and dismiss-after-terminal) * - the customer dashboard renders pending+resume cards from the * same `listActiveTenantRequestsByOrgId` query — adding a separate * table would mean two queries and union-merging in the UI * - the admin queue likewise treats them uniformly * The cost is a discriminator column (`request_type`) and most * provision-only fields being null on resume rows. That's a tradeoff * I think is worth it. */ export async function createResumeRequest(params: { tenantName: string; zitadelOrgId: string; zitadelUserId: string; contactName: string; contactEmail: string; // Provision-only fields default sensibly. company_name + agent_name // are NOT NULL in the original schema; we copy them from the existing // tenant request for traceability rather than storing dummy values. companyName: string; agentName: string; /** * Feature 6: optional free-form note from the customer explaining * why they want reactivation. Surfaced to admin in the queue and * forwarded to the platform notification email so the admin can * decide before opening the request. */ customerNotes?: string | null; }): Promise { await ensureSchema(); const result = await getPool().query( `INSERT INTO tenant_requests ( zitadel_org_id, zitadel_user_id, company_name, contact_name, contact_email, agent_name, tenant_name, request_type, status, customer_notes ) VALUES ($1, $2, $3, $4, $5, $6, $7, 'resume', 'pending', $8) RETURNING *`, [ params.zitadelOrgId, params.zitadelUserId, params.companyName, params.contactName, params.contactEmail, params.agentName, params.tenantName, params.customerNotes ?? null, ] ); return mapRow(result.rows[0]); } /** * Get the most recent provision request for a tenant_name. Used by * Bug 37a's resume-request creation to populate company_name and * agent_name (NOT NULL columns) from the original provision row * rather than make up values. * * Returns null when no such row exists — should be impossible in * normal flow (resume requests are only created for already-existing * tenants whose CR was created via approving a provision request), * but the caller should guard against it for safety. */ export async function getTenantRequestByTenantName( tenantName: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM tenant_requests WHERE tenant_name = $1 AND request_type = 'provision' ORDER BY created_at DESC LIMIT 1`, [tenantName] ); return result.rows.length > 0 ? mapRow(result.rows[0]) : null; } /** * Return the in-flight (pending) resume request for a given tenant, if * any. Used both to gate the customer's "Request reactivation" button * (don't allow a second when one's already pending) and by the admin * UI to navigate from the tenant detail page to the awaiting request. * * Returns null when no pending resume exists. Approved/rejected rows * are never returned — they're terminal. */ export async function getPendingResumeRequestForTenant( tenantName: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM tenant_requests WHERE tenant_name = $1 AND request_type = 'resume' AND status = 'pending' LIMIT 1`, [tenantName] ); return result.rows.length > 0 ? mapRow(result.rows[0]) : null; } export async function dismissTenantRequest(id: string): Promise { await ensureSchema(); await getPool().query( `UPDATE tenant_requests SET dismissed_at = COALESCE(dismissed_at, now()), updated_at = now() WHERE id = $1`, [id] ); } /** * Update editable fields of a still-pending tenant request. Bug 6 — a * customer who notices a typo or wants to add a package after submitting * the wizard should be able to fix it without admin involvement. * * Only the customer-input fields are updateable. `status`, `tenant_name`, * `admin_notes`, `encrypted_secrets`, `is_personal`, `zitadel_*` and * timestamps are managed elsewhere and intentionally not here. * * The caller is responsible for: * - verifying the row belongs to the user's org * - verifying status === 'pending' (editing approved/provisioning rows * would race against the operator) * * Returns the updated row, or null if the id didn't match anything. */ export async function updateTenantRequestEditableFields( id: string, fields: { instanceName?: string | null; agentName?: string; soulMd?: string; agentsMd?: string | null; packages?: string[]; billingAddress?: BillingAddress; billingNotes?: string; encryptedSecrets?: Buffer | null; } ): Promise { await ensureSchema(); const sets: string[] = ["updated_at = now()"]; const values: any[] = [id]; let idx = 2; // Map JS field names to SQL columns. Each entry is gated on // `!== undefined` so passing only some fields just updates those. const colMap: Array<[keyof typeof fields, string]> = [ ["instanceName", "instance_name"], ["agentName", "agent_name"], ["soulMd", "soul_md"], ["agentsMd", "agents_md"], ["packages", "packages"], ["billingAddress", "billing_address"], ["billingNotes", "billing_notes"], ["encryptedSecrets", "encrypted_secrets"], ]; for (const [jsField, sqlCol] of colMap) { const v = fields[jsField]; if (v === undefined) continue; sets.push(`${sqlCol} = $${idx}`); values.push(v); idx++; } if (sets.length === 1) { // No editable fields supplied — return the row unchanged rather // than running a useless UPDATE that just bumps updated_at. const cur = await getTenantRequestById(id); return cur; } const result = await getPool().query( `UPDATE tenant_requests SET ${sets.join(", ")} WHERE id = $1 RETURNING *`, values ); return result.rows[0] ? mapRow(result.rows[0]) : null; } /** * Wrapper around domain-check.ts that injects the portal's connection pool. * Kept here so route handlers don't need direct access to the pool. */ export async function checkDuplicateDomain(email: string) { await ensureSchema(); // Lazy import to keep db.ts free of fetch/AbortSignal at module load time. const { checkRegistrationDomain } = await import("./domain-check"); return checkRegistrationDomain(getPool(), email); } /** * Mark a single tenant request as "deleted" when the associated tenant CR * is deleted. With multi-tenant per org this affects exactly one row, * since tenant_name is unique by index. The customer's other instances * are untouched. */ export async function markTenantRequestDeletedByTenantName( tenantName: string ): Promise { await ensureSchema(); await getPool().query( "UPDATE tenant_requests SET status = 'deleted', tenant_name = NULL, updated_at = now() WHERE tenant_name = $1", [tenantName] ); } /** * Delete a tenant request row entirely. Used when a customer re-submits * after their previous tenant was deleted by admin. */ export async function deleteTenantRequest(id: string): Promise { await ensureSchema(); await getPool().query("DELETE FROM tenant_requests WHERE id = $1", [id]); } /** * Reconcile the portal's tenant_requests table against actual cluster * state. Three passes, walking only rows with `tenant_name` set: * * 1. provisioning → active: when a tenant CR's phase reaches Ready * or Running, the portal flips the row to active so the * "provisioning…" card transitions into the running tenant view. * * 2. active/provisioning → deleted: when the corresponding CR no * longer exists in the cluster (404), or is mid-deletion (has * metadata.deletionTimestamp set), the row gets flipped to * `deleted`. The DB is otherwise blind to operator-initiated * deletions — when the 60-day TTL fires (Bug 37b) and the * operator deletes a suspended tenant, the portal would happily * keep showing the "Your assistant is ready!" card forever. * Without this reconciliation the dashboard drifts from reality. * * 3. pending resume → cancelled: when a pending resume request's * tenant is no longer suspended (admin resumed it directly, * tenant was deleted, or it was never suspended in the first * place), the request is moot. Flip to 'cancelled' so the * pending-resume unique index releases for any future genuine * resume request. We pick `cancelled` over `rejected` because * the customer didn't do anything wrong — circumstances just * changed. * * Errors are tolerated per-row: a transient API hiccup on one tenant * shouldn't fail the whole sweep. Skipped rows get retried next call. * * Slice 3 note: with multi-tenant per org, this iterates each row * individually (keyed by its own tenant_name), so multiple in-flight * tenants in the same org are handled correctly. */ export async function syncProvisioningStatuses(): Promise { await ensureSchema(); // Active+provisioning rows: status reflects "the tenant should // exist and be running". // Pending resume rows: status reflects "the tenant is suspended, // awaiting reactivation". // Both need cluster-side validation; we fetch them in one query // and dispatch on (status, request_type). const result = await getPool().query( `SELECT * FROM tenant_requests WHERE tenant_name IS NOT NULL AND ( status IN ('provisioning', 'active') OR (status = 'pending' AND request_type = 'resume') )` ); for (const row of result.rows) { const mapped = mapRow(row); if (!mapped.tenantName) continue; let tenant: Awaited> = null; try { tenant = await getTenant(mapped.tenantName); } catch { // Transient API error — skip this row, retry on next sweep. continue; } // Pending resume request: validity hinges on tenant being suspended. if ( mapped.status === "pending" && mapped.requestType === "resume" ) { // Tenant doesn't exist or is being deleted: cancel the resume // request (it can never be fulfilled). Don't fall through to // the "deleted" branch below — that would also flip the // provision row, which is the right thing for a CR-level // deletion but we want this resume row specifically resolved // here. if (!tenant || tenant.metadata.deletionTimestamp) { await updateTenantRequestStatus(mapped.id, "cancelled"); continue; } // Tenant is no longer suspended: the request is moot. // Cancel it (the customer didn't do anything wrong; the // condition the request was about no longer applies). if (!tenant.spec.suspend) { await updateTenantRequestStatus(mapped.id, "cancelled"); continue; } // Tenant still suspended, request still relevant. Leave as-is. continue; } // Active or provisioning row: CR gone, or mid-deletion. Flip the // row to 'deleted'. `markTenantRequestDeletedByTenantName` flips // every row with this tenant_name (provision + any resume rows), // which is the right thing for a CR-level deletion. if (!tenant || tenant.metadata.deletionTimestamp) { await markTenantRequestDeletedByTenantName(mapped.tenantName); continue; } // CR exists and is healthy. Promote provisioning → active when // the operator reports the tenant has reached steady state. // Keep `active` rows on `active` regardless of phase — a // temporarily-Reconfiguring tenant is still active from the // portal's billing/visibility perspective. if ( mapped.status === "provisioning" && (tenant.status?.phase === "Ready" || tenant.status?.phase === "Running") ) { await updateTenantRequestStatus(mapped.id, "active"); } } } // --------------------------------------------------------------------------- // Row mapper // --------------------------------------------------------------------------- function mapRow(row: any): TenantRequest { return { id: row.id, zitadelOrgId: row.zitadel_org_id, zitadelUserId: row.zitadel_user_id, companyName: row.company_name, instanceName: row.instance_name ?? null, contactName: row.contact_name, contactEmail: row.contact_email, agentName: row.agent_name, soulMd: row.soul_md, agentsMd: row.agents_md ?? null, packages: row.packages ?? [], billingAddress: row.billing_address ?? {}, billingNotes: row.billing_notes, customerNotes: row.customer_notes ?? null, status: row.status as TenantRequestStatus, adminNotes: row.admin_notes, tenantName: row.tenant_name, setupInvoiceId: row.setup_invoice_id ?? null, channelUsers: (row.channel_users ?? {}) as Record, encryptedSecrets: row.encrypted_secrets ?? null, isPersonal: row.is_personal ?? false, dismissedAt: row.dismissed_at?.toISOString?.() ?? row.dismissed_at ?? null, requestType: (row.request_type ?? "provision") as | "provision" | "resume", createdAt: row.created_at?.toISOString?.() ?? row.created_at, updatedAt: row.updated_at?.toISOString?.() ?? row.updated_at, }; } // --------------------------------------------------------------------------- // Bug 35: org-scoped billing // --------------------------------------------------------------------------- function rowToOrgBilling(row: any): OrgBilling { return { zitadelOrgId: row.zitadel_org_id, companyName: row.company_name, contactName: row.contact_name ?? null, streetAddress: row.street_address, postalCode: row.postal_code, city: row.city, country: row.country, vatNumber: row.vat_number ?? null, billingEmail: row.billing_email, notes: row.notes ?? null, createdAt: row.created_at?.toISOString?.() ?? row.created_at, updatedAt: row.updated_at?.toISOString?.() ?? row.updated_at, }; } /** * Fetch org billing if it exists. Returns null when the org has never * captured billing — that's the signal the wizard uses to know * whether to render the inline billing step on the first tenant * request. */ export async function getOrgBilling( zitadelOrgId: string ): Promise { await ensureSchema(); const result = await getPool().query( "SELECT * FROM org_billing WHERE zitadel_org_id = $1", [zitadelOrgId] ); return result.rows.length > 0 ? rowToOrgBilling(result.rows[0]) : null; } /** * Insert or update org billing. Single function for both because the * UI flow makes the "first time vs editing" distinction in a single * settings page that doesn't need to know which one it's doing. * * VAT-required-for-companies isn't enforced here — that's an API * concern (the API knows whether the caller is a company org). * Keeping the DB layer dumb. */ export async function upsertOrgBilling( data: Omit ): Promise { await ensureSchema(); const result = await getPool().query( `INSERT INTO org_billing ( zitadel_org_id, company_name, contact_name, street_address, postal_code, city, country, vat_number, billing_email, notes ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (zitadel_org_id) DO UPDATE SET company_name = EXCLUDED.company_name, contact_name = EXCLUDED.contact_name, street_address = EXCLUDED.street_address, postal_code = EXCLUDED.postal_code, city = EXCLUDED.city, country = EXCLUDED.country, vat_number = EXCLUDED.vat_number, billing_email = EXCLUDED.billing_email, notes = EXCLUDED.notes, updated_at = now() RETURNING *`, [ data.zitadelOrgId, data.companyName, data.contactName ?? null, data.streetAddress, data.postalCode, data.city, data.country, data.vatNumber ?? null, data.billingEmail, data.notes ?? null, ] ); return rowToOrgBilling(result.rows[0]); } // --------------------------------------------------------------------------- // Slice 6: tenant ↔ user assignments // --------------------------------------------------------------------------- /** * One assignment grants one user visibility into one tenant. Returned * shape is the camelCase mirror of the Postgres row. */ export interface TenantUserAssignment { tenantName: string; zitadelOrgId: string; zitadelUserId: string; assignedAt: string; assignedBy: string; } function mapAssignmentRow(row: any): TenantUserAssignment { return { tenantName: row.tenant_name, zitadelOrgId: row.zitadel_org_id, zitadelUserId: row.zitadel_user_id, assignedAt: row.assigned_at?.toISOString?.() ?? row.assigned_at, assignedBy: row.assigned_by, }; } /** * Returns the set of tenant CR names assigned to the given user. * * Hot path on every read for `user`-role customers, so it's intentionally * a single indexed lookup. The returned array is small (a handful of * tenants per user); callers usually wrap it in a Set. * * Note: this does NOT cross-check the org id — assignments are per-user, * and a user's org context comes from their JWT. If a user's * authorization is revoked at the ZITADEL level, their JWT ceases to * carry the customer role and they can't reach the dashboard at all; * the orphan rows are cleaned up the next time their org membership * is re-evaluated (Slice 7's removeAllAssignmentsForUser). */ export async function listTenantAssignmentsForUser( userId: string ): Promise { await ensureSchema(); const result = await getPool().query<{ tenant_name: string }>( "SELECT tenant_name FROM tenant_user_assignments WHERE zitadel_user_id = $1", [userId] ); return result.rows.map((r) => r.tenant_name); } /** * Returns all assignments for a single tenant. Used by the team UI * (Slice 7) to render "who has access to this instance". Includes * `assignedBy` and `assignedAt` for audit display. */ export async function listAssignmentsForTenant( tenantName: string ): Promise { await ensureSchema(); const result = await getPool().query( "SELECT * FROM tenant_user_assignments WHERE tenant_name = $1 ORDER BY assigned_at DESC", [tenantName] ); return result.rows.map(mapAssignmentRow); } /** * Grant a user access to a tenant. Idempotent — a duplicate INSERT * is silently ignored via ON CONFLICT, and the existing * `assigned_at`/`assigned_by` are preserved (we don't update them on * re-assign). * * Caller is responsible for verifying: * - The actor (`assignedBy`) holds owner/platform role in `orgId`. * - The target user (`userId`) is actually a member of the same * ZITADEL org. We don't validate this here — the team UI fetches * the org's user list from ZITADEL and selects from it. * - The tenant CR exists and is labelled with the same `orgId`. */ export async function addTenantAssignment(params: { tenantName: string; orgId: string; userId: string; assignedBy: string; }): Promise { await ensureSchema(); await getPool().query( `INSERT INTO tenant_user_assignments (tenant_name, zitadel_org_id, zitadel_user_id, assigned_by) VALUES ($1, $2, $3, $4) ON CONFLICT (tenant_name, zitadel_user_id) DO NOTHING`, [params.tenantName, params.orgId, params.userId, params.assignedBy] ); } /** * Revoke a user's access to a tenant. No-op if the row doesn't exist. */ export async function removeTenantAssignment( tenantName: string, userId: string ): Promise { await ensureSchema(); await getPool().query( "DELETE FROM tenant_user_assignments WHERE tenant_name = $1 AND zitadel_user_id = $2", [tenantName, userId] ); } /** * Cascade cleanup: drop ALL assignments for a tenant when the tenant * itself is deleted. Called from the admin delete handler. * * Without this, an orphan row would stick around forever — a future * tenant with the same name (won't happen given Slice 1's UUID-suffix * naming, but defense in depth) would inherit the old assignments. */ export async function removeAllAssignmentsForTenant( tenantName: string ): Promise { await ensureSchema(); await getPool().query( "DELETE FROM tenant_user_assignments WHERE tenant_name = $1", [tenantName] ); } /** * Cascade cleanup: drop ALL assignments for a user within a specific * org. Used by Slice 7's "remove member" flow when an owner kicks a * user out of the org. Scoped by `orgId` so a user with assignments in * org A doesn't lose them when removed from org B (multi-org users * exist when a person registers personally and is also invited to a * company). */ export async function removeAllAssignmentsForUser( orgId: string, userId: string ): Promise { await ensureSchema(); await getPool().query( "DELETE FROM tenant_user_assignments WHERE zitadel_org_id = $1 AND zitadel_user_id = $2", [orgId, userId] ); } // --------------------------------------------------------------------------- // Feature 5: support tickets // --------------------------------------------------------------------------- function rowToSupportTicket(row: any): SupportTicket { return { id: row.id, zitadelOrgId: row.zitadel_org_id, zitadelUserId: row.zitadel_user_id, title: row.title, description: row.description, category: row.category as SupportTicketCategory, status: row.status as SupportTicketStatus, contactEmail: row.contact_email, contactName: row.contact_name, createdAt: row.created_at?.toISOString?.() ?? row.created_at, updatedAt: row.updated_at?.toISOString?.() ?? row.updated_at, }; } function rowToSupportTicketComment(row: any): SupportTicketComment { return { id: row.id, ticketId: row.ticket_id, authorUserId: row.author_user_id, authorName: row.author_name, authorKind: row.author_kind as SupportTicketCommentAuthorKind, body: row.body, createdAt: row.created_at?.toISOString?.() ?? row.created_at, }; } /** * Create a new support ticket. The contact_name/contact_email are * snapshotted from the session at creation time — see SupportTicket * doc for why. */ export async function createSupportTicket(params: { zitadelOrgId: string; zitadelUserId: string; title: string; description: string; category: SupportTicketCategory; contactName: string; contactEmail: string; }): Promise { await ensureSchema(); const result = await getPool().query( `INSERT INTO support_tickets ( zitadel_org_id, zitadel_user_id, title, description, category, contact_name, contact_email ) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING *`, [ params.zitadelOrgId, params.zitadelUserId, params.title, params.description, params.category, params.contactName, params.contactEmail, ] ); return rowToSupportTicket(result.rows[0]); } /** Tickets created by a single user, newest activity first. */ export async function listSupportTicketsForUser( zitadelUserId: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM support_tickets WHERE zitadel_user_id = $1 ORDER BY updated_at DESC`, [zitadelUserId] ); return result.rows.map(rowToSupportTicket); } /** * Admin queue. Returns every ticket across all users/orgs, newest * activity first. Pending tickets (open/reopened) bubble to the top * by virtue of recent activity, but the API doesn't sort by status — * the admin UI handles filtering and bucketing. */ export async function listAllSupportTickets(): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM support_tickets ORDER BY updated_at DESC` ); return result.rows.map(rowToSupportTicket); } export async function getSupportTicketById( id: string ): Promise { await ensureSchema(); const result = await getPool().query( "SELECT * FROM support_tickets WHERE id = $1", [id] ); return result.rows.length > 0 ? rowToSupportTicket(result.rows[0]) : null; } export async function listCommentsForTicket( ticketId: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM support_ticket_comments WHERE ticket_id = $1 ORDER BY created_at`, [ticketId] ); return result.rows.map(rowToSupportTicketComment); } /** * Insert a comment. Bumps the parent ticket's `updated_at` so the * activity sort orders work — done in a transaction so the two are * atomic from any concurrent reader's perspective. * * Caller is responsible for status auto-bumping (e.g. customer * replying to a `waiting_for_customer` ticket → `in_progress`); the * DB layer just writes what it's told. */ export async function createSupportTicketComment(params: { ticketId: string; authorUserId: string; authorName: string; authorKind: SupportTicketCommentAuthorKind; body: string; }): Promise { await ensureSchema(); const client = await getPool().connect(); try { await client.query("BEGIN"); const inserted = await client.query( `INSERT INTO support_ticket_comments ( ticket_id, author_user_id, author_name, author_kind, body ) VALUES ($1, $2, $3, $4, $5) RETURNING *`, [ params.ticketId, params.authorUserId, params.authorName, params.authorKind, params.body, ] ); await client.query( "UPDATE support_tickets SET updated_at = now() WHERE id = $1", [params.ticketId] ); await client.query("COMMIT"); return rowToSupportTicketComment(inserted.rows[0]); } catch (e) { await client.query("ROLLBACK"); throw e; } finally { client.release(); } } /** * Update mutable fields on a ticket. Only category and status are * mutable; title/description are frozen post-creation. Returns the * updated row so callers can email the right contact_email * afterwards. */ export async function updateSupportTicket( id: string, changes: { status?: SupportTicketStatus; category?: SupportTicketCategory } ): Promise { await ensureSchema(); const sets: string[] = ["updated_at = now()"]; const values: any[] = [id]; let idx = 2; if (changes.status !== undefined) { sets.push(`status = $${idx}`); values.push(changes.status); idx++; } if (changes.category !== undefined) { sets.push(`category = $${idx}`); values.push(changes.category); idx++; } // No-op early exit. Without an actual change we still want // updated_at refreshed if the caller asked for one, but if they // passed neither field there's nothing to do. if (sets.length === 1) return getSupportTicketById(id); const result = await getPool().query( `UPDATE support_tickets SET ${sets.join(", ")} WHERE id = $1 RETURNING *`, values ); return result.rows.length > 0 ? rowToSupportTicket(result.rows[0]) : null; } // --------------------------------------------------------------------------- // Billing — Phase 1: pricing, lifecycle, and skill events // --------------------------------------------------------------------------- // // All helpers are intentionally narrow CRUD — no business logic. // Higher-level operations (compute monthly proration, build an // invoice from raw signals) belong in a future lib/billing.ts // introduced by Phase 2. // // Hook callers should treat every write here as best-effort: if // recording a lifecycle/event row fails, log and continue — never // fail the underlying K8s mutation. Drift is corrected by the // idempotent backfill helper at the bottom of this section. import type { PlatformPricing, SkillPricing, OrgBillingConfig, TenantBillingLifecycle, TenantSkillEvent, TenantSuspensionEvent, } from "@/types"; // --- platform_pricing ------------------------------------------------------ function rowToPlatformPricing(row: any): PlatformPricing { return { tenantMonthlyFeeChf: Number(row.tenant_monthly_fee_chf), tenantSetupFeeChf: Number(row.tenant_setup_fee_chf), threemaMessageChf: Number(row.threema_message_chf), vatRateChli: Number(row.vat_rate_chli), updatedAt: row.updated_at?.toISOString?.() ?? row.updated_at, }; } /** * Read the single-row platform pricing config. The migration seeds * the row with zeros on first run, so this never returns null on a * properly migrated database. */ export async function getPlatformPricing(): Promise { await ensureSchema(); const result = await getPool().query( "SELECT * FROM platform_pricing WHERE id = 1" ); if (result.rows.length === 0) { // Defensive: re-seed if the row went missing (manual DELETE, // partial restore, etc.). The migration's INSERT ON CONFLICT // should have ensured this, but a stale row state shouldn't // crash the helper. await getPool().query( "INSERT INTO platform_pricing (id) VALUES (1) ON CONFLICT DO NOTHING" ); const retry = await getPool().query( "SELECT * FROM platform_pricing WHERE id = 1" ); return rowToPlatformPricing(retry.rows[0]); } return rowToPlatformPricing(result.rows[0]); } /** * Update one or more pricing fields. Pass only the fields to change; * unspecified fields are left as-is. `updated_at` is always refreshed. */ export async function updatePlatformPricing(changes: { tenantMonthlyFeeChf?: number; tenantSetupFeeChf?: number; threemaMessageChf?: number; vatRateChli?: number; }): Promise { await ensureSchema(); const sets: string[] = ["updated_at = now()"]; const values: any[] = []; let idx = 1; if (changes.tenantMonthlyFeeChf !== undefined) { sets.push(`tenant_monthly_fee_chf = $${idx++}`); values.push(changes.tenantMonthlyFeeChf); } if (changes.tenantSetupFeeChf !== undefined) { sets.push(`tenant_setup_fee_chf = $${idx++}`); values.push(changes.tenantSetupFeeChf); } if (changes.threemaMessageChf !== undefined) { sets.push(`threema_message_chf = $${idx++}`); values.push(changes.threemaMessageChf); } if (changes.vatRateChli !== undefined) { sets.push(`vat_rate_chli = $${idx++}`); values.push(changes.vatRateChli); } // Only updated_at would change → still execute so the caller's // intent ("touch the row") isn't silently dropped, but make it // an explicit no-op if no fields were provided. if (sets.length === 1 && values.length === 0) { return getPlatformPricing(); } const result = await getPool().query( `UPDATE platform_pricing SET ${sets.join(", ")} WHERE id = 1 RETURNING *`, values ); return rowToPlatformPricing(result.rows[0]); } // --- skill_pricing --------------------------------------------------------- function rowToSkillPricing(row: any): SkillPricing { return { skillId: row.skill_id, dailyPriceChf: Number(row.daily_price_chf), setupFeeChf: Number(row.setup_fee_chf ?? 0), createdAt: row.created_at?.toISOString?.() ?? row.created_at, updatedAt: row.updated_at?.toISOString?.() ?? row.updated_at, }; } export async function listSkillPricing(): Promise { await ensureSchema(); const result = await getPool().query( "SELECT * FROM skill_pricing ORDER BY skill_id" ); return result.rows.map(rowToSkillPricing); } export async function getSkillPricing( skillId: string ): Promise { await ensureSchema(); const result = await getPool().query( "SELECT * FROM skill_pricing WHERE skill_id = $1", [skillId] ); return result.rows.length > 0 ? rowToSkillPricing(result.rows[0]) : null; } /** * Upsert pricing for a package. `dailyPriceChf` activates * usage-based billing (one billable unit per UTC day the package * was enabled). `setupFeeChf` is a one-time charge emitted on the * first invoice line for any given (tenant, skill). * * Both fields are required so admin must consciously set 0 to mean * "no setup fee" rather than accidentally inheriting an old value * from a partial update. */ export async function setSkillPricing( skillId: string, dailyPriceChf: number, setupFeeChf: number ): Promise { await ensureSchema(); const result = await getPool().query( `INSERT INTO skill_pricing (skill_id, daily_price_chf, setup_fee_chf) VALUES ($1, $2, $3) ON CONFLICT (skill_id) DO UPDATE SET daily_price_chf = EXCLUDED.daily_price_chf, setup_fee_chf = EXCLUDED.setup_fee_chf, updated_at = now() RETURNING *`, [skillId, dailyPriceChf, setupFeeChf] ); return rowToSkillPricing(result.rows[0]); } /** * Remove the price for a package. Day-tracking events continue to * be recorded (cheap append-only) but the package becomes free * effective immediately. Historical invoices already issued are * unaffected. */ export async function removeSkillPricing(skillId: string): Promise { await ensureSchema(); await getPool().query("DELETE FROM skill_pricing WHERE skill_id = $1", [ skillId, ]); } // --- tenant_billing_lifecycle --------------------------------------------- function rowToTenantBillingLifecycle(row: any): TenantBillingLifecycle { return { tenantName: row.tenant_name, zitadelOrgId: row.zitadel_org_id, createdAt: row.created_at?.toISOString?.() ?? row.created_at, deletedAt: row.deleted_at?.toISOString?.() ?? row.deleted_at ?? null, }; } export async function getTenantBillingLifecycle( tenantName: string ): Promise { await ensureSchema(); const result = await getPool().query( "SELECT * FROM tenant_billing_lifecycle WHERE tenant_name = $1", [tenantName] ); return result.rows.length > 0 ? rowToTenantBillingLifecycle(result.rows[0]) : null; } /** * Record a tenant's creation for billing purposes. Idempotent on * `tenant_name` — re-running with a different created_at is a no-op * so re-approvals don't move the proration anchor. Pair with * recordInitialSkillEvents() at the same call site. */ export async function recordTenantCreated( tenantName: string, zitadelOrgId: string, createdAt?: Date ): Promise { await ensureSchema(); await getPool().query( `INSERT INTO tenant_billing_lifecycle (tenant_name, zitadel_org_id, created_at) VALUES ($1, $2, COALESCE($3::timestamptz, now())) ON CONFLICT (tenant_name) DO NOTHING`, [tenantName, zitadelOrgId, createdAt ?? null] ); } /** * Stamp deletion timestamp. Idempotent — calling twice keeps the * first deletion's timestamp. Pair with closing-skill-disabled * events at the same call site if you want the events log to * reflect that everything is now off. * * We deliberately don't delete the row — the lifecycle record is * needed for any final invoice covering the deletion month. */ export async function recordTenantDeleted( tenantName: string, deletedAt?: Date ): Promise { await ensureSchema(); await getPool().query( `UPDATE tenant_billing_lifecycle SET deleted_at = COALESCE(deleted_at, COALESCE($2::timestamptz, now())) WHERE tenant_name = $1`, [tenantName, deletedAt ?? null] ); } // --- tenant_skill_events -------------------------------------------------- function rowToTenantSkillEvent(row: any): TenantSkillEvent { return { id: String(row.id), tenantName: row.tenant_name, zitadelOrgId: row.zitadel_org_id, skillId: row.skill_id, eventKind: row.event_kind as "enabled" | "disabled", occurredAt: row.occurred_at?.toISOString?.() ?? row.occurred_at, }; } /** * Append a batch of enabled/disabled events. Single multi-row INSERT * so all rows share an effectively-identical occurred_at when * `occurredAt` is omitted (otherwise they'd be a few microseconds * apart, which would skew the "is this skill on for day D" computation * around midnight UTC). * * Empty arrays are a no-op (no SQL fired). */ export async function recordSkillEvents( tenantName: string, zitadelOrgId: string, added: string[], removed: string[], occurredAt?: Date ): Promise { if (added.length === 0 && removed.length === 0) return; await ensureSchema(); const at = occurredAt ?? new Date(); // Build placeholders. Each row uses 5 placeholders. const values: any[] = []; const rows: string[] = []; let idx = 1; for (const skillId of added) { rows.push(`($${idx++}, $${idx++}, $${idx++}, 'enabled', $${idx++})`); values.push(tenantName, zitadelOrgId, skillId, at); } for (const skillId of removed) { rows.push(`($${idx++}, $${idx++}, $${idx++}, 'disabled', $${idx++})`); values.push(tenantName, zitadelOrgId, skillId, at); } await getPool().query( `INSERT INTO tenant_skill_events (tenant_name, zitadel_org_id, skill_id, event_kind, occurred_at) VALUES ${rows.join(", ")}`, values ); } /** * Read events for a tenant within a half-open interval — `from` * inclusive, `to` exclusive. Used by Phase 2's billing computation * to collapse to billable days. Returned in chronological order. */ export async function listSkillEventsForTenant( tenantName: string, from: Date, to: Date ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM tenant_skill_events WHERE tenant_name = $1 AND occurred_at >= $2 AND occurred_at < $3 ORDER BY occurred_at, id`, [tenantName, from, to] ); return result.rows.map(rowToTenantSkillEvent); } /** * Read the most recent event for each (tenant, skill) pair as of * a moment in time. Phase 2 uses this to know which skills were * enabled at the start of a billing window. * * Implemented with DISTINCT ON for a single round-trip; the * (tenant_name, skill_id, occurred_at) index supports the sort. */ export async function getSkillStateAt( tenantName: string, asOf: Date ): Promise> { await ensureSchema(); const result = await getPool().query( `SELECT DISTINCT ON (skill_id) skill_id, event_kind FROM tenant_skill_events WHERE tenant_name = $1 AND occurred_at <= $2 ORDER BY skill_id, occurred_at DESC, id DESC`, [tenantName, asOf] ); const out: Record = {}; for (const row of result.rows) { out[row.skill_id] = row.event_kind as "enabled" | "disabled"; } return out; } // --- tenant_suspension_events --------------------------------------------- function rowToTenantSuspensionEvent(row: any): TenantSuspensionEvent { return { id: String(row.id), tenantName: row.tenant_name, zitadelOrgId: row.zitadel_org_id, eventKind: row.event_kind as "suspended" | "resumed", occurredAt: row.occurred_at?.toISOString?.() ?? row.occurred_at, }; } export async function recordSuspensionEvent( tenantName: string, zitadelOrgId: string, eventKind: "suspended" | "resumed", occurredAt?: Date ): Promise { await ensureSchema(); await getPool().query( `INSERT INTO tenant_suspension_events (tenant_name, zitadel_org_id, event_kind, occurred_at) VALUES ($1, $2, $3, COALESCE($4::timestamptz, now()))`, [tenantName, zitadelOrgId, eventKind, occurredAt ?? null] ); } export async function listSuspensionEventsForTenant( tenantName: string, from: Date, to: Date ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM tenant_suspension_events WHERE tenant_name = $1 AND occurred_at >= $2 AND occurred_at < $3 ORDER BY occurred_at, id`, [tenantName, from, to] ); return result.rows.map(rowToTenantSuspensionEvent); } // --- org_billing_config --------------------------------------------------- function rowToOrgBillingConfig(row: any): OrgBillingConfig { return { zitadelOrgId: row.zitadel_org_id, payByInvoice: row.pay_by_invoice, stripeCustomerId: row.stripe_customer_id ?? null, autoInvoiceEnabled: row.auto_invoice_enabled, autoRemindersEnabled: row.auto_reminders_enabled, stripeDefaultPaymentMethodId: row.stripe_default_payment_method_id ?? null, stripePmBrand: row.stripe_pm_brand ?? null, stripePmLast4: row.stripe_pm_last4 ?? null, stripePmExpMonth: row.stripe_pm_exp_month != null ? Number(row.stripe_pm_exp_month) : null, stripePmExpYear: row.stripe_pm_exp_year != null ? Number(row.stripe_pm_exp_year) : null, autoChargeEnabled: row.auto_charge_enabled === undefined ? true : !!row.auto_charge_enabled, createdAt: row.created_at?.toISOString?.() ?? row.created_at, updatedAt: row.updated_at?.toISOString?.() ?? row.updated_at, }; } /** * Get config for an org, auto-creating with defaults if missing. * Returning the row (vs null) simplifies callers: the gate logic in * Phase 4 ("approve only if pay_by_invoice OR has card") doesn't * need a "what if no row" branch. * * Defaults are baked into the table's column defaults, so the * INSERT here only needs the primary key. */ export async function getOrgBillingConfig( zitadelOrgId: string ): Promise { await ensureSchema(); await getPool().query( `INSERT INTO org_billing_config (zitadel_org_id) VALUES ($1) ON CONFLICT (zitadel_org_id) DO NOTHING`, [zitadelOrgId] ); const result = await getPool().query( "SELECT * FROM org_billing_config WHERE zitadel_org_id = $1", [zitadelOrgId] ); return rowToOrgBillingConfig(result.rows[0]); } export async function updateOrgBillingConfig( zitadelOrgId: string, changes: { payByInvoice?: boolean; stripeCustomerId?: string | null; autoInvoiceEnabled?: boolean; autoRemindersEnabled?: boolean; } ): Promise { await ensureSchema(); // Ensure row exists first — mirrors getOrgBillingConfig's // auto-create. await getPool().query( `INSERT INTO org_billing_config (zitadel_org_id) VALUES ($1) ON CONFLICT (zitadel_org_id) DO NOTHING`, [zitadelOrgId] ); const sets: string[] = ["updated_at = now()"]; const values: any[] = [zitadelOrgId]; let idx = 2; if (changes.payByInvoice !== undefined) { sets.push(`pay_by_invoice = $${idx++}`); values.push(changes.payByInvoice); } if (changes.stripeCustomerId !== undefined) { sets.push(`stripe_customer_id = $${idx++}`); values.push(changes.stripeCustomerId); } if (changes.autoInvoiceEnabled !== undefined) { sets.push(`auto_invoice_enabled = $${idx++}`); values.push(changes.autoInvoiceEnabled); } if (changes.autoRemindersEnabled !== undefined) { sets.push(`auto_reminders_enabled = $${idx++}`); values.push(changes.autoRemindersEnabled); } const result = await getPool().query( `UPDATE org_billing_config SET ${sets.join(", ")} WHERE zitadel_org_id = $1 RETURNING *`, values ); return rowToOrgBillingConfig(result.rows[0]); } // --- Backfill ------------------------------------------------------------- // // Idempotent one-time bootstrap for tenants that existed before // Phase 1 shipped. Phase 2 wraps this in an admin endpoint; for now // it can be invoked from a one-off node script. // // For each PiecedTenant CR: // - If no tenant_billing_lifecycle row exists, insert one with // created_at = metadata.creationTimestamp. // - If no tenant_skill_events row exists for the tenant, insert // 'enabled' events at the same timestamp for every package // currently in spec.packages. // Tenants suspended at backfill time get a 'suspended' event at // status.suspendedAt (operator-stamped); resumed tenants get nothing // extra (default state is "running"). /** * Returns a count of lifecycle rows inserted and skill events * recorded — both expected to be zero on a second run. */ export async function backfillTenantBillingLifecycle(tenants: { name: string; zitadelOrgId: string; createdAt: Date; packages: string[]; suspendedAt: Date | null; }[]): Promise<{ lifecycleInserted: number; eventsInserted: number; suspensionEventsInserted: number }> { await ensureSchema(); let lifecycleInserted = 0; let eventsInserted = 0; let suspensionEventsInserted = 0; for (const t of tenants) { // Lifecycle row — idempotent. const existing = await getTenantBillingLifecycle(t.name); if (!existing) { await recordTenantCreated(t.name, t.zitadelOrgId, t.createdAt); lifecycleInserted++; } // Initial skill events — only if the tenant has zero events at // all. We don't want to add to an active event stream. const eventsRow = await getPool().query( "SELECT 1 FROM tenant_skill_events WHERE tenant_name = $1 LIMIT 1", [t.name] ); if (eventsRow.rows.length === 0 && t.packages.length > 0) { await recordSkillEvents( t.name, t.zitadelOrgId, t.packages, [], t.createdAt ); eventsInserted += t.packages.length; } // Suspension state — only if the tenant has zero suspension // events. If it's currently suspended, record one 'suspended' // event at the operator-stamped time so proration sees it. const susRow = await getPool().query( "SELECT 1 FROM tenant_suspension_events WHERE tenant_name = $1 LIMIT 1", [t.name] ); if (susRow.rows.length === 0 && t.suspendedAt) { await recordSuspensionEvent( t.name, t.zitadelOrgId, "suspended", t.suspendedAt ); suspensionEventsInserted++; } } return { lifecycleInserted, eventsInserted, suspensionEventsInserted }; } // --------------------------------------------------------------------------- // Billing — Phase 2: invoice persistence // --------------------------------------------------------------------------- // // Invoice creation is intentionally a single transaction: allocate // number, INSERT invoice, INSERT lines, store PDF — all-or-nothing. // The Postgres invoice_number_counters row lock serializes // concurrent allocators for the same year, producing gapless // numbering even under bursts. import type { Invoice, InvoiceBillingSnapshot, InvoiceDetail, InvoiceDraft, InvoiceLine, InvoiceStatus, CreditNote, CreditNoteKind, InvoiceRefund, CustomInvoiceDraftPayload, InvoiceDraftRecord, } from "@/types"; function rowToInvoice(row: any): Invoice { // Phase 8: period_start / period_end are nullable for custom // invoices (no fixed billing period). Convert defensively whether // the driver returned a string or Date. const periodStartIso = row.period_start == null ? null : typeof row.period_start === "string" ? row.period_start : row.period_start.toISOString().split("T")[0]; const periodEndIso = row.period_end == null ? null : typeof row.period_end === "string" ? row.period_end : row.period_end.toISOString().split("T")[0]; return { id: row.id, invoiceNumber: row.invoice_number, zitadelOrgId: row.zitadel_org_id, source: (row.source ?? "auto") as "auto" | "custom", periodStart: periodStartIso, periodEnd: periodEndIso, issuedAt: row.issued_at?.toISOString?.() ?? row.issued_at, dueAt: typeof row.due_at === "string" ? row.due_at : row.due_at.toISOString().split("T")[0], subtotalChf: Number(row.subtotal_chf), vatRate: Number(row.vat_rate), vatAmountChf: Number(row.vat_amount_chf), totalChf: Number(row.total_chf), status: row.status as InvoiceStatus, locale: row.locale ?? "de", paymentMethod: row.payment_method, billingSnapshot: row.billing_snapshot as InvoiceBillingSnapshot, stripePaymentIntentId: row.stripe_payment_intent_id ?? null, pdfFilename: row.pdf_filename ?? null, hasPdf: row.has_pdf ?? row.pdf_data !== null, adminNotes: row.admin_notes ?? null, paidAt: row.paid_at?.toISOString?.() ?? row.paid_at ?? null, paidBy: row.paid_by ?? null, paidMethodDetail: row.paid_method_detail ?? null, voidReason: row.void_reason ?? null, voidedAt: row.voided_at?.toISOString?.() ?? row.voided_at ?? null, voidedBy: row.voided_by ?? null, refundedTotalChf: row.refunded_total_chf != null ? Number(row.refunded_total_chf) : 0, createdAt: row.created_at?.toISOString?.() ?? row.created_at, }; } function rowToInvoiceLine(row: any): InvoiceLine { return { id: row.id, invoiceId: row.invoice_id, tenantName: row.tenant_name ?? null, kind: row.kind, description: row.description, quantity: Number(row.quantity), unitLabel: row.unit_label ?? null, unitPriceChf: Number(row.unit_price_chf), amountChf: Number(row.amount_chf), metadata: row.metadata ?? null, displayOrder: row.display_order, }; } // Standard SELECT projection that includes a cheap NOT-NULL probe of // pdf_data instead of pulling the bytes themselves. Crucial for list // endpoints — a few KB per row across hundreds of invoices is wasted // network and memory. const INVOICE_LIST_COLUMNS = ` id, invoice_number, zitadel_org_id, period_start, period_end, issued_at, due_at, subtotal_chf, vat_rate, vat_amount_chf, total_chf, status, locale, payment_method, billing_snapshot, stripe_payment_intent_id, pdf_filename, admin_notes, paid_at, paid_by, paid_method_detail, void_reason, voided_at, voided_by, refunded_total_chf, source, created_at, (pdf_data IS NOT NULL) AS has_pdf `; /** * Persist a fully-computed invoice draft with its lines and PDF in * a single transaction. Allocates the year-scoped invoice number * inside the same transaction so a rollback restores the counter * (gapless guarantee). * * The caller is responsible for upstream validation: * - the (org, period) uniqueness (the unique index will reject * duplicates, but we return a clear error message rather than * leaking the constraint name) * - the draft's lines/totals are consistent (compute pipeline * ensures this) * * `pdfBuffer` is the rendered PDF bytes; pass null if PDF is * generated separately or stored in a side channel. For Phase 2 we * always render synchronously and pass the buffer here. */ export async function createInvoice( draft: InvoiceDraft, pdfBuffer: Buffer | null, pdfFilename: string | null ): Promise { await ensureSchema(); const pool = getPool(); const client = await pool.connect(); try { await client.query("BEGIN"); // Phase 8: pick the year for invoice-number allocation. // - auto invoices: use period_start (the original behaviour) // - custom invoices: use the override issued_at if set, else // the year of "now" // The sequence is shared across auto and custom — every invoice // gets a number from the same year-scoped counter so the audit // trail is gapless and sequential regardless of source. let year: number; if (draft.source === "custom") { const yearSource = draft.issuedAt ? draft.issuedAt.slice(0, 4) : new Date().getUTCFullYear().toString(); year = parseInt(yearSource, 10); } else if (draft.periodStart) { year = parseInt(draft.periodStart.slice(0, 4), 10); } else { year = new Date().getUTCFullYear(); } const counterResult = await client.query( `INSERT INTO invoice_number_counters (year, last_number) VALUES ($1, 1) ON CONFLICT (year) DO UPDATE SET last_number = invoice_number_counters.last_number + 1 RETURNING last_number`, [year] ); const seq = counterResult.rows[0].last_number; const invoiceNumber = `${year}-${String(seq).padStart(5, "0")}`; // Phase 8: optional override of issued_at (custom flow lets // admin backdate or future-date). Empty → now(); set → that // exact date. const source = draft.source ?? "auto"; const inv = await client.query( `INSERT INTO invoices ( invoice_number, zitadel_org_id, period_start, period_end, issued_at, due_at, subtotal_chf, vat_rate, vat_amount_chf, total_chf, status, locale, payment_method, billing_snapshot, pdf_data, pdf_filename, source ) VALUES ( $1, $2, $3::date, $4::date, COALESCE($5::timestamptz, now()), $6::date, $7, $8, $9, $10, 'open', $11, $12, $13::jsonb, $14, $15, $16 ) RETURNING ${INVOICE_LIST_COLUMNS}`, [ invoiceNumber, draft.zitadelOrgId, draft.periodStart, // may be null for custom draft.periodEnd, // may be null for custom draft.issuedAt ?? null, draft.dueAt, draft.subtotalChf, draft.vatRate, draft.vatAmountChf, draft.totalChf, draft.locale, draft.paymentMethod, JSON.stringify(draft.billingSnapshot), pdfBuffer, pdfFilename, source, ] ); const invoiceId = inv.rows[0].id; // Insert lines in batch — one INSERT statement is significantly // faster than per-line round-trips, which matters when an invoice // accumulates many ai_usage / skill_usage lines. if (draft.lines.length > 0) { const placeholders: string[] = []; const values: any[] = []; let idx = 1; for (const line of draft.lines) { placeholders.push( `($${idx++}, $${idx++}, $${idx++}, $${idx++}, $${idx++}, $${idx++}, $${idx++}, $${idx++}, $${idx++}::jsonb, $${idx++})` ); values.push( invoiceId, line.tenantName, line.kind, line.description, line.quantity, line.unitLabel, line.unitPriceChf, line.amountChf, line.metadata ? JSON.stringify(line.metadata) : null, line.displayOrder ); } await client.query( `INSERT INTO invoice_lines ( invoice_id, tenant_name, kind, description, quantity, unit_label, unit_price_chf, amount_chf, metadata, display_order ) VALUES ${placeholders.join(", ")}`, values ); } await client.query("COMMIT"); return rowToInvoice(inv.rows[0]); } catch (e: any) { await client.query("ROLLBACK").catch(() => undefined); // Translate the uniqueness violation into a user-friendly error. // 23505 = unique_violation in Postgres. The partial index is // named uniq_invoices_org_period_auto post-Phase-8; we match // both for back-compat with DBs that haven't run the migration // yet (the old name) or have run it (the new name). if ( e?.code === "23505" && /uniq_invoices_org_period/.test(e?.constraint ?? "") ) { const month = draft.periodStart?.slice(0, 7) ?? "this period"; throw new Error( `An invoice already exists for this org and billing period (${month}). ` + `Delete the existing invoice first if you want to regenerate.` ); } throw e; } finally { client.release(); } } export async function getInvoiceById(id: string): Promise { await ensureSchema(); const result = await getPool().query( `SELECT ${INVOICE_LIST_COLUMNS} FROM invoices WHERE id = $1`, [id] ); return result.rows.length > 0 ? rowToInvoice(result.rows[0]) : null; } export async function getInvoiceDetail( id: string ): Promise { const invoice = await getInvoiceById(id); if (!invoice) return null; const lines = await getPool().query( `SELECT * FROM invoice_lines WHERE invoice_id = $1 ORDER BY display_order, id`, [id] ); return { invoice, lines: lines.rows.map(rowToInvoiceLine) }; } /** * Phase 3 — customer-scoped lookup by human-readable invoice * number with ownership enforcement in a single query. The org * filter is part of the WHERE clause so a customer can't probe * another org's invoice numbers (which are sequential and easy * to guess) and get a different status code (404 vs 403) than * for their own — both miss-and-not-yours return null. * * Used by /api/billing/invoices/[invoiceNumber] and the * /billing/[invoiceNumber] customer page. */ export async function getInvoiceByNumberForOrg( invoiceNumber: string, zitadelOrgId: string ): Promise { await ensureSchema(); const head = await getPool().query( `SELECT ${INVOICE_LIST_COLUMNS} FROM invoices WHERE invoice_number = $1 AND zitadel_org_id = $2 LIMIT 1`, [invoiceNumber, zitadelOrgId] ); if (head.rows.length === 0) return null; const invoice = rowToInvoice(head.rows[0]); const lines = await getPool().query( `SELECT * FROM invoice_lines WHERE invoice_id = $1 ORDER BY display_order, id`, [invoice.id] ); return { invoice, lines: lines.rows.map(rowToInvoiceLine) }; } /** * Fetch the PDF bytes for an invoice. Returns null if no PDF was * stored (shouldn't happen in v1; defensive against partial state). */ export async function getInvoicePdf( id: string ): Promise<{ data: Buffer; filename: string } | null> { await ensureSchema(); const result = await getPool().query( "SELECT pdf_data, pdf_filename, invoice_number FROM invoices WHERE id = $1", [id] ); if (result.rows.length === 0) return null; const row = result.rows[0]; if (!row.pdf_data) return null; return { data: row.pdf_data, filename: row.pdf_filename ?? `${row.invoice_number}.pdf`, }; } /** * List invoices, optionally filtered. Used by the admin invoice * list page and (Phase 3) the customer-facing /billing page. * * The customer-facing call site MUST pass `zitadelOrgId` to scope * results — this helper does not enforce that itself. */ export async function listInvoices(filters: { zitadelOrgId?: string; status?: InvoiceStatus; /** Inclusive YYYY-MM filter on period_start. */ periodMonth?: string; limit?: number; } = {}): Promise { await ensureSchema(); const where: string[] = []; const values: any[] = []; let idx = 1; if (filters.zitadelOrgId) { where.push(`zitadel_org_id = $${idx++}`); values.push(filters.zitadelOrgId); } if (filters.status) { where.push(`status = $${idx++}`); values.push(filters.status); } if (filters.periodMonth) { where.push(`to_char(period_start, 'YYYY-MM') = $${idx++}`); values.push(filters.periodMonth); } const limit = filters.limit ?? 200; const sql = `SELECT ${INVOICE_LIST_COLUMNS} FROM invoices ` + (where.length > 0 ? `WHERE ${where.join(" AND ")} ` : "") + `ORDER BY issued_at DESC LIMIT $${idx}`; values.push(limit); const result = await getPool().query(sql, values); return result.rows.map(rowToInvoice); } /** * Sweep open invoices past their due date to `overdue` status. * Cheap idempotent UPDATE; safe to call on every admin list view * to keep status fresh without a dedicated cron. */ export async function syncOverdueInvoices(): Promise { await ensureSchema(); const result = await getPool().query( `UPDATE invoices SET status = 'overdue' WHERE status = 'open' AND due_at < CURRENT_DATE` ); return result.rowCount ?? 0; } export async function markInvoicePaid( id: string, opts: { paidBy: string; paidMethodDetail?: string | null; paidAt?: Date } ): Promise { await ensureSchema(); const result = await getPool().query( `UPDATE invoices SET status = 'paid', paid_at = COALESCE($2::timestamptz, now()), paid_by = $3, paid_method_detail = $4 WHERE id = $1 AND status IN ('open', 'overdue') RETURNING ${INVOICE_LIST_COLUMNS}`, [ id, opts.paidAt ?? null, opts.paidBy, opts.paidMethodDetail ?? null, ] ); return result.rows.length > 0 ? rowToInvoice(result.rows[0]) : null; } /** * Hard delete an invoice and its lines (CASCADE). * * This is the testing tool — Swiss bookkeeping requires immutable * invoices in production, but during pilot/testing we need to * iterate. The gap left in the invoice number sequence is * intentional and documented; no attempt to "recycle" numbers. * * Reminders (and their PDFs) cascade-delete via the FK. */ export async function deleteInvoice(id: string): Promise { await ensureSchema(); const result = await getPool().query( "DELETE FROM invoices WHERE id = $1 RETURNING id", [id] ); return (result.rowCount ?? 0) > 0; } /** * Has this tenant ever been billed a setup fee? Drives the * compute pipeline's "include setup line on first invoice" * decision. Looks at invoice_lines directly so it survives org * billing config edits. */ export async function tenantHasSetupFeeBilled( tenantName: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT 1 FROM invoice_lines WHERE tenant_name = $1 AND kind = 'tenant_setup' LIMIT 1`, [tenantName] ); return result.rows.length > 0; } /** * Has this (tenant, skill) pair already appeared on any prior * invoice line — either as setup or usage? Drives the per-skill * setup-fee gate. Same "first appearance" semantics as the tenant * setup fee: a previously-free skill that newly gets a setup fee * configured will trigger the fee on its next billed period. * * Uses metadata->>'skill_id' (which is what both skill_setup and * skill_usage lines store) rather than parsing description. */ export async function tenantSkillHasBeenBilled( tenantName: string, skillId: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT 1 FROM invoice_lines WHERE tenant_name = $1 AND kind IN ('skill_setup', 'skill_usage') AND metadata->>'skill_id' = $2 LIMIT 1`, [tenantName, skillId] ); return result.rows.length > 0; } /** * Aggregate open balance per org for the admin overview. Returns * orgs with at least one open or overdue invoice; orgs in good * standing don't appear. */ export async function getOrgOpenBalances(): Promise<{ zitadelOrgId: string; openCount: number; overdueCount: number; totalOpenChf: number; }[]> { await ensureSchema(); const result = await getPool().query( `SELECT zitadel_org_id, COUNT(*) FILTER (WHERE status = 'open') AS open_count, COUNT(*) FILTER (WHERE status = 'overdue') AS overdue_count, SUM(total_chf) FILTER (WHERE status IN ('open', 'overdue')) AS total_open FROM invoices WHERE status IN ('open', 'overdue') GROUP BY zitadel_org_id ORDER BY total_open DESC` ); return result.rows.map((r) => ({ zitadelOrgId: r.zitadel_org_id, openCount: Number(r.open_count), overdueCount: Number(r.overdue_count), totalOpenChf: Number(r.total_open), })); } /** * Update the stored PDF for an invoice. Used by the two-pass * compute pipeline: insert invoice with empty PDF → render PDF with * the allocated invoice number → write bytes back. * * Could be merged into createInvoice via a render callback in a * future cleanup, but two passes are simpler and the extra UPDATE * is cheap. */ export async function updateInvoicePdf( invoiceId: string, pdfBuffer: Buffer, filename: string ): Promise { await ensureSchema(); await getPool().query( "UPDATE invoices SET pdf_data = $2, pdf_filename = $3 WHERE id = $1", [invoiceId, pdfBuffer, filename] ); } // --------------------------------------------------------------------------- // Skill activation requests — Phase 2.5 // --------------------------------------------------------------------------- import type { SkillActivationRequest, SkillActivationStatus, } from "@/types"; function rowToSkillActivationRequest(row: any): SkillActivationRequest { return { id: row.id, tenantName: row.tenant_name, zitadelOrgId: row.zitadel_org_id, zitadelUserId: row.zitadel_user_id, skillId: row.skill_id, status: row.status as SkillActivationStatus, requestedAt: row.requested_at?.toISOString?.() ?? row.requested_at, reviewedAt: row.reviewed_at?.toISOString?.() ?? row.reviewed_at ?? null, reviewedBy: row.reviewed_by ?? null, rejectionReason: row.rejection_reason ?? null, adminNotes: row.admin_notes ?? null, }; } /** * Insert a pending activation request. Throws a tagged error if a * pending row already exists for the (tenant, skill) — the partial * unique index enforces this. The caller surfaces "request already * pending" to the user rather than letting it 500. */ export async function createSkillActivationRequest(params: { tenantName: string; zitadelOrgId: string; zitadelUserId: string; skillId: string; }): Promise { await ensureSchema(); try { const result = await getPool().query( `INSERT INTO skill_activation_requests (tenant_name, zitadel_org_id, zitadel_user_id, skill_id) VALUES ($1, $2, $3, $4) RETURNING *`, [ params.tenantName, params.zitadelOrgId, params.zitadelUserId, params.skillId, ] ); return rowToSkillActivationRequest(result.rows[0]); } catch (e: any) { if (e?.code === "23505") { const err = new Error( `A pending activation request already exists for ${params.skillId} on ${params.tenantName}.` ); (err as any).code = "REQUEST_ALREADY_PENDING"; throw err; } throw e; } } export async function getSkillActivationRequestById( id: string ): Promise { await ensureSchema(); const result = await getPool().query( "SELECT * FROM skill_activation_requests WHERE id = $1", [id] ); return result.rows.length > 0 ? rowToSkillActivationRequest(result.rows[0]) : null; } /** * All pending requests across all tenants — feeds the admin queue * page. Capped to 500 rows for safety; unlikely to ever be hit but * keeps the page bounded. */ export async function listPendingSkillActivationRequests(): Promise< SkillActivationRequest[] > { await ensureSchema(); const result = await getPool().query( `SELECT * FROM skill_activation_requests WHERE status = 'pending' ORDER BY requested_at ASC LIMIT 500` ); return result.rows.map(rowToSkillActivationRequest); } export async function countPendingSkillActivationRequests(): Promise { await ensureSchema(); const result = await getPool().query( "SELECT COUNT(*)::int AS c FROM skill_activation_requests WHERE status = 'pending'" ); return result.rows[0]?.c ?? 0; } /** * Requests visible to a customer for one tenant. Returns: * - pending: rows the user might want to withdraw * - rejected: the most recent rejection per skill, so the user * sees why and can retry * Approved and withdrawn rows are excluded (terminal states with * no user-visible UI effect after the fact). */ export async function listSkillActivationRequestsForTenant( tenantName: string ): Promise { await ensureSchema(); const result = await getPool().query( `WITH ranked AS ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY skill_id, status ORDER BY requested_at DESC ) AS rn FROM skill_activation_requests WHERE tenant_name = $1 AND status IN ('pending', 'rejected') ) SELECT * FROM ranked WHERE rn = 1 ORDER BY requested_at DESC`, [tenantName] ); return result.rows.map(rowToSkillActivationRequest); } /** * Transition a request's status. The caller is responsible for the * side effects (K8s patch on approve, email send) — this function * only touches the row. * * Returns null when the row doesn't exist or isn't in 'pending' * status. That null is meaningful: it tells the caller the * transition didn't happen (already approved/rejected by another * admin tab, etc.) and downstream actions should be skipped. */ export async function updateSkillActivationRequestStatus( id: string, newStatus: Exclude, opts: { reviewedBy: string; rejectionReason?: string | null; adminNotes?: string | null; } ): Promise { await ensureSchema(); const result = await getPool().query( `UPDATE skill_activation_requests SET status = $2, reviewed_at = now(), reviewed_by = $3, rejection_reason = $4, admin_notes = COALESCE($5, admin_notes) WHERE id = $1 AND status = 'pending' RETURNING *`, [id, newStatus, opts.reviewedBy, opts.rejectionReason ?? null, opts.adminNotes ?? null] ); return result.rows.length > 0 ? rowToSkillActivationRequest(result.rows[0]) : null; } // --------------------------------------------------------------------------- // Phase 3 diagnostic — single-purpose helper for the /api/admin/billing/debug // endpoint. Returns raw invoice_line rows for a tenant filtered to setup-fee // rows, so a human can verify what the billing emission code's SQL is // actually seeing. Not intended for production use; kept here for shipping // hotfixes when running-total drafts diverge from expected behaviour. // --------------------------------------------------------------------------- export async function debugListSetupLines( tenantName: string ): Promise> { await ensureSchema(); const result = await getPool().query( `SELECT id, invoice_id, tenant_name, kind, amount_chf, description FROM invoice_lines WHERE tenant_name = $1 AND kind = 'tenant_setup'`, [tenantName] ); return result.rows; } // --------------------------------------------------------------------------- // Stripe — Phase 4 // --------------------------------------------------------------------------- /** * Attempt to record receipt of a Stripe webhook event. Returns true * when this is the first time we've seen the event (caller should * process it), false when the event_id was already present * (caller should ack with 200 and skip — Stripe retries are * normal and we must be idempotent). * * The whole-payload JSONB is stored so a misbehaving event can be * diagnosed after the fact without re-fetching from Stripe. */ export async function tryRecordStripeEvent( eventId: string, eventType: string, payload: unknown ): Promise { await ensureSchema(); try { await getPool().query( `INSERT INTO stripe_events (event_id, event_type, payload) VALUES ($1, $2, $3::jsonb)`, [eventId, eventType, JSON.stringify(payload)] ); return true; } catch (e: any) { // 23505 = unique_violation; the row already exists, meaning we've // seen this event before. That's the normal duplicate-delivery // case — return false so the caller short-circuits. if (e?.code === "23505") return false; throw e; } } /** * Stamp processed_at on a stripe_events row once the handler has * finished its work successfully. Lets us spot stuck events * (received but not processed) for diagnosis. */ export async function markStripeEventProcessed(eventId: string): Promise { await ensureSchema(); await getPool().query( "UPDATE stripe_events SET processed_at = now() WHERE event_id = $1", [eventId] ); } /** * Persist the Stripe PaymentIntent id on an invoice. Used by the * webhook handler once the Checkout Session completes — at that * point Stripe has minted the PaymentIntent and we want to be * able to find the Stripe-side record from the invoice (and vice * versa via metadata). * * Idempotent: re-running with the same value is a no-op. The * column was added in Phase 2 schema; this helper was missing. */ export async function setInvoiceStripePaymentIntent( invoiceId: string, paymentIntentId: string ): Promise { await ensureSchema(); await getPool().query( `UPDATE invoices SET stripe_payment_intent_id = $2 WHERE id = $1 AND (stripe_payment_intent_id IS NULL OR stripe_payment_intent_id = $2)`, [invoiceId, paymentIntentId] ); } // --------------------------------------------------------------------------- // Phase 5 — Cron run history + reminder helpers // --------------------------------------------------------------------------- import type { CronRun, CronRunKind } from "@/types"; function rowToCronRun(row: any): CronRun { return { id: row.id, runKind: row.run_kind, triggeredBy: row.triggered_by, startedAt: row.started_at?.toISOString?.() ?? String(row.started_at), finishedAt: row.finished_at ? row.finished_at.toISOString?.() ?? String(row.finished_at) : null, successCount: Number(row.success_count ?? 0), failureCount: Number(row.failure_count ?? 0), skippedCount: Number(row.skipped_count ?? 0), errorDetails: row.error_details ?? null, }; } /** * Open a new cron-run row in 'started' state. Returns the row's * id which the caller passes to finishCronRun() with the summary * stats once the sweep completes. * * Separating start/finish lets the admin UI distinguish an in- * progress run from a finished one, and lets a crashed pod leave * a forensic trace ("started but never finished — investigate"). */ export async function startCronRun( runKind: CronRunKind, triggeredBy: string ): Promise { await ensureSchema(); const result = await getPool().query( `INSERT INTO cron_run_history (run_kind, triggered_by) VALUES ($1, $2) RETURNING id`, [runKind, triggeredBy] ); return result.rows[0].id; } export async function finishCronRun( id: string, summary: { successCount: number; failureCount: number; skippedCount: number; errorDetails?: unknown; } ): Promise { await ensureSchema(); await getPool().query( `UPDATE cron_run_history SET finished_at = now(), success_count = $2, failure_count = $3, skipped_count = $4, error_details = $5::jsonb WHERE id = $1`, [ id, summary.successCount, summary.failureCount, summary.skippedCount, summary.errorDetails ? JSON.stringify(summary.errorDetails) : null, ] ); } export async function listRecentCronRuns( limit = 30 ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM cron_run_history ORDER BY started_at DESC LIMIT $1`, [limit] ); return result.rows.map(rowToCronRun); } /** * Most recent successful run of each kind. Drives the admin * dashboard's "last issuance: N days ago" indicator. Returns * null for a kind that has never run successfully. */ export async function getLastSuccessfulCronRuns(): Promise<{ monthlyIssue: CronRun | null; reminders: CronRun | null; }> { await ensureSchema(); const result = await getPool().query( `SELECT DISTINCT ON (run_kind) * FROM cron_run_history WHERE finished_at IS NOT NULL AND failure_count = 0 ORDER BY run_kind, started_at DESC` ); const map: Record = {}; for (const row of result.rows) { map[row.run_kind] = rowToCronRun(row); } return { monthlyIssue: map["monthly_issue"] ?? null, reminders: map["reminders"] ?? null, }; } /** * IDs of all orgs with auto-issue enabled. Drives the monthly * issuance sweep. Returns just the zitadel_org_id strings — the * caller fetches OrgBilling per-org during the sweep so a bad * row doesn't poison the whole list at SELECT time. */ export async function listAutoIssueOrgIds(): Promise { await ensureSchema(); const result = await getPool().query( `SELECT zitadel_org_id FROM org_billing_config WHERE auto_invoice_enabled = TRUE` ); return result.rows.map((r) => r.zitadel_org_id as string); } /** * Open or overdue invoices whose org has auto-reminders enabled * and whose due_at is at least 7 days in the past. The reminder * sweep takes this list and picks the right level (1/2/3) per * invoice based on days-past-due AND which levels have already * been sent. * * We don't filter by "needs reminder X yet" in SQL because the * level logic is more readable in TypeScript and the candidate * set is small (only past-due invoices for opted-in orgs). */ export async function listInvoicesPendingReminders(): Promise { await ensureSchema(); // Bug fix: the prior version did `FROM invoices i JOIN org_billing_config c ON c.zitadel_org_id = i.zitadel_org_id`, // but INVOICE_LIST_COLUMNS selects unqualified column names — // `zitadel_org_id` appears in both tables and Postgres rejects // it as ambiguous. Rewriting as a subquery filter keeps every // referenced column on the single `invoices` row source, so the // unqualified list stays valid (matching every other caller of // INVOICE_LIST_COLUMNS in this file). // // Semantics are unchanged: only include invoices belonging to // orgs that have opted into auto-reminders. Orgs with no // org_billing_config row at all are excluded (same as before — // the inner join didn't match for them either). const result = await getPool().query( `SELECT ${INVOICE_LIST_COLUMNS} FROM invoices WHERE status IN ('open','overdue') AND due_at < now() - INTERVAL '7 days' AND zitadel_org_id IN ( SELECT zitadel_org_id FROM org_billing_config WHERE auto_reminders_enabled = TRUE ) ORDER BY due_at ASC` ); return result.rows.map(rowToInvoice); } /** * Which reminder levels have already been sent for this invoice? * Returns a Set of {1, 2, 3} subset. Drives the "send the next * level only" logic in the reminder sweep. */ export async function getReminderLevelsSent( invoiceId: string ): Promise> { await ensureSchema(); const result = await getPool().query( `SELECT level FROM invoice_reminders WHERE invoice_id = $1`, [invoiceId] ); return new Set(result.rows.map((r) => Number(r.level))); } /** * Mark a reminder as sent. Wrapped in an INSERT ... ON CONFLICT * DO NOTHING so a retry of the same level after a partial failure * is a no-op rather than a 23505 explosion. Returns true if a row * was inserted (first send), false on conflict (already sent). */ export async function recordReminderSent(params: { invoiceId: string; level: 1 | 2 | 3; sentBy: string; emailSentTo: string; }): Promise { await ensureSchema(); const result = await getPool().query( `INSERT INTO invoice_reminders (invoice_id, level, sent_by, email_sent_to) VALUES ($1, $2, $3, $4) ON CONFLICT (invoice_id, level) DO NOTHING RETURNING id`, [params.invoiceId, params.level, params.sentBy, params.emailSentTo] ); return result.rowCount === 1; } // --------------------------------------------------------------------------- // Phase 7 — credit notes and refunds // --------------------------------------------------------------------------- function rowToCreditNote(row: any): CreditNote { return { id: row.id, creditNoteNumber: row.credit_note_number, invoiceId: row.invoice_id, invoiceNumber: row.invoice_number, // joined column when available zitadelOrgId: row.zitadel_org_id, kind: row.kind as CreditNoteKind, amountChf: Number(row.amount_chf), vatAmountChf: Number(row.vat_amount_chf), reason: row.reason ?? null, issuedAt: row.issued_at?.toISOString?.() ?? row.issued_at, issuedBy: row.issued_by, locale: row.locale ?? "de", pdfFilename: row.pdf_filename ?? null, hasPdf: row.has_pdf ?? row.pdf_data !== null, billingSnapshot: row.billing_snapshot as InvoiceBillingSnapshot, }; } function rowToInvoiceRefund(row: any): InvoiceRefund { return { id: row.id, invoiceId: row.invoice_id, stripeRefundId: row.stripe_refund_id ?? null, amountChf: Number(row.amount_chf), reason: row.reason ?? null, status: row.status, refundedAt: row.refunded_at?.toISOString?.() ?? row.refunded_at, refundedBy: row.refunded_by, creditNoteId: row.credit_note_id ?? null, }; } const CREDIT_NOTE_COLUMNS = ` cn.id, cn.credit_note_number, cn.invoice_id, cn.zitadel_org_id, cn.kind, cn.amount_chf, cn.vat_amount_chf, cn.reason, cn.issued_at, cn.issued_by, cn.locale, cn.pdf_filename, cn.billing_snapshot, (cn.pdf_data IS NOT NULL) AS has_pdf, inv.invoice_number AS invoice_number `; /** * Allocate the next credit-note number for the given year. Uses the * per-year counter table with a row-level lock, same approach as * invoice numbering. Format: CN-2026-00001 (5-digit padding, matches * invoice padding for visual consistency even though Cedric agreed * to "CN-YYYY-NNNN" originally — the extra digit is harmless headroom * and keeps eyes from misreading "CN-2026-1" next to "2026-00001"). * * Must be called inside a transaction; the caller passes the same * client so the allocation and the INSERT roll back together if * anything downstream fails. */ async function allocateCreditNoteNumber( client: any, year: number ): Promise { const r = await client.query( `INSERT INTO credit_note_counters (year, last_number) VALUES ($1, 1) ON CONFLICT (year) DO UPDATE SET last_number = credit_note_counters.last_number + 1 RETURNING last_number`, [year] ); const seq = r.rows[0].last_number; return `CN-${year}-${String(seq).padStart(5, "0")}`; } /** * Persist a new credit note (without its PDF — that's attached later * via attachCreditNotePdf so the PDF render can read the just-inserted * row, including its credit_note_number, for self-referential rendering). * * Snapshots the invoice's billing block at issue time. Returns the * inserted row (with PDF still null). */ export async function createCreditNote(params: { invoiceId: string; zitadelOrgId: string; kind: CreditNoteKind; amountChf: number; vatAmountChf: number; reason: string | null; issuedBy: string; locale: string; billingSnapshot: InvoiceBillingSnapshot; }): Promise { await ensureSchema(); const pool = getPool(); const client = await pool.connect(); try { await client.query("BEGIN"); // Allocate from the year of NOW — credit notes are issued // "today", not retroactively, so the year is current. const year = new Date().getUTCFullYear(); const creditNoteNumber = await allocateCreditNoteNumber(client, year); const inserted = await client.query( `INSERT INTO credit_notes ( credit_note_number, invoice_id, zitadel_org_id, kind, amount_chf, vat_amount_chf, reason, issued_by, locale, billing_snapshot ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10::jsonb) RETURNING *`, [ creditNoteNumber, params.invoiceId, params.zitadelOrgId, params.kind, params.amountChf, params.vatAmountChf, params.reason, params.issuedBy, params.locale, JSON.stringify(params.billingSnapshot), ] ); await client.query("COMMIT"); // Re-query with the invoice_number join so the returned row has // it populated (matches what list/get methods return). const detail = await pool.query( `SELECT ${CREDIT_NOTE_COLUMNS} FROM credit_notes cn JOIN invoices inv ON inv.id = cn.invoice_id WHERE cn.id = $1`, [inserted.rows[0].id] ); return rowToCreditNote(detail.rows[0]); } catch (e) { await client.query("ROLLBACK"); throw e; } finally { client.release(); } } /** * Attach a freshly-rendered PDF to an existing credit note row. * Two-phase issue (insert row, render PDF, attach PDF) mirrors the * invoice flow, where the PDF generation needs the number on the * row to render itself. */ export async function attachCreditNotePdf( creditNoteId: string, pdfBuffer: Buffer, pdfFilename: string ): Promise { await getPool().query( `UPDATE credit_notes SET pdf_data = $1, pdf_filename = $2 WHERE id = $3`, [pdfBuffer, pdfFilename, creditNoteId] ); } /** * Read a credit note by its number, scoped to an org. Returns null * if the row doesn't exist OR exists but belongs to a different org * (same 404-not-403 leak-protection as the invoice equivalent). */ export async function getCreditNoteByNumberForOrg( creditNoteNumber: string, zitadelOrgId: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT ${CREDIT_NOTE_COLUMNS} FROM credit_notes cn JOIN invoices inv ON inv.id = cn.invoice_id WHERE cn.credit_note_number = $1 AND cn.zitadel_org_id = $2 LIMIT 1`, [creditNoteNumber, zitadelOrgId] ); return result.rows.length > 0 ? rowToCreditNote(result.rows[0]) : null; } /** Platform-admin variant: look up by number regardless of org. */ export async function getCreditNoteByNumber( creditNoteNumber: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT ${CREDIT_NOTE_COLUMNS} FROM credit_notes cn JOIN invoices inv ON inv.id = cn.invoice_id WHERE cn.credit_note_number = $1 LIMIT 1`, [creditNoteNumber] ); return result.rows.length > 0 ? rowToCreditNote(result.rows[0]) : null; } /** * List credit notes for an org, newest first. Used by /billing to * render the credit-note list alongside the invoice list. */ export async function listCreditNotesForOrg( zitadelOrgId: string, limit = 50 ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT ${CREDIT_NOTE_COLUMNS} FROM credit_notes cn JOIN invoices inv ON inv.id = cn.invoice_id WHERE cn.zitadel_org_id = $1 ORDER BY cn.issued_at DESC LIMIT $2`, [zitadelOrgId, limit] ); return result.rows.map(rowToCreditNote); } /** * All credit notes linked to a specific invoice. Used by the invoice * detail page to surface "this invoice was voided / partially * refunded by these credit notes". */ export async function listCreditNotesForInvoice( invoiceId: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT ${CREDIT_NOTE_COLUMNS} FROM credit_notes cn JOIN invoices inv ON inv.id = cn.invoice_id WHERE cn.invoice_id = $1 ORDER BY cn.issued_at ASC`, [invoiceId] ); return result.rows.map(rowToCreditNote); } /** * Fetch the PDF bytes for a credit note. Returns null if no PDF was * ever attached (which would be a bug — every credit note should * have one) or if the credit note doesn't exist. */ export async function getCreditNotePdf( creditNoteId: string ): Promise<{ data: Buffer; filename: string } | null> { const result = await getPool().query( `SELECT pdf_data, pdf_filename, credit_note_number FROM credit_notes WHERE id = $1`, [creditNoteId] ); if (result.rows.length === 0) return null; const row = result.rows[0]; if (!row.pdf_data) return null; return { data: row.pdf_data, filename: row.pdf_filename ?? `${row.credit_note_number}.pdf`, }; } /** * Mark an invoice as voided and bump the status. Caller is * responsible for ensuring the invoice is in a void-eligible state * (status='open' or 'overdue'); this helper doesn't enforce that — * billing.voidInvoice() does. */ export async function markInvoiceVoided(params: { invoiceId: string; reason: string; voidedBy: string; }): Promise { await getPool().query( `UPDATE invoices SET status = 'void', void_reason = $2, voided_at = now(), voided_by = $3 WHERE id = $1`, [params.invoiceId, params.reason, params.voidedBy] ); } /** * Record a refund event and (optionally) bump the invoice status. * The status transition is: * refunded_total + amount >= total_chf → fully_refunded * otherwise → partially_refunded * * Idempotent against Stripe webhook replays via stripe_refund_id: * if a row with the same Stripe refund id already exists, returns * the existing row without double-counting. Manual (non-Stripe) * refunds have stripe_refund_id=null and can't be deduped — caller * must guard against double-submit at the UI/API layer. * * Returns the recorded refund, the updated invoice's new total * refunded amount, and the resulting invoice status. */ export interface RecordRefundResult { refund: InvoiceRefund; alreadyExisted: boolean; newRefundedTotalChf: number; newInvoiceStatus: InvoiceStatus; } export async function recordInvoiceRefund(params: { invoiceId: string; stripeRefundId: string | null; amountChf: number; reason: string | null; refundedBy: string; creditNoteId: string | null; status?: "pending" | "succeeded" | "failed" | "canceled"; }): Promise { await ensureSchema(); const pool = getPool(); const client = await pool.connect(); try { await client.query("BEGIN"); // Webhook idempotency: if the Stripe refund id is already // recorded, return the existing row without re-adding to the // running total. The invoice status row reflects the cumulative // state independent of how many times this function is called. if (params.stripeRefundId) { const existing = await client.query( `SELECT * FROM invoice_refunds WHERE stripe_refund_id = $1`, [params.stripeRefundId] ); if (existing.rows.length > 0) { const inv = await client.query( `SELECT refunded_total_chf, status FROM invoices WHERE id = $1`, [params.invoiceId] ); await client.query("COMMIT"); return { refund: rowToInvoiceRefund(existing.rows[0]), alreadyExisted: true, newRefundedTotalChf: Number(inv.rows[0]?.refunded_total_chf ?? 0), newInvoiceStatus: (inv.rows[0]?.status ?? "paid") as InvoiceStatus, }; } } // Insert the refund event. const inserted = await client.query( `INSERT INTO invoice_refunds ( invoice_id, stripe_refund_id, amount_chf, reason, status, refunded_by, credit_note_id ) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING *`, [ params.invoiceId, params.stripeRefundId, params.amountChf, params.reason, params.status ?? "succeeded", params.refundedBy, params.creditNoteId, ] ); const recordedStatus = inserted.rows[0].status; // Only succeeded refunds count toward the invoice's // refunded_total. Pending/failed refunds are tracked for audit // but don't change the customer-visible state. if (recordedStatus !== "succeeded") { const inv = await client.query( `SELECT refunded_total_chf, status FROM invoices WHERE id = $1`, [params.invoiceId] ); await client.query("COMMIT"); return { refund: rowToInvoiceRefund(inserted.rows[0]), alreadyExisted: false, newRefundedTotalChf: Number(inv.rows[0]?.refunded_total_chf ?? 0), newInvoiceStatus: (inv.rows[0]?.status ?? "paid") as InvoiceStatus, }; } // Update aggregate + status atomically based on the new total. const updated = await client.query( `UPDATE invoices SET refunded_total_chf = refunded_total_chf + $2, status = CASE WHEN refunded_total_chf + $2 >= total_chf THEN 'fully_refunded' ELSE 'partially_refunded' END WHERE id = $1 RETURNING refunded_total_chf, status`, [params.invoiceId, params.amountChf] ); await client.query("COMMIT"); return { refund: rowToInvoiceRefund(inserted.rows[0]), alreadyExisted: false, newRefundedTotalChf: Number(updated.rows[0].refunded_total_chf), newInvoiceStatus: updated.rows[0].status as InvoiceStatus, }; } catch (e) { await client.query("ROLLBACK"); throw e; } finally { client.release(); } } /** All refund events for an invoice, ordered oldest first. */ export async function listRefundsForInvoice( invoiceId: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM invoice_refunds WHERE invoice_id = $1 ORDER BY refunded_at ASC`, [invoiceId] ); return result.rows.map(rowToInvoiceRefund); } /** * Phase 7: find an invoice by its Stripe PaymentIntent id. Used by * the charge.refunded webhook to locate the invoice when a refund * is initiated outside the portal (e.g. directly in Stripe * Dashboard). * * Returns null if no invoice has that payment intent recorded. That * shouldn't happen in normal flow — every Stripe-paid invoice gets * its intent stored at checkout.session.completed time — but a * refund event for an unknown intent should be logged and ignored * rather than throwing. */ export async function getInvoiceByStripePaymentIntent( paymentIntentId: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT ${INVOICE_LIST_COLUMNS} FROM invoices WHERE stripe_payment_intent_id = $1 LIMIT 1`, [paymentIntentId] ); return result.rows.length > 0 ? rowToInvoice(result.rows[0]) : null; } /** * Phase 7: check if a particular Stripe refund id is already * recorded in invoice_refunds. Used by the charge.refunded webhook * to skip refunds that were initiated via /api/admin/.../refund * (which records them immediately) — the webhook would otherwise * try to double-create the credit note. */ export async function isStripeRefundRecorded( stripeRefundId: string ): Promise { const result = await getPool().query( `SELECT 1 FROM invoice_refunds WHERE stripe_refund_id = $1 LIMIT 1`, [stripeRefundId] ); return result.rows.length > 0; } // --------------------------------------------------------------------------- // Phase 8 — custom invoice drafts // --------------------------------------------------------------------------- function rowToInvoiceDraft(row: any): InvoiceDraftRecord { return { id: row.id, zitadelOrgId: row.zitadel_org_id, createdBy: row.created_by, createdAt: row.created_at?.toISOString?.() ?? row.created_at, updatedAt: row.updated_at?.toISOString?.() ?? row.updated_at, payload: row.payload as CustomInvoiceDraftPayload, }; } /** * Create a new draft for the given org with the supplied payload. * The payload is whatever the editor has so far (possibly minimal — * just a date and an empty lines array). Returns the inserted row. */ export async function createInvoiceDraft(params: { zitadelOrgId: string; createdBy: string; payload: CustomInvoiceDraftPayload; }): Promise { await ensureSchema(); const result = await getPool().query( `INSERT INTO invoice_drafts (zitadel_org_id, created_by, payload) VALUES ($1, $2, $3::jsonb) RETURNING *`, [params.zitadelOrgId, params.createdBy, JSON.stringify(params.payload)] ); return rowToInvoiceDraft(result.rows[0]); } /** * Update an existing draft's payload. Updated_at gets bumped to now() * so the drafts list can sort by recent activity. Returns the * updated row, or null if no row with that id exists. * * Org boundary check is the caller's responsibility (the admin API * route only accepts requests from platform admins, but you could * pass a zitadelOrgId filter here too if you ever expose drafts to * customer-level roles). */ export async function updateInvoiceDraft( id: string, payload: CustomInvoiceDraftPayload ): Promise { const result = await getPool().query( `UPDATE invoice_drafts SET payload = $2::jsonb, updated_at = now() WHERE id = $1 RETURNING *`, [id, JSON.stringify(payload)] ); return result.rows.length > 0 ? rowToInvoiceDraft(result.rows[0]) : null; } export async function getInvoiceDraftById( id: string ): Promise { await ensureSchema(); const result = await getPool().query( "SELECT * FROM invoice_drafts WHERE id = $1", [id] ); return result.rows.length > 0 ? rowToInvoiceDraft(result.rows[0]) : null; } /** * List all open drafts across all orgs, newest first. Used by the * admin "Drafts" tab so the admin can resume any in-progress * invoice. Drafts are tiny (a JSONB payload), so we don't paginate * by default — 200 rows is plenty of headroom for a solo-founder * workflow. */ export async function listAllInvoiceDrafts( limit = 200 ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM invoice_drafts ORDER BY updated_at DESC LIMIT $1`, [limit] ); return result.rows.map(rowToInvoiceDraft); } /** * Per-org listing — used if you ever want to surface drafts on an * org-detail page or filter by org from the drafts list. */ export async function listInvoiceDraftsForOrg( zitadelOrgId: string, limit = 50 ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM invoice_drafts WHERE zitadel_org_id = $1 ORDER BY updated_at DESC LIMIT $2`, [zitadelOrgId, limit] ); return result.rows.map(rowToInvoiceDraft); } export async function deleteInvoiceDraft(id: string): Promise { const result = await getPool().query( "DELETE FROM invoice_drafts WHERE id = $1", [id] ); return (result.rowCount ?? 0) > 0; } // --------------------------------------------------------------------------- // Phase 9 — saved-card management for off-session auto-charge // --------------------------------------------------------------------------- /** * Persist a saved PaymentMethod against an org's billing config. * Called from the webhook after a successful setup-mode Checkout * session, and again when "Pay by Card" with setup_future_usage * delivers a fresh PaymentMethod. Upserts the config row in case * the org has none yet (rare — onboarding usually creates one, * but defensive doesn't hurt). * * Only display fields (brand/last4/exp) are persisted. The full PAN * is never seen by this code — Stripe holds it. */ export async function setSavedPaymentMethod(params: { zitadelOrgId: string; stripeCustomerId: string; paymentMethodId: string; brand: string | null; last4: string | null; expMonth: number | null; expYear: number | null; }): Promise { await ensureSchema(); await getPool().query( `INSERT INTO org_billing_config ( zitadel_org_id, stripe_customer_id, stripe_default_payment_method_id, stripe_pm_brand, stripe_pm_last4, stripe_pm_exp_month, stripe_pm_exp_year, updated_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, now()) ON CONFLICT (zitadel_org_id) DO UPDATE SET stripe_customer_id = COALESCE(org_billing_config.stripe_customer_id, EXCLUDED.stripe_customer_id), stripe_default_payment_method_id = EXCLUDED.stripe_default_payment_method_id, stripe_pm_brand = EXCLUDED.stripe_pm_brand, stripe_pm_last4 = EXCLUDED.stripe_pm_last4, stripe_pm_exp_month = EXCLUDED.stripe_pm_exp_month, stripe_pm_exp_year = EXCLUDED.stripe_pm_exp_year, updated_at = now()`, [ params.zitadelOrgId, params.stripeCustomerId, params.paymentMethodId, params.brand, params.last4, params.expMonth, params.expYear, ] ); } /** * Clear the saved PaymentMethod fields. Used when the customer * clicks "Remove card" — the Stripe-side detach happens in the * caller (stripe.detachPaymentMethod); this just nulls the * portal-side display fields and the pm id reference. * * Does not touch stripe_customer_id (the customer object survives), * auto_charge_enabled, or any other config — only the four card * fields and the pm id pointer. */ export async function clearSavedPaymentMethod( zitadelOrgId: string ): Promise { await getPool().query( `UPDATE org_billing_config SET stripe_default_payment_method_id = NULL, stripe_pm_brand = NULL, stripe_pm_last4 = NULL, stripe_pm_exp_month = NULL, stripe_pm_exp_year = NULL, updated_at = now() WHERE zitadel_org_id = $1`, [zitadelOrgId] ); } /** * Toggle the auto_charge_enabled flag. Used by the customer's * "Disable auto-pay / Enable auto-pay" button in /settings/billing * and (Phase 9b) the admin override on /admin/billing/orgs. */ export async function setAutoChargeEnabled( zitadelOrgId: string, enabled: boolean ): Promise { await getPool().query( `INSERT INTO org_billing_config (zitadel_org_id, auto_charge_enabled, updated_at) VALUES ($1, $2, now()) ON CONFLICT (zitadel_org_id) DO UPDATE SET auto_charge_enabled = EXCLUDED.auto_charge_enabled, updated_at = now()`, [zitadelOrgId, enabled] ); } /** * Look up the org id for a given Stripe customer id — used by the * webhook when a checkout.session.completed in setup mode arrives * and we need to find which org to save the card against. The * customer id is the join key Stripe gives us in the session. */ export async function getOrgIdByStripeCustomerId( stripeCustomerId: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT zitadel_org_id FROM org_billing_config WHERE stripe_customer_id = $1 LIMIT 1`, [stripeCustomerId] ); return result.rows.length > 0 ? result.rows[0].zitadel_org_id : null; } // --------------------------------------------------------------------------- // Phase 9b — tenant order with setup-fee charge // --------------------------------------------------------------------------- /** * Phase 9b: invoked by the Stripe webhook when the setup-fee * Checkout for a tenant order completes. Atomically: * - flips the request status from 'pending_payment' → 'pending' * (admin queue now sees it) * - sets tenant_name to the derived value (so monthly cron's * setup-fee dedup works) * - links the paid invoice via setup_invoice_id (so admin reject * can refund it via the existing refund flow) * * Idempotent on the request side: if the webhook re-fires after * the row already has status='pending', the UPDATE is a no-op * (same values). On the rare case of webhook retry happening after * admin already approved/rejected, the WHERE clause guards against * regressing the status. */ export async function linkTenantRequestSetupPayment(params: { requestId: string; tenantName: string; setupInvoiceId: string; }): Promise { const result = await getPool().query( `UPDATE tenant_requests SET status = 'pending', tenant_name = $2, setup_invoice_id = $3, updated_at = now() WHERE id = $1 AND status = 'pending_payment' RETURNING id`, [params.requestId, params.tenantName, params.setupInvoiceId] ); return (result.rowCount ?? 0) > 0; } /** * Look up a tenant request by id without restricting by status — * used by the webhook + reject handler. Caller is responsible for * any role-gating; this is a pure read. */ export async function getTenantRequestForSetupFlow( requestId: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM tenant_requests WHERE id = $1`, [requestId] ); return result.rows.length > 0 ? mapRow(result.rows[0]) : null; } /** * Insert a tenant request row in the 'pending_payment' status — * used at order time, before the Stripe Checkout completes. Once * payment succeeds the webhook flips it to 'pending' via * linkTenantRequestSetupPayment. * * tenant_name stays NULL throughout pending_payment so the unique * partial index uniq_tenant_requests_tenant_name_provision * (WHERE tenant_name IS NOT NULL) doesn't block retries from * abandoned Checkout sessions. The derived tenant_name is computed * by the caller from the inserted row's id and stored only at * webhook time. */ export async function createTenantRequestPendingPayment(params: { zitadelOrgId: string; zitadelUserId: string; companyName: string; instanceName?: string | null; contactName: string; contactEmail: string; agentName: string; soulMd?: string; agentsMd?: string | null; packages: string[]; billingAddress: BillingAddress; billingNotes?: string; encryptedSecrets?: Buffer | null; isPersonal: boolean; channelUsers?: Record; }): Promise { await ensureSchema(); const result = await getPool().query( `INSERT INTO tenant_requests ( zitadel_org_id, zitadel_user_id, company_name, instance_name, contact_name, contact_email, agent_name, soul_md, agents_md, packages, billing_address, billing_notes, encrypted_secrets, is_personal, channel_users, status, request_type ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11::jsonb, $12, $13, $14, $15::jsonb, 'pending_payment', 'provision' ) RETURNING *`, [ params.zitadelOrgId, params.zitadelUserId, params.companyName, params.instanceName ?? null, params.contactName, params.contactEmail, params.agentName, params.soulMd ?? null, params.agentsMd ?? null, params.packages, JSON.stringify(params.billingAddress), params.billingNotes ?? null, params.encryptedSecrets ?? null, params.isPersonal, JSON.stringify(params.channelUsers ?? {}), ] ); return mapRow(result.rows[0]); } /** * Delete a pending_payment row — used when admin or system needs * to clean up an abandoned order (e.g. Checkout session expired * before the customer completed payment). Guarded: only deletes * if status is still 'pending_payment' so we never accidentally * delete a request that admin has already approved. * * Also nulls any setup_invoice_id reference before deleting so we * don't leave dangling FK refs (we don't have ON DELETE behavior * defined on the column). */ export async function deletePendingPaymentRequest( requestId: string ): Promise { const result = await getPool().query( `DELETE FROM tenant_requests WHERE id = $1 AND status = 'pending_payment' RETURNING id`, [requestId] ); return (result.rowCount ?? 0) > 0; }