Skip to content

Database Operations (@happ-integ/db)

Обзор

Вместо TypeORM используется пакет @happ-integ/db - универсальная абстракция для работы с базами данных.

Поддерживаемые провайдеры:

  • Cloudflare D1 (default)
  • Neon PostgreSQL

Инициализация

Lazy Init Pattern

typescript
import { DB } from "@happ-integ/db";

let _db: DB | null = null;

export const getDb = (env: CloudflareBindings) =>
	(_db ??= new DB({
		provider: "d1",
		d1: env.DB,
		project: "sofa",
	}));

// Для тестов - сброс клиента
export const __resetDb = () => {
	_db = null;
};

Использование в Handlers

typescript
export async function handleInit(payload: IInitPayload, env: CloudflareBindings) {
	const db = getDb(env);

	// CRUD операции
	const record = await db.select("calls", { status: "pending" });
	return { success: true, data: record };
}

Важно: Автоматический prefix таблиц

Методы db.insert(), db.update(), db.select(), db.delete() автоматически добавляют prefix проекта к имени таблицы!

typescript
// Если project = "sofa", то:
await db.insert("calls", data);     // SQL: INSERT INTO sofa_calls ...
await db.update("calls", data, w);  // SQL: UPDATE sofa_calls ...
await db.select("calls", where);    // SQL: SELECT * FROM sofa_calls ...
await db.delete("calls", where);    // SQL: DELETE FROM sofa_calls ...

❌ Частая ошибка - двойной prefix

typescript
// НЕПРАВИЛЬНО - получится sofa_sofa_calls!
await db.insert("sofa_calls", data);
await db.insert(`${integrationName}_calls`, data);

// ПРАВИЛЬНО - используй короткое имя
await db.insert("calls", data);

Raw queries (db.query)

Для db.query() нужно использовать полное имя таблицы с prefix:

typescript
// db.query() НЕ добавляет prefix автоматически
const result = await db.query<{ count: number }>(
  "SELECT COUNT(*) as count FROM sofa_calls WHERE status = 'pending'"
);

// Или динамически
const result = await db.query<{ count: number }>(
  `SELECT COUNT(*) as count FROM ${integrationName}_calls WHERE status = 'pending'`
);

Сводка

МетодPrefix добавляется?Пример
db.insert("calls", ...)✅ Даsofa_calls
db.update("calls", ...)✅ Даsofa_calls
db.select("calls", ...)✅ Даsofa_calls
db.delete("calls", ...)✅ Даsofa_calls
db.query("SELECT ... FROM sofa_calls")❌ НетИспользуй полное имя

CRUD операции

SELECT - Получить данные

Базовый SELECT:

typescript
const db = getDb(env);

// Все записи
const allCalls = await db.select("calls");

// С условиями
const pendingCalls = await db.select("calls", { status: "pending" });

// Несколько условий (AND)
const filteredCalls = await db.select("calls", {
	status: "pending",
	assignee_id: "user123",
});

С опциями (limit, offset, sort):

typescript
const calls = await db.select(
	"calls",
	{ status: "pending" },
	{
		limit: 10,
		offset: 20,
		sort: { created_at: "DESC" },
	}
);

COUNT - подсчёт записей:

typescript
const count = await db.query<{ count: number }>("SELECT COUNT(*) as count FROM calls WHERE status = 'pending'");
console.log(count.data?.[0]?.count);

INSERT - Добавить данные

Одна запись:

typescript
const db = getDb(env);

await db.insert("calls", {
	id: "call_123",
	phone: "+79999999999",
	status: "pending",
	created_at: new Date().toISOString(),
});

Несколько записей:

typescript
await db.insert("calls", [
	{ id: "call_1", phone: "+79991111111", status: "pending" },
	{ id: "call_2", phone: "+79992222222", status: "pending" },
	{ id: "call_3", phone: "+79993333333", status: "pending" },
]);

UPDATE - Изменить данные

Обновить запись:

typescript
await db.update(
	"calls",
	{ status: "completed", outcome: "successful" },
	{ id: "call_123" } // WHERE условие
);

Обновить несколько записей:

typescript
await db.update(
	"calls",
	{ status: "completed" },
	{ assignee_id: "user_123" } // Обновит ВСЕ записи с этим assignee
);

DELETE - Удалить данные

