Verteilung von Executoren, Kernen und Speicher für eine Spark-Anwendung

Die Ressourcenzuweisung ist ein wichtiger Aspekt bei der Ausführung eines Spark-Jobs. Wenn ein Spark-Job nicht richtig konfiguriert ist, kann er ganze Cluster-Ressourcen verbrauchen und andere Anwendungen nach Ressourcen hungern lassen

Beginnen wir mit einigen grundlegenden Definitionen der Begriffe, die beim Umgang mit Spark-Anwendungen verwendet werden.

Partitionen : Eine Partition ist ein kleiner Teil eines großen verteilten Datensatzes. Spark verwaltet Daten mithilfe von Partitionen, die die Datenverarbeitung mit minimaler Datenmischung über die Executoren hinweg parallelisieren.

Aufgabe : Eine Aufgabe ist eine Arbeitseinheit, die auf einer Partition eines verteilten Datasets ausgeführt werden kann und auf einem einzelnen Executor ausgeführt wird. Die Einheit der parallelen Ausführung befindet sich auf Task-Ebene.Alle Aufgaben mit -in einer einzigen Stufe können parallel ausgeführt werden

Executor : Ein Executor ist ein einzelner JVM-Prozess, der für eine Anwendung auf einem Worker-Knoten gestartet wird. Executor führt Aufgaben aus und speichert Daten im Arbeitsspeicher oder auf der Festplatte. Jede Anwendung hat ihre eigenen Executoren. Ein einzelner Knoten kann mehrere Executoren ausführen, und Executoren für eine Anwendung können mehrere Worker-Knoten umfassen. Ein Executor bleibt für die
Dauer der Spark-Anwendung aktiv und führt die Aufgaben in mehreren Threads aus. Die Anzahl der Executoren für eine Spark-Anwendung kann in der SparkConf oder über das Flag –num-executors in der Befehlszeile angegeben werden.

Cluster Manager : Ein externer Dienst zum Abrufen von Ressourcen auf dem Cluster (z. B. Standalone Manager, Mesos, YARN). Spark ist für einen Cluster-Manager unabhängig, solange er Executor-Prozesse erfassen kann und diese miteinander kommunizieren können.Wir sind in erster Linie an Yarn als Clustermanager interessiert. Ein Spark-Cluster kann entweder im Yarn-Cluster- oder im Yarn–Client-Modus ausgeführt werden:

Yarn-Client-Modus – Ein Treiber wird im Client-Prozess ausgeführt, der Anwendungsmaster wird nur zum Anfordern von Ressourcen von YARN verwendet.

yarn-Cluster–Modus – Ein Treiber läuft innerhalb des Application Master-Prozesses, der Client verschwindet, sobald die Anwendung initialisiert ist

Kerne: Ein Kern ist eine grundlegende Recheneinheit der CPU und eine CPU kann einen oder mehrere Kerne haben, um Aufgaben zu einem bestimmten Zeitpunkt auszuführen. Je mehr Kerne wir haben, desto mehr Arbeit können wir leisten. In Spark steuert dies die Anzahl der parallelen Tasks, die ein Executor ausführen kann.

Verteilung von Executoren, Kernen und Speicher für eine Spark-Anwendung, die in Yarn ausgeführt wird:

spark–submit –class -num-executors ? -Executor-Kerne ? -executor-Speicher ? ….

Haben Sie sich jemals gefragt, wie Sie die Konfigurationsparameter –num-executors, –executor-memory und –execuor-cores für Ihren Cluster konfigurieren können?

Die folgende Liste enthält einige Empfehlungen, die Sie bei der Konfiguration beachten sollten:

  • Hadoop/Yarn/OS Deamons: Wenn wir eine Spark-Anwendung mit einem Cluster-Manager wie Yarn ausführen, gibt es mehrere Daemons, die im Hintergrund ausgeführt werden, wie NameNode, Secondary NameNode, DataNode, JobTracker und TaskTracker. Während wir also num-Executors angeben, müssen wir sicherstellen, dass wir genügend Kerne (~ 1 Kern pro Knoten) beiseite lassen, damit diese Daemons reibungslos laufen.
  • Garn ApplicationMaster (AM): ApplicationMaster ist dafür verantwortlich, Ressourcen vom ResourceManager auszuhandeln und mit den NodeManagern zusammenzuarbeiten, um die Container und ihren Ressourcenverbrauch auszuführen und zu überwachen. Wenn wir Spark auf Yarn ausführen, müssen wir die Ressourcen einplanen, die AM benötigen würde (~ 1024 MB und 1 Executor).
  • HDFS-Durchsatz: Der HDFS-Client hat Probleme mit Tonnen gleichzeitiger Threads. Es wurde beobachtet, dass HDFS mit ~ 5 Tasks pro Executor den vollen Schreibdurchsatz erreicht. Es ist also gut, die Anzahl der Kerne pro Executor unter dieser Zahl zu halten.
  • MemoryOverhead: Das folgende Bild zeigt Spark-Yarn-memory-usage .
Bild

Zwei Dinge, die Sie bei diesem Bild beachten sollten:

1
2
3
4
5

Full memory requested to yarn per executor =
spark-executor-memory + spark.yarn.executor.memoryOverhead.
spark.yarn.executor.memoryOverhead =
Max(384MB, 7% of spark.executor-memory)

Wenn wir also 20 GB pro Executor anfordern, erhalten wir tatsächlich 20 GB + memoryOverhead = 20 + 7% von 20 GB = ~ 23 GB Speicher für uns.

  • Das Ausführen von Executoren mit zu viel Speicher führt häufig zu übermäßigen Verzögerungen bei der Garbage Collection.
  • Das Ausführen winziger Executoren (mit einem einzigen Kern und gerade genug Speicher, um beispielsweise eine einzelne Aufgabe auszuführen) wirft die Vorteile weg, die sich aus dem Ausführen mehrerer Aufgaben in einer einzigen JVM ergeben.

Es gibt zwei Möglichkeiten, wie wir die Executor- und Core-Details für den Spark-Job konfigurieren. Sie sind:

  1. Statische Zuweisung – Die Werte werden als Teil von spark–submit
  2. Dynamische Zuweisung – Die Werte werden basierend auf der Anforderung (Datengröße, Anzahl der erforderlichen Berechnungen) abgerufen und nach Gebrauch freigegeben. Dadurch können die Ressourcen für andere Anwendungen wiederverwendet werden.

Statische Zuweisung:

Betrachten wir nun einen 10-Knoten-Cluster mit folgender Konfiguration und analysieren verschiedene Möglichkeiten der Executors-Core-Memory-Verteilung:

1
2
3
4

**Cluster Config:**
10 Nodes
16 cores per Node
64GB RAM per Node

First Approach: Tiny executors :

Tiny executors essentially means one executor per core. Die folgende Tabelle zeigt die Werte unserer Spar-config-Parameter mit diesem Ansatz:

1
2
3
4
5
6
7
8
9

–num-executors = In this approach, we’ll assign one executor per core
= total-cores-in-cluster
= num-cores-per-node * total-nodes-in-cluster
= 16 x 10 = 160
–executor-cores = 1 (ein Executor pro Kern)
–executor-memory = Speichermenge pro Executor
= mem-per-node/num-executors-per-node
= 64GB/16 = 4GB

Analyse: Mit nur ein Executor pro Kern, wie wir oben besprochen haben, werden wir nicht in der Lage sein, mehrere Aufgaben in derselben JVM auszuführen. Außerdem werden gemeinsam genutzte / zwischengespeicherte Variablen wie Broadcast-Variablen und Akkumulatoren in jedem Kern der Knoten repliziert, was 16-mal der Fall ist. Außerdem lassen wir nicht genügend Speicher für Hadoop / Yarn-Daemon-Prozesse übrig und zählen nicht in ApplicationManager. NICHT GUT!

Zweiter Ansatz: Fat executors (Ein Executor pro Knoten):

Fat executors bedeutet im Wesentlichen einen Executor pro Knoten. Die folgende Tabelle zeigt die Werte unserer Spark-Config-Parameter mit diesem Ansatz:

1
2
3
4
5
6
7
8
9
10

–num-executors = In this approach, we’ll assign one executor per node
= total-nodes-in-cluster
= 10
–executor-cores = one executor per node means all die Kerne des Knotens werden einem Executor zugewiesen
= total-cores-in-a-node
= 16
–executor-memory = Speichermenge pro Executor
= mem-per-node/num-executors-per-node
= 64GB/1 = 64GB

Analyse: Mit allen 16 Kernen pro Executor, abgesehen von ApplicationManager und Daemon-Prozesse werden nicht gezählt, HDFS Durchsatz verletzt und es wird in übermäßigen Müll Ergebnisse führen. Ebenfalls,NICHT GUT!

Dritter Ansatz: Balance zwischen Fett (vs) Fett

Gemäß den Empfehlungen, die wir oben besprochen haben:

  • Based on the recommendations mentioned above, Let’s assign 5 core per executors =>--executor-cores = 5 (for good HDFS throughput)
  • Leave 1 core per node for Hadoop/Yarn daemons => Num cores available per node = 16-1 = 15
  • So, Total available of cores in cluster = 15 x 10 = 150
  • Number of available executors = (total cores/num-cores-per-executor) = 150/5 = 30
  • Leaving 1 executor for ApplicationManager =>--num-executors = 29
  • Number of executors per node = 30/10 = 3
  • Memory per executor = 64 GB / 3 = 21 GB
  • Heap-Overhead abzählen = 7% von 21 GB = 3 GB. Also, tatsächlich --executor-memory = 21 – 3 = 18GB

Die empfohlene Konfiguration lautet also: 29 Executoren, jeweils 18 GB Speicher und jeweils 5 Kerne!!

Analyse: Es ist offensichtlich, wie dieser dritte Ansatz das richtige Gleichgewicht zwischen fetten und winzigen Ansätzen gefunden hat. Unnötig zu erwähnen, dass es die Parallelität eines fetten Executors und die besten Durchsätze eines winzigen Executors erreicht hat!!

Dynamische Zuweisung

Hinweis: Die Obergrenze für die Anzahl der Executoren bei aktivierter dynamischer Zuweisung ist unendlich. Dies besagt also, dass die Spark-Anwendung bei Bedarf alle Ressourcen auffressen kann. In einem Cluster, in dem andere Anwendungen ausgeführt werden und diese auch Kerne zum Ausführen der Aufgaben benötigen, müssen wir sicherstellen, dass wir die Kerne auf Clusterebene zuweisen.

Dies bedeutet, dass wir eine bestimmte Anzahl von Kernen für garnbasierte Anwendungen basierend auf dem Benutzerzugriff zuweisen können. So können wir einen spark_user erstellen und dann Kerne (min / max) für diesen Benutzer angeben. Diese Grenzwerte gelten für die gemeinsame Nutzung zwischen Spark und anderen Anwendungen, die auf YARN ausgeführt werden.

Um die dynamische Zuweisung zu verstehen, müssen wir die folgenden Eigenschaften kennen:

spark.dynamicAllocation.aktiviert – Wenn dies auf true gesetzt ist, müssen wir Executoren nicht erwähnen. Der Grund ist unten:

Die statischen Parameternummern, die wir bei spark-submit angeben, gelten für die gesamte Jobdauer. Wenn jedoch eine dynamische Zuweisung ins Spiel kommt, gibt es verschiedene Stufen wie die folgenden:

Mit welcher Nummer beginnen die Executoren:

Anfängliche Anzahl der Executoren (spark.dynamicAllocation.initialExecutors), um mit

Die Anzahl der Executoren dynamisch zu steuern:

Dann basierend auf der Last (ausstehende Aufgaben), wie viele Executoren angefordert werden sollen. Dies wäre schließlich die Nummer, die wir bei spark-submit auf statische Weise angeben. Sobald die anfänglichen Executor-Nummern festgelegt sind, gehen wir zu min ().dynamicAllocation.minExecutors) und max (spark.dynamicAllocation.maxExecutors) Zahlen.

Wann man neue Executoren fragt oder aktuelle Executoren verschenkt:

Wann fordern wir neue Executoren an (spark.dynamicAllocation.schedulerBacklogTimeout) – Dies bedeutet, dass für diese Dauer ausstehende Aufgaben vorhanden waren. Die Anforderung für die Anzahl der in jeder Runde angeforderten Executoren steigt also exponentiell gegenüber der vorherigen Runde. Beispielsweise fügt eine Anwendung in der ersten Runde 1 Executor und in den folgenden Runden 2, 4, 8 usw. Executoren hinzu. An einem bestimmten Punkt kommt die obige Eigenschaft max ins Bild.

Wann geben wir weg ein Executor wird mit Spark gesetzt.dynamicAllocation.executorIdleTimeout.

Abschließend, wenn wir mehr Kontrolle über die Ausführungszeit des Jobs benötigen, überwachen Sie den Job auf unerwartetes Datenvolumen Die statischen Zahlen würden helfen. Wenn Sie zu dynamic wechseln, werden die Ressourcen im Hintergrund verwendet, und die Jobs mit unerwarteten Volumes können sich auf andere Anwendungen auswirken.

https://stackoverflow.com/questions/24622108/apache-spark-the-number-of-cores-vs-the-number-of-executors
http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation
http://spark.apache.org/docs/latest/job-scheduling.html#resource-allocation-policy
https://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
http://spark.apache.org/docs/latest/cluster-overview.html

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht.