a mai bejegyzésben egy naiv háttérfeldolgozó rendszert fogunk megvalósítani szórakozásból! Lehet, hogy megtanulunk néhány dolgot az út mentén, amikor bepillantunk a népszerű háttérfeldolgozó rendszerek, például a Sidekiq belső részeibe. Ennek a szórakozásnak a terméke semmiképpen sem termelési felhasználásra szolgál.
képzeljük el, hogy van egy feladatunk az alkalmazásunkban, amely betölti egy vagy több weboldalt, és kibontja azok címét. Mivel nincs befolyásunk ezeknek a weboldalaknak a teljesítményére, szeretnénk a feladatot a fő szálunkon kívül (vagy az aktuális kérésen kívül—ha webes alkalmazást építünk), de a háttérben végrehajtani.
egy feladat beágyazása
mielőtt háttérfeldolgozásba kezdenénk, építsünk egy szolgáltatási objektumot a feladat végrehajtásához. Az OpenURI és a Nokogiri segítségével kinyerjük a címcímke tartalmát.
1 2 3 4 5 6 7 8 9 10 11 12 |
require 'open-uri' require 'nokogiri' class TitleExtractorService def call(url) document = Nokogiri::HTML(open(url)) title = document.css('html > head > title').first.content puts title.gsub(/]+/, ' ').strip rescue puts "Unable to find a title for #{url}" end end |
Calling the service prints the title of the given URL.
1 2 |
TitleExtractorService.new.call('https://appsignal.com') # AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir |
Ez a várt módon működik, de nézzük meg, hogy javíthatunk-e egy kicsit a szintaxison, hogy kicsit jobban hasonlítson más háttérfeldolgozó rendszerekre. A Magique::Worker
modul létrehozásával hozzáadhatunk néhány szintaktikai cukrot a szolgáltatási objektumhoz.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
module Magique module Worker def self.included(base) base.extend(ClassMethods) end module ClassMethods def perform_now(*args) new.perform(*args) end end def perform(*) raise NotImplementedError end end end |
a modul hozzáad egy perform
metódust a munkapéldányhoz és egy perform_now
metódust a munkapéldányhoz munkásosztály, hogy a meghívás egy kicsit jobb legyen.
vegyük bele a modult a szolgáltatási objektumunkba. While we’re at it, let’s also rename it to TitleExtractorWorker
and change the call
method to perform
.
1 2 3 4 5 6 7 8 9 10 11 |
class TitleExtractorWorker include Magique::Worker def perform(url) document = Nokogiri::HTML(open(url)) title = document.css('html > head > title').first.content puts title.gsub(/]+/, ' ').strip rescue puts "Unable to find a title for #{url}" end end |
The invocation still has the same result, but it’s a bit clearer what’s going on.
1 2 |
TitleExtractorWorker.perform_now('https://appsignal.com') # AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir |
aszinkron feldolgozás végrehajtása
most, hogy a címkivonás működik, megragadhatjuk az összes címet a korábbi Ruby Magic cikkekből. Ehhez tegyük fel, hogy van egy RUBYMAGIC
állandó a korábbi cikkek összes URL-jének listájával.
1 2 3 4 5 6 7 8 9 |
RUBYMAGIC.each do |url| TitleExtractorWorker.perform_now(url) end # Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog # Bindings and Lexical Scope in Ruby | AppSignal Blog # Building a Ruby C Extension From Scratch | AppSignal Blog # Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog # ... |
megkapjuk a korábbi cikkek címét, de eltart egy ideig, amíg mindet kibontjuk. Ez azért van, mert megvárjuk, amíg minden kérés befejeződik, mielőtt továbblépnénk a következőre.
javítsuk ezt egy perform_async
módszer bevezetésével a munkás modulunkba. A dolgok felgyorsítása érdekében minden URL-hez új szálat hoz létre.
1 2 3 4 5 6 7 8 9 |
module Magique module Worker module ClassMethods def perform_async(*args) Thread.new { new.perform(*args) } end end end end |
a meghívás TitleExtractorWorker.perform_async(url)
megváltoztatása után az összes címet szinte egyszerre kapjuk meg. Ez azonban azt is jelenti, hogy egyszerre több mint 20 kapcsolatot nyitunk meg a Ruby Magic bloghoz. (Elnézést a blogodért, emberek!
ha a saját implementációját követi és teszteli ezt egy hosszú ideje futó folyamaton kívül (például egy webszerveren), ne felejtsen el hozzáadni valami hasonlót loop { sleep 1 }
A szkript végéhez, hogy megbizonyosodjon arról, hogy a folyamat nem fejeződik be azonnal.
A feladatok sorba állítása
azzal a megközelítéssel, hogy minden meghíváshoz új szálat hozzunk létre, végül elérjük az erőforrás korlátait (mind a mi oldalunkon, mind az általunk elérni kívánt webhelyeken). Mivel kedves állampolgárok szeretnénk lenni, változtassuk meg a megvalósítást aszinkronra, de nem érezzük magunkat szolgáltatásmegtagadási támadásnak.
a probléma megoldásának általános módja a gyártó/fogyasztó minta használata. Egy vagy több gyártó áthelyezi a feladatokat egy sorba, míg egy vagy több fogyasztó átveszi a feladatokat a sorból, és feldolgozza azokat.
a sor alapvetően elemek listája. Elméletileg egy egyszerű tömb elvégezné a munkát. Mivel azonban párhuzamosságról van szó, meg kell győződnünk arról, hogy egyszerre csak egy termelő vagy fogyasztó férhet hozzá a sorhoz. Ha nem vigyázunk erre, a dolgok káoszba torkollnak—akárcsak két ember, aki egyszerre próbál betörni egy ajtón.
ezt a problémát gyártó-fogyasztó problémának nevezik, és számos megoldás létezik rá. Szerencsére ez egy nagyon gyakori probléma, és a Ruby megfelelő Queue
implementációval szállít, amelyet anélkül használhatunk, hogy aggódnunk kellene a szálszinkronizálás miatt.
a használathoz győződjünk meg róla, hogy mind a gyártók, mind a fogyasztók hozzáférhetnek a sorhoz. Ezt úgy végezzük, hogy hozzáadunk egy osztály metódust a Magique
modulunkhoz, és hozzárendelünk egy példányt a Queue
hozzá.
1 2 3 4 5 6 7 8 9 10 11 |
module Magique def self.backend @backend end def self.backend=(backend) @backend = backend end end Magique.backend = Queue.new |
ezután megváltoztatjuk a perform_async
implementációnkat, hogy egy feladatot a várólistára toljunk ahelyett, hogy saját új szálat hoznánk létre. A feladat hash-ként jelenik meg, beleértve a worker osztályra való hivatkozást, valamint a perform_async
metódusnak átadott argumentumokat.
1 2 3 4 5 6 7 8 9 |
module Magique module Worker module ClassMethods def perform_async(*args) Magique.backend.push(worker: self, args: args) end end end end |
ezzel végeztünk a dolgok termelői oldalával. Ezután vessünk egy pillantást a fogyasztói oldalra.
minden fogyasztó egy külön szál, amely a sorból veszi a feladatokat, és végrehajtja azokat. Ahelyett, hogy egy feladat után megállna, mint például a szál, a fogyasztó egy másik feladatot vesz a sorból, és végrehajtja, stb. Itt van egy alapvető végrehajtása a fogyasztó úgynevezett Magique::Processor
. Minden processzor létrehoz egy új szálat, amely végtelenül hurkol. Minden iterációhoz megpróbál új feladatot megragadni a sorból, létrehoz egy új példányt a worker osztályból, és meghívja a perform
metódust a megadott argumentumokkal.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
module Magique class Processor def self.start(concurrency = 1) concurrency.times { |n| new("Processor #{n}") } end def initialize(name) thread = Thread.new do loop do payload = Magique.backend.pop worker_class = payload worker_class.new.perform(*payload) end end thread.name = name end end end |
a feldolgozási hurok mellett hozzáadunk egy kényelmi módszert, amelyet Magique::Processor.start
. Ez lehetővé teszi számunkra, hogy egyszerre több processzort forgassunk fel. Bár a szál megnevezése nem igazán szükséges, lehetővé teszi számunkra, hogy lássuk, a dolgok valóban a várt módon működnek-e.
állítsuk be a TitleExtractorWorker
kimenetét, hogy tartalmazza az aktuális szál nevét.
1 |
puts " #{title.gsub(/]+/, ' ').strip}" |
a háttérfeldolgozási beállításunk teszteléséhez először fel kell pörgetnünk egy sor processzort, mielőtt a feladatainkat elvégeznénk.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Magique.backend = Queue.new Magique::Processor.start(5) RUBYMAGIC.each do |url| TitleExtractorWorker.perform_async(url) end # Bindings and Lexical Scope in Ruby | AppSignal Blog # Building a Ruby C Extension From Scratch | AppSignal Blog # Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog # Ruby's Hidden Gems, StringScanner | AppSignal Blog # Fibers and Enumerators in Ruby: Turning Blocks Inside Out | AppSignal Blog # Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog # ... |
amikor ez fut, még mindig megkapjuk az összes cikk címét. Bár ez nem olyan gyors, mint csak egy külön szál használata minden feladathoz, még mindig gyorsabb, mint a kezdeti megvalósítás, amelynek nem volt háttérfeldolgozása. A hozzáadott processzorneveknek köszönhetően azt is megerősíthetjük, hogy az összes processzor a sorban dolgozik. Az egyidejű processzorok számának módosításával meg lehet találni az egyensúlyt a feldolgozási sebesség és a meglévő erőforrás-korlátozások között.
több folyamatra és gépre kiterjesztve
eddig a háttérfeldolgozó rendszerünk jelenlegi megvalósítása elég jól működik. Ez azonban továbbra is ugyanarra a folyamatra korlátozódik. Az erőforrás-éhes feladatok továbbra is befolyásolják a teljes folyamat teljesítményét. Utolsó lépésként nézzük meg a munkaterhelés elosztását több folyamat, esetleg több gép között.
a sor az egyetlen kapcsolat a gyártók és a fogyasztók között. Jelenleg egy memóriában lévő implementációt használ. Vegyünk több inspirációt a Sidekiq – tól, és hajtsunk végre egy sort a Redis használatával.
a Redis támogatja azokat a listákat, amelyek lehetővé teszik a feladatok lekérését. Ezenkívül a Redis Ruby gem szálbiztonságos, a Redis parancsok pedig a listák módosítására atomikusak. Ezek a tulajdonságok lehetővé teszik az aszinkron háttérfeldolgozó rendszerünk használatához szinkronizálási problémák nélkül.
hozzunk létre egy Redis által támogatott várólistát, amely végrehajtja apush
ésshift
metódusokat, csakúgy, mint a korábban használtQueue
módszereket.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
require 'json' require 'redis' module Magique module Backend class Redis def initialize(connection = ::Redis.new) @connection = connection end def push(job) @connection.lpush('magique:queue', JSON.dump(job)) end def shift _queue, job = @connection.brpop('magique:queue') payload = JSON.parse(job, symbolize_names: true) payload = Object.const_get(payload) payload end end end end |
mivel a Redis nem tud semmit a Ruby objektumokról, a feladatainkat JSON-ba kell sorosítani, mielőtt azokat az adatbázisban tárolnánk a lpush
paranccsal, amely hozzáad egy elemet az elülső részhez a listáról.
feladat lekéréséhez a brpop
parancsot használjuk, amely a lista utolsó elemét kapja meg. Ha a lista üres, akkor blokkolja, amíg egy új elem áll rendelkezésre. Ez egy jó módszer a processzorok szüneteltetésére, ha nem állnak rendelkezésre feladatok. Végül, miután kivettünk egy feladatot a Redisből, fel kell keresnünk a valódi Ruby osztályt a munkavállaló neve alapján a Object.const_get
használatával.
utolsó lépésként osszuk fel a dolgokat több folyamatra. A dolgok termelői oldalán, az egyetlen dolog, amit meg kell tennünk, az, hogy megváltoztatjuk a hátteret az újonnan megvalósított Redis várólistánkra.
1 2 3 4 5 6 7 |
# ... Magique.backend = Magique::Backend::Redis.new RUBYMAGIC.each do |url| TitleExtractorWorker.perform_async(url) end |
On the consumer side of things, we can get away with a few lines like this:
1 2 3 4 5 6 |
# ... Magique.backend = Magique::Backend::Redis.new Magique::Processor.start(5) loop { sleep 1 } |
végrehajtáskor a fogyasztói folyamat megvárja, amíg az új munka megérkezik a sorba. Miután elindítottuk a gyártási folyamatot, amely a feladatokat a sorba tolja, láthatjuk, hogy azonnal feldolgozásra kerülnek.
élvezze felelősségteljesen, és ne használja ezt a termelés
miközben tartottuk távol a valós világ beállítás használná a termelés (így nem!), néhány lépést tettünk egy háttérprocesszor felépítésében. Azzal kezdtük, hogy egy folyamat fut, mint egy háttér szolgáltatás. Ezután aszinkronizálást végeztünk, és a Queue
-t használtuk a gyártó-fogyasztó probléma megoldására. Ezután kibővítettük a folyamatot több folyamatra vagy gépre, amelyek inkább a Redis-t, mint a memóriában történő megvalósítást használják.
mint korábban említettük, ez egy háttérfeldolgozó rendszer egyszerűsített megvalósítása. Sok dolog hiányzik, és nem foglalkoznak vele kifejezetten. Ezek közé tartozik (de nem kizárólagosan) a hibakezelés, a többszörös sorok, az ütemezés, a kapcsolat összevonása és a jelkezelés.
ennek ellenére jól éreztük magunkat, és reméljük, hogy élvezted a háttérfeldolgozó rendszer motorháztetője alatt. Talán még egy-két dolgot is elvettél.
Vendég író Benedikt Deicke egy szoftver mérnök és CTO Userlist.io. oldalán könyvet ír a SaaS alkalmazások építéséről Ruby on Rails – ben. A Benedikt-et a Twitteren keresztül érheti el.