Resque: manipulação em massa de jobs e workers
Série Resque — parte 3 de 3
- Infra: iniciando, parando, matando
- Diagnóstico pelo console
- Você está aqui — Manipulação em massa
Correlação com a série Sidekiq — esse post é o espelho da parte 3 da série Sidekiq. A grande diferença é que o Resque não tem
RetrySet/DeadSet— tudo o que falha vai proResque::Failure, indexado por posição (não por ID). Isso muda o jeito de fazer “delete em massa”: como remover um item desloca os índices, é preciso iterar de trás pra frente. Esse cuidado vai aparecer em quase todo snippet abaixo.
Selecionando failures por classe
1
2
3
4
class_name = 'MyApp::ImportWorker'
total = Resque::Failure.count
failures = Resque::Failure.all(0, total).each_with_index.select { |f, _| f['payload']['class'] == class_name }
failures.size
Diferente do Sidekiq (RetrySet#select { |j| j.klass == ... }), aqui a coleção do Resque é uma lista linear indexada. Carrego com each_with_index pra preservar a posição original — vou precisar dela pra retry/delete.
Selecionando failures por mensagem de erro
1
2
3
4
5
needle = 'Net::OpenTimeout'
failures = Resque::Failure.all(0, Resque::Failure.count).each_with_index.select { |f, _|
f['error'].to_s.include?(needle)
}
failures.size
Combina bem com a contagem por erro do post anterior: você descobre que 90% das failures são de um único OpenTimeout, isola só elas e dá retry depois que o serviço externo voltou.
Reenfileirando uma failure individual
1
Resque::Failure.requeue(0) # índice 0 é a falha mais antiga
requeue recoloca o job na fila original sem remover do failure set — então você ainda vê o histórico depois.
Removendo uma failure individual
1
Resque::Failure.remove(0)
Remove só essa entrada do failure set. Importante: todas as failures depois dela deslocam o índice em 1.
Retry em massa de uma classe — iterando de trás pra frente
1
2
3
4
5
6
7
8
9
10
class_name = 'MyApp::ImportWorker'
indexes = Resque::Failure.all(0, Resque::Failure.count)
.each_with_index
.select { |f, _| f['payload']['class'] == class_name }
.map { |_, i| i }
indexes.reverse_each do |i|
Resque::Failure.requeue(i)
Resque::Failure.remove(i)
end
Esse padrão é específico do Resque: como remove desloca índices, tem que iterar do maior pro menor pra cada operação não invalidar as anteriores. Se você fizer indexes.each (ordem crescente), vai apagar/reagendar o job errado a partir da segunda iteração.
Existe
Resque::Failure.requeue_alleResque::Failure.clearpra agir em todas, sem filtro. Use quando o filtro é “tudo”.
Apagar todas as failures
1
Resque::Failure.clear
Equivalente ao DeadSet#clear. Sem volta — só faça quando já confirmou via diagnóstico (parte 2) que não tem nada útil ali.
Movendo jobs de uma classe pra fila dedicada
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
queue_name = 'default'
new_queue_name = 'isolated_import'
class_name = 'MyApp::ImportWorker'
snapshot = Resque.peek(queue_name, 0, Resque.size(queue_name))
moved = 0
snapshot.each_with_index do |payload, _|
next unless payload['class'] == class_name
# remove a primeira ocorrência exata desse payload da fila
removed = Resque.redis.lrem("queue:#{queue_name}", 1, Resque.encode(payload))
if removed > 0
Resque.push(new_queue_name, payload)
moved += 1
end
end
moved
Equivalente direto do snippet “Movendo 1000 jobs de uma classe pra outra fila” da série Sidekiq. A mecânica é diferente porque o Resque guarda os jobs como uma LIST no Redis (queue:<nome>), então usar LREM direto é a forma confiável de remover por payload exato sem mexer nos vizinhos.
Por que isolar: se uma classe está derrubando uma fila compartilhada, em vez de pausar tudo, eu crio uma fila dedicada (isolated_import), movo os jobs problemáticos pra ela e subo um worker separado consumindo só essa fila. O resto da operação não sente.
1
QUEUE=isolated_import COUNT=1 bundle exec rake resque:workers
Apagar uma fila inteira
1
Resque.remove_queue('isolated_import')
Apaga tanto o conteúdo (queue:<nome>) quanto o registro da fila no índice de filas. Útil pra fila criada temporariamente como acima — sem isso, ela continua aparecendo no Resque.queues mesmo vazia.
Pausar todos os workers (USR2)
1
2
3
4
5
Resque.workers.each do |w|
hostname, pid, _ = w.id.split(':')
next unless hostname == Socket.gethostname # só workers locais
Process.kill('USR2', pid.to_i) rescue nil
end
Como Resque não tem API “via Redis” pra pausar um worker (diferente do Sidekiq::Process#quiet!), a única forma é mandar sinal pro processo. Por isso o filtro de hostname: só dá pra sinalizar workers que rodam na mesma máquina onde você está executando o console.
Pra resumir:
1
2
3
4
5
Resque.workers.each do |w|
hostname, pid, _ = w.id.split(':')
next unless hostname == Socket.gethostname
Process.kill('CONT', pid.to_i) rescue nil
end
Em cluster multi-host, a alternativa é rodar esses snippets via Capistrano/Ansible em cada nó, ou orquestrar com systemd. Pausa “pelo Redis” não existe.
Prunar workers fantasma
1
Resque.workers.each(&:prune_dead_workers)
Workers que morreram sem chamar unregister_worker (ex: kill -9 ou crash) ficam listados no Redis mas sem processo correspondente. prune_dead_workers checa cada worker no host atual e desregistra os que não têm processo vivo. Roda em cada host periodicamente — em produção, eu deixo num cron ou no health check da unit do systemd.
Limpar todo o Redis do Resque — APAGA TUDO
1
Resque.redis.flushdb
Bomba nuclear, equivalente exato do Sidekiq.redis { |conn| conn.flushdb } da série anterior: apaga tudo que o Resque tem no Redis (filas, workers registrados, failures, stats). Use só quando sabe que pode reprocessar com tranquilidade ou está em dev. Em produção, isso é incidente — só faça com plano de recuperação claro.
Se o Redis tem outros usos no mesmo db (cache, sessions, outros workers),
flushdbapaga tudo isso também. VerifiqueResque.redis.client.dbantes — separar Sidekiq/Resque/cache em DBs diferentes (/0,/1,/2) é o padrão exatamente pra esse cenário.
Final da série
Essa foi a parte 3 e última. Voltando ao começo: Infra: iniciando, parando, matando.
E se você administra os dois ao mesmo tempo (caso clássico: serviço legado em Resque + serviços novos em Sidekiq), vale ter as duas séries lado a lado. A primeira parte da série Sidekiq é o ponto de entrada equivalente.