@happ-integ/call-queue
Очередь исходящих звонков с concurrency-контролем, retry-окнами и управлением жизненным циклом звонка.
Когда использовать
Если интеграции нужно делать исходящие звонки через Voice Assistant — используй этот пакет. Он решает:
- Ограничение одновременных звонков (concurrency limit)
- Очередь звонков (FIFO) при достижении лимита
- Дедупликация по номеру телефона (при enqueue и processQueue)
- Автоматический retry при неуспешных звонках (duration < threshold)
- Расписание retry по временным окнам (утро/день/вечер)
- Управление статусами звонка (queued → pending → in_call → waiting_postcall / pending_retry / skipped_duplicate)
- Генерация SQL-миграций для таблицы звонков
Установка
pnpm add @happ-integ/call-queueИспользование
1. Миграция
import { generateCallsMigration } from "@happ-integ/call-queue";
import type { IMigration } from "@happ-integ/core";
export const MIGRATIONS: IMigration[] = [generateCallsMigration("my-crm")];
// Создаст таблицу my-crm_calls с 6 индексами2. Создание CallQueue
import { CallQueue, DEFAULT_RETRY, DEFAULT_QUEUE_CONFIG } from "@happ-integ/call-queue";
import { SaasApiClient } from "@happ-integ/saas-api";
const saasClient = new SaasApiClient(env.SAAS_API_URL, env.SAAS_ACCESS_TOKEN);
const queue = new CallQueue({
db,
tableName: "calls",
integrationPrefix: "my-crm",
queueConfig: {
concurrency_limit: 3,
failed_call_duration_threshold: 5,
},
retryConfig: DEFAULT_RETRY,
originate: async (phone) => {
const result = await saasClient.originateCall(assistantId, phone);
return { vaCallId: result.callId! };
},
log,
});3. В handlers
// call-originate handler
const result = await queue.enqueue(phone_number);
// { queued: true/false, deduplicated?: true, callId: "...", vaCallId?: "..." }
// call-events handler (media_start)
await queue.handleCallStarted(callId, { assistantId, assistantPhone });
// call-events handler (media_end)
const result = await queue.handleCallEnded(callId, duration, clientPhone);
// { status: "pending_retry" | "waiting_postcall", scheduledRetry, processQueueResult }
// retry handler (cron)
const result = await queue.processQueue();
// { processed: N, skipped: N, failed_max_attempts: N, skipped_duplicate: N }4. Factory-паттерн (рекомендуется)
Создайте async-фабрику в utils/create-call-queue.util.ts:
import { CallQueue, DEFAULT_RETRY } from "@happ-integ/call-queue";
import type { IRetryConfig } from "@happ-integ/call-queue";
import { SaasApiClient } from "@happ-integ/saas-api";
export async function createMyCallQueue(deps: { db, config, env, creds, log, integrationName }) {
const { db, config, env, creds, log, integrationName } = deps;
const retryConfig = (await config.get<IRetryConfig>("retry")) ?? DEFAULT_RETRY;
const sofaCreds = await creds.get<ICredentials>(integrationName);
const saasClient = new SaasApiClient(env.SAAS_API_URL, env.SAAS_ACCESS_TOKEN);
return new CallQueue({
db,
tableName: "calls",
integrationPrefix: integrationName,
queueConfig: { concurrency_limit: 3, failed_call_duration_threshold: 5 },
retryConfig,
originate: async (phone) => {
const result = await saasClient.originateCall(sofaCreds.SAAS_ASSISTANT_ID, phone);
return { vaCallId: result.callId! };
},
log,
});
}API
CallQueue
new CallQueue(options: ICallQueueOptions)Options:
| Параметр | Тип | Описание |
|---|---|---|
db | ICallQueueDB | DB-клиент (query/insert/update) |
tableName | string | Имя таблицы без префикса ("calls") |
integrationPrefix | string | Префикс для raw SQL ("sofa" → sofa_calls) |
queueConfig | IQueueConfig | Лимиты concurrency и threshold |
retryConfig | IRetryConfig | Кол-во попыток и временные окна |
originate | OriginateFn | Callback для инициации звонка |
log | ICallQueueLogger | Логгер (info/error) |
Методы
enqueue(phone): Promise<IEnqueueResult>
Поставить звонок в очередь или инициировать сразу (если есть свободный слот). Если для данного номера уже есть активная запись (queued/pending/in_call/pending_retry), возвращает { queued: false, deduplicated: true, callId: "" } без создания дубликата.
handleCallStarted(vaCallId, metadata?): Promise<ICallStartedResult>
Обработать событие начала звонка (media_start).
handleCallEnded(vaCallId, duration, clientPhone): Promise<ICallEndedResult>
Обработать событие окончания звонка (media_end). Автоматически планирует retry или ставит waiting_postcall.
processQueue(): Promise<IProcessQueueResult>
Обработать очередь: запустить queued-звонки и pending_retry (если подошло время). Дубликаты (звонки на номер, для которого уже есть active call) помечаются skipped_duplicate и не инициируются.
Утилиты
getNextRetryWindow(windows?, fromDate?): string
Вычислить следующее временное окно для retry. Возвращает ISO-строку.
generateCallsMigration(prefix, tableName?): IMigration
Сгенерировать SQL-миграцию для таблицы звонков.
Defaults
import { DEFAULT_RETRY, DEFAULT_QUEUE_CONFIG } from "@happ-integ/call-queue";
DEFAULT_RETRY = {
max_attempts: 3,
windows: [
{ hour: 10, minute: 40 },
{ hour: 13, minute: 40 },
{ hour: 16, minute: 40 },
],
};
DEFAULT_QUEUE_CONFIG = {
concurrency_limit: 3,
failed_call_duration_threshold: 5,
};Жизненный цикл звонка
enqueue() ──► queued ──► pending ──► in_call ──► waiting_postcall ──► completed
│ │ │
│ │ (duration < threshold)
│ │ │
│ │ ▼
│ └──── processQueue() ◄── pending_retry ──► failed (max attempts)
│ │
│ (duplicate phone in queue)
│ │
│ ▼
│ skipped_duplicate
│
(duplicate phone active)
│
▼
deduplicated (not created)