Удалить одну запись:

typescript
await db.delete("calls", { id: "call_123" });

Удалить несколько записей:

typescript
await db.delete("calls", { status: "archived" });

Примеры Use Cases

Пример 1: Создать и получить запись

typescript
export async function handleCreateCall(payload: ICreateCallPayload, env: CloudflareBindings) {
	const db = getDb(env);

	const callData = {
		id: payload.id,
		phone: payload.phone,
		status: "initiated",
		created_at: new Date().toISOString(),
	};

	try {
		await db.insert("calls", callData);

		const created = await db.select("calls", { id: payload.id });
		return { success: true, data: created?.[0] };
	} catch (error) {
		logger.error("Failed to create call", error);
		return { success: false, error: error.message };
	}
}

Пример 2: Обновить статус и получить обновлённую запись

typescript
export async function handleCompleteCall(payload: ICompleteCallPayload, env: CloudflareBindings) {
	const db = getDb(env);

	try {
		await db.update(
			"calls",
			{
				status: "completed",
				outcome: payload.outcome,
				transcript: payload.transcript,
				updated_at: new Date().toISOString(),
			},
			{ id: payload.callId }
		);

		const updated = await db.select("calls", { id: payload.callId });
		return { success: true, data: updated?.[0] };
	} catch (error) {
		return { success: false, error: error.message };
	}
}

Пример 3: Pagination

typescript
export async function handleListCalls(payload: IListCallsPayload, env: CloudflareBindings) {
	const db = getDb(env);

	const page = payload.page || 1;
	const limit = payload.limit || 20;
	const offset = (page - 1) * limit;

	try {
		const calls = await db.select(
			"calls",
			{ status: "completed" },
			{
				limit,
				offset,
				sort: { created_at: "DESC" },
			}
		);

		const countResult = await db.query<{ count: number }>(
			"SELECT COUNT(*) as count FROM calls WHERE status = 'completed'"
		);

		return {
			success: true,
			data: {
				calls,
				pagination: {
					page,
					limit,
					total: countResult.data?.[0]?.count || 0,
					pages: Math.ceil((countResult.data?.[0]?.count || 0) / limit),
				},
			},
		};
	} catch (error) {
		return { success: false, error: error.message };
	}
}

Пример 4: Поиск с фильтрацией

typescript
export async function handleSearchCalls(payload: ISearchPayload, env: CloudflareBindings) {
	const db = getDb(env);

	try {
		const filters: Record<string, unknown> = {};

		if (payload.phone) {
			filters.phone = payload.phone;
		}
		if (payload.status) {
			filters.status = payload.status;
		}
		if (payload.assignee_id) {
			filters.assignee_id = payload.assignee_id;
		}

		const results = await db.select("calls", filters);

		return {
			success: true,
			data: {
				found: results.length,
				calls: results,
			},
		};
	} catch (error) {
		return { success: false, error: error.message };
	}
}

Миграции D1

Все миграции — code-based. Применяются через POST /setup на каждой интеграции. Единый механизм для local, dev и prod.

Как это работает

При вызове POST /{integration}/setup:

  1. Глобальные миграции (GLOBAL_MIGRATIONS из @happ-integ/core) — creds таблица и др.
  2. Миграции интеграции (массив IMigration[] из createIntegration({ migrations }))

Все миграции идемпотентны (CREATE TABLE IF NOT EXISTS) и трекаются в таблице _migrations.

Определение миграций в коде

typescript
// integrations/sofa/src/migrations/index.ts
import type { IMigration } from "@happ-integ/core";
import { generateCallsMigration } from "@happ-integ/call-queue";

export const SOFA_MIGRATIONS: IMigration[] = [
  generateCallsMigration("sofa"),
  // Добавляй новые миграции сюда
];
typescript
// integrations/sofa/src/index.ts
export default createIntegration({
  name: "sofa",
  migrations: SOFA_MIGRATIONS,
  // ...
});

Формат миграции

typescript
interface IMigration {
  id?: string;    // Уникальный ID (по умолчанию = name)
  name: string;   // Имя миграции
  sql?: string;   // SQL для применения
  up?: string;    // Алиас для sql
}

Пошаговая инструкция: миграции для новой интеграции

Допустим, ты создаёшь интеграцию rozetka.

Шаг 1: Создать файл миграций

typescript
// integrations/rozetka/src/migrations/index.ts
import type { IMigration } from "@happ-integ/core";

export const ROZETKA_MIGRATIONS: IMigration[] = [
  {
    name: "rozetka/0001_create_orders",
    sql: `
      CREATE TABLE IF NOT EXISTS rozetka_orders (
        id TEXT PRIMARY KEY,
        external_id TEXT,
        phone TEXT,
        status TEXT NOT NULL DEFAULT 'pending',
        data TEXT,
        created_at TEXT DEFAULT (datetime('now')),
        updated_at TEXT DEFAULT (datetime('now'))
      );
      CREATE INDEX IF NOT EXISTS idx_rozetka_orders_status ON rozetka_orders(status);
      CREATE INDEX IF NOT EXISTS idx_rozetka_orders_phone ON rozetka_orders(phone)
    `,
  },
];

Шаг 2: Подключить в createIntegration()

typescript
// integrations/rozetka/src/index.ts
import { createIntegration } from "@happ-integ/core";
import { ROZETKA_MIGRATIONS } from "./migrations";

export default createIntegration({
  name: "rozetka",
  migrations: ROZETKA_MIGRATIONS,
  // ... handlers, secrets, defaultConfigs
});

Шаг 3: Применить миграции

bash
# Локально
pnpm start rozetka
# POST http://localhost:8787/rozetka/setup

# Dev/Prod — после деплоя
# POST https://integ.dev.happ.tools/rozetka/setup

При вызове /setup автоматически применятся:

  1. GLOBAL_MIGRATIONS (таблица creds) — из @happ-integ/core
  2. ROZETKA_MIGRATIONS — твои миграции

Шаг 4: Добавление миграций в будущем

Просто добавляй новые объекты в массив. Уже применённые миграции пропускаются (трекинг по name в _migrations):

typescript
export const ROZETKA_MIGRATIONS: IMigration[] = [
  {
    name: "rozetka/0001_create_orders",
    sql: `...`,  // уже применена — пропустится
  },
  {
    name: "rozetka/0002_add_orders_comment",
    sql: `ALTER TABLE rozetka_orders ADD COLUMN comment TEXT`,
  },
  {
    name: "rozetka/0003_create_contacts",
    sql: `
      CREATE TABLE IF NOT EXISTS rozetka_contacts (
        id TEXT PRIMARY KEY,
        phone TEXT NOT NULL UNIQUE,
        name TEXT,
        created_at TEXT DEFAULT (datetime('now'))
      );
      CREATE INDEX IF NOT EXISTS idx_rozetka_contacts_phone ON rozetka_contacts(phone)
    `,
  },
];

Затем вызови POST /rozetka/setup — применятся только новые (0002, 0003).

С использованием @happ-integ/call-queue

Если интеграция делает исходящие звонки, используй готовую миграцию:

typescript
import type { IMigration } from "@happ-integ/core";
import { generateCallsMigration } from "@happ-integ/call-queue";

export const ROZETKA_MIGRATIONS: IMigration[] = [
  generateCallsMigration("rozetka"),  // создаст rozetka_calls + 6 индексов
  {
    name: "rozetka/0002_create_contacts",
    sql: `...`,
  },
];

Правила именования миграций

ЧтоФорматПример
Имя миграции{integration}/{номер}_{описание}rozetka/0001_create_orders
Имя таблицы{integration}_{таблица}rozetka_orders
Имя индексаidx_{integration}_{таблица}_{колонка}idx_rozetka_orders_status

Создание новой миграции

Добавь объект в массив миграций интеграции:

typescript
// integrations/sofa/src/migrations/index.ts
export const SOFA_MIGRATIONS: IMigration[] = [
  generateCallsMigration("sofa"),
  {
    name: "sofa/0002_create_users",
    sql: `
      CREATE TABLE IF NOT EXISTS sofa_users (
        id TEXT PRIMARY KEY,
        external_id TEXT NOT NULL UNIQUE,
        name TEXT,
        created_at TEXT DEFAULT (datetime('now'))
      );
      CREATE INDEX IF NOT EXISTS idx_sofa_users_external ON sofa_users(external_id)
    `,
  },
];

Затем вызови POST /{integration}/setup — новая миграция применится автоматически.

Именование таблиц

