import { Pool } from "pg"; import type { BillingAddress, OrgBilling, SupportTicket, SupportTicketComment, SupportTicketCommentAuthorKind, SupportTicketCategory, SupportTicketStatus, TenantRequest, TenantRequestStatus, } from "@/types"; import { listTenants, getTenant } from "./k8s"; // --------------------------------------------------------------------------- // Connection pool (singleton) // --------------------------------------------------------------------------- let pool: Pool | null = null; function getPool(): Pool { if (!pool) { const connectionString = process.env.DATABASE_URL ?? "postgresql://portal:portal@portal-db-rw.portal.svc:5432/portal"; pool = new Pool({ connectionString, max: 5 }); } return pool; } // --------------------------------------------------------------------------- // Schema migration (auto-run on first query) // --------------------------------------------------------------------------- // Notes on the Slice 3 changes // ---------------------------- // 1. Removed `UNIQUE` from `zitadel_org_id` in the CREATE TABLE for fresh // installs, AND emit a defensive `DROP CONSTRAINT IF EXISTS` for // existing installs whose schema was created pre-Slice-3. The // constraint was Postgres-autonamed; the name is deterministic. // 2. Added `instance_name TEXT` — the customer's human label per // instance (e.g. "Production", "Dev"). NULL is fine and means "use // the company name for display". // 3. Added a unique index on `tenant_name WHERE NOT NULL`. Multiple // rows in the table can have NULL tenant_name (pending/rejected // requests), but every approved row points to a distinct K8s CR. // 4. Added `(zitadel_org_id, status)` index for the list-by-org queries // introduced this slice. const MIGRATION_SQL = ` CREATE TABLE IF NOT EXISTS tenant_requests ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), zitadel_org_id TEXT NOT NULL, zitadel_user_id TEXT NOT NULL, company_name TEXT NOT NULL, instance_name TEXT, contact_name TEXT NOT NULL, contact_email TEXT NOT NULL, agent_name TEXT NOT NULL DEFAULT 'Assistant', soul_md TEXT, agents_md TEXT, packages TEXT[] DEFAULT '{}', billing_address JSONB DEFAULT '{}', billing_notes TEXT, status TEXT NOT NULL DEFAULT 'pending', admin_notes TEXT, tenant_name TEXT, encrypted_secrets BYTEA, is_personal BOOLEAN NOT NULL DEFAULT FALSE, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); CREATE INDEX IF NOT EXISTS idx_tenant_requests_status ON tenant_requests(status); CREATE INDEX IF NOT EXISTS idx_tenant_requests_org_id ON tenant_requests(zitadel_org_id); CREATE INDEX IF NOT EXISTS idx_tenant_requests_org_status ON tenant_requests(zitadel_org_id, status); -- Note: the unique constraint on tenant_name is NOT created here. -- Pre-Bug-37 we had a non-partial UNIQUE on tenant_name, which is -- incompatible with resume requests (same tenant_name, different -- request_type). The new partial unique indexes are created -- further down in the migration block, after the request_type -- column has been added and backfilled. This bootstrap section -- only creates indexes that are safe regardless of request_type -- semantics. -- Idempotent column adds for existing databases ALTER TABLE tenant_requests ADD COLUMN IF NOT EXISTS encrypted_secrets BYTEA; ALTER TABLE tenant_requests ADD COLUMN IF NOT EXISTS agents_md TEXT; ALTER TABLE tenant_requests ADD COLUMN IF NOT EXISTS instance_name TEXT; ALTER TABLE tenant_requests ADD COLUMN IF NOT EXISTS is_personal BOOLEAN NOT NULL DEFAULT FALSE; -- Bug 13: customer-side dismissal of rejected requests. NULL means "still -- visible on the dashboard"; non-null means "customer clicked Dismiss". -- Pending/approved/active rows keep this NULL by definition — the field -- is only meaningful for rejected and cancelled rows. ALTER TABLE tenant_requests ADD COLUMN IF NOT EXISTS dismissed_at TIMESTAMPTZ; -- Feature 6: free-form customer note attached to the request. -- Currently surfaced only by resume requests (where the customer -- explains why they want reactivation), but the column is generic -- so future flows could reuse it. Distinct from billing_notes -- (provision-only, accounting-related) and admin_notes (admin's -- reason on reject/approve). Optional — nullable. ALTER TABLE tenant_requests ADD COLUMN IF NOT EXISTS customer_notes TEXT; -- Bug 37a: resume requests use the same table as provision requests so -- the customer dashboard and admin queue share rendering. Discriminator -- is request_type. Default 'provision' on backfill keeps existing rows -- working without explicit migration. -- -- Resume rows have: -- request_type = 'resume' -- tenant_name = the existing tenant being requested for reactivation -- zitadel_org_id = the org owning that tenant -- zitadel_user_id = the requesting customer -- status = pending → approved/rejected (or cancelled by customer) -- most provision-only fields (packages, billing_address, etc.) are NULL ALTER TABLE tenant_requests ADD COLUMN IF NOT EXISTS request_type TEXT NOT NULL DEFAULT 'provision'; -- Constrain to the known set so a future code change can't accidentally -- write a third type without first widening this constraint. DO $$ BEGIN ALTER TABLE tenant_requests ADD CONSTRAINT tenant_requests_request_type_check CHECK (request_type IN ('provision', 'resume')); EXCEPTION WHEN duplicate_object THEN NULL; END $$; -- Tenant_name uniqueness was originally meant for "one tenant CR per -- approved provision request". Resume requests reuse a tenant_name, -- so the uniqueness must now be scoped to provision rows only. DROP INDEX IF EXISTS uniq_tenant_requests_tenant_name; CREATE UNIQUE INDEX IF NOT EXISTS uniq_tenant_requests_tenant_name_provision ON tenant_requests(tenant_name) WHERE tenant_name IS NOT NULL AND request_type = 'provision'; -- Only one pending resume request per tenant at a time. Otherwise a -- customer could spam-create resume requests (the admin queue would -- bloat) or two admins might race on approving duplicates. CREATE UNIQUE INDEX IF NOT EXISTS uniq_tenant_requests_pending_resume ON tenant_requests(tenant_name) WHERE tenant_name IS NOT NULL AND request_type = 'resume' AND status = 'pending'; -- Slice 3: drop the legacy 1-org-1-request constraint if it exists ALTER TABLE tenant_requests DROP CONSTRAINT IF EXISTS tenant_requests_zitadel_org_id_key; -- Workspace templates: admin-editable default content for workspace files CREATE TABLE IF NOT EXISTS workspace_templates ( file_key TEXT PRIMARY KEY, content TEXT NOT NULL, updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); -- --------------------------------------------------------------------------- -- Slice 6: per-tenant user assignments -- --------------------------------------------------------------------------- -- -- Each row grants ONE user visibility into ONE tenant within their own -- ZITADEL org. Used to narrow the customer 'user' role from "everything -- in the org" to "only the tenants I've been assigned to". Owners and -- platform users bypass this table entirely. -- -- Composite PK is (tenant_name, zitadel_user_id) — a user is either -- assigned to a tenant or not, no degree. -- -- The zitadel_org_id column is denormalised onto every row so cascade -- cleanups when a user leaves an org can be expressed as a single -- DELETE WHERE zitadel_org_id=$1 AND zitadel_user_id=$2 — without -- joining tenant_requests. The assigned_by column tracks which user -- (the owner usually) granted the assignment, for audit. -- -- Cascade on tenant deletion is enforced in application code (the -- admin delete handler calls removeAllAssignmentsForTenant) rather -- than via FK — there's no FK target, since K8s CRs aren't a Postgres -- table. CREATE TABLE IF NOT EXISTS tenant_user_assignments ( tenant_name TEXT NOT NULL, zitadel_org_id TEXT NOT NULL, zitadel_user_id TEXT NOT NULL, assigned_at TIMESTAMPTZ NOT NULL DEFAULT now(), assigned_by TEXT NOT NULL, PRIMARY KEY (tenant_name, zitadel_user_id) ); CREATE INDEX IF NOT EXISTS idx_tua_user ON tenant_user_assignments(zitadel_user_id); CREATE INDEX IF NOT EXISTS idx_tua_org ON tenant_user_assignments(zitadel_org_id); -- Bug 35: org-scoped billing. One row per ZITADEL org; captured by -- the first tenant request inline, editable afterwards via -- /settings/billing. Subsequent tenant requests in the same org read -- this and skip the billing step entirely. -- -- vat_number is nullable: required at write time for company orgs -- (enforced by the API, not the schema, because "company-or-personal" -- isn't expressible as a column constraint). Notes is free-form -- accounting context — VAT exemption reasons, special invoicing -- arrangements, etc. -- -- We do NOT migrate data from tenant_requests.billing_address into -- this table automatically. Existing customers re-enter on next -- tenant or via settings — the data set is small (single-digit -- customers in pilot) and re-entering is the simplest path. CREATE TABLE IF NOT EXISTS org_billing ( zitadel_org_id TEXT PRIMARY KEY, company_name TEXT NOT NULL, street_address TEXT NOT NULL, postal_code TEXT NOT NULL, city TEXT NOT NULL, country TEXT NOT NULL, vat_number TEXT, billing_email TEXT NOT NULL, notes TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); -- Feature 5: lightweight customer support / feedback tickets. -- Scoped strictly per-user (zitadel_user_id), not per-org — -- coworkers in the same org cannot see each other's tickets. The -- index on (zitadel_user_id, status) is what most customer-side -- queries hit; the index on (status, updated_at DESC) is for the -- admin queue. -- -- contact_email / contact_name are frozen at creation time so the -- ticket retains a working "reply-to" identity even if the user -- later changes their email or display name in ZITADEL. CREATE TABLE IF NOT EXISTS support_tickets ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), zitadel_org_id TEXT NOT NULL, zitadel_user_id TEXT NOT NULL, title TEXT NOT NULL, description TEXT NOT NULL, category TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'open', contact_email TEXT NOT NULL, contact_name TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); -- CHECK constraints added separately so re-running the migration -- against an existing table (without these constraints) works. -- IF NOT EXISTS isn't supported on ADD CONSTRAINT, hence the -- DO $$ wrapper. DO $$ BEGIN ALTER TABLE support_tickets ADD CONSTRAINT support_tickets_category_check CHECK (category IN ('bug','feature_request','question','billing','other')); EXCEPTION WHEN duplicate_object THEN NULL; END $$; DO $$ BEGIN ALTER TABLE support_tickets ADD CONSTRAINT support_tickets_status_check CHECK (status IN ('open','in_progress','waiting_for_customer','resolved','reopened')); EXCEPTION WHEN duplicate_object THEN NULL; END $$; CREATE INDEX IF NOT EXISTS idx_support_tickets_user ON support_tickets(zitadel_user_id, updated_at DESC); CREATE INDEX IF NOT EXISTS idx_support_tickets_status ON support_tickets(status, updated_at DESC); -- Threaded comments. ON DELETE CASCADE so deleting a ticket -- cleans up its history; we don't currently expose ticket -- deletion in the UI but the cascade keeps options open. CREATE TABLE IF NOT EXISTS support_ticket_comments ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), ticket_id UUID NOT NULL REFERENCES support_tickets(id) ON DELETE CASCADE, author_user_id TEXT NOT NULL, author_name TEXT NOT NULL, author_kind TEXT NOT NULL, body TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); DO $$ BEGIN ALTER TABLE support_ticket_comments ADD CONSTRAINT support_ticket_comments_author_kind_check CHECK (author_kind IN ('customer','admin')); EXCEPTION WHEN duplicate_object THEN NULL; END $$; CREATE INDEX IF NOT EXISTS idx_support_ticket_comments_ticket ON support_ticket_comments(ticket_id, created_at); -- ========================================================================= -- Billing — Phase 1: pricing, lifecycle, and events -- ========================================================================= -- -- This block introduces the schema for the consolidated billing -- subsystem. Phase 1 lands the schema + writes the lifecycle and -- skill-event rows that downstream phases consume; invoice -- generation, PDF rendering, Stripe wiring and reminders are added -- in later phases. The invoice/line/reminder tables are created -- here so all billing schema lives in a single migration block, -- but no code writes to them until Phase 2. -- -- Money columns: NUMERIC(10,2) for CHF amounts (max ~99 million.99 -- which is fine for the foreseeable future) and NUMERIC(10,5) for -- per-unit prices (Threema messages and skill-days can have -- sub-rappen unit prices — 0.00012 CHF/message is the kind of -- granularity we want to keep precise across multiplication). -- -- All timestamps are TIMESTAMPTZ. Billing-day computations use UTC -- by convention (matches all other stored timestamps; avoids DST -- ambiguity around month boundaries). -- Single-row platform pricing config. The id=1 CHECK and explicit -- DEFAULT 1 make accidental multi-row inserts impossible, and -- callers can always SELECT * FROM platform_pricing WHERE id = 1. -- (Could use a settings KV table; this shape is clearer for -- typed columns that the admin UI binds to directly.) CREATE TABLE IF NOT EXISTS platform_pricing ( id INT PRIMARY KEY DEFAULT 1 CHECK (id = 1), tenant_monthly_fee_chf NUMERIC(10,2) NOT NULL DEFAULT 0, tenant_setup_fee_chf NUMERIC(10,2) NOT NULL DEFAULT 0, -- Single price per Threema message; the relay reports in+out -- separately but the spec ("billed per Message") treats both -- directions identically. If asymmetric pricing is needed -- later, split into in/out columns — additive change. threema_message_chf NUMERIC(10,5) NOT NULL DEFAULT 0, -- Default VAT rate for CH/LI customers, as percent (e.g. 8.10). -- Foreign customers' effective rate is computed at invoice time -- from billing address + VAT number (reverse-charge logic). vat_rate_chli NUMERIC(5,2) NOT NULL DEFAULT 8.10, updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); -- Ensure the single row exists. ON CONFLICT DO NOTHING is idempotent -- on every migration run. INSERT INTO platform_pricing (id) VALUES (1) ON CONFLICT DO NOTHING; -- Per-package optional daily price. Any package id can have a row; -- the admin UI in Phase 2 will only expose skill-category packages -- because that's what the spec called for, but nothing here -- prevents pricing other categories later. -- -- "skill" naming follows the spec ("custom pricing also for skills"). CREATE TABLE IF NOT EXISTS skill_pricing ( skill_id TEXT PRIMARY KEY, daily_price_chf NUMERIC(10,5) NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); -- Phase 2 addition: per-skill one-time setup fee. Charged the -- first time a given (tenant, skill) appears on an invoice line. -- Default 0 so pricing rows created before this column exists -- stay free until the admin sets a fee. ALTER TABLE skill_pricing ADD COLUMN IF NOT EXISTS setup_fee_chf NUMERIC(10,2) NOT NULL DEFAULT 0; -- One row per tenant. created_at anchors first-month proration; -- deleted_at (nullable, stamped on delete) anchors last-month -- proration. The PiecedTenant CR is the source of truth for -- existence, but once the CR is deleted we lose its -- creationTimestamp — so we mirror those two bookends here. CREATE TABLE IF NOT EXISTS tenant_billing_lifecycle ( tenant_name TEXT PRIMARY KEY, zitadel_org_id TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL, deleted_at TIMESTAMPTZ ); CREATE INDEX IF NOT EXISTS idx_tenant_billing_lifecycle_org ON tenant_billing_lifecycle(zitadel_org_id); -- Skill enable/disable events. One row per state change; same-day -- toggles still produce multiple rows, and the billing computation -- collapses to distinct UTC days at compute time. This append-only -- log preserves history for audit and lets us re-bill historical -- months reproducibly. -- -- skill_id is the package id from PACKAGE_CATALOG. We store -- events for ALL package toggles, not just skill-category — the -- channel/core toggles are cheap to record and may become billable -- in the future without a schema change. CREATE TABLE IF NOT EXISTS tenant_skill_events ( id BIGSERIAL PRIMARY KEY, tenant_name TEXT NOT NULL, zitadel_org_id TEXT NOT NULL, skill_id TEXT NOT NULL, event_kind TEXT NOT NULL CHECK (event_kind IN ('enabled', 'disabled')), occurred_at TIMESTAMPTZ NOT NULL DEFAULT now() ); CREATE INDEX IF NOT EXISTS idx_tenant_skill_events_tenant_skill ON tenant_skill_events(tenant_name, skill_id, occurred_at); CREATE INDEX IF NOT EXISTS idx_tenant_skill_events_org_time ON tenant_skill_events(zitadel_org_id, occurred_at); -- Suspend/resume transitions. Same shape as skill events. Reading -- these in order reconstructs the suspended-state segments for -- monthly fee proration: a tenant in 'suspended' state pays no -- monthly fee for the days it was suspended. -- -- The portal commands the transition (PATCH spec.suspend); the -- operator observes and stamps PiecedTenantStatus.suspendedAt -- after reconcile. We record the event at command time — billing -- is monthly so the few-second reconcile lag is irrelevant. CREATE TABLE IF NOT EXISTS tenant_suspension_events ( id BIGSERIAL PRIMARY KEY, tenant_name TEXT NOT NULL, zitadel_org_id TEXT NOT NULL, event_kind TEXT NOT NULL CHECK (event_kind IN ('suspended','resumed')), occurred_at TIMESTAMPTZ NOT NULL DEFAULT now() ); CREATE INDEX IF NOT EXISTS idx_tenant_suspension_events_tenant ON tenant_suspension_events(tenant_name, occurred_at); -- Per-org billing configuration. Distinct from org_billing -- (address/VAT/email): that table is customer-editable, this one -- is admin-controlled and holds the payment posture. -- -- Defaults: a new org has pay_by_invoice = false (must use a -- credit card per the onboarding gate in Phase 4) and auto -- billing/reminders enabled. Admin can flip pay_by_invoice on -- per customer, after which approval no longer requires a card. CREATE TABLE IF NOT EXISTS org_billing_config ( zitadel_org_id TEXT PRIMARY KEY, pay_by_invoice BOOLEAN NOT NULL DEFAULT FALSE, stripe_customer_id TEXT, auto_invoice_enabled BOOLEAN NOT NULL DEFAULT TRUE, auto_reminders_enabled BOOLEAN NOT NULL DEFAULT TRUE, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); -- Stripe payment methods. Populated by the Phase 4 webhook handler. -- Created in Phase 1 so all billing schema is together; rows are -- empty until Phase 4 ships. CREATE TABLE IF NOT EXISTS org_payment_methods ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), zitadel_org_id TEXT NOT NULL, stripe_payment_method_id TEXT NOT NULL UNIQUE, brand TEXT, last4 TEXT, exp_month INT, exp_year INT, is_default BOOLEAN NOT NULL DEFAULT FALSE, created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); CREATE INDEX IF NOT EXISTS idx_org_payment_methods_org ON org_payment_methods(zitadel_org_id); -- At most one default payment method per org. Partial unique index -- so non-default rows don't conflict with each other. CREATE UNIQUE INDEX IF NOT EXISTS uniq_org_payment_methods_default ON org_payment_methods(zitadel_org_id) WHERE is_default = TRUE; -- Gapless per-year invoice number counter (Art. 957a OR -- compliance). A Postgres SEQUENCE would be faster but allows -- gaps on rollback; this counter table is SELECT FOR UPDATE-able -- and produces gapless numbers when the invoice insert is in the -- same transaction. Populated lazily — the first invoice of each -- year inserts its row. CREATE TABLE IF NOT EXISTS invoice_number_counters ( year INT PRIMARY KEY, last_number INT NOT NULL DEFAULT 0 ); -- Issued invoices. Immutable once status leaves 'draft'. -- billing_snapshot captures the address/VAT/email at issue time -- so subsequent edits to org_billing don't mutate historical -- invoices. CREATE TABLE IF NOT EXISTS invoices ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), invoice_number TEXT NOT NULL UNIQUE, zitadel_org_id TEXT NOT NULL, -- Billing period as DATEs (not timestamps): a calendar month. -- period_end is the last day of the month, inclusive. period_start DATE NOT NULL, period_end DATE NOT NULL, issued_at TIMESTAMPTZ NOT NULL DEFAULT now(), due_at DATE NOT NULL, subtotal_chf NUMERIC(10,2) NOT NULL, vat_rate NUMERIC(5,2) NOT NULL, vat_amount_chf NUMERIC(10,2) NOT NULL, total_chf NUMERIC(10,2) NOT NULL, status TEXT NOT NULL DEFAULT 'open' CHECK ( status IN ('draft','open','paid','overdue','void','uncollectible') ), billing_snapshot JSONB NOT NULL, payment_method TEXT NOT NULL CHECK (payment_method IN ('invoice','card')), stripe_payment_intent_id TEXT, pdf_data BYTEA, pdf_filename TEXT, admin_notes TEXT, paid_at TIMESTAMPTZ, paid_by TEXT, paid_method_detail TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); -- Phase 2 addition: PDF locale, frozen at issue time so re-rendering -- an old invoice produces an identical document. Defaults to 'de' -- since most pilot customers are Swiss B2B; the generator UI lets -- admin override at issue time. ALTER TABLE invoices ADD COLUMN IF NOT EXISTS locale TEXT NOT NULL DEFAULT 'de'; CREATE INDEX IF NOT EXISTS idx_invoices_org ON invoices(zitadel_org_id, issued_at DESC); CREATE INDEX IF NOT EXISTS idx_invoices_status ON invoices(status, due_at); -- One invoice per org per billing month — protects the monthly -- cron from double-issuing if it gets retried mid-run. CREATE UNIQUE INDEX IF NOT EXISTS uniq_invoices_org_period ON invoices(zitadel_org_id, period_start); -- Invoice line items. The kind column lets the PDF renderer -- group lines (all monthly fees together, all AI usage together, -- etc.) and the admin UI filter by category. CREATE TABLE IF NOT EXISTS invoice_lines ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), invoice_id UUID NOT NULL REFERENCES invoices(id) ON DELETE CASCADE, -- NULL for org-wide items; tenant name for per-tenant breakdowns. tenant_name TEXT, kind TEXT NOT NULL CHECK (kind IN ( 'tenant_monthly','tenant_setup','ai_usage','threema_messages','skill_usage','adjustment' )), description TEXT NOT NULL, quantity NUMERIC(12,4) NOT NULL DEFAULT 1, unit_label TEXT, unit_price_chf NUMERIC(10,5) NOT NULL, amount_chf NUMERIC(10,2) NOT NULL, -- Per-kind audit metadata (e.g. {proration_days, days_in_month} -- for tenant_monthly; {in_count, out_count} for threema_messages). metadata JSONB, display_order INT NOT NULL DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_invoice_lines_invoice ON invoice_lines(invoice_id, display_order); -- Reminders fired against open/overdue invoices. Level 3 = final. -- One PDF per reminder, stored alongside. CREATE TABLE IF NOT EXISTS invoice_reminders ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), invoice_id UUID NOT NULL REFERENCES invoices(id) ON DELETE CASCADE, level INT NOT NULL CHECK (level IN (1, 2, 3)), sent_at TIMESTAMPTZ NOT NULL DEFAULT now(), sent_by TEXT NOT NULL, pdf_data BYTEA, pdf_filename TEXT, email_sent_to TEXT, UNIQUE (invoice_id, level) ); CREATE INDEX IF NOT EXISTS idx_invoice_reminders_invoice ON invoice_reminders(invoice_id, level); -- Phase 2.5: queue for skills flagged with requiresManualSetup in -- the package catalog. A user-side enable on a flagged skill -- creates a pending row here instead of mutating tenant.spec.packages; -- the operator never sees the skill until admin approves and adds -- it to the spec. Disable is always direct — there's no gate on -- turning a skill off, even one that previously required setup. CREATE TABLE IF NOT EXISTS skill_activation_requests ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), tenant_name TEXT NOT NULL, zitadel_org_id TEXT NOT NULL, zitadel_user_id TEXT NOT NULL, skill_id TEXT NOT NULL, status TEXT NOT NULL CHECK (status IN ('pending', 'approved', 'rejected', 'withdrawn')) DEFAULT 'pending', requested_at TIMESTAMPTZ NOT NULL DEFAULT now(), reviewed_at TIMESTAMPTZ, reviewed_by TEXT, rejection_reason TEXT, admin_notes TEXT ); -- Only one in-flight request per (tenant, skill). Rejected and -- approved rows don't block new requests; user can retry after a -- rejection by toggling the skill again. CREATE UNIQUE INDEX IF NOT EXISTS uniq_skill_act_one_pending ON skill_activation_requests (tenant_name, skill_id) WHERE status = 'pending'; -- Admin queue lookup — partial index keeps it tiny. CREATE INDEX IF NOT EXISTS idx_skill_act_pending_status ON skill_activation_requests (requested_at DESC) WHERE status = 'pending'; -- Per-tenant lookup for the customer UI's pending+rejected display. CREATE INDEX IF NOT EXISTS idx_skill_act_tenant ON skill_activation_requests (tenant_name, requested_at DESC); `; let migrated = false; export async function ensureSchema(): Promise { if (migrated) return; await getPool().query(MIGRATION_SQL); migrated = true; } // --------------------------------------------------------------------------- // Workspace templates // --------------------------------------------------------------------------- /** * Get a workspace template by file key (e.g. "SOUL.md", "AGENTS.md", "TOOLS.md"). * Returns null if no template is stored for this key. */ export async function getWorkspaceTemplate( fileKey: string ): Promise { await ensureSchema(); const result = await getPool().query<{ content: string }>( "SELECT content FROM workspace_templates WHERE file_key = $1", [fileKey] ); return result.rows[0]?.content ?? null; } /** * Upsert a workspace template. */ export async function setWorkspaceTemplate( fileKey: string, content: string ): Promise { await ensureSchema(); await getPool().query( `INSERT INTO workspace_templates (file_key, content, updated_at) VALUES ($1, $2, now()) ON CONFLICT (file_key) DO UPDATE SET content = $2, updated_at = now()`, [fileKey, content] ); } /** * List all workspace templates. */ export async function listWorkspaceTemplates(): Promise< Array<{ fileKey: string; content: string; updatedAt: string }> > { await ensureSchema(); const result = await getPool().query( "SELECT file_key, content, updated_at FROM workspace_templates ORDER BY file_key" ); return result.rows.map((r: any) => ({ fileKey: r.file_key, content: r.content, updatedAt: r.updated_at?.toISOString?.() ?? r.updated_at, })); } // --------------------------------------------------------------------------- // Tenant requests CRUD // --------------------------------------------------------------------------- export async function createTenantRequest( params: Omit & { encryptedSecrets?: Buffer; } ): Promise { await ensureSchema(); const result = await getPool().query( `INSERT INTO tenant_requests (zitadel_org_id, zitadel_user_id, company_name, instance_name, contact_name, contact_email, agent_name, soul_md, agents_md, packages, billing_address, billing_notes, encrypted_secrets, is_personal) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) RETURNING *`, [ params.zitadelOrgId, params.zitadelUserId, params.companyName, params.instanceName ?? null, params.contactName, params.contactEmail, params.agentName, params.soulMd, params.agentsMd ?? null, params.packages, JSON.stringify(params.billingAddress), params.billingNotes, params.encryptedSecrets ?? null, params.isPersonal ?? false, ] ); return mapRow(result.rows[0]); } export async function getTenantRequestById( id: string ): Promise { await ensureSchema(); const result = await getPool().query( "SELECT * FROM tenant_requests WHERE id = $1", [id] ); return result.rows[0] ? mapRow(result.rows[0]) : null; } /** * Slice 3: returns ALL requests for an org, most recent first. * * Replaces the pre-Slice-3 `getTenantRequestByOrgId` which returned the * single most recent row. Callers that previously assumed one-row-per-org * must now iterate or pick by status. The intent is explicit at every * call site, which is the point of the rename. * * Includes rows in every status (pending, approved, provisioning, active, * rejected, deleted). For "active or in-flight only" filtering, see * {@link listActiveTenantRequestsByOrgId}. */ export async function listTenantRequestsByOrgId( orgId: string ): Promise { await ensureSchema(); const result = await getPool().query( "SELECT * FROM tenant_requests WHERE zitadel_org_id = $1 ORDER BY created_at DESC", [orgId] ); return result.rows.map(mapRow); } /** * As {@link listTenantRequestsByOrgId} but tuned for the customer's * dashboard view. * * Returns: * - All non-terminal rows (pending, approved, provisioning, active), * because the customer needs to see what's in flight. * - Terminal-failed rows (rejected, cancelled) that the customer * hasn't dismissed yet (Bug 13). Without this, a rejection that * happens while the customer isn't online would only be * communicated by email — easy to miss. * * Excludes: * - `deleted` rows (admin tore down the tenant — historical, not * actionable). * - Dismissed rejected/cancelled rows. */ export async function listActiveTenantRequestsByOrgId( orgId: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM tenant_requests WHERE zitadel_org_id = $1 AND status <> 'deleted' AND (status NOT IN ('rejected', 'cancelled') OR dismissed_at IS NULL) ORDER BY created_at DESC`, [orgId] ); return result.rows.map(mapRow); } /** * Returns the most recent approved-or-active request for an org. Used to * seed billing/contact defaults when a customer creates an additional * instance — saves them re-typing data already on file. * * Returns null if the org has never had an approved instance (e.g. first * registration is still pending). */ export async function getMostRecentApprovedRequestForOrg( orgId: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM tenant_requests WHERE zitadel_org_id = $1 AND status IN ('approved', 'provisioning', 'active') ORDER BY created_at DESC LIMIT 1`, [orgId] ); return result.rows[0] ? mapRow(result.rows[0]) : null; } export async function listTenantRequests( status?: TenantRequestStatus ): Promise { await ensureSchema(); const result = status ? await getPool().query( "SELECT * FROM tenant_requests WHERE status = $1 ORDER BY created_at DESC", [status] ) : await getPool().query( "SELECT * FROM tenant_requests ORDER BY created_at DESC" ); return result.rows.map(mapRow); } export async function updateTenantRequestStatus( id: string, status: TenantRequestStatus, extra?: { adminNotes?: string | null; tenantName?: string; clearAdminNotes?: boolean; } ): Promise { await ensureSchema(); const sets = ["status = $2", "updated_at = now()"]; const values: any[] = [id, status]; let idx = 3; if (extra?.adminNotes !== undefined) { sets.push(`admin_notes = $${idx}`); values.push(extra.adminNotes); idx++; } if (extra?.clearAdminNotes) { sets.push("admin_notes = NULL"); } if (extra?.tenantName) { sets.push(`tenant_name = $${idx}`); values.push(extra.tenantName); idx++; } const result = await getPool().query( `UPDATE tenant_requests SET ${sets.join(", ")} WHERE id = $1 RETURNING *`, values ); return mapRow(result.rows[0]); } /** * Clear the encrypted_secrets column after secrets have been written to OpenBao. * Called during admin approval after successful vault writes. */ export async function clearEncryptedSecrets(requestId: string): Promise { await ensureSchema(); await getPool().query( "UPDATE tenant_requests SET encrypted_secrets = NULL, updated_at = now() WHERE id = $1", [requestId] ); } /** * Set dismissed_at = now() on a request row. Used when a customer * clicks "Dismiss" on a rejected/cancelled card on their dashboard * (Bug 13). The row stays in the database for history/audit but * stops appearing in `listActiveTenantRequestsByOrgId`. * * Idempotent: dismissing an already-dismissed row is a no-op. * Caller is responsible for verifying the row belongs to the user's * org before calling. */ /** * Create a resume request (Bug 37a). Used when an owner of a suspended * tenant wants to reactivate it. Resume is admin-gated — the request * sits as `pending` until a platform admin approves or rejects it. * * Tenant-name uniqueness is enforced for `pending` resume rows by a * partial unique index, so a customer can't spam the queue with * duplicate resume requests for the same tenant. The DB throws a * unique-violation if they try; callers should catch that and translate * to a 409. * * Why this lives in tenant_requests instead of a separate table: * - the lifecycle is identical (pending → approved/rejected, plus * customer-side cancel and dismiss-after-terminal) * - the customer dashboard renders pending+resume cards from the * same `listActiveTenantRequestsByOrgId` query — adding a separate * table would mean two queries and union-merging in the UI * - the admin queue likewise treats them uniformly * The cost is a discriminator column (`request_type`) and most * provision-only fields being null on resume rows. That's a tradeoff * I think is worth it. */ export async function createResumeRequest(params: { tenantName: string; zitadelOrgId: string; zitadelUserId: string; contactName: string; contactEmail: string; // Provision-only fields default sensibly. company_name + agent_name // are NOT NULL in the original schema; we copy them from the existing // tenant request for traceability rather than storing dummy values. companyName: string; agentName: string; /** * Feature 6: optional free-form note from the customer explaining * why they want reactivation. Surfaced to admin in the queue and * forwarded to the platform notification email so the admin can * decide before opening the request. */ customerNotes?: string | null; }): Promise { await ensureSchema(); const result = await getPool().query( `INSERT INTO tenant_requests ( zitadel_org_id, zitadel_user_id, company_name, contact_name, contact_email, agent_name, tenant_name, request_type, status, customer_notes ) VALUES ($1, $2, $3, $4, $5, $6, $7, 'resume', 'pending', $8) RETURNING *`, [ params.zitadelOrgId, params.zitadelUserId, params.companyName, params.contactName, params.contactEmail, params.agentName, params.tenantName, params.customerNotes ?? null, ] ); return mapRow(result.rows[0]); } /** * Get the most recent provision request for a tenant_name. Used by * Bug 37a's resume-request creation to populate company_name and * agent_name (NOT NULL columns) from the original provision row * rather than make up values. * * Returns null when no such row exists — should be impossible in * normal flow (resume requests are only created for already-existing * tenants whose CR was created via approving a provision request), * but the caller should guard against it for safety. */ export async function getTenantRequestByTenantName( tenantName: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM tenant_requests WHERE tenant_name = $1 AND request_type = 'provision' ORDER BY created_at DESC LIMIT 1`, [tenantName] ); return result.rows.length > 0 ? mapRow(result.rows[0]) : null; } /** * Return the in-flight (pending) resume request for a given tenant, if * any. Used both to gate the customer's "Request reactivation" button * (don't allow a second when one's already pending) and by the admin * UI to navigate from the tenant detail page to the awaiting request. * * Returns null when no pending resume exists. Approved/rejected rows * are never returned — they're terminal. */ export async function getPendingResumeRequestForTenant( tenantName: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM tenant_requests WHERE tenant_name = $1 AND request_type = 'resume' AND status = 'pending' LIMIT 1`, [tenantName] ); return result.rows.length > 0 ? mapRow(result.rows[0]) : null; } export async function dismissTenantRequest(id: string): Promise { await ensureSchema(); await getPool().query( `UPDATE tenant_requests SET dismissed_at = COALESCE(dismissed_at, now()), updated_at = now() WHERE id = $1`, [id] ); } /** * Update editable fields of a still-pending tenant request. Bug 6 — a * customer who notices a typo or wants to add a package after submitting * the wizard should be able to fix it without admin involvement. * * Only the customer-input fields are updateable. `status`, `tenant_name`, * `admin_notes`, `encrypted_secrets`, `is_personal`, `zitadel_*` and * timestamps are managed elsewhere and intentionally not here. * * The caller is responsible for: * - verifying the row belongs to the user's org * - verifying status === 'pending' (editing approved/provisioning rows * would race against the operator) * * Returns the updated row, or null if the id didn't match anything. */ export async function updateTenantRequestEditableFields( id: string, fields: { instanceName?: string | null; agentName?: string; soulMd?: string; agentsMd?: string | null; packages?: string[]; billingAddress?: BillingAddress; billingNotes?: string; encryptedSecrets?: Buffer | null; } ): Promise { await ensureSchema(); const sets: string[] = ["updated_at = now()"]; const values: any[] = [id]; let idx = 2; // Map JS field names to SQL columns. Each entry is gated on // `!== undefined` so passing only some fields just updates those. const colMap: Array<[keyof typeof fields, string]> = [ ["instanceName", "instance_name"], ["agentName", "agent_name"], ["soulMd", "soul_md"], ["agentsMd", "agents_md"], ["packages", "packages"], ["billingAddress", "billing_address"], ["billingNotes", "billing_notes"], ["encryptedSecrets", "encrypted_secrets"], ]; for (const [jsField, sqlCol] of colMap) { const v = fields[jsField]; if (v === undefined) continue; sets.push(`${sqlCol} = $${idx}`); values.push(v); idx++; } if (sets.length === 1) { // No editable fields supplied — return the row unchanged rather // than running a useless UPDATE that just bumps updated_at. const cur = await getTenantRequestById(id); return cur; } const result = await getPool().query( `UPDATE tenant_requests SET ${sets.join(", ")} WHERE id = $1 RETURNING *`, values ); return result.rows[0] ? mapRow(result.rows[0]) : null; } /** * Wrapper around domain-check.ts that injects the portal's connection pool. * Kept here so route handlers don't need direct access to the pool. */ export async function checkDuplicateDomain(email: string) { await ensureSchema(); // Lazy import to keep db.ts free of fetch/AbortSignal at module load time. const { checkRegistrationDomain } = await import("./domain-check"); return checkRegistrationDomain(getPool(), email); } /** * Mark a single tenant request as "deleted" when the associated tenant CR * is deleted. With multi-tenant per org this affects exactly one row, * since tenant_name is unique by index. The customer's other instances * are untouched. */ export async function markTenantRequestDeletedByTenantName( tenantName: string ): Promise { await ensureSchema(); await getPool().query( "UPDATE tenant_requests SET status = 'deleted', tenant_name = NULL, updated_at = now() WHERE tenant_name = $1", [tenantName] ); } /** * Delete a tenant request row entirely. Used when a customer re-submits * after their previous tenant was deleted by admin. */ export async function deleteTenantRequest(id: string): Promise { await ensureSchema(); await getPool().query("DELETE FROM tenant_requests WHERE id = $1", [id]); } /** * Reconcile the portal's tenant_requests table against actual cluster * state. Three passes, walking only rows with `tenant_name` set: * * 1. provisioning → active: when a tenant CR's phase reaches Ready * or Running, the portal flips the row to active so the * "provisioning…" card transitions into the running tenant view. * * 2. active/provisioning → deleted: when the corresponding CR no * longer exists in the cluster (404), or is mid-deletion (has * metadata.deletionTimestamp set), the row gets flipped to * `deleted`. The DB is otherwise blind to operator-initiated * deletions — when the 60-day TTL fires (Bug 37b) and the * operator deletes a suspended tenant, the portal would happily * keep showing the "Your assistant is ready!" card forever. * Without this reconciliation the dashboard drifts from reality. * * 3. pending resume → cancelled: when a pending resume request's * tenant is no longer suspended (admin resumed it directly, * tenant was deleted, or it was never suspended in the first * place), the request is moot. Flip to 'cancelled' so the * pending-resume unique index releases for any future genuine * resume request. We pick `cancelled` over `rejected` because * the customer didn't do anything wrong — circumstances just * changed. * * Errors are tolerated per-row: a transient API hiccup on one tenant * shouldn't fail the whole sweep. Skipped rows get retried next call. * * Slice 3 note: with multi-tenant per org, this iterates each row * individually (keyed by its own tenant_name), so multiple in-flight * tenants in the same org are handled correctly. */ export async function syncProvisioningStatuses(): Promise { await ensureSchema(); // Active+provisioning rows: status reflects "the tenant should // exist and be running". // Pending resume rows: status reflects "the tenant is suspended, // awaiting reactivation". // Both need cluster-side validation; we fetch them in one query // and dispatch on (status, request_type). const result = await getPool().query( `SELECT * FROM tenant_requests WHERE tenant_name IS NOT NULL AND ( status IN ('provisioning', 'active') OR (status = 'pending' AND request_type = 'resume') )` ); for (const row of result.rows) { const mapped = mapRow(row); if (!mapped.tenantName) continue; let tenant: Awaited> = null; try { tenant = await getTenant(mapped.tenantName); } catch { // Transient API error — skip this row, retry on next sweep. continue; } // Pending resume request: validity hinges on tenant being suspended. if ( mapped.status === "pending" && mapped.requestType === "resume" ) { // Tenant doesn't exist or is being deleted: cancel the resume // request (it can never be fulfilled). Don't fall through to // the "deleted" branch below — that would also flip the // provision row, which is the right thing for a CR-level // deletion but we want this resume row specifically resolved // here. if (!tenant || tenant.metadata.deletionTimestamp) { await updateTenantRequestStatus(mapped.id, "cancelled"); continue; } // Tenant is no longer suspended: the request is moot. // Cancel it (the customer didn't do anything wrong; the // condition the request was about no longer applies). if (!tenant.spec.suspend) { await updateTenantRequestStatus(mapped.id, "cancelled"); continue; } // Tenant still suspended, request still relevant. Leave as-is. continue; } // Active or provisioning row: CR gone, or mid-deletion. Flip the // row to 'deleted'. `markTenantRequestDeletedByTenantName` flips // every row with this tenant_name (provision + any resume rows), // which is the right thing for a CR-level deletion. if (!tenant || tenant.metadata.deletionTimestamp) { await markTenantRequestDeletedByTenantName(mapped.tenantName); continue; } // CR exists and is healthy. Promote provisioning → active when // the operator reports the tenant has reached steady state. // Keep `active` rows on `active` regardless of phase — a // temporarily-Reconfiguring tenant is still active from the // portal's billing/visibility perspective. if ( mapped.status === "provisioning" && (tenant.status?.phase === "Ready" || tenant.status?.phase === "Running") ) { await updateTenantRequestStatus(mapped.id, "active"); } } } // --------------------------------------------------------------------------- // Row mapper // --------------------------------------------------------------------------- function mapRow(row: any): TenantRequest { return { id: row.id, zitadelOrgId: row.zitadel_org_id, zitadelUserId: row.zitadel_user_id, companyName: row.company_name, instanceName: row.instance_name ?? null, contactName: row.contact_name, contactEmail: row.contact_email, agentName: row.agent_name, soulMd: row.soul_md, agentsMd: row.agents_md ?? null, packages: row.packages ?? [], billingAddress: row.billing_address ?? {}, billingNotes: row.billing_notes, customerNotes: row.customer_notes ?? null, status: row.status as TenantRequestStatus, adminNotes: row.admin_notes, tenantName: row.tenant_name, encryptedSecrets: row.encrypted_secrets ?? null, isPersonal: row.is_personal ?? false, dismissedAt: row.dismissed_at?.toISOString?.() ?? row.dismissed_at ?? null, requestType: (row.request_type ?? "provision") as | "provision" | "resume", createdAt: row.created_at?.toISOString?.() ?? row.created_at, updatedAt: row.updated_at?.toISOString?.() ?? row.updated_at, }; } // --------------------------------------------------------------------------- // Bug 35: org-scoped billing // --------------------------------------------------------------------------- function rowToOrgBilling(row: any): OrgBilling { return { zitadelOrgId: row.zitadel_org_id, companyName: row.company_name, streetAddress: row.street_address, postalCode: row.postal_code, city: row.city, country: row.country, vatNumber: row.vat_number ?? null, billingEmail: row.billing_email, notes: row.notes ?? null, createdAt: row.created_at?.toISOString?.() ?? row.created_at, updatedAt: row.updated_at?.toISOString?.() ?? row.updated_at, }; } /** * Fetch org billing if it exists. Returns null when the org has never * captured billing — that's the signal the wizard uses to know * whether to render the inline billing step on the first tenant * request. */ export async function getOrgBilling( zitadelOrgId: string ): Promise { await ensureSchema(); const result = await getPool().query( "SELECT * FROM org_billing WHERE zitadel_org_id = $1", [zitadelOrgId] ); return result.rows.length > 0 ? rowToOrgBilling(result.rows[0]) : null; } /** * Insert or update org billing. Single function for both because the * UI flow makes the "first time vs editing" distinction in a single * settings page that doesn't need to know which one it's doing. * * VAT-required-for-companies isn't enforced here — that's an API * concern (the API knows whether the caller is a company org). * Keeping the DB layer dumb. */ export async function upsertOrgBilling( data: Omit ): Promise { await ensureSchema(); const result = await getPool().query( `INSERT INTO org_billing ( zitadel_org_id, company_name, street_address, postal_code, city, country, vat_number, billing_email, notes ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (zitadel_org_id) DO UPDATE SET company_name = EXCLUDED.company_name, street_address = EXCLUDED.street_address, postal_code = EXCLUDED.postal_code, city = EXCLUDED.city, country = EXCLUDED.country, vat_number = EXCLUDED.vat_number, billing_email = EXCLUDED.billing_email, notes = EXCLUDED.notes, updated_at = now() RETURNING *`, [ data.zitadelOrgId, data.companyName, data.streetAddress, data.postalCode, data.city, data.country, data.vatNumber ?? null, data.billingEmail, data.notes ?? null, ] ); return rowToOrgBilling(result.rows[0]); } // --------------------------------------------------------------------------- // Slice 6: tenant ↔ user assignments // --------------------------------------------------------------------------- /** * One assignment grants one user visibility into one tenant. Returned * shape is the camelCase mirror of the Postgres row. */ export interface TenantUserAssignment { tenantName: string; zitadelOrgId: string; zitadelUserId: string; assignedAt: string; assignedBy: string; } function mapAssignmentRow(row: any): TenantUserAssignment { return { tenantName: row.tenant_name, zitadelOrgId: row.zitadel_org_id, zitadelUserId: row.zitadel_user_id, assignedAt: row.assigned_at?.toISOString?.() ?? row.assigned_at, assignedBy: row.assigned_by, }; } /** * Returns the set of tenant CR names assigned to the given user. * * Hot path on every read for `user`-role customers, so it's intentionally * a single indexed lookup. The returned array is small (a handful of * tenants per user); callers usually wrap it in a Set. * * Note: this does NOT cross-check the org id — assignments are per-user, * and a user's org context comes from their JWT. If a user's * authorization is revoked at the ZITADEL level, their JWT ceases to * carry the customer role and they can't reach the dashboard at all; * the orphan rows are cleaned up the next time their org membership * is re-evaluated (Slice 7's removeAllAssignmentsForUser). */ export async function listTenantAssignmentsForUser( userId: string ): Promise { await ensureSchema(); const result = await getPool().query<{ tenant_name: string }>( "SELECT tenant_name FROM tenant_user_assignments WHERE zitadel_user_id = $1", [userId] ); return result.rows.map((r) => r.tenant_name); } /** * Returns all assignments for a single tenant. Used by the team UI * (Slice 7) to render "who has access to this instance". Includes * `assignedBy` and `assignedAt` for audit display. */ export async function listAssignmentsForTenant( tenantName: string ): Promise { await ensureSchema(); const result = await getPool().query( "SELECT * FROM tenant_user_assignments WHERE tenant_name = $1 ORDER BY assigned_at DESC", [tenantName] ); return result.rows.map(mapAssignmentRow); } /** * Grant a user access to a tenant. Idempotent — a duplicate INSERT * is silently ignored via ON CONFLICT, and the existing * `assigned_at`/`assigned_by` are preserved (we don't update them on * re-assign). * * Caller is responsible for verifying: * - The actor (`assignedBy`) holds owner/platform role in `orgId`. * - The target user (`userId`) is actually a member of the same * ZITADEL org. We don't validate this here — the team UI fetches * the org's user list from ZITADEL and selects from it. * - The tenant CR exists and is labelled with the same `orgId`. */ export async function addTenantAssignment(params: { tenantName: string; orgId: string; userId: string; assignedBy: string; }): Promise { await ensureSchema(); await getPool().query( `INSERT INTO tenant_user_assignments (tenant_name, zitadel_org_id, zitadel_user_id, assigned_by) VALUES ($1, $2, $3, $4) ON CONFLICT (tenant_name, zitadel_user_id) DO NOTHING`, [params.tenantName, params.orgId, params.userId, params.assignedBy] ); } /** * Revoke a user's access to a tenant. No-op if the row doesn't exist. */ export async function removeTenantAssignment( tenantName: string, userId: string ): Promise { await ensureSchema(); await getPool().query( "DELETE FROM tenant_user_assignments WHERE tenant_name = $1 AND zitadel_user_id = $2", [tenantName, userId] ); } /** * Cascade cleanup: drop ALL assignments for a tenant when the tenant * itself is deleted. Called from the admin delete handler. * * Without this, an orphan row would stick around forever — a future * tenant with the same name (won't happen given Slice 1's UUID-suffix * naming, but defense in depth) would inherit the old assignments. */ export async function removeAllAssignmentsForTenant( tenantName: string ): Promise { await ensureSchema(); await getPool().query( "DELETE FROM tenant_user_assignments WHERE tenant_name = $1", [tenantName] ); } /** * Cascade cleanup: drop ALL assignments for a user within a specific * org. Used by Slice 7's "remove member" flow when an owner kicks a * user out of the org. Scoped by `orgId` so a user with assignments in * org A doesn't lose them when removed from org B (multi-org users * exist when a person registers personally and is also invited to a * company). */ export async function removeAllAssignmentsForUser( orgId: string, userId: string ): Promise { await ensureSchema(); await getPool().query( "DELETE FROM tenant_user_assignments WHERE zitadel_org_id = $1 AND zitadel_user_id = $2", [orgId, userId] ); } // --------------------------------------------------------------------------- // Feature 5: support tickets // --------------------------------------------------------------------------- function rowToSupportTicket(row: any): SupportTicket { return { id: row.id, zitadelOrgId: row.zitadel_org_id, zitadelUserId: row.zitadel_user_id, title: row.title, description: row.description, category: row.category as SupportTicketCategory, status: row.status as SupportTicketStatus, contactEmail: row.contact_email, contactName: row.contact_name, createdAt: row.created_at?.toISOString?.() ?? row.created_at, updatedAt: row.updated_at?.toISOString?.() ?? row.updated_at, }; } function rowToSupportTicketComment(row: any): SupportTicketComment { return { id: row.id, ticketId: row.ticket_id, authorUserId: row.author_user_id, authorName: row.author_name, authorKind: row.author_kind as SupportTicketCommentAuthorKind, body: row.body, createdAt: row.created_at?.toISOString?.() ?? row.created_at, }; } /** * Create a new support ticket. The contact_name/contact_email are * snapshotted from the session at creation time — see SupportTicket * doc for why. */ export async function createSupportTicket(params: { zitadelOrgId: string; zitadelUserId: string; title: string; description: string; category: SupportTicketCategory; contactName: string; contactEmail: string; }): Promise { await ensureSchema(); const result = await getPool().query( `INSERT INTO support_tickets ( zitadel_org_id, zitadel_user_id, title, description, category, contact_name, contact_email ) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING *`, [ params.zitadelOrgId, params.zitadelUserId, params.title, params.description, params.category, params.contactName, params.contactEmail, ] ); return rowToSupportTicket(result.rows[0]); } /** Tickets created by a single user, newest activity first. */ export async function listSupportTicketsForUser( zitadelUserId: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM support_tickets WHERE zitadel_user_id = $1 ORDER BY updated_at DESC`, [zitadelUserId] ); return result.rows.map(rowToSupportTicket); } /** * Admin queue. Returns every ticket across all users/orgs, newest * activity first. Pending tickets (open/reopened) bubble to the top * by virtue of recent activity, but the API doesn't sort by status — * the admin UI handles filtering and bucketing. */ export async function listAllSupportTickets(): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM support_tickets ORDER BY updated_at DESC` ); return result.rows.map(rowToSupportTicket); } export async function getSupportTicketById( id: string ): Promise { await ensureSchema(); const result = await getPool().query( "SELECT * FROM support_tickets WHERE id = $1", [id] ); return result.rows.length > 0 ? rowToSupportTicket(result.rows[0]) : null; } export async function listCommentsForTicket( ticketId: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM support_ticket_comments WHERE ticket_id = $1 ORDER BY created_at`, [ticketId] ); return result.rows.map(rowToSupportTicketComment); } /** * Insert a comment. Bumps the parent ticket's `updated_at` so the * activity sort orders work — done in a transaction so the two are * atomic from any concurrent reader's perspective. * * Caller is responsible for status auto-bumping (e.g. customer * replying to a `waiting_for_customer` ticket → `in_progress`); the * DB layer just writes what it's told. */ export async function createSupportTicketComment(params: { ticketId: string; authorUserId: string; authorName: string; authorKind: SupportTicketCommentAuthorKind; body: string; }): Promise { await ensureSchema(); const client = await getPool().connect(); try { await client.query("BEGIN"); const inserted = await client.query( `INSERT INTO support_ticket_comments ( ticket_id, author_user_id, author_name, author_kind, body ) VALUES ($1, $2, $3, $4, $5) RETURNING *`, [ params.ticketId, params.authorUserId, params.authorName, params.authorKind, params.body, ] ); await client.query( "UPDATE support_tickets SET updated_at = now() WHERE id = $1", [params.ticketId] ); await client.query("COMMIT"); return rowToSupportTicketComment(inserted.rows[0]); } catch (e) { await client.query("ROLLBACK"); throw e; } finally { client.release(); } } /** * Update mutable fields on a ticket. Only category and status are * mutable; title/description are frozen post-creation. Returns the * updated row so callers can email the right contact_email * afterwards. */ export async function updateSupportTicket( id: string, changes: { status?: SupportTicketStatus; category?: SupportTicketCategory } ): Promise { await ensureSchema(); const sets: string[] = ["updated_at = now()"]; const values: any[] = [id]; let idx = 2; if (changes.status !== undefined) { sets.push(`status = $${idx}`); values.push(changes.status); idx++; } if (changes.category !== undefined) { sets.push(`category = $${idx}`); values.push(changes.category); idx++; } // No-op early exit. Without an actual change we still want // updated_at refreshed if the caller asked for one, but if they // passed neither field there's nothing to do. if (sets.length === 1) return getSupportTicketById(id); const result = await getPool().query( `UPDATE support_tickets SET ${sets.join(", ")} WHERE id = $1 RETURNING *`, values ); return result.rows.length > 0 ? rowToSupportTicket(result.rows[0]) : null; } // --------------------------------------------------------------------------- // Billing — Phase 1: pricing, lifecycle, and skill events // --------------------------------------------------------------------------- // // All helpers are intentionally narrow CRUD — no business logic. // Higher-level operations (compute monthly proration, build an // invoice from raw signals) belong in a future lib/billing.ts // introduced by Phase 2. // // Hook callers should treat every write here as best-effort: if // recording a lifecycle/event row fails, log and continue — never // fail the underlying K8s mutation. Drift is corrected by the // idempotent backfill helper at the bottom of this section. import type { PlatformPricing, SkillPricing, OrgBillingConfig, TenantBillingLifecycle, TenantSkillEvent, TenantSuspensionEvent, } from "@/types"; // --- platform_pricing ------------------------------------------------------ function rowToPlatformPricing(row: any): PlatformPricing { return { tenantMonthlyFeeChf: Number(row.tenant_monthly_fee_chf), tenantSetupFeeChf: Number(row.tenant_setup_fee_chf), threemaMessageChf: Number(row.threema_message_chf), vatRateChli: Number(row.vat_rate_chli), updatedAt: row.updated_at?.toISOString?.() ?? row.updated_at, }; } /** * Read the single-row platform pricing config. The migration seeds * the row with zeros on first run, so this never returns null on a * properly migrated database. */ export async function getPlatformPricing(): Promise { await ensureSchema(); const result = await getPool().query( "SELECT * FROM platform_pricing WHERE id = 1" ); if (result.rows.length === 0) { // Defensive: re-seed if the row went missing (manual DELETE, // partial restore, etc.). The migration's INSERT ON CONFLICT // should have ensured this, but a stale row state shouldn't // crash the helper. await getPool().query( "INSERT INTO platform_pricing (id) VALUES (1) ON CONFLICT DO NOTHING" ); const retry = await getPool().query( "SELECT * FROM platform_pricing WHERE id = 1" ); return rowToPlatformPricing(retry.rows[0]); } return rowToPlatformPricing(result.rows[0]); } /** * Update one or more pricing fields. Pass only the fields to change; * unspecified fields are left as-is. `updated_at` is always refreshed. */ export async function updatePlatformPricing(changes: { tenantMonthlyFeeChf?: number; tenantSetupFeeChf?: number; threemaMessageChf?: number; vatRateChli?: number; }): Promise { await ensureSchema(); const sets: string[] = ["updated_at = now()"]; const values: any[] = []; let idx = 1; if (changes.tenantMonthlyFeeChf !== undefined) { sets.push(`tenant_monthly_fee_chf = $${idx++}`); values.push(changes.tenantMonthlyFeeChf); } if (changes.tenantSetupFeeChf !== undefined) { sets.push(`tenant_setup_fee_chf = $${idx++}`); values.push(changes.tenantSetupFeeChf); } if (changes.threemaMessageChf !== undefined) { sets.push(`threema_message_chf = $${idx++}`); values.push(changes.threemaMessageChf); } if (changes.vatRateChli !== undefined) { sets.push(`vat_rate_chli = $${idx++}`); values.push(changes.vatRateChli); } // Only updated_at would change → still execute so the caller's // intent ("touch the row") isn't silently dropped, but make it // an explicit no-op if no fields were provided. if (sets.length === 1 && values.length === 0) { return getPlatformPricing(); } const result = await getPool().query( `UPDATE platform_pricing SET ${sets.join(", ")} WHERE id = 1 RETURNING *`, values ); return rowToPlatformPricing(result.rows[0]); } // --- skill_pricing --------------------------------------------------------- function rowToSkillPricing(row: any): SkillPricing { return { skillId: row.skill_id, dailyPriceChf: Number(row.daily_price_chf), setupFeeChf: Number(row.setup_fee_chf ?? 0), createdAt: row.created_at?.toISOString?.() ?? row.created_at, updatedAt: row.updated_at?.toISOString?.() ?? row.updated_at, }; } export async function listSkillPricing(): Promise { await ensureSchema(); const result = await getPool().query( "SELECT * FROM skill_pricing ORDER BY skill_id" ); return result.rows.map(rowToSkillPricing); } export async function getSkillPricing( skillId: string ): Promise { await ensureSchema(); const result = await getPool().query( "SELECT * FROM skill_pricing WHERE skill_id = $1", [skillId] ); return result.rows.length > 0 ? rowToSkillPricing(result.rows[0]) : null; } /** * Upsert pricing for a package. `dailyPriceChf` activates * usage-based billing (one billable unit per UTC day the package * was enabled). `setupFeeChf` is a one-time charge emitted on the * first invoice line for any given (tenant, skill). * * Both fields are required so admin must consciously set 0 to mean * "no setup fee" rather than accidentally inheriting an old value * from a partial update. */ export async function setSkillPricing( skillId: string, dailyPriceChf: number, setupFeeChf: number ): Promise { await ensureSchema(); const result = await getPool().query( `INSERT INTO skill_pricing (skill_id, daily_price_chf, setup_fee_chf) VALUES ($1, $2, $3) ON CONFLICT (skill_id) DO UPDATE SET daily_price_chf = EXCLUDED.daily_price_chf, setup_fee_chf = EXCLUDED.setup_fee_chf, updated_at = now() RETURNING *`, [skillId, dailyPriceChf, setupFeeChf] ); return rowToSkillPricing(result.rows[0]); } /** * Remove the price for a package. Day-tracking events continue to * be recorded (cheap append-only) but the package becomes free * effective immediately. Historical invoices already issued are * unaffected. */ export async function removeSkillPricing(skillId: string): Promise { await ensureSchema(); await getPool().query("DELETE FROM skill_pricing WHERE skill_id = $1", [ skillId, ]); } // --- tenant_billing_lifecycle --------------------------------------------- function rowToTenantBillingLifecycle(row: any): TenantBillingLifecycle { return { tenantName: row.tenant_name, zitadelOrgId: row.zitadel_org_id, createdAt: row.created_at?.toISOString?.() ?? row.created_at, deletedAt: row.deleted_at?.toISOString?.() ?? row.deleted_at ?? null, }; } export async function getTenantBillingLifecycle( tenantName: string ): Promise { await ensureSchema(); const result = await getPool().query( "SELECT * FROM tenant_billing_lifecycle WHERE tenant_name = $1", [tenantName] ); return result.rows.length > 0 ? rowToTenantBillingLifecycle(result.rows[0]) : null; } /** * Record a tenant's creation for billing purposes. Idempotent on * `tenant_name` — re-running with a different created_at is a no-op * so re-approvals don't move the proration anchor. Pair with * recordInitialSkillEvents() at the same call site. */ export async function recordTenantCreated( tenantName: string, zitadelOrgId: string, createdAt?: Date ): Promise { await ensureSchema(); await getPool().query( `INSERT INTO tenant_billing_lifecycle (tenant_name, zitadel_org_id, created_at) VALUES ($1, $2, COALESCE($3::timestamptz, now())) ON CONFLICT (tenant_name) DO NOTHING`, [tenantName, zitadelOrgId, createdAt ?? null] ); } /** * Stamp deletion timestamp. Idempotent — calling twice keeps the * first deletion's timestamp. Pair with closing-skill-disabled * events at the same call site if you want the events log to * reflect that everything is now off. * * We deliberately don't delete the row — the lifecycle record is * needed for any final invoice covering the deletion month. */ export async function recordTenantDeleted( tenantName: string, deletedAt?: Date ): Promise { await ensureSchema(); await getPool().query( `UPDATE tenant_billing_lifecycle SET deleted_at = COALESCE(deleted_at, COALESCE($2::timestamptz, now())) WHERE tenant_name = $1`, [tenantName, deletedAt ?? null] ); } // --- tenant_skill_events -------------------------------------------------- function rowToTenantSkillEvent(row: any): TenantSkillEvent { return { id: String(row.id), tenantName: row.tenant_name, zitadelOrgId: row.zitadel_org_id, skillId: row.skill_id, eventKind: row.event_kind as "enabled" | "disabled", occurredAt: row.occurred_at?.toISOString?.() ?? row.occurred_at, }; } /** * Append a batch of enabled/disabled events. Single multi-row INSERT * so all rows share an effectively-identical occurred_at when * `occurredAt` is omitted (otherwise they'd be a few microseconds * apart, which would skew the "is this skill on for day D" computation * around midnight UTC). * * Empty arrays are a no-op (no SQL fired). */ export async function recordSkillEvents( tenantName: string, zitadelOrgId: string, added: string[], removed: string[], occurredAt?: Date ): Promise { if (added.length === 0 && removed.length === 0) return; await ensureSchema(); const at = occurredAt ?? new Date(); // Build placeholders. Each row uses 5 placeholders. const values: any[] = []; const rows: string[] = []; let idx = 1; for (const skillId of added) { rows.push(`($${idx++}, $${idx++}, $${idx++}, 'enabled', $${idx++})`); values.push(tenantName, zitadelOrgId, skillId, at); } for (const skillId of removed) { rows.push(`($${idx++}, $${idx++}, $${idx++}, 'disabled', $${idx++})`); values.push(tenantName, zitadelOrgId, skillId, at); } await getPool().query( `INSERT INTO tenant_skill_events (tenant_name, zitadel_org_id, skill_id, event_kind, occurred_at) VALUES ${rows.join(", ")}`, values ); } /** * Read events for a tenant within a half-open interval — `from` * inclusive, `to` exclusive. Used by Phase 2's billing computation * to collapse to billable days. Returned in chronological order. */ export async function listSkillEventsForTenant( tenantName: string, from: Date, to: Date ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM tenant_skill_events WHERE tenant_name = $1 AND occurred_at >= $2 AND occurred_at < $3 ORDER BY occurred_at, id`, [tenantName, from, to] ); return result.rows.map(rowToTenantSkillEvent); } /** * Read the most recent event for each (tenant, skill) pair as of * a moment in time. Phase 2 uses this to know which skills were * enabled at the start of a billing window. * * Implemented with DISTINCT ON for a single round-trip; the * (tenant_name, skill_id, occurred_at) index supports the sort. */ export async function getSkillStateAt( tenantName: string, asOf: Date ): Promise> { await ensureSchema(); const result = await getPool().query( `SELECT DISTINCT ON (skill_id) skill_id, event_kind FROM tenant_skill_events WHERE tenant_name = $1 AND occurred_at <= $2 ORDER BY skill_id, occurred_at DESC, id DESC`, [tenantName, asOf] ); const out: Record = {}; for (const row of result.rows) { out[row.skill_id] = row.event_kind as "enabled" | "disabled"; } return out; } // --- tenant_suspension_events --------------------------------------------- function rowToTenantSuspensionEvent(row: any): TenantSuspensionEvent { return { id: String(row.id), tenantName: row.tenant_name, zitadelOrgId: row.zitadel_org_id, eventKind: row.event_kind as "suspended" | "resumed", occurredAt: row.occurred_at?.toISOString?.() ?? row.occurred_at, }; } export async function recordSuspensionEvent( tenantName: string, zitadelOrgId: string, eventKind: "suspended" | "resumed", occurredAt?: Date ): Promise { await ensureSchema(); await getPool().query( `INSERT INTO tenant_suspension_events (tenant_name, zitadel_org_id, event_kind, occurred_at) VALUES ($1, $2, $3, COALESCE($4::timestamptz, now()))`, [tenantName, zitadelOrgId, eventKind, occurredAt ?? null] ); } export async function listSuspensionEventsForTenant( tenantName: string, from: Date, to: Date ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT * FROM tenant_suspension_events WHERE tenant_name = $1 AND occurred_at >= $2 AND occurred_at < $3 ORDER BY occurred_at, id`, [tenantName, from, to] ); return result.rows.map(rowToTenantSuspensionEvent); } // --- org_billing_config --------------------------------------------------- function rowToOrgBillingConfig(row: any): OrgBillingConfig { return { zitadelOrgId: row.zitadel_org_id, payByInvoice: row.pay_by_invoice, stripeCustomerId: row.stripe_customer_id ?? null, autoInvoiceEnabled: row.auto_invoice_enabled, autoRemindersEnabled: row.auto_reminders_enabled, createdAt: row.created_at?.toISOString?.() ?? row.created_at, updatedAt: row.updated_at?.toISOString?.() ?? row.updated_at, }; } /** * Get config for an org, auto-creating with defaults if missing. * Returning the row (vs null) simplifies callers: the gate logic in * Phase 4 ("approve only if pay_by_invoice OR has card") doesn't * need a "what if no row" branch. * * Defaults are baked into the table's column defaults, so the * INSERT here only needs the primary key. */ export async function getOrgBillingConfig( zitadelOrgId: string ): Promise { await ensureSchema(); await getPool().query( `INSERT INTO org_billing_config (zitadel_org_id) VALUES ($1) ON CONFLICT (zitadel_org_id) DO NOTHING`, [zitadelOrgId] ); const result = await getPool().query( "SELECT * FROM org_billing_config WHERE zitadel_org_id = $1", [zitadelOrgId] ); return rowToOrgBillingConfig(result.rows[0]); } export async function updateOrgBillingConfig( zitadelOrgId: string, changes: { payByInvoice?: boolean; stripeCustomerId?: string | null; autoInvoiceEnabled?: boolean; autoRemindersEnabled?: boolean; } ): Promise { await ensureSchema(); // Ensure row exists first — mirrors getOrgBillingConfig's // auto-create. await getPool().query( `INSERT INTO org_billing_config (zitadel_org_id) VALUES ($1) ON CONFLICT (zitadel_org_id) DO NOTHING`, [zitadelOrgId] ); const sets: string[] = ["updated_at = now()"]; const values: any[] = [zitadelOrgId]; let idx = 2; if (changes.payByInvoice !== undefined) { sets.push(`pay_by_invoice = $${idx++}`); values.push(changes.payByInvoice); } if (changes.stripeCustomerId !== undefined) { sets.push(`stripe_customer_id = $${idx++}`); values.push(changes.stripeCustomerId); } if (changes.autoInvoiceEnabled !== undefined) { sets.push(`auto_invoice_enabled = $${idx++}`); values.push(changes.autoInvoiceEnabled); } if (changes.autoRemindersEnabled !== undefined) { sets.push(`auto_reminders_enabled = $${idx++}`); values.push(changes.autoRemindersEnabled); } const result = await getPool().query( `UPDATE org_billing_config SET ${sets.join(", ")} WHERE zitadel_org_id = $1 RETURNING *`, values ); return rowToOrgBillingConfig(result.rows[0]); } // --- Backfill ------------------------------------------------------------- // // Idempotent one-time bootstrap for tenants that existed before // Phase 1 shipped. Phase 2 wraps this in an admin endpoint; for now // it can be invoked from a one-off node script. // // For each PiecedTenant CR: // - If no tenant_billing_lifecycle row exists, insert one with // created_at = metadata.creationTimestamp. // - If no tenant_skill_events row exists for the tenant, insert // 'enabled' events at the same timestamp for every package // currently in spec.packages. // Tenants suspended at backfill time get a 'suspended' event at // status.suspendedAt (operator-stamped); resumed tenants get nothing // extra (default state is "running"). /** * Returns a count of lifecycle rows inserted and skill events * recorded — both expected to be zero on a second run. */ export async function backfillTenantBillingLifecycle(tenants: { name: string; zitadelOrgId: string; createdAt: Date; packages: string[]; suspendedAt: Date | null; }[]): Promise<{ lifecycleInserted: number; eventsInserted: number; suspensionEventsInserted: number }> { await ensureSchema(); let lifecycleInserted = 0; let eventsInserted = 0; let suspensionEventsInserted = 0; for (const t of tenants) { // Lifecycle row — idempotent. const existing = await getTenantBillingLifecycle(t.name); if (!existing) { await recordTenantCreated(t.name, t.zitadelOrgId, t.createdAt); lifecycleInserted++; } // Initial skill events — only if the tenant has zero events at // all. We don't want to add to an active event stream. const eventsRow = await getPool().query( "SELECT 1 FROM tenant_skill_events WHERE tenant_name = $1 LIMIT 1", [t.name] ); if (eventsRow.rows.length === 0 && t.packages.length > 0) { await recordSkillEvents( t.name, t.zitadelOrgId, t.packages, [], t.createdAt ); eventsInserted += t.packages.length; } // Suspension state — only if the tenant has zero suspension // events. If it's currently suspended, record one 'suspended' // event at the operator-stamped time so proration sees it. const susRow = await getPool().query( "SELECT 1 FROM tenant_suspension_events WHERE tenant_name = $1 LIMIT 1", [t.name] ); if (susRow.rows.length === 0 && t.suspendedAt) { await recordSuspensionEvent( t.name, t.zitadelOrgId, "suspended", t.suspendedAt ); suspensionEventsInserted++; } } return { lifecycleInserted, eventsInserted, suspensionEventsInserted }; } // --------------------------------------------------------------------------- // Billing — Phase 2: invoice persistence // --------------------------------------------------------------------------- // // Invoice creation is intentionally a single transaction: allocate // number, INSERT invoice, INSERT lines, store PDF — all-or-nothing. // The Postgres invoice_number_counters row lock serializes // concurrent allocators for the same year, producing gapless // numbering even under bursts. import type { Invoice, InvoiceBillingSnapshot, InvoiceDetail, InvoiceDraft, InvoiceLine, InvoiceStatus, } from "@/types"; function rowToInvoice(row: any): Invoice { return { id: row.id, invoiceNumber: row.invoice_number, zitadelOrgId: row.zitadel_org_id, periodStart: typeof row.period_start === "string" ? row.period_start : row.period_start.toISOString().split("T")[0], periodEnd: typeof row.period_end === "string" ? row.period_end : row.period_end.toISOString().split("T")[0], issuedAt: row.issued_at?.toISOString?.() ?? row.issued_at, dueAt: typeof row.due_at === "string" ? row.due_at : row.due_at.toISOString().split("T")[0], subtotalChf: Number(row.subtotal_chf), vatRate: Number(row.vat_rate), vatAmountChf: Number(row.vat_amount_chf), totalChf: Number(row.total_chf), status: row.status as InvoiceStatus, locale: row.locale ?? "de", paymentMethod: row.payment_method, billingSnapshot: row.billing_snapshot as InvoiceBillingSnapshot, stripePaymentIntentId: row.stripe_payment_intent_id ?? null, pdfFilename: row.pdf_filename ?? null, hasPdf: row.has_pdf ?? row.pdf_data !== null, adminNotes: row.admin_notes ?? null, paidAt: row.paid_at?.toISOString?.() ?? row.paid_at ?? null, paidBy: row.paid_by ?? null, paidMethodDetail: row.paid_method_detail ?? null, createdAt: row.created_at?.toISOString?.() ?? row.created_at, }; } function rowToInvoiceLine(row: any): InvoiceLine { return { id: row.id, invoiceId: row.invoice_id, tenantName: row.tenant_name ?? null, kind: row.kind, description: row.description, quantity: Number(row.quantity), unitLabel: row.unit_label ?? null, unitPriceChf: Number(row.unit_price_chf), amountChf: Number(row.amount_chf), metadata: row.metadata ?? null, displayOrder: row.display_order, }; } // Standard SELECT projection that includes a cheap NOT-NULL probe of // pdf_data instead of pulling the bytes themselves. Crucial for list // endpoints — a few KB per row across hundreds of invoices is wasted // network and memory. const INVOICE_LIST_COLUMNS = ` id, invoice_number, zitadel_org_id, period_start, period_end, issued_at, due_at, subtotal_chf, vat_rate, vat_amount_chf, total_chf, status, locale, payment_method, billing_snapshot, stripe_payment_intent_id, pdf_filename, admin_notes, paid_at, paid_by, paid_method_detail, created_at, (pdf_data IS NOT NULL) AS has_pdf `; /** * Persist a fully-computed invoice draft with its lines and PDF in * a single transaction. Allocates the year-scoped invoice number * inside the same transaction so a rollback restores the counter * (gapless guarantee). * * The caller is responsible for upstream validation: * - the (org, period) uniqueness (the unique index will reject * duplicates, but we return a clear error message rather than * leaking the constraint name) * - the draft's lines/totals are consistent (compute pipeline * ensures this) * * `pdfBuffer` is the rendered PDF bytes; pass null if PDF is * generated separately or stored in a side channel. For Phase 2 we * always render synchronously and pass the buffer here. */ export async function createInvoice( draft: InvoiceDraft, pdfBuffer: Buffer | null, pdfFilename: string | null ): Promise { await ensureSchema(); const pool = getPool(); const client = await pool.connect(); try { await client.query("BEGIN"); // Allocate number for the year of period_start. Locking the // counter row prevents concurrent allocators from racing. const year = parseInt(draft.periodStart.slice(0, 4), 10); const counterResult = await client.query( `INSERT INTO invoice_number_counters (year, last_number) VALUES ($1, 1) ON CONFLICT (year) DO UPDATE SET last_number = invoice_number_counters.last_number + 1 RETURNING last_number`, [year] ); const seq = counterResult.rows[0].last_number; const invoiceNumber = `${year}-${String(seq).padStart(5, "0")}`; // Insert invoice row. PDF goes inline as bytea for v1; we can // migrate to MinIO/S3 later if storage gets noisy. const inv = await client.query( `INSERT INTO invoices ( invoice_number, zitadel_org_id, period_start, period_end, issued_at, due_at, subtotal_chf, vat_rate, vat_amount_chf, total_chf, status, locale, payment_method, billing_snapshot, pdf_data, pdf_filename ) VALUES ( $1, $2, $3::date, $4::date, now(), $5::date, $6, $7, $8, $9, 'open', $10, $11, $12::jsonb, $13, $14 ) RETURNING ${INVOICE_LIST_COLUMNS}`, [ invoiceNumber, draft.zitadelOrgId, draft.periodStart, draft.periodEnd, draft.dueAt, draft.subtotalChf, draft.vatRate, draft.vatAmountChf, draft.totalChf, draft.locale, draft.paymentMethod, JSON.stringify(draft.billingSnapshot), pdfBuffer, pdfFilename, ] ); const invoiceId = inv.rows[0].id; // Insert lines in batch — one INSERT statement is significantly // faster than per-line round-trips, which matters when an invoice // accumulates many ai_usage / skill_usage lines. if (draft.lines.length > 0) { const placeholders: string[] = []; const values: any[] = []; let idx = 1; for (const line of draft.lines) { placeholders.push( `($${idx++}, $${idx++}, $${idx++}, $${idx++}, $${idx++}, $${idx++}, $${idx++}, $${idx++}, $${idx++}::jsonb, $${idx++})` ); values.push( invoiceId, line.tenantName, line.kind, line.description, line.quantity, line.unitLabel, line.unitPriceChf, line.amountChf, line.metadata ? JSON.stringify(line.metadata) : null, line.displayOrder ); } await client.query( `INSERT INTO invoice_lines ( invoice_id, tenant_name, kind, description, quantity, unit_label, unit_price_chf, amount_chf, metadata, display_order ) VALUES ${placeholders.join(", ")}`, values ); } await client.query("COMMIT"); return rowToInvoice(inv.rows[0]); } catch (e: any) { await client.query("ROLLBACK").catch(() => undefined); // Translate the uniqueness violation into a user-friendly error. // 23505 = unique_violation in Postgres. if (e?.code === "23505" && /uniq_invoices_org_period/.test(e?.constraint ?? "")) { const month = draft.periodStart.slice(0, 7); throw new Error( `An invoice already exists for this org and billing period (${month}). ` + `Delete the existing invoice first if you want to regenerate.` ); } throw e; } finally { client.release(); } } export async function getInvoiceById(id: string): Promise { await ensureSchema(); const result = await getPool().query( `SELECT ${INVOICE_LIST_COLUMNS} FROM invoices WHERE id = $1`, [id] ); return result.rows.length > 0 ? rowToInvoice(result.rows[0]) : null; } export async function getInvoiceDetail( id: string ): Promise { const invoice = await getInvoiceById(id); if (!invoice) return null; const lines = await getPool().query( `SELECT * FROM invoice_lines WHERE invoice_id = $1 ORDER BY display_order, id`, [id] ); return { invoice, lines: lines.rows.map(rowToInvoiceLine) }; } /** * Phase 3 — customer-scoped lookup by human-readable invoice * number with ownership enforcement in a single query. The org * filter is part of the WHERE clause so a customer can't probe * another org's invoice numbers (which are sequential and easy * to guess) and get a different status code (404 vs 403) than * for their own — both miss-and-not-yours return null. * * Used by /api/billing/invoices/[invoiceNumber] and the * /billing/[invoiceNumber] customer page. */ export async function getInvoiceByNumberForOrg( invoiceNumber: string, zitadelOrgId: string ): Promise { await ensureSchema(); const head = await getPool().query( `SELECT ${INVOICE_LIST_COLUMNS} FROM invoices WHERE invoice_number = $1 AND zitadel_org_id = $2 LIMIT 1`, [invoiceNumber, zitadelOrgId] ); if (head.rows.length === 0) return null; const invoice = rowToInvoice(head.rows[0]); const lines = await getPool().query( `SELECT * FROM invoice_lines WHERE invoice_id = $1 ORDER BY display_order, id`, [invoice.id] ); return { invoice, lines: lines.rows.map(rowToInvoiceLine) }; } /** * Fetch the PDF bytes for an invoice. Returns null if no PDF was * stored (shouldn't happen in v1; defensive against partial state). */ export async function getInvoicePdf( id: string ): Promise<{ data: Buffer; filename: string } | null> { await ensureSchema(); const result = await getPool().query( "SELECT pdf_data, pdf_filename, invoice_number FROM invoices WHERE id = $1", [id] ); if (result.rows.length === 0) return null; const row = result.rows[0]; if (!row.pdf_data) return null; return { data: row.pdf_data, filename: row.pdf_filename ?? `${row.invoice_number}.pdf`, }; } /** * List invoices, optionally filtered. Used by the admin invoice * list page and (Phase 3) the customer-facing /billing page. * * The customer-facing call site MUST pass `zitadelOrgId` to scope * results — this helper does not enforce that itself. */ export async function listInvoices(filters: { zitadelOrgId?: string; status?: InvoiceStatus; /** Inclusive YYYY-MM filter on period_start. */ periodMonth?: string; limit?: number; } = {}): Promise { await ensureSchema(); const where: string[] = []; const values: any[] = []; let idx = 1; if (filters.zitadelOrgId) { where.push(`zitadel_org_id = $${idx++}`); values.push(filters.zitadelOrgId); } if (filters.status) { where.push(`status = $${idx++}`); values.push(filters.status); } if (filters.periodMonth) { where.push(`to_char(period_start, 'YYYY-MM') = $${idx++}`); values.push(filters.periodMonth); } const limit = filters.limit ?? 200; const sql = `SELECT ${INVOICE_LIST_COLUMNS} FROM invoices ` + (where.length > 0 ? `WHERE ${where.join(" AND ")} ` : "") + `ORDER BY issued_at DESC LIMIT $${idx}`; values.push(limit); const result = await getPool().query(sql, values); return result.rows.map(rowToInvoice); } /** * Sweep open invoices past their due date to `overdue` status. * Cheap idempotent UPDATE; safe to call on every admin list view * to keep status fresh without a dedicated cron. */ export async function syncOverdueInvoices(): Promise { await ensureSchema(); const result = await getPool().query( `UPDATE invoices SET status = 'overdue' WHERE status = 'open' AND due_at < CURRENT_DATE` ); return result.rowCount ?? 0; } export async function markInvoicePaid( id: string, opts: { paidBy: string; paidMethodDetail?: string | null; paidAt?: Date } ): Promise { await ensureSchema(); const result = await getPool().query( `UPDATE invoices SET status = 'paid', paid_at = COALESCE($2::timestamptz, now()), paid_by = $3, paid_method_detail = $4 WHERE id = $1 AND status IN ('open', 'overdue') RETURNING ${INVOICE_LIST_COLUMNS}`, [ id, opts.paidAt ?? null, opts.paidBy, opts.paidMethodDetail ?? null, ] ); return result.rows.length > 0 ? rowToInvoice(result.rows[0]) : null; } /** * Hard delete an invoice and its lines (CASCADE). * * This is the testing tool — Swiss bookkeeping requires immutable * invoices in production, but during pilot/testing we need to * iterate. The gap left in the invoice number sequence is * intentional and documented; no attempt to "recycle" numbers. * * Reminders (and their PDFs) cascade-delete via the FK. */ export async function deleteInvoice(id: string): Promise { await ensureSchema(); const result = await getPool().query( "DELETE FROM invoices WHERE id = $1 RETURNING id", [id] ); return (result.rowCount ?? 0) > 0; } /** * Has this tenant ever been billed a setup fee? Drives the * compute pipeline's "include setup line on first invoice" * decision. Looks at invoice_lines directly so it survives org * billing config edits. */ export async function tenantHasSetupFeeBilled( tenantName: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT 1 FROM invoice_lines WHERE tenant_name = $1 AND kind = 'tenant_setup' LIMIT 1`, [tenantName] ); return result.rows.length > 0; } /** * Has this (tenant, skill) pair already appeared on any prior * invoice line — either as setup or usage? Drives the per-skill * setup-fee gate. Same "first appearance" semantics as the tenant * setup fee: a previously-free skill that newly gets a setup fee * configured will trigger the fee on its next billed period. * * Uses metadata->>'skill_id' (which is what both skill_setup and * skill_usage lines store) rather than parsing description. */ export async function tenantSkillHasBeenBilled( tenantName: string, skillId: string ): Promise { await ensureSchema(); const result = await getPool().query( `SELECT 1 FROM invoice_lines WHERE tenant_name = $1 AND kind IN ('skill_setup', 'skill_usage') AND metadata->>'skill_id' = $2 LIMIT 1`, [tenantName, skillId] ); return result.rows.length > 0; } /** * Aggregate open balance per org for the admin overview. Returns * orgs with at least one open or overdue invoice; orgs in good * standing don't appear. */ export async function getOrgOpenBalances(): Promise<{ zitadelOrgId: string; openCount: number; overdueCount: number; totalOpenChf: number; }[]> { await ensureSchema(); const result = await getPool().query( `SELECT zitadel_org_id, COUNT(*) FILTER (WHERE status = 'open') AS open_count, COUNT(*) FILTER (WHERE status = 'overdue') AS overdue_count, SUM(total_chf) FILTER (WHERE status IN ('open', 'overdue')) AS total_open FROM invoices WHERE status IN ('open', 'overdue') GROUP BY zitadel_org_id ORDER BY total_open DESC` ); return result.rows.map((r) => ({ zitadelOrgId: r.zitadel_org_id, openCount: Number(r.open_count), overdueCount: Number(r.overdue_count), totalOpenChf: Number(r.total_open), })); } /** * Update the stored PDF for an invoice. Used by the two-pass * compute pipeline: insert invoice with empty PDF → render PDF with * the allocated invoice number → write bytes back. * * Could be merged into createInvoice via a render callback in a * future cleanup, but two passes are simpler and the extra UPDATE * is cheap. */ export async function updateInvoicePdf( invoiceId: string, pdfBuffer: Buffer, filename: string ): Promise { await ensureSchema(); await getPool().query( "UPDATE invoices SET pdf_data = $2, pdf_filename = $3 WHERE id = $1", [invoiceId, pdfBuffer, filename] ); } // --------------------------------------------------------------------------- // Skill activation requests — Phase 2.5 // --------------------------------------------------------------------------- import type { SkillActivationRequest, SkillActivationStatus, } from "@/types"; function rowToSkillActivationRequest(row: any): SkillActivationRequest { return { id: row.id, tenantName: row.tenant_name, zitadelOrgId: row.zitadel_org_id, zitadelUserId: row.zitadel_user_id, skillId: row.skill_id, status: row.status as SkillActivationStatus, requestedAt: row.requested_at?.toISOString?.() ?? row.requested_at, reviewedAt: row.reviewed_at?.toISOString?.() ?? row.reviewed_at ?? null, reviewedBy: row.reviewed_by ?? null, rejectionReason: row.rejection_reason ?? null, adminNotes: row.admin_notes ?? null, }; } /** * Insert a pending activation request. Throws a tagged error if a * pending row already exists for the (tenant, skill) — the partial * unique index enforces this. The caller surfaces "request already * pending" to the user rather than letting it 500. */ export async function createSkillActivationRequest(params: { tenantName: string; zitadelOrgId: string; zitadelUserId: string; skillId: string; }): Promise { await ensureSchema(); try { const result = await getPool().query( `INSERT INTO skill_activation_requests (tenant_name, zitadel_org_id, zitadel_user_id, skill_id) VALUES ($1, $2, $3, $4) RETURNING *`, [ params.tenantName, params.zitadelOrgId, params.zitadelUserId, params.skillId, ] ); return rowToSkillActivationRequest(result.rows[0]); } catch (e: any) { if (e?.code === "23505") { const err = new Error( `A pending activation request already exists for ${params.skillId} on ${params.tenantName}.` ); (err as any).code = "REQUEST_ALREADY_PENDING"; throw err; } throw e; } } export async function getSkillActivationRequestById( id: string ): Promise { await ensureSchema(); const result = await getPool().query( "SELECT * FROM skill_activation_requests WHERE id = $1", [id] ); return result.rows.length > 0 ? rowToSkillActivationRequest(result.rows[0]) : null; } /** * All pending requests across all tenants — feeds the admin queue * page. Capped to 500 rows for safety; unlikely to ever be hit but * keeps the page bounded. */ export async function listPendingSkillActivationRequests(): Promise< SkillActivationRequest[] > { await ensureSchema(); const result = await getPool().query( `SELECT * FROM skill_activation_requests WHERE status = 'pending' ORDER BY requested_at ASC LIMIT 500` ); return result.rows.map(rowToSkillActivationRequest); } export async function countPendingSkillActivationRequests(): Promise { await ensureSchema(); const result = await getPool().query( "SELECT COUNT(*)::int AS c FROM skill_activation_requests WHERE status = 'pending'" ); return result.rows[0]?.c ?? 0; } /** * Requests visible to a customer for one tenant. Returns: * - pending: rows the user might want to withdraw * - rejected: the most recent rejection per skill, so the user * sees why and can retry * Approved and withdrawn rows are excluded (terminal states with * no user-visible UI effect after the fact). */ export async function listSkillActivationRequestsForTenant( tenantName: string ): Promise { await ensureSchema(); const result = await getPool().query( `WITH ranked AS ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY skill_id, status ORDER BY requested_at DESC ) AS rn FROM skill_activation_requests WHERE tenant_name = $1 AND status IN ('pending', 'rejected') ) SELECT * FROM ranked WHERE rn = 1 ORDER BY requested_at DESC`, [tenantName] ); return result.rows.map(rowToSkillActivationRequest); } /** * Transition a request's status. The caller is responsible for the * side effects (K8s patch on approve, email send) — this function * only touches the row. * * Returns null when the row doesn't exist or isn't in 'pending' * status. That null is meaningful: it tells the caller the * transition didn't happen (already approved/rejected by another * admin tab, etc.) and downstream actions should be skipped. */ export async function updateSkillActivationRequestStatus( id: string, newStatus: Exclude, opts: { reviewedBy: string; rejectionReason?: string | null; adminNotes?: string | null; } ): Promise { await ensureSchema(); const result = await getPool().query( `UPDATE skill_activation_requests SET status = $2, reviewed_at = now(), reviewed_by = $3, rejection_reason = $4, admin_notes = COALESCE($5, admin_notes) WHERE id = $1 AND status = 'pending' RETURNING *`, [id, newStatus, opts.reviewedBy, opts.rejectionReason ?? null, opts.adminNotes ?? null] ); return result.rows.length > 0 ? rowToSkillActivationRequest(result.rows[0]) : null; }