distributie van uitvoerders, kernen en geheugen voor een Spark-toepassing

brontoewijzing is een belangrijk aspect tijdens het uitvoeren van een spark-taak. Als het niet correct is geconfigureerd, kan een spark-taak hele clusterbronnen verbruiken en andere toepassingen laten verhongeren voor bronnen

laten we beginnen met enkele basisdefinities van de termen die worden gebruikt bij het behandelen van Spark-toepassingen.

partities: een partitie is een klein deel van een grote gedistribueerde dataset. Spark beheert gegevens met behulp van partities die helpt parallelize gegevensverwerking met minimale gegevens shuffle over de uitvoerders.

taak: een taak is een werkeenheid die kan worden uitgevoerd op een partitie van een gedistribueerde dataset en wordt uitgevoerd op een enkele uitvoerder. De eenheid van parallelle uitvoering is op taakniveau.Alle taken met-in een enkele fase kunnen parallel worden uitgevoerd

uitvoerder: een uitvoerder is een enkel JVM-proces dat wordt gestart voor een toepassing op een werkknooppunt. Uitvoerder voert taken uit en bewaart gegevens in het geheugen of schijfopslag over hen heen. Elke toepassing heeft zijn eigen uitvoerders. Een enkel knooppunt kan meerdere uitvoerders uitvoeren en uitvoerders voor een toepassing kan meerdere werkknooppunten omvatten. Een uitvoerder blijft gedurende de
duur van de Spark toepassing en voert de taken uit in meerdere threads. Het aantal uitvoerders voor een spark-toepassing kan worden opgegeven in de SparkConf of via de flag –num-uitvoerders vanaf de opdrachtregel.

Clusterbeheer : een externe service voor het verwerven van resources op het cluster (bijvoorbeeld standalone manager, Mesos, YARN). Spark is agnostisch voor een cluster manager, zolang het executeur processen kan verwerven en die kunnen communiceren met elkaar.We zijn vooral geïnteresseerd in garen als Clustermanager. Een spark cluster kan draaien in garen cluster of garen-client mode:

garen-client mode – een driver draait op client proces, Application Master wordt alleen gebruikt voor het aanvragen van bronnen van garen.

garen-clustermodus – een driver draait in het hoofdproces van de toepassing, client verdwijnt zodra de toepassing is geïnitialiseerd

kernen : een kern is een basisberekeningseenheid van CPU en een CPU kan een of meer kernen hebben om taken op een bepaald moment uit te voeren. Hoe meer kernen we hebben, hoe meer werk we kunnen doen. In spark bepaalt dit het aantal parallelle taken dat een uitvoerder kan uitvoeren.

distributie van uitvoerders, kernen en geheugen voor een Spark-toepassing die draait in garen:

spark-submit-class –num-uitvoerders ? executeur-kernen ? executeur-geheugen ? ….

ooit afgevraagd hoe –num-executors, –executor-memory en –execuor-cores spark config params voor uw cluster te configureren?

de volgende lijst bevat enkele aanbevelingen om in gedachten te houden tijdens het configureren:

  • Hadoop/Yarn / OS Deamons: wanneer we spark applicatie draaien met behulp van een cluster manager zoals Yarn, zullen er verschillende daemons zijn die op de achtergrond draaien zoals NameNode, Secondary NameNode, DataNode, JobTracker en TaskTracker. Dus, terwijl we num-executors specificeren, moeten we er zeker van zijn dat we genoeg kernen (~1 core per node) terzijde laten zodat deze daemons soepel kunnen werken.
  • garen ApplicationMaster (AM): ApplicationMaster is verantwoordelijk voor het onderhandelen over resources vanuit de ResourceManager en het werken met de NodeManagers om de containers en hun resource verbruik uit te voeren en te monitoren. Als we draaien vonk op garen, dan moeten we budget in de middelen die AM nodig zou hebben (~1024MB en 1 uitvoerder).
  • HDFS-doorvoer: HDFS-client heeft problemen met tonnen gelijktijdige threads. Er werd opgemerkt dat HDFS volledige schrijfdoorvoer bereikt met ~5 taken per uitvoerder . Het is dus goed om het aantal kernen per uitvoerder onder dat aantal te houden.
  • MemoryOverhead: Volgende afbeelding toont spark-garen-geheugen-gebruik.
image

twee dingen om op te merken van deze afbeelding:

1
2
3
4
5

Full memory requested to yarn per executor =
spark-executor-memory + spark.yarn.executor.memoryOverhead.
spark.yarn.executor.memoryOverhead =
Max(384MB, 7% of spark.executor-memory)

dus als we 20 GB per uitvoerder aanvragen, krijgt AM 20 GB + memoryOverhead = 20 + 7% van 20 GB = ~23 GB geheugen voor ons.

  • het uitvoeren van uitvoerders met te veel geheugen resulteert vaak in overmatige vertragingen bij het ophalen van vuilnis.
  • het uitvoeren van kleine uitvoerders (met een enkele kern en net genoeg geheugen nodig om een enkele taak uit te voeren, bijvoorbeeld) gooit de voordelen weg die voortkomen uit het uitvoeren van meerdere taken in een enkele JVM.

er zijn twee manieren waarop we de uitvoerder en kerndetails configureren voor de Spark-taak. Ze zijn:

  1. statische allocatie – de waarden worden gegeven als onderdeel van spark-submit
  2. dynamische allocatie – de waarden worden opgepikt op basis van de eis (grootte van de gegevens, hoeveelheid benodigde berekeningen) en vrijgegeven na gebruik. Dit helpt de middelen te hergebruiken voor andere toepassingen.

statische allocatie:

laten we nu een 10 knooppuntcluster met de volgende configuratie bekijken en de verschillende mogelijkheden van de distributie van uitvoerders-core-memory analyseren:

1
2
3
4

**Cluster Config:**
10 Nodes
16 cores per Node
64GB RAM per Node

First Approach: Tiny executors :

Tiny executors essentially means one executor per core. Volgende tabel toont de waarden van onze spar-config params met deze aanpak:

1
2
3
4
5
6
7
8
9

–num-executors = In this approach, we’ll assign one executor per core
= total-cores-in-cluster
= num-cores-per-node * total-nodes-in-cluster
= 16 x 10 = 160
–executor-cores = 1 (één uitvoerder per kern)
–executor-memory = hoeveelheid geheugen per uitvoerder
= mem-per-node/num-executors-per-node
= 64GB/16 = 4GB

/div>

analyse: met slechts één uitvoerder per kern, zoals we hierboven hebben besproken, kunnen we niet profiteren van het uitvoeren van meerdere taken in dezelfde JVM. Ook gedeelde / cache variabelen zoals broadcast variabelen en accu ‘ s zullen worden gerepliceerd in elke kern van de knooppunten die 16 keer. Ook laten we niet genoeg geheugen overhead voor Hadoop/garen daemon processen en we tellen niet mee in ApplicationManager. NIET GOED!

tweede benadering: fat-uitvoerders (één uitvoerder per knooppunt):

Fat-uitvoerders betekent in wezen één uitvoerder per knooppunt. Volgende tabel toont de waarden van onze spark-config params met deze aanpak:

1
2
3
4
5
6
7
8
9
10

–num-executors = In this approach, we’ll assign one executor per node
= total-nodes-in-cluster
= 10
–executor-cores = one executor per node means all de kernen van het knooppunt worden toegewezen aan een executeur
= totaal-kernen-in-een-punt
= 16
–executeur-geheugen = de hoeveelheid geheugen per uitvoerder
= mem-per-node/num-executeurs-per-node
= 64GB/1 = 64

Analyse: Met alle 16 kernen per persoon, afgezien van ApplicationManager en daemon-processen zijn niet meegeteld, HDFS doorvoer zal pijn doen en het zal resulteren in een overmatige vuilnis resultaten. Ook niet goed!

derde benadering: Evenwicht tussen vet (vs) minuscuul

volgens de hierboven besproken aanbevelingen:

  • Based on the recommendations mentioned above, Let’s assign 5 core per executors =>--executor-cores = 5 (for good HDFS throughput)
  • Leave 1 core per node for Hadoop/Yarn daemons => Num cores available per node = 16-1 = 15
  • So, Total available of cores in cluster = 15 x 10 = 150
  • Number of available executors = (total cores/num-cores-per-executor) = 150/5 = 30
  • Leaving 1 executor for ApplicationManager =>--num-executors = 29
  • Number of executors per node = 30/10 = 3
  • Memory per executor = 64GB / 3 = 21GB
  • aftellen van heap overhead = 7% van 21GB = 3GB. So, actual --executor-memory = 21-3 = 18GB

So, aanbevolen configuratie is: 29 uitvoerders, 18GB geheugen elk en 5 kernen elk!!

analyse: het is duidelijk hoe deze derde benadering de juiste balans heeft gevonden tussen vet versus klein benaderingen. Onnodig te zeggen, het bereikte parallellisme van een dikke uitvoerder en de beste doorvoer van een kleine uitvoerder!!

dynamische toewijzing

opmerking: de bovengrens voor het aantal uitvoerders als dynamische toewijzing is ingeschakeld is oneindig. Dus dit zegt dat vonk toepassing kan weg te eten alle middelen indien nodig. In een cluster waar andere toepassingen draaien en ze ook kernen nodig hebben om de taken uit te voeren, moeten we ervoor zorgen dat we de kernen toewijzen op clusterniveau.

Dit betekent dat we een specifiek aantal cores kunnen toewijzen voor op garen gebaseerde toepassingen op basis van gebruikerstoegang. Dus we kunnen een spark_user maken en dan cores (min/max) geven voor die gebruiker. Deze limieten zijn voor het delen tussen vonk en andere toepassingen die draaien op garen.

om dynamische allocatie te begrijpen, moeten we kennis hebben van de volgende eigenschappen:

spark.dynamische toewijzing.ingeschakeld – wanneer dit is ingesteld op true hoeven we geen uitvoerders te noemen. De reden is hieronder:

de statische parameternummers die we geven bij spark-submit zijn voor de gehele duur van de taak. Echter, als dynamische allocatie in beeld komt, zouden er verschillende stadia zijn, zoals de volgende:

Wat is het aantal uitvoerders om mee te beginnen:

initiële aantal uitvoerders (spark.dynamische toewijzing.initiale uitvoerders) om te beginnen met

het aantal uitvoerders dynamisch controleren:

vervolgens gebaseerd op laden (taken in afwachting) hoeveel uitvoerders moeten worden aangevraagd. Dit zou uiteindelijk het nummer zijn wat we geven bij spark-submit op statische manier. Dus zodra de eerste uitvoerder nummers zijn ingesteld, gaan we naar min (vonk.dynamische toewijzing.minExecutors) en max (vonk.dynamische toewijzing.maxExecutors) nummers.

Wanneer moeten we nieuwe uitvoerders vragen of huidige uitvoerders weggeven:

wanneer vragen we nieuwe uitvoerders aan (spark.dynamische toewijzing.schedulerBacklogTimeout) – dit betekent dat er taken in afwachting zijn geweest voor zo veel duur. Dus het verzoek om het aantal uitvoerders gevraagd in elke ronde neemt exponentieel ten opzichte van de vorige ronde. Bijvoorbeeld, een toepassing zal toevoegen 1 uitvoerder in de eerste ronde, en vervolgens 2, 4, 8 en ga zo maar door uitvoerders in de volgende rondes. Op een specifiek punt komt de bovenstaande eigenschap max in beeld.

wanneer geven we weg dat een uitvoerder is ingesteld met spark.dynamische toewijzing.executorIdleTimeout.

tot slot, als we meer controle nodig hebben over de uitvoertijd van de taak, controleer dan de taak op onverwacht datavolume dat de statische getallen zouden helpen. Door over te schakelen op dynamic, zouden de middelen op de achtergrond worden gebruikt en de banen met onverwachte volumes zouden andere toepassingen kunnen beïnvloeden.

https://stackoverflow.com/questions/24622108/apache-spark-the-number-of-cores-vs-the-number-of-executors
http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation
http://spark.apache.org/docs/latest/job-scheduling.html#resource-allocation-policy
https://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
http://spark.apache.org/docs/latest/cluster-overview.html

Geef een antwoord

Het e-mailadres wordt niet gepubliceerd.