Learning by building, un Sistema de Procesamiento en segundo plano en Ruby

En la publicación de hoy, vamos a implementar un sistema de procesamiento en segundo plano ingenuo por diversión. Podríamos aprender algunas cosas en el camino como un vistazo a los aspectos internos de los sistemas de procesamiento en segundo plano populares como Sidekiq. El producto de esta diversión de ninguna manera está destinado a uso de producción.

Imaginemos que tenemos una tarea en nuestra aplicación que carga uno o más sitios web y extrae sus títulos. Como no tenemos ninguna influencia en el rendimiento de estos sitios web, nos gustaría realizar la tarea fuera de nuestro hilo principal (o la solicitud actual, si estamos creando una aplicación web), pero en segundo plano.

Encapsular una tarea

Antes de entrar en el procesamiento en segundo plano, construyamos un objeto de servicio para realizar la tarea en cuestión. Usaremos OpenURI y Nokogiri para extraer el contenido de la etiqueta de título.

1 2 3 4 5 6 7 8 9 10 11 12
require 'open-uri' require 'nokogiri' class TitleExtractorService def call(url) document = Nokogiri::HTML(open(url)) title = document.css('html > head > title').first.content puts title.gsub(/]+/, ' ').strip rescue puts "Unable to find a title for #{url}" end end 

Calling the service prints the title of the given URL.

1 2
TitleExtractorService.new.call('https://appsignal.com') # AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir 

Esto funciona como se esperaba, pero vamos a ver si podemos mejorar la sintaxis un poco para que se vea y se sienta un poco más como otros antecedentes de los sistemas de procesamiento. Al crear un módulo Magique::Worker, podemos agregar algo de azúcar sintáctica al objeto de servicio.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
module Magique module Worker def self.included(base) base.extend(ClassMethods) end module ClassMethods def perform_now(*args) new.perform(*args) end end def perform(*) raise NotImplementedError end end end 

El módulo añade un perform método para el trabajador de la instancia y de un perform_now método a la clase trabajadora para hacer la invocación un poco mejor.

Vamos a incluir el módulo en nuestro objeto de servicio. While we’re at it, let’s also rename it to TitleExtractorWorker and change the call method to perform.

1 2 3 4 5 6 7 8 9 10 11
class TitleExtractorWorker include Magique::Worker def perform(url) document = Nokogiri::HTML(open(url)) title = document.css('html > head > title').first.content puts title.gsub(/]+/, ' ').strip rescue puts "Unable to find a title for #{url}" end end 

The invocation still has the same result, but it’s a bit clearer what’s going on.

1 2
TitleExtractorWorker.perform_now('https://appsignal.com') # AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir 

la Aplicación de Procesamiento Asincrónico

Ahora que tenemos el título de extracción de trabajo, se puede tomar todos los títulos del pasado Ruby Magia de los artículos. Para hacer esto, supongamos que tenemos una constante RUBYMAGIC con una lista de todas las URL de artículos anteriores.

1 2 3 4 5 6 7 8 9
RUBYMAGIC.each do |url| TitleExtractorWorker.perform_now(url) end # Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog # Bindings and Lexical Scope in Ruby | AppSignal Blog # Building a Ruby C Extension From Scratch | AppSignal Blog # Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog # ... 

Tenemos los títulos de los artículos anteriores, pero lleva un tiempo para extraer de todos ellos. Esto se debe a que esperamos hasta que se complete cada solicitud antes de pasar a la siguiente.

Mejoremos eso introduciendo un método perform_async en nuestro módulo de trabajo. Para acelerar las cosas, crea un nuevo hilo para cada URL.

1 2 3 4 5 6 7 8 9
module Magique module Worker module ClassMethods def perform_async(*args) Thread.new { new.perform(*args) } end end end end 

Después de cambiar la invocación a TitleExtractorWorker.perform_async(url), tenemos todos los títulos casi a la vez. Sin embargo, esto también significa que estamos abriendo más de 20 conexiones al blog de Ruby Magic a la vez. (¡Perdón por jugar con su blog, amigos! 😅)

Si está siguiendo junto con su propia implementación y probando esto fuera de un proceso de larga duración (como un servidor web), no olvide agregar algo como loop { sleep 1 } al final de su script para asegurarse de que el proceso no termine inmediatamente.

Hacer cola de tareas

