Skip to content

@happ-integ/call-queue

Очередь исходящих звонков с concurrency-контролем, retry-окнами и управлением жизненным циклом звонка.

Когда использовать

Если интеграции нужно делать исходящие звонки через Voice Assistant — используй этот пакет. Он решает:

  • Ограничение одновременных звонков (concurrency limit)
  • Очередь звонков (FIFO) при достижении лимита
  • Дедупликация по номеру телефона (при enqueue и processQueue)
  • Автоматический retry при неуспешных звонках (duration < threshold)
  • Расписание retry по временным окнам (утро/день/вечер)
  • Управление статусами звонка (queued → pending → in_call → waiting_postcall / pending_retry / skipped_duplicate)
  • Генерация SQL-миграций для таблицы звонков

Установка

bash
pnpm add @happ-integ/call-queue

Использование

1. Миграция

typescript
import { generateCallsMigration } from "@happ-integ/call-queue";
import type { IMigration } from "@happ-integ/core";

export const MIGRATIONS: IMigration[] = [generateCallsMigration("my-crm")];
// Создаст таблицу my-crm_calls с 6 индексами

2. Создание CallQueue

typescript
import { CallQueue, DEFAULT_RETRY, DEFAULT_QUEUE_CONFIG } from "@happ-integ/call-queue";
import { SaasApiClient } from "@happ-integ/saas-api";

const saasClient = new SaasApiClient(env.SAAS_API_URL, env.SAAS_ACCESS_TOKEN);

const queue = new CallQueue({
  db,
  tableName: "calls",
  integrationPrefix: "my-crm",
  queueConfig: {
    concurrency_limit: 3,
    failed_call_duration_threshold: 5,
  },
  retryConfig: DEFAULT_RETRY,
  originate: async (phone) => {
    const result = await saasClient.originateCall(assistantId, phone);
    return { vaCallId: result.callId! };
  },
  log,
});

3. В handlers

typescript
// call-originate handler
const result = await queue.enqueue(phone_number);
// { queued: true/false, deduplicated?: true, callId: "...", vaCallId?: "..." }

// call-events handler (media_start)
await queue.handleCallStarted(callId, { assistantId, assistantPhone });

// call-events handler (media_end)
const result = await queue.handleCallEnded(callId, duration, clientPhone);
// { status: "pending_retry" | "waiting_postcall", scheduledRetry, processQueueResult }

// retry handler (cron)
const result = await queue.processQueue();
// { processed: N, skipped: N, failed_max_attempts: N, skipped_duplicate: N }

4. Factory-паттерн (рекомендуется)

Создайте async-фабрику в utils/create-call-queue.util.ts:

typescript
import { CallQueue, DEFAULT_RETRY } from "@happ-integ/call-queue";
import type { IRetryConfig } from "@happ-integ/call-queue";
import { SaasApiClient } from "@happ-integ/saas-api";

export async function createMyCallQueue(deps: { db, config, env, creds, log, integrationName }) {
  const { db, config, env, creds, log, integrationName } = deps;

  const retryConfig = (await config.get<IRetryConfig>("retry")) ?? DEFAULT_RETRY;
  const sofaCreds = await creds.get<ICredentials>(integrationName);
  const saasClient = new SaasApiClient(env.SAAS_API_URL, env.SAAS_ACCESS_TOKEN);

  return new CallQueue({
    db,
    tableName: "calls",
    integrationPrefix: integrationName,
    queueConfig: { concurrency_limit: 3, failed_call_duration_threshold: 5 },
    retryConfig,
    originate: async (phone) => {
      const result = await saasClient.originateCall(sofaCreds.SAAS_ASSISTANT_ID, phone);
      return { vaCallId: result.callId! };
    },
    log,
  });
}

API

CallQueue

typescript
new CallQueue(options: ICallQueueOptions)

Options:

ПараметрТипОписание
dbICallQueueDBDB-клиент (query/insert/update)
tableNamestringИмя таблицы без префикса ("calls")
integrationPrefixstringПрефикс для raw SQL ("sofa"sofa_calls)
queueConfigIQueueConfigЛимиты concurrency и threshold
retryConfigIRetryConfigКол-во попыток и временные окна
originateOriginateFnCallback для инициации звонка
logICallQueueLoggerЛоггер (info/error)

Методы

enqueue(phone): Promise<IEnqueueResult>

Поставить звонок в очередь или инициировать сразу (если есть свободный слот). Если для данного номера уже есть активная запись (queued/pending/in_call/pending_retry), возвращает { queued: false, deduplicated: true, callId: "" } без создания дубликата.

handleCallStarted(vaCallId, metadata?): Promise<ICallStartedResult>

Обработать событие начала звонка (media_start).

handleCallEnded(vaCallId, duration, clientPhone): Promise<ICallEndedResult>

Обработать событие окончания звонка (media_end). Автоматически планирует retry или ставит waiting_postcall.

processQueue(): Promise<IProcessQueueResult>

Обработать очередь: запустить queued-звонки и pending_retry (если подошло время). Дубликаты (звонки на номер, для которого уже есть active call) помечаются skipped_duplicate и не инициируются.

Утилиты

getNextRetryWindow(windows?, fromDate?): string

Вычислить следующее временное окно для retry. Возвращает ISO-строку.

generateCallsMigration(prefix, tableName?): IMigration

Сгенерировать SQL-миграцию для таблицы звонков.

Defaults

typescript
import { DEFAULT_RETRY, DEFAULT_QUEUE_CONFIG } from "@happ-integ/call-queue";

DEFAULT_RETRY = {
  max_attempts: 3,
  windows: [
    { hour: 10, minute: 40 },
    { hour: 13, minute: 40 },
    { hour: 16, minute: 40 },
  ],
};

DEFAULT_QUEUE_CONFIG = {
  concurrency_limit: 3,
  failed_call_duration_threshold: 5,
};

Жизненный цикл звонка

enqueue() ──► queued ──► pending ──► in_call ──► waiting_postcall ──► completed
   │            │                      │
   │            │         (duration < threshold)
   │            │                      │
   │            │                      ▼
   │            └──── processQueue() ◄── pending_retry ──► failed (max attempts)
   │            │
   │   (duplicate phone in queue)
   │            │
   │            ▼
   │     skipped_duplicate

(duplicate phone active)


 deduplicated (not created)