Apprendre en construisant, un Système de Traitement d’arrière-plan en Ruby

Dans le post d’aujourd’hui, nous allons implémenter un système de traitement d’arrière-plan naïf pour le plaisir! Nous pourrions apprendre certaines choses en cours de route en jetant un coup d’œil aux composants internes des systèmes de traitement d’arrière-plan populaires comme Sidekiq. Le produit de ce plaisir n’est en aucun cas destiné à une utilisation en production.

Imaginons que nous ayons une tâche dans notre application qui charge un ou plusieurs sites Web et extrait leurs titres. Comme nous n’avons aucune influence sur les performances de ces sites Web, nous aimerions effectuer la tâche en dehors de notre thread principal (ou de la demande actuelle — si nous construisons une application Web), mais en arrière-plan.

Encapsulation d’une tâche

Avant d’entrer dans le traitement en arrière-plan, construisons un objet de service pour effectuer la tâche à accomplir. Nous utiliserons OpenURI et Nokogiri pour extraire le contenu de la balise de titre.

1 2 3 4 5 6 7 8 9 10 11 12
require 'open-uri' require 'nokogiri' class TitleExtractorService def call(url) document = Nokogiri::HTML(open(url)) title = document.css('html > head > title').first.content puts title.gsub(/]+/, ' ').strip rescue puts "Unable to find a title for #{url}" end end 

Calling the service prints the title of the given URL.

1 2
TitleExtractorService.new.call('https://appsignal.com') # AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir 

Cela fonctionne comme prévu, mais voyons si nous pouvons améliorer un peu la syntaxe pour la faire ressembler un peu aux autres systèmes de traitement d’arrière-plan. En créant un module Magique::Worker, nous pouvons ajouter du sucre syntaxique à l’objet service.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
module Magique module Worker def self.included(base) base.extend(ClassMethods) end module ClassMethods def perform_now(*args) new.perform(*args) end end def perform(*) raise NotImplementedError end end end 

Le module ajoute une méthode perform à l’instance de travail et une méthode perform_now à la classe de travail pour rendre l’invocation un peu meilleure.

Incluons le module dans notre objet de service. While we’re at it, let’s also rename it to TitleExtractorWorker and change the call method to perform.

1 2 3 4 5 6 7 8 9 10 11
class TitleExtractorWorker include Magique::Worker def perform(url) document = Nokogiri::HTML(open(url)) title = document.css('html > head > title').first.content puts title.gsub(/]+/, ' ').strip rescue puts "Unable to find a title for #{url}" end end 

The invocation still has the same result, but it’s a bit clearer what’s going on.

1 2
TitleExtractorWorker.perform_now('https://appsignal.com') # AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir 

Implémentation du traitement asynchrone

Maintenant que l’extraction du titre fonctionne, nous pouvons récupérer tous les titres des articles passés de Ruby Magic. Pour ce faire, supposons que nous ayons une constante RUBYMAGIC avec une liste de toutes les URL des articles précédents.

1 2 3 4 5 6 7 8 9
RUBYMAGIC.each do |url| TitleExtractorWorker.perform_now(url) end # Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog # Bindings and Lexical Scope in Ruby | AppSignal Blog # Building a Ruby C Extension From Scratch | AppSignal Blog # Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog # ... 

Nous obtenons les titres des articles précédents, mais il faut un certain temps pour les extraire tous. En effet, nous attendons que chaque demande soit terminée avant de passer à la suivante.

Améliorons cela en introduisant une méthode perform_async dans notre module de travail. Pour accélérer les choses, il crée un nouveau thread pour chaque URL.

1 2 3 4 5 6 7 8 9
module Magique module Worker module ClassMethods def perform_async(*args) Thread.new { new.perform(*args) } end end end end 

Après avoir changé l’invocation en TitleExtractorWorker.perform_async(url), nous obtenons tous les titres presque à la fois. Cependant, cela signifie également que nous ouvrons plus de 20 connexions au blog Ruby Magic à la fois. (Désolé de jouer avec votre blog, les amis! 😅)

Si vous suivez votre propre implémentation et que vous testez cela en dehors d’un processus de longue durée (comme un serveur Web), n’oubliez pas d’ajouter quelque chose comme loop { sleep 1 } à la fin de votre script pour vous assurer que le processus ne se termine pas immédiatement.

Mise en file d’attente des tâches

Avec l’approche de la création d’un nouveau thread pour chaque invocation, nous finirons par atteindre les limites de ressources (à la fois de notre côté et sur les sites Web auxquels nous accédons). Comme nous aimerions être de gentils citoyens, changeons l’implémentation en quelque chose qui est asynchrone mais qui ne ressemble pas à une attaque par déni de service.

Une façon courante de résoudre ce problème consiste à utiliser le modèle producteur / consommateur. Un ou plusieurs producteurs envoient des tâches dans une file d’attente tandis qu’un ou plusieurs consommateurs prennent des tâches de la file d’attente et les traitent.

Une file d’attente est essentiellement une liste d’éléments. En théorie, un tableau simple ferait le travail. Cependant, comme nous traitons de la concurrence, nous devons nous assurer qu’un seul producteur ou consommateur peut accéder à la file d’attente à la fois. Si nous ne faisons pas attention à cela, les choses se termineront dans le chaos — comme deux personnes essayant de franchir une porte à la fois.

Ce problème est connu sous le nom de problème producteur-consommateur et il existe de multiples solutions. Heureusement, c’est un problème très courant et Ruby est livré avec une implémentation Queue appropriée que nous pouvons utiliser sans avoir à nous soucier de la synchronisation des threads.

Pour l’utiliser, nous allons nous assurer que les producteurs et les consommateurs peuvent accéder à la file d’attente. Pour ce faire, nous ajoutons une méthode de classe à notre module Magique et lui attribuons une instance de Queue.

1 2 3 4 5 6 7 8 9 10 11
module Magique def self.backend @backend end def self.backend=(backend) @backend = backend end end Magique.backend = Queue.new 

Ensuite, nous changeons notre implémentation perform_async pour pousser une tâche dans la file d’attente au lieu de créer son propre nouveau thread. Une tâche est représentée sous la forme d’un hachage comprenant une référence à la classe de travail ainsi que les arguments transmis à la méthode perform_async.

1 2 3 4 5 6 7 8 9
module Magique module Worker module ClassMethods def perform_async(*args) Magique.backend.push(worker: self, args: args) end end end end 

Avec cela, nous en avons fini avec le côté producteur des choses. Ensuite, jetons un coup d’œil au côté des consommateurs.

Chaque consommateur est un thread distinct qui prend les tâches de la file d’attente et les exécute. Au lieu de s’arrêter après une tâche, comme le thread, le consommateur prend ensuite une autre tâche de la file d’attente et l’exécute, et ainsi de suite. Voici une implémentation de base d’un consommateur appelé Magique::Processor. Chaque processeur crée un nouveau thread qui boucle à l’infini. Pour chaque itération, il essaie de récupérer une nouvelle tâche de la file d’attente, crée une nouvelle instance de la classe de travail et appelle sa méthode perform avec les arguments donnés.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
module Magique class Processor def self.start(concurrency = 1) concurrency.times { |n| new("Processor #{n}") } end def initialize(name) thread = Thread.new do loop do payload = Magique.backend.pop worker_class = payload worker_class.new.perform(*payload) end end thread.name = name end end end 

En plus de la boucle de traitement, nous ajoutons une méthode pratique appelée Magique::Processor.start. Cela nous permet de faire tourner plusieurs processeurs à la fois. Bien que nommer le thread ne soit pas vraiment nécessaire, cela nous permettra de voir si les choses fonctionnent réellement comme prévu.

Ajustons la sortie de notre TitleExtractorWorker pour inclure le nom du thread actuel.

1
puts " #{title.gsub(/]+/, ' ').strip}" 

Pour tester notre configuration de traitement en arrière-plan, nous devons d’abord faire tourner un ensemble de processeurs avant de mettre nos tâches en file d’attente.

1 2 3 4 5 6 7 8 9 10 11 12 13 14
Magique.backend = Queue.new Magique::Processor.start(5) RUBYMAGIC.each do |url| TitleExtractorWorker.perform_async(url) end # Bindings and Lexical Scope in Ruby | AppSignal Blog # Building a Ruby C Extension From Scratch | AppSignal Blog # Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog # Ruby's Hidden Gems, StringScanner | AppSignal Blog # Fibers and Enumerators in Ruby: Turning Blocks Inside Out | AppSignal Blog # Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog # ... 

Lorsque cela est exécuté, nous obtenons toujours les titres de tous les articles. Bien que ce ne soit pas aussi rapide que d’utiliser un thread séparé pour chaque tâche, il est toujours plus rapide que l’implémentation initiale qui n’avait pas de traitement en arrière-plan. Grâce aux noms de processeurs ajoutés, nous pouvons également confirmer que tous les processeurs fonctionnent dans la file d’attente. En ajustant le nombre de processeurs concurrents, il est possible de trouver un équilibre entre la vitesse de traitement et les limitations de ressources existantes.

S’étendant à plusieurs processus et Machines

Jusqu’à présent, la mise en œuvre actuelle de notre système de traitement d’arrière-plan fonctionne assez bien. C’est toujours limité au même processus, cependant. Les tâches gourmandes en ressources affecteront toujours les performances de l’ensemble du processus. En dernière étape, examinons la répartition de la charge de travail sur plusieurs processus et peut-être même sur plusieurs machines.

La file d’attente est le seul lien entre les producteurs et les consommateurs. En ce moment, il utilise une implémentation en mémoire. Inspirons-nous davantage de Sidekiq et implémentons une file d’attente à l’aide de Redis.

Redis prend en charge les listes qui nous permettent de pousser et d’extraire des tâches. De plus, la gemme Ruby Redis est thread-safe et les commandes Redis pour modifier les listes sont atomiques. Ces propriétés permettent de l’utiliser pour notre système de traitement d’arrière-plan asynchrone sans rencontrer de problèmes de synchronisation.

Créons une file d’attente Redis qui implémente les méthodes push et shift tout comme les méthodes Queue que nous avons utilisées précédemment.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
require 'json' require 'redis' module Magique module Backend class Redis def initialize(connection = ::Redis.new) @connection = connection end def push(job) @connection.lpush('magique:queue', JSON.dump(job)) end def shift _queue, job = @connection.brpop('magique:queue') payload = JSON.parse(job, symbolize_names: true) payload = Object.const_get(payload) payload end end end end 

Comme Redis ne connaît rien aux objets Ruby, nous devons sérialiser nos tâches en JSON avant de les stocker dans la base de données à l’aide de la commande lpush qui ajoute un élément à l’avant de la base de données liste.

Pour extraire une tâche de la file d’attente, nous utilisons la commande brpop, qui obtient le dernier élément d’une liste. Si la liste est vide, elle se bloquera jusqu’à ce qu’un nouvel élément soit disponible. C’est un bon moyen de mettre en pause nos processeurs lorsqu’aucune tâche n’est disponible. Enfin, après avoir obtenu une tâche de Redis, nous devons rechercher la vraie classe Ruby en fonction du nom du travailleur en utilisant Object.const_get.

En dernière étape, divisons les choses en plusieurs processus. Du côté des producteurs, la seule chose que nous devons faire est de changer le backend en notre file d’attente Redis nouvellement implémentée.

1 2 3 4 5 6 7
# ... Magique.backend = Magique::Backend::Redis.new RUBYMAGIC.each do |url| TitleExtractorWorker.perform_async(url) end 

On the consumer side of things, we can get away with a few lines like this:

1 2 3 4 5 6
# ... Magique.backend = Magique::Backend::Redis.new Magique::Processor.start(5) loop { sleep 1 } 

Une fois exécuté, le processus consommateur attendra que le nouveau travail arrive dans la file d’attente. Une fois que nous avons démarré le processus de production qui pousse les tâches dans la file d’attente, nous pouvons voir qu’elles sont traitées immédiatement.

Profitez de manière responsable et ne l’utilisez pas en Production

Alors que nous l’avons gardé loin d’une configuration du monde réel que vous utiliseriez en production (alors ne le faites pas!), nous avons pris quelques mesures dans la construction d’un processeur d’arrière-plan. Nous avons commencé par exécuter un processus en tant que service d’arrière-plan. Ensuite, nous l’avons rendu asynchrone et avons utilisé Queue pour résoudre le problème producteur-consommateur. Ensuite, nous avons étendu le processus à plusieurs processus ou machines utilisant Redis plutôt qu’une implémentation en mémoire.

Comme mentionné précédemment, il s’agit d’une implémentation simplifiée d’un système de traitement en arrière-plan. Il y a beaucoup de choses qui manquent et qui ne sont pas traitées explicitement. Ceux-ci incluent (sans s’y limiter) la gestion des erreurs, les files d’attente multiples, la planification, la mise en commun des connexions et la gestion des signaux.

Néanmoins, nous nous sommes amusés à écrire ceci et espérons que vous avez apprécié un coup d’œil sous le capot d’un système de traitement d’arrière-plan. Peut-être avez-vous même emporté une chose ou deux.

Auteur invité Benedikt Deicke est ingénieur logiciel et directeur technique de Userlist.io . Sur le côté, il écrit un livre sur la construction d’applications SaaS dans Ruby on Rails. Vous pouvez contacter Benedikt via Twitter.

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée.