Database Operations (@happ-integ/db)
Обзор
Вместо TypeORM используется пакет @happ-integ/db - универсальная абстракция для работы с базами данных.
Поддерживаемые провайдеры:
- Cloudflare D1 (default)
- Neon PostgreSQL
Инициализация
Lazy Init Pattern
import { DB } from "@happ-integ/db";
let _db: DB | null = null;
export const getDb = (env: CloudflareBindings) =>
(_db ??= new DB({
provider: "d1",
d1: env.DB,
project: "sofa",
}));
// Для тестов - сброс клиента
export const __resetDb = () => {
_db = null;
};Использование в Handlers
export async function handleInit(payload: IInitPayload, env: CloudflareBindings) {
const db = getDb(env);
// CRUD операции
const record = await db.select("calls", { status: "pending" });
return { success: true, data: record };
}Важно: Автоматический prefix таблиц
Методы db.insert(), db.update(), db.select(), db.delete() автоматически добавляют prefix проекта к имени таблицы!
// Если project = "sofa", то:
await db.insert("calls", data); // SQL: INSERT INTO sofa_calls ...
await db.update("calls", data, w); // SQL: UPDATE sofa_calls ...
await db.select("calls", where); // SQL: SELECT * FROM sofa_calls ...
await db.delete("calls", where); // SQL: DELETE FROM sofa_calls ...❌ Частая ошибка - двойной prefix
// НЕПРАВИЛЬНО - получится sofa_sofa_calls!
await db.insert("sofa_calls", data);
await db.insert(`${integrationName}_calls`, data);
// ПРАВИЛЬНО - используй короткое имя
await db.insert("calls", data);Raw queries (db.query)
Для db.query() нужно использовать полное имя таблицы с prefix:
// db.query() НЕ добавляет prefix автоматически
const result = await db.query<{ count: number }>(
"SELECT COUNT(*) as count FROM sofa_calls WHERE status = 'pending'"
);
// Или динамически
const result = await db.query<{ count: number }>(
`SELECT COUNT(*) as count FROM ${integrationName}_calls WHERE status = 'pending'`
);Сводка
| Метод | Prefix добавляется? | Пример |
|---|---|---|
db.insert("calls", ...) | ✅ Да | → sofa_calls |
db.update("calls", ...) | ✅ Да | → sofa_calls |
db.select("calls", ...) | ✅ Да | → sofa_calls |
db.delete("calls", ...) | ✅ Да | → sofa_calls |
db.query("SELECT ... FROM sofa_calls") | ❌ Нет | Используй полное имя |
CRUD операции
SELECT - Получить данные
Базовый SELECT:
const db = getDb(env);
// Все записи
const allCalls = await db.select("calls");
// С условиями
const pendingCalls = await db.select("calls", { status: "pending" });
// Несколько условий (AND)
const filteredCalls = await db.select("calls", {
status: "pending",
assignee_id: "user123",
});С опциями (limit, offset, sort):
const calls = await db.select(
"calls",
{ status: "pending" },
{
limit: 10,
offset: 20,
sort: { created_at: "DESC" },
}
);COUNT - подсчёт записей:
const count = await db.query<{ count: number }>("SELECT COUNT(*) as count FROM calls WHERE status = 'pending'");
console.log(count.data?.[0]?.count);INSERT - Добавить данные
Одна запись:
const db = getDb(env);
await db.insert("calls", {
id: "call_123",
phone: "+79999999999",
status: "pending",
created_at: new Date().toISOString(),
});Несколько записей:
await db.insert("calls", [
{ id: "call_1", phone: "+79991111111", status: "pending" },
{ id: "call_2", phone: "+79992222222", status: "pending" },
{ id: "call_3", phone: "+79993333333", status: "pending" },
]);UPDATE - Изменить данные
Обновить запись:
await db.update(
"calls",
{ status: "completed", outcome: "successful" },
{ id: "call_123" } // WHERE условие
);Обновить несколько записей:
await db.update(
"calls",
{ status: "completed" },
{ assignee_id: "user_123" } // Обновит ВСЕ записи с этим assignee
);DELETE - Удалить данные
Удалить одну запись:
await db.delete("calls", { id: "call_123" });Удалить несколько записей:
await db.delete("calls", { status: "archived" });Примеры Use Cases
Пример 1: Создать и получить запись
export async function handleCreateCall(payload: ICreateCallPayload, env: CloudflareBindings) {
const db = getDb(env);
const callData = {
id: payload.id,
phone: payload.phone,
status: "initiated",
created_at: new Date().toISOString(),
};
try {
await db.insert("calls", callData);
const created = await db.select("calls", { id: payload.id });
return { success: true, data: created?.[0] };
} catch (error) {
logger.error("Failed to create call", error);
return { success: false, error: error.message };
}
}Пример 2: Обновить статус и получить обновлённую запись
export async function handleCompleteCall(payload: ICompleteCallPayload, env: CloudflareBindings) {
const db = getDb(env);
try {
await db.update(
"calls",
{
status: "completed",
outcome: payload.outcome,
transcript: payload.transcript,
updated_at: new Date().toISOString(),
},
{ id: payload.callId }
);
const updated = await db.select("calls", { id: payload.callId });
return { success: true, data: updated?.[0] };
} catch (error) {
return { success: false, error: error.message };
}
}Пример 3: Pagination
export async function handleListCalls(payload: IListCallsPayload, env: CloudflareBindings) {
const db = getDb(env);
const page = payload.page || 1;
const limit = payload.limit || 20;
const offset = (page - 1) * limit;
try {
const calls = await db.select(
"calls",
{ status: "completed" },
{
limit,
offset,
sort: { created_at: "DESC" },
}
);
const countResult = await db.query<{ count: number }>(
"SELECT COUNT(*) as count FROM calls WHERE status = 'completed'"
);
return {
success: true,
data: {
calls,
pagination: {
page,
limit,
total: countResult.data?.[0]?.count || 0,
pages: Math.ceil((countResult.data?.[0]?.count || 0) / limit),
},
},
};
} catch (error) {
return { success: false, error: error.message };
}
}Пример 4: Поиск с фильтрацией
export async function handleSearchCalls(payload: ISearchPayload, env: CloudflareBindings) {
const db = getDb(env);
try {
const filters: Record<string, unknown> = {};
if (payload.phone) {
filters.phone = payload.phone;
}
if (payload.status) {
filters.status = payload.status;
}
if (payload.assignee_id) {
filters.assignee_id = payload.assignee_id;
}
const results = await db.select("calls", filters);
return {
success: true,
data: {
found: results.length,
calls: results,
},
};
} catch (error) {
return { success: false, error: error.message };
}
}Миграции D1
Все миграции — code-based. Применяются через POST /setup на каждой интеграции. Единый механизм для local, dev и prod.
Как это работает
При вызове POST /{integration}/setup:
- Глобальные миграции (
GLOBAL_MIGRATIONSиз@happ-integ/core) —credsтаблица и др. - Миграции интеграции (массив
IMigration[]изcreateIntegration({ migrations }))
Все миграции идемпотентны (CREATE TABLE IF NOT EXISTS) и трекаются в таблице _migrations.
Определение миграций в коде
// integrations/sofa/src/migrations/index.ts
import type { IMigration } from "@happ-integ/core";
import { generateCallsMigration } from "@happ-integ/call-queue";
export const SOFA_MIGRATIONS: IMigration[] = [
generateCallsMigration("sofa"),
// Добавляй новые миграции сюда
];// integrations/sofa/src/index.ts
export default createIntegration({
name: "sofa",
migrations: SOFA_MIGRATIONS,
// ...
});Формат миграции
interface IMigration {
id?: string; // Уникальный ID (по умолчанию = name)
name: string; // Имя миграции
sql?: string; // SQL для применения
up?: string; // Алиас для sql
}Пошаговая инструкция: миграции для новой интеграции
Допустим, ты создаёшь интеграцию rozetka.
Шаг 1: Создать файл миграций
// integrations/rozetka/src/migrations/index.ts
import type { IMigration } from "@happ-integ/core";
export const ROZETKA_MIGRATIONS: IMigration[] = [
{
name: "rozetka/0001_create_orders",
sql: `
CREATE TABLE IF NOT EXISTS rozetka_orders (
id TEXT PRIMARY KEY,
external_id TEXT,
phone TEXT,
status TEXT NOT NULL DEFAULT 'pending',
data TEXT,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_rozetka_orders_status ON rozetka_orders(status);
CREATE INDEX IF NOT EXISTS idx_rozetka_orders_phone ON rozetka_orders(phone)
`,
},
];Шаг 2: Подключить в createIntegration()
// integrations/rozetka/src/index.ts
import { createIntegration } from "@happ-integ/core";
import { ROZETKA_MIGRATIONS } from "./migrations";
export default createIntegration({
name: "rozetka",
migrations: ROZETKA_MIGRATIONS,
// ... handlers, secrets, defaultConfigs
});Шаг 3: Применить миграции
# Локально
pnpm start rozetka
# POST http://localhost:8787/rozetka/setup
# Dev/Prod — после деплоя
# POST https://integ.dev.happ.tools/rozetka/setupПри вызове /setup автоматически применятся:
GLOBAL_MIGRATIONS(таблицаcreds) — из@happ-integ/coreROZETKA_MIGRATIONS— твои миграции
Шаг 4: Добавление миграций в будущем
Просто добавляй новые объекты в массив. Уже применённые миграции пропускаются (трекинг по name в _migrations):
export const ROZETKA_MIGRATIONS: IMigration[] = [
{
name: "rozetka/0001_create_orders",
sql: `...`, // уже применена — пропустится
},
{
name: "rozetka/0002_add_orders_comment",
sql: `ALTER TABLE rozetka_orders ADD COLUMN comment TEXT`,
},
{
name: "rozetka/0003_create_contacts",
sql: `
CREATE TABLE IF NOT EXISTS rozetka_contacts (
id TEXT PRIMARY KEY,
phone TEXT NOT NULL UNIQUE,
name TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_rozetka_contacts_phone ON rozetka_contacts(phone)
`,
},
];Затем вызови POST /rozetka/setup — применятся только новые (0002, 0003).
С использованием @happ-integ/call-queue
Если интеграция делает исходящие звонки, используй готовую миграцию:
import type { IMigration } from "@happ-integ/core";
import { generateCallsMigration } from "@happ-integ/call-queue";
export const ROZETKA_MIGRATIONS: IMigration[] = [
generateCallsMigration("rozetka"), // создаст rozetka_calls + 6 индексов
{
name: "rozetka/0002_create_contacts",
sql: `...`,
},
];Правила именования миграций
| Что | Формат | Пример |
|---|---|---|
| Имя миграции | {integration}/{номер}_{описание} | rozetka/0001_create_orders |
| Имя таблицы | {integration}_{таблица} | rozetka_orders |
| Имя индекса | idx_{integration}_{таблица}_{колонка} | idx_rozetka_orders_status |
Создание новой миграции
Добавь объект в массив миграций интеграции:
// integrations/sofa/src/migrations/index.ts
export const SOFA_MIGRATIONS: IMigration[] = [
generateCallsMigration("sofa"),
{
name: "sofa/0002_create_users",
sql: `
CREATE TABLE IF NOT EXISTS sofa_users (
id TEXT PRIMARY KEY,
external_id TEXT NOT NULL UNIQUE,
name TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_sofa_users_external ON sofa_users(external_id)
`,
},
];Затем вызови POST /{integration}/setup — новая миграция применится автоматически.
Именование таблиц
Таблицы интеграций обязательно с префиксом интеграции:
-- Правильно: sofa_calls, sofa_users
CREATE TABLE IF NOT EXISTS sofa_calls (...);
-- Неправильно: calls (без префикса — конфликт между интеграциями)Отслеживание миграций
Таблица _migrations:
SELECT * FROM _migrations;
-- id | name | applied_at
-- ----------------------------|-------------------------|--------------------
-- global/0001_create_creds | global/0001_create_creds| 2024-01-01 12:00:00
-- sofa/0001_create_calls | sofa/0001_create_calls | 2024-01-01 12:00:01Команды
# Проверить статус миграций
pnpm migrate:status # Локально
pnpm migrate:status:dev # Remote dev
pnpm migrate:status:prod # Remote prod
# Сброс локальной БД
pnpm setup:reset # Удалить data/miniflare и пересоздатьПрименение миграций по окружениям
| Окружение | Как применять |
|---|---|
| Local | pnpm start sofa → POST /sofa/setup |
| Dev | Deploy → POST /sofa/setup (или автоматически в CI/CD) |
| Prod | Deploy → POST /sofa/setup |
Бекапы и восстановление
# Экспорт всей базы
wrangler d1 export integ-db --remote --output=backup.sql
wrangler d1 export integ-db --remote --env production --output=backup-prod.sql
# Восстановление из бекапа
wrangler d1 execute integ-db --remote --file=backup.sqlTime Travel (Point-in-Time Recovery)
D1 хранит историю изменений 30 дней:
# Посмотреть доступные точки восстановления
wrangler d1 time-travel info integ-db --remote
# Восстановить на timestamp
wrangler d1 time-travel restore integ-db --remote --timestamp=2024-01-15T10:30:00Z
# Восстановить на bookmark
wrangler d1 time-travel restore integ-db --remote --bookmark=<BOOKMARK_ID>Рекомендации
- Бекап перед миграциями — особенно для production
- Идемпотентные миграции — используй
CREATE TABLE IF NOT EXISTS - Time Travel как страховка — для критичных случаев быстрее восстановить на timestamp
- Держи down-файлы — дисциплинирует писать обратимые миграции
API Reference
db.select(table, where?, options?)
interface SelectOptions {
limit?: number;
offset?: number;
sort?: Record<string, "ASC" | "DESC">;
}
// Возвращает массив записей
const rows = await db.select("calls", { status: "pending" }, { limit: 10 });db.insert(table, data)
// Одна запись
await db.insert("calls", { id: "1", status: "pending" });
// Несколько записей
await db.insert("calls", [
{ id: "1", status: "pending" },
{ id: "2", status: "pending" },
]);db.update(table, data, where)
// Обновить записи WHERE условие
await db.update("calls", { status: "completed" }, { id: "123" });db.delete(table, where)
// Удалить записи WHERE условие
await db.delete("calls", { id: "123" });db.query(sql)
// Выполнить сырой SQL запрос
const result = await db.query<{ count: number }>("SELECT COUNT(*) as count FROM calls");
const count = result.data?.[0]?.count;db.healthCheck()
// Проверить доступность БД
const isHealthy = await db.healthCheck();Типизация
Типы результатов
// Select возвращает массив или пусто
const records = await db.select("calls");
// records: any[] | null
// Insert не возвращает значение
await db.insert("calls", data);
// void
// Update не возвращает значение
await db.update("calls", data, where);
// void
// Query возвращает типизированный результат
interface ICount {
count: number;
}
const result = await db.query<ICount>("SELECT COUNT(*) as count FROM calls");
// result: { data: ICount[] | null, error?: string }Обработка ошибок
export async function handleSafeOperation(payload, env: CloudflareBindings) {
const db = getDb(env);
try {
const record = await db.select("calls", { id: payload.id });
if (!record) {
return { success: false, error: "Record not found" };
}
await db.update("calls", { status: "updated" }, { id: payload.id });
return { success: true, data: record };
} catch (error) {
logger.error("DB operation failed", error);
return {
success: false,
error: error instanceof Error ? error.message : "Unknown error",
};
}
}Связанные документы
- CODE_RULES.md - правила написания кода
- DEVELOPMENT.md - разработка
- packages/db - детали пакета DB