Con el enfoque de crear un nuevo hilo para cada invocación, eventualmente alcanzaremos los límites de recursos (tanto de nuestro lado como de los sitios web a los que accedemos). Como nos gustaría ser buenos ciudadanos, cambiemos la implementación a algo que sea asíncrono pero que no se sienta como un ataque de denegación de servicio.

Una forma común de resolver este problema es utilizar el patrón productor / consumidor. Uno o más productores colocan tareas en una cola, mientras que uno o más consumidores toman tareas de la cola y las procesan.

Una cola es básicamente una lista de elementos. En teoría, una matriz simple haría el trabajo. Sin embargo, como estamos tratando con la concurrencia, necesitamos asegurarnos de que solo un productor o consumidor pueda acceder a la cola a la vez. Si no tenemos cuidado con esto, las cosas terminarán en caos, al igual que dos personas que intentan atravesar una puerta a la vez.

Este problema se conoce como el problema productor-consumidor y hay múltiples soluciones para él. Afortunadamente, es un problema muy común y Ruby se envía con una implementación Queue adecuada que podemos usar sin tener que preocuparnos por la sincronización de subprocesos.

Para usarlo, asegurémonos de que tanto los productores como los consumidores puedan acceder a la cola. Hacemos esto agregando un método de clase a nuestro módulo Magique y asignándole una instancia de Queue.

1 2 3 4 5 6 7 8 9 10 11
module Magique def self.backend @backend end def self.backend=(backend) @backend = backend end end Magique.backend = Queue.new 

a continuación, vamos a cambiar nuestro perform_async aplicación para empujar una tarea a la cola en lugar de crear su propio nuevo hilo. Una tarea se representa como un hash que incluye una referencia a la clase worker, así como los argumentos pasados al método perform_async.

1 2 3 4 5 6 7 8 9
module Magique module Worker module ClassMethods def perform_async(*args) Magique.backend.push(worker: self, args: args) end end end end 

Con esto, hemos terminado con el productor lado de las cosas. A continuación, echemos un vistazo al lado del consumidor.

Cada consumidor es un hilo separado que toma tareas de la cola y las realiza. En lugar de detenerse después de una tarea, como el hilo, el consumidor luego toma otra tarea de la cola y la realiza, y así sucesivamente. Esta es una implementación básica de un consumidor llamado Magique::Processor. Cada procesador crea un nuevo hilo que se repite infinitamente. Para cada iteración, intenta tomar una nueva tarea de la cola, crea una nueva instancia de la clase worker y llama a su método perform con los argumentos dados.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
module Magique class Processor def self.start(concurrency = 1) concurrency.times { |n| new("Processor #{n}") } end def initialize(name) thread = Thread.new do loop do payload = Magique.backend.pop worker_class = payload worker_class.new.perform(*payload) end end thread.name = name end end end 

además el procesamiento de bucle, añadimos una comodidad método llamado Magique::Processor.start. Esto nos permite activar varios procesadores a la vez. Si bien nombrar el hilo no es realmente necesario, nos permitirá ver si las cosas realmente funcionan como se esperaba.

Ajustemos la salida de nuestro TitleExtractorWorker para incluir el nombre del hilo actual.

1
puts " #{title.gsub(/]+/, ' ').strip}" 

A prueba nuestro fondo de procesamiento de la instalación, lo primero que necesitamos para hacer girar un conjunto de procesadores antes de enqueueing nuestras tareas.

1 2 3 4 5 6 7 8 9 10 11 12 13 14
Magique.backend = Queue.new Magique::Processor.start(5) RUBYMAGIC.each do |url| TitleExtractorWorker.perform_async(url) end # Bindings and Lexical Scope in Ruby | AppSignal Blog # Building a Ruby C Extension From Scratch | AppSignal Blog # Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog # Ruby's Hidden Gems, StringScanner | AppSignal Blog # Fibers and Enumerators in Ruby: Turning Blocks Inside Out | AppSignal Blog # Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog # ... 

Cuando se ejecuta, todavía podemos obtener los títulos de todos los artículos. Si bien no es tan rápido como usar un subproceso separado para cada tarea, sigue siendo más rápido que la implementación inicial que no tenía procesamiento en segundo plano. Gracias a los nombres de procesador añadidos, también podemos confirmar que todos los procesadores están trabajando en la cola. Al ajustar el número de procesadores simultáneos, es posible encontrar un equilibrio entre la velocidad de procesamiento y las limitaciones de recursos existentes.