Таблицы интеграций обязательно с префиксом интеграции:

sql
-- Правильно: sofa_calls, sofa_users
CREATE TABLE IF NOT EXISTS sofa_calls (...);

-- Неправильно: calls (без префикса — конфликт между интеграциями)

Отслеживание миграций

Таблица _migrations:

sql
SELECT * FROM _migrations;

-- id                          | name                    | applied_at
-- ----------------------------|-------------------------|--------------------
-- global/0001_create_creds    | global/0001_create_creds| 2024-01-01 12:00:00
-- sofa/0001_create_calls      | sofa/0001_create_calls  | 2024-01-01 12:00:01

Команды

bash
# Проверить статус миграций
pnpm migrate:status              # Локально
pnpm migrate:status:dev          # Remote dev
pnpm migrate:status:prod         # Remote prod

# Сброс локальной БД
pnpm setup:reset                 # Удалить data/miniflare и пересоздать

Применение миграций по окружениям

ОкружениеКак применять
Localpnpm start sofaPOST /sofa/setup
DevDeploy → POST /sofa/setup (или автоматически в CI/CD)
ProdDeploy → POST /sofa/setup

Бекапы и восстановление

bash
# Экспорт всей базы
wrangler d1 export integ-db --remote --output=backup.sql
wrangler d1 export integ-db --remote --env production --output=backup-prod.sql

# Восстановление из бекапа
wrangler d1 execute integ-db --remote --file=backup.sql

Time Travel (Point-in-Time Recovery)

D1 хранит историю изменений 30 дней:

bash
# Посмотреть доступные точки восстановления
wrangler d1 time-travel info integ-db --remote

# Восстановить на timestamp
wrangler d1 time-travel restore integ-db --remote --timestamp=2024-01-15T10:30:00Z

# Восстановить на bookmark
wrangler d1 time-travel restore integ-db --remote --bookmark=<BOOKMARK_ID>

Рекомендации

  1. Бекап перед миграциями — особенно для production
  2. Идемпотентные миграции — используй CREATE TABLE IF NOT EXISTS
  3. Time Travel как страховка — для критичных случаев быстрее восстановить на timestamp
  4. Держи down-файлы — дисциплинирует писать обратимые миграции

API Reference

db.select(table, where?, options?)

typescript
interface SelectOptions {
	limit?: number;
	offset?: number;
	sort?: Record<string, "ASC" | "DESC">;
}

// Возвращает массив записей
const rows = await db.select("calls", { status: "pending" }, { limit: 10 });

db.insert(table, data)

typescript
// Одна запись
await db.insert("calls", { id: "1", status: "pending" });

// Несколько записей
await db.insert("calls", [
	{ id: "1", status: "pending" },
	{ id: "2", status: "pending" },
]);

db.update(table, data, where)

typescript
// Обновить записи WHERE условие
await db.update("calls", { status: "completed" }, { id: "123" });

db.delete(table, where)

typescript
// Удалить записи WHERE условие
await db.delete("calls", { id: "123" });

db.query(sql)

typescript
// Выполнить сырой SQL запрос
const result = await db.query<{ count: number }>("SELECT COUNT(*) as count FROM calls");
const count = result.data?.[0]?.count;

db.healthCheck()

typescript
// Проверить доступность БД
const isHealthy = await db.healthCheck();

Типизация

Типы результатов

typescript
// Select возвращает массив или пусто
const records = await db.select("calls");
// records: any[] | null

// Insert не возвращает значение
await db.insert("calls", data);
// void

// Update не возвращает значение
await db.update("calls", data, where);
// void

// Query возвращает типизированный результат
interface ICount {
	count: number;
}
const result = await db.query<ICount>("SELECT COUNT(*) as count FROM calls");
// result: { data: ICount[] | null, error?: string }

Обработка ошибок

typescript
export async function handleSafeOperation(payload, env: CloudflareBindings) {
	const db = getDb(env);

	try {
		const record = await db.select("calls", { id: payload.id });

		if (!record) {
			return { success: false, error: "Record not found" };
		}

		await db.update("calls", { status: "updated" }, { id: payload.id });

		return { success: true, data: record };
	} catch (error) {
		logger.error("DB operation failed", error);
		return {
			success: false,
			error: error instanceof Error ? error.message : "Unknown error",
		};
	}
}

Связанные документы