Если вы когда-нибудь psql для выборок из PostgreSQL, то едва ли вы были удовлетворены результатом, т.к. он не адаптирован для чтения. Выводится по-умолчанию результат с использованием стандартной утилиты more. Если вы любите консоль, то обращу ваше внимание на пагинатор для вывода результатов запросов pspg. Продукт с открытым исходным кодом, активно поддерживается, написан на C. Выглядит как mc (Midnight Commander).
Ключевой функционал:
Для приверженцев использования только стандартной библиотеки для тестирования Go-кода, рекомендую статью. Я опробовал на себе два подхода: только SDK и доп. библиотеки (testify, gjosn). Большую часть времени коммерческие продукты я покрываю автотестами с использованием дополнительных инструментов.
Позволю себе не согласится с автором публикации, что читаемость выше, если вы используете только SDK. Вместе с тем полезно себе напомнить, что библиотеки абстрагируют детали, которые могут оказаться важными для каждого конкретного теста.
Задачи можно пушить не только в Redis/Kafka/RabbitMQ/etc. Пожалуй, самое очевидное решение - сохранение задачи прямо в СУБД в рамках транзакции. Часто можно встретить в проектах два хранилища: реляционная СУБД и очередь (Redis/RabbitMQ и т.д.). Такой подход предполагает, что нужно записать сначала в СУБД, а потом в очередь. Внутри транзакции писать в очередь плохо, т.к. это увеличивает время выполнения транзакции, что ведет к ошибкам сериализации и замедленному автовакуму. Писать отдельно в очередь тоже опасно: может случиться сбой (очередь недоступна, сеть и т.д.). Писать в очередь до транзакции, во время транзакции, после транзакции? Все варианты имеют свои недостатки. Есть ли надежное решение?
Готовое решение для этой задачи предлагает riverqueue.
Предполагается, что вы пишете проект на Go, ваша СУБД - это PostgreSQL и pgx (5 версия) в качестве драйвера.
Можно пушить задачи из Python и Ruby, но предполагается прямое подключение к СУБД.
Но если придерживаться структуры задачи очереди, то пушить можно откуда угодно, что имеет доступ на вставку в
соответсвующие таблицы. Также, в riverqueue есть готовый UI для обзора состояния задач,
повтора задачи и т.д. Также поддерживаются запланированные на будущее и периодические задачи,
включая задачи по формату crond
,
для чего используется https://github.com/robfig/cron.
Есть также механизм "засыпания" задач: можно задачу отложить и дать её поскать определенный Duration.
Также в riverqueue включены важные задачи поддержи очереди в здоровом состоянии: очистка (по-умолчанию, заверешенные задачи хранятся не менее суток), запуск периодических задач (просыпается, запускает, калькулирует сколько спать, спит - и так по циклу), переиндексация (REINDEX INDEX CONCURRENTLY, отключена по-умолчанию), спаситель (чинит задачи в очереди, которые сменили своё состояние на промежуточное).
Riverqueue предлагает готовый механизм хранения задач в PostgreSQL (плюс механизм миграций), ретраи (экспоненциальные, линейные и кастомные), таймауты. Это решение создает 3 таблички в PostgreSQL, о которых чуть ниже.
Как пользоваться? Прямо в коде нужно объявить структуры задач и их воркеры, т.е. функции, которые нужно вызывать для выполнения этих задач. При объявлении воркера его можно как раз кастомизировать: задать таймауты выполнения задач, подправить, если нужно, ретраи. Прямо в воркере доступна транзакция, которая используется для макрировки задачи в БД как выполненной. Здесь же в воркере можно отменить задачу, если по ряду условий стало ясно, что дальнейшее её выполнение не требуется. Или добавить еще одну задачу. Далее, здесь же создаем так называемый клиент (неудачное название, т.к. он же занимается запуском воркеров) и, используя его, добавляем задачи в очередь.
По-умолчанию, повторы/ретраи работают по экспоненциальному увеличению задержки:
attempts ^ 4 + jitter
.
Всего 25 повторов, последний из которых случится через 3 недели. После этого задача будет считаться завершенной
с ошибкой.
Таймауты сработают только если вы учитываете, что контекст может быть завершен, т.е.
сверяетесь периодически с <-ctx.Done()
. Напомню, других механизмов завершить работу go-рутины
в Go нет.
Несколько очередей может понадобится, если вы понимаете, что часть задач выполняется быстро (отправка email), а другая часть медленно (кодирование аудио/видео, обращение к LLM и т.д.).
Быстрее всего понять суть проекта можно глянув на файлы миграций: ищите CREATE TABLE
по коду.
Из любопытного, выравнивание колонок по 8 байт (64 бита), т.е. под размер регистра подогнали.
Здесь же в миграциях создается триггер на вставку записи в таблицу river_job
и JSON-сериализованные
данные отправляются в канал нотификаций PostgreSQL под названием river_insert
.
В документации об этом не сказано, но riverqueue предлагает две опции для обработки очередей: LISTEN/NOTIFY и полинг (по умолчанию 1 секунда). С дефолтным LISTEN/NOTIFY 8-ядерный лептоп выполняет 46К+ задач в секунду.
Как устроены независимые очереди? Как работает приоритезация?