Expandiéndose a Múltiples Procesos y Máquinas

Hasta ahora, la implementación actual de nuestro sistema de procesamiento en segundo plano funciona lo suficientemente bien. Sin embargo, todavía está limitado al mismo proceso. Las tareas que requieren muchos recursos seguirán afectando el rendimiento de todo el proceso. Como paso final, veamos la distribución de la carga de trabajo en varios procesos y tal vez incluso en varias máquinas.

La cola es la única conexión entre productores y consumidores. En este momento, está usando una implementación en memoria. Tomemos más inspiración de Sidekiq e implementemos una cola usando Redis.

Redis tiene soporte para listas que nos permiten enviar y recuperar tareas. Además, la gema rubí de Redis es segura para subprocesos y los comandos de Redis para modificar listas son atómicos. Estas propiedades permiten usarlo para nuestro sistema de procesamiento en segundo plano asíncrono sin tener problemas de sincronización.

Vamos a crear una cola con respaldo de Redis que implemente los métodos push y shift al igual que los métodos Queue que usamos anteriormente.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
require 'json' require 'redis' module Magique module Backend class Redis def initialize(connection = ::Redis.new) @connection = connection end def push(job) @connection.lpush('magique:queue', JSON.dump(job)) end def shift _queue, job = @connection.brpop('magique:queue') payload = JSON.parse(job, symbolize_names: true) payload = Object.const_get(payload) payload end end end end 

Como Redis no sabe nada acerca de Ruby objetos, tenemos que serializar nuestras tareas en JSON antes de guardarlos en la base de datos utilizando la etiqueta lpush comando que añade un elemento al frente de la lista.

Para obtener una tarea de la cola, estamos usando el comando brpop, que obtiene el último elemento de una lista. Si la lista está vacía, se bloqueará hasta que haya un nuevo elemento disponible. Esta es una buena manera de pausar nuestros procesadores cuando no hay tareas disponibles. Finalmente, después de obtener una tarea de Redis, tenemos que buscar la clase Ruby real basada en el nombre del trabajador usando Object.const_get.

Como paso final, dividamos las cosas en varios procesos. En el lado del productor, lo único que tenemos que hacer es cambiar el backend a nuestra cola Redis recién implementada.

1 2 3 4 5 6 7
# ... Magique.backend = Magique::Backend::Redis.new RUBYMAGIC.each do |url| TitleExtractorWorker.perform_async(url) end 

On the consumer side of things, we can get away with a few lines like this:

1 2 3 4 5 6
# ... Magique.backend = Magique::Backend::Redis.new Magique::Processor.start(5) loop { sleep 1 } 

Cuando se ejecuta, el consumidor proceso de espera para un nuevo trabajo para llegar en la cola. Una vez que iniciamos el proceso de producción que empuja las tareas a la cola, podemos ver que se procesan de inmediato.

Disfrute de manera responsable y No Lo Use en Producción

Mientras lo mantuvimos lejos de una configuración del mundo real que usaría en producción (¡así que no lo haga!), tomamos algunos pasos para construir un procesador en segundo plano. Comenzamos haciendo que un proceso se ejecutara como un servicio en segundo plano. Luego lo hicimos asincrónico y usamos Queue para resolver el problema productor-consumidor. Luego expandimos el proceso a varios procesos o máquinas utilizando Redis en lugar de una implementación en memoria.

Como se mencionó anteriormente, esta es una implementación simplificada de un sistema de procesamiento en segundo plano. Faltan muchas cosas que no se tratan explícitamente. Estos incluyen (pero no se limitan a) el manejo de errores, múltiples colas, programación, agrupación de conexiones y manejo de señales.

Sin embargo, nos divertimos escribiendo esto y esperamos que hayan disfrutado de un vistazo bajo el capó de un sistema de procesamiento de fondo. Tal vez incluso te llevaste una o dos cosas.

El escritor invitado Benedikt Deicke es ingeniero de software y CTO de Userlist.io Por otro lado, está escribiendo un libro sobre la creación de aplicaciones SaaS en Ruby on Rails. Puedes contactar a Benedikt a través de Twitter.

Deja una respuesta

Tu dirección de correo electrónico no será publicada.