Tanulás építéssel, Háttérfeldolgozó rendszer Ruby

a mai bejegyzésben egy naiv háttérfeldolgozó rendszert fogunk megvalósítani szórakozásból! Lehet, hogy megtanulunk néhány dolgot az út mentén, amikor bepillantunk a népszerű háttérfeldolgozó rendszerek, például a Sidekiq belső részeibe. Ennek a szórakozásnak a terméke semmiképpen sem termelési felhasználásra szolgál.

képzeljük el, hogy van egy feladatunk az alkalmazásunkban, amely betölti egy vagy több weboldalt, és kibontja azok címét. Mivel nincs befolyásunk ezeknek a weboldalaknak a teljesítményére, szeretnénk a feladatot a fő szálunkon kívül (vagy az aktuális kérésen kívül—ha webes alkalmazást építünk), de a háttérben végrehajtani.

egy feladat beágyazása

mielőtt háttérfeldolgozásba kezdenénk, építsünk egy szolgáltatási objektumot a feladat végrehajtásához. Az OpenURI és a Nokogiri segítségével kinyerjük a címcímke tartalmát.

1 2 3 4 5 6 7 8 9 10 11 12
require 'open-uri' require 'nokogiri' class TitleExtractorService def call(url) document = Nokogiri::HTML(open(url)) title = document.css('html > head > title').first.content puts title.gsub(/]+/, ' ').strip rescue puts "Unable to find a title for #{url}" end end 

Calling the service prints the title of the given URL.

1 2
TitleExtractorService.new.call('https://appsignal.com') # AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir 

Ez a várt módon működik, de nézzük meg, hogy javíthatunk-e egy kicsit a szintaxison, hogy kicsit jobban hasonlítson más háttérfeldolgozó rendszerekre. A Magique::Worker modul létrehozásával hozzáadhatunk néhány szintaktikai cukrot a szolgáltatási objektumhoz.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
module Magique module Worker def self.included(base) base.extend(ClassMethods) end module ClassMethods def perform_now(*args) new.perform(*args) end end def perform(*) raise NotImplementedError end end end 

a modul hozzáad egy perform metódust a munkapéldányhoz és egy perform_now metódust a munkapéldányhoz munkásosztály, hogy a meghívás egy kicsit jobb legyen.

vegyük bele a modult a szolgáltatási objektumunkba. While we’re at it, let’s also rename it to TitleExtractorWorker and change the call method to perform.

1 2 3 4 5 6 7 8 9 10 11
class TitleExtractorWorker include Magique::Worker def perform(url) document = Nokogiri::HTML(open(url)) title = document.css('html > head > title').first.content puts title.gsub(/]+/, ' ').strip rescue puts "Unable to find a title for #{url}" end end 

The invocation still has the same result, but it’s a bit clearer what’s going on.

1 2
TitleExtractorWorker.perform_now('https://appsignal.com') # AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir 

aszinkron feldolgozás végrehajtása

most, hogy a címkivonás működik, megragadhatjuk az összes címet a korábbi Ruby Magic cikkekből. Ehhez tegyük fel, hogy van egy RUBYMAGIC állandó a korábbi cikkek összes URL-jének listájával.

1 2 3 4 5 6 7 8 9
RUBYMAGIC.each do |url| TitleExtractorWorker.perform_now(url) end # Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog # Bindings and Lexical Scope in Ruby | AppSignal Blog # Building a Ruby C Extension From Scratch | AppSignal Blog # Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog # ... 

megkapjuk a korábbi cikkek címét, de eltart egy ideig, amíg mindet kibontjuk. Ez azért van, mert megvárjuk, amíg minden kérés befejeződik, mielőtt továbblépnénk a következőre.

javítsuk ezt egy perform_async módszer bevezetésével a munkás modulunkba. A dolgok felgyorsítása érdekében minden URL-hez új szálat hoz létre.

1 2 3 4 5 6 7 8 9
module Magique module Worker module ClassMethods def perform_async(*args) Thread.new { new.perform(*args) } end end end end 

a meghívás TitleExtractorWorker.perform_async(url) megváltoztatása után az összes címet szinte egyszerre kapjuk meg. Ez azonban azt is jelenti, hogy egyszerre több mint 20 kapcsolatot nyitunk meg a Ruby Magic bloghoz. (Elnézést a blogodért, emberek!

ha a saját implementációját követi és teszteli ezt egy hosszú ideje futó folyamaton kívül (például egy webszerveren), ne felejtsen el hozzáadni valami hasonlót loop { sleep 1 } A szkript végéhez, hogy megbizonyosodjon arról, hogy a folyamat nem fejeződik be azonnal.

A feladatok sorba állítása

azzal a megközelítéssel, hogy minden meghíváshoz új szálat hozzunk létre, végül elérjük az erőforrás korlátait (mind a mi oldalunkon, mind az általunk elérni kívánt webhelyeken). Mivel kedves állampolgárok szeretnénk lenni, változtassuk meg a megvalósítást aszinkronra, de nem érezzük magunkat szolgáltatásmegtagadási támadásnak.

a probléma megoldásának általános módja a gyártó/fogyasztó minta használata. Egy vagy több gyártó áthelyezi a feladatokat egy sorba, míg egy vagy több fogyasztó átveszi a feladatokat a sorból, és feldolgozza azokat.

a sor alapvetően elemek listája. Elméletileg egy egyszerű tömb elvégezné a munkát. Mivel azonban párhuzamosságról van szó, meg kell győződnünk arról, hogy egyszerre csak egy termelő vagy fogyasztó férhet hozzá a sorhoz. Ha nem vigyázunk erre, a dolgok káoszba torkollnak—akárcsak két ember, aki egyszerre próbál betörni egy ajtón.

ezt a problémát gyártó-fogyasztó problémának nevezik, és számos megoldás létezik rá. Szerencsére ez egy nagyon gyakori probléma, és a Ruby megfelelő Queue implementációval szállít, amelyet anélkül használhatunk, hogy aggódnunk kellene a szálszinkronizálás miatt.

a használathoz győződjünk meg róla, hogy mind a gyártók, mind a fogyasztók hozzáférhetnek a sorhoz. Ezt úgy végezzük, hogy hozzáadunk egy osztály metódust a Magique modulunkhoz, és hozzárendelünk egy példányt a Queue hozzá.

1 2 3 4 5 6 7 8 9 10 11
module Magique def self.backend @backend end def self.backend=(backend) @backend = backend end end Magique.backend = Queue.new 

ezután megváltoztatjuk a perform_async implementációnkat, hogy egy feladatot a várólistára toljunk ahelyett, hogy saját új szálat hoznánk létre. A feladat hash-ként jelenik meg, beleértve a worker osztályra való hivatkozást, valamint a perform_async metódusnak átadott argumentumokat.

1 2 3 4 5 6 7 8 9
module Magique module Worker module ClassMethods def perform_async(*args) Magique.backend.push(worker: self, args: args) end end end end 

ezzel végeztünk a dolgok termelői oldalával. Ezután vessünk egy pillantást a fogyasztói oldalra.

minden fogyasztó egy külön szál, amely a sorból veszi a feladatokat, és végrehajtja azokat. Ahelyett, hogy egy feladat után megállna, mint például a szál, a fogyasztó egy másik feladatot vesz a sorból, és végrehajtja, stb. Itt van egy alapvető végrehajtása a fogyasztó úgynevezett Magique::Processor. Minden processzor létrehoz egy új szálat, amely végtelenül hurkol. Minden iterációhoz megpróbál új feladatot megragadni a sorból, létrehoz egy új példányt a worker osztályból, és meghívja a perform metódust a megadott argumentumokkal.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
module Magique class Processor def self.start(concurrency = 1) concurrency.times { |n| new("Processor #{n}") } end def initialize(name) thread = Thread.new do loop do payload = Magique.backend.pop worker_class = payload worker_class.new.perform(*payload) end end thread.name = name end end end 

a feldolgozási hurok mellett hozzáadunk egy kényelmi módszert, amelyet Magique::Processor.start. Ez lehetővé teszi számunkra, hogy egyszerre több processzort forgassunk fel. Bár a szál megnevezése nem igazán szükséges, lehetővé teszi számunkra, hogy lássuk, a dolgok valóban a várt módon működnek-e.

állítsuk be a TitleExtractorWorker kimenetét, hogy tartalmazza az aktuális szál nevét.

1
puts " #{title.gsub(/]+/, ' ').strip}" 

a háttérfeldolgozási beállításunk teszteléséhez először fel kell pörgetnünk egy sor processzort, mielőtt a feladatainkat elvégeznénk.

1 2 3 4 5 6 7 8 9 10 11 12 13 14
Magique.backend = Queue.new Magique::Processor.start(5) RUBYMAGIC.each do |url| TitleExtractorWorker.perform_async(url) end # Bindings and Lexical Scope in Ruby | AppSignal Blog # Building a Ruby C Extension From Scratch | AppSignal Blog # Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog # Ruby's Hidden Gems, StringScanner | AppSignal Blog # Fibers and Enumerators in Ruby: Turning Blocks Inside Out | AppSignal Blog # Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog # ... 

amikor ez fut, még mindig megkapjuk az összes cikk címét. Bár ez nem olyan gyors, mint csak egy külön szál használata minden feladathoz, még mindig gyorsabb, mint a kezdeti megvalósítás, amelynek nem volt háttérfeldolgozása. A hozzáadott processzorneveknek köszönhetően azt is megerősíthetjük, hogy az összes processzor a sorban dolgozik. Az egyidejű processzorok számának módosításával meg lehet találni az egyensúlyt a feldolgozási sebesség és a meglévő erőforrás-korlátozások között.

több folyamatra és gépre kiterjesztve

eddig a háttérfeldolgozó rendszerünk jelenlegi megvalósítása elég jól működik. Ez azonban továbbra is ugyanarra a folyamatra korlátozódik. Az erőforrás-éhes feladatok továbbra is befolyásolják a teljes folyamat teljesítményét. Utolsó lépésként nézzük meg a munkaterhelés elosztását több folyamat, esetleg több gép között.

a sor az egyetlen kapcsolat a gyártók és a fogyasztók között. Jelenleg egy memóriában lévő implementációt használ. Vegyünk több inspirációt a Sidekiq – tól, és hajtsunk végre egy sort a Redis használatával.

a Redis támogatja azokat a listákat, amelyek lehetővé teszik a feladatok lekérését. Ezenkívül a Redis Ruby gem szálbiztonságos, a Redis parancsok pedig a listák módosítására atomikusak. Ezek a tulajdonságok lehetővé teszik az aszinkron háttérfeldolgozó rendszerünk használatához szinkronizálási problémák nélkül.

hozzunk létre egy Redis által támogatott várólistát, amely végrehajtja apush ésshift metódusokat, csakúgy, mint a korábban használtQueue módszereket.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
require 'json' require 'redis' module Magique module Backend class Redis def initialize(connection = ::Redis.new) @connection = connection end def push(job) @connection.lpush('magique:queue', JSON.dump(job)) end def shift _queue, job = @connection.brpop('magique:queue') payload = JSON.parse(job, symbolize_names: true) payload = Object.const_get(payload) payload end end end end 

mivel a Redis nem tud semmit a Ruby objektumokról, a feladatainkat JSON-ba kell sorosítani, mielőtt azokat az adatbázisban tárolnánk a lpush paranccsal, amely hozzáad egy elemet az elülső részhez a listáról.

feladat lekéréséhez a brpop parancsot használjuk, amely a lista utolsó elemét kapja meg. Ha a lista üres, akkor blokkolja, amíg egy új elem áll rendelkezésre. Ez egy jó módszer a processzorok szüneteltetésére, ha nem állnak rendelkezésre feladatok. Végül, miután kivettünk egy feladatot a Redisből, fel kell keresnünk a valódi Ruby osztályt a munkavállaló neve alapján a Object.const_gethasználatával.

utolsó lépésként osszuk fel a dolgokat több folyamatra. A dolgok termelői oldalán, az egyetlen dolog, amit meg kell tennünk, az, hogy megváltoztatjuk a hátteret az újonnan megvalósított Redis várólistánkra.

1 2 3 4 5 6 7
# ... Magique.backend = Magique::Backend::Redis.new RUBYMAGIC.each do |url| TitleExtractorWorker.perform_async(url) end 

On the consumer side of things, we can get away with a few lines like this:

1 2 3 4 5 6
# ... Magique.backend = Magique::Backend::Redis.new Magique::Processor.start(5) loop { sleep 1 } 

végrehajtáskor a fogyasztói folyamat megvárja, amíg az új munka megérkezik a sorba. Miután elindítottuk a gyártási folyamatot, amely a feladatokat a sorba tolja, láthatjuk, hogy azonnal feldolgozásra kerülnek.

élvezze felelősségteljesen, és ne használja ezt a termelés

miközben tartottuk távol a valós világ beállítás használná a termelés (így nem!), néhány lépést tettünk egy háttérprocesszor felépítésében. Azzal kezdtük, hogy egy folyamat fut, mint egy háttér szolgáltatás. Ezután aszinkronizálást végeztünk, és a Queue-t használtuk a gyártó-fogyasztó probléma megoldására. Ezután kibővítettük a folyamatot több folyamatra vagy gépre, amelyek inkább a Redis-t, mint a memóriában történő megvalósítást használják.

mint korábban említettük, ez egy háttérfeldolgozó rendszer egyszerűsített megvalósítása. Sok dolog hiányzik, és nem foglalkoznak vele kifejezetten. Ezek közé tartozik (de nem kizárólagosan) a hibakezelés, a többszörös sorok, az ütemezés, a kapcsolat összevonása és a jelkezelés.

ennek ellenére jól éreztük magunkat, és reméljük, hogy élvezted a háttérfeldolgozó rendszer motorháztetője alatt. Talán még egy-két dolgot is elvettél.

Vendég író Benedikt Deicke egy szoftver mérnök és CTO Userlist.io. oldalán könyvet ír a SaaS alkalmazások építéséről Ruby on Rails – ben. A Benedikt-et a Twitteren keresztül érheti el.

Vélemény, hozzászólás?

Az e-mail-címet nem tesszük közzé.