8 Leistungsoptimierungstechniken mit Spark

Angenommen, Sie haben eine Situation, in der ein Datensatz sehr klein und ein anderer Datensatz ziemlich groß ist, und Sie möchten den Join-Vorgang zwischen diesen beiden ausführen. In diesem Fall sollten wir uns für den Broadcast-Join entscheiden, damit der kleine Datensatz in Ihre Broadcast-Variable passt. Die Syntax zur Verwendung der Broadcast-Variablen lautet df1.beitreten(Sendung(df2)). Hier haben wir einen zweiten Datenrahmen, der sehr klein ist, und wir behalten diesen Datenrahmen als Broadcast-Variable.

Code:

val broadcastVar = sc.broadcast(Array(1, 2, 3))

broadcastVar.wert

res0: Array = Array(1, 2, 3)

in:val accum = sc.longAccumulator(„Mein Akkumulator“)

sc.parallelisieren (Array(1, 2, 3, 4)).foreach(x => accum.hinzufügen(x))

accum.value

res2: Long = 10

Cache und Persist

  • Spark bietet eigene Caching-Mechanismen wie persist() und cache() .
  • cache() und persist() speichern den Datensatz im Speicher.
  • Wenn Sie einen kleinen Datensatz haben, der mehrmals in Ihrem Programm verwendet werden muss, wird dieser Datensatz zwischengespeichert.
  • Cache() – Immer im Speicher
  • Persist() – Speicher und Festplatten

Spark bietet einen eigenen Caching-Mechanismus wie Persist und Caching. Persist- und Cache-Mechanismen speichern den Datensatz bei Bedarf im Speicher, wenn Sie über einen kleinen Datensatz verfügen und dieser Datensatz in Ihrem Programm mehrmals verwendet wird. Wenn wir RDD anwenden.Cache() es wird immer die Daten im Speicher speichern, und wenn wir RDD anwenden.Persist() dann kann ein Teil der Daten im Speicher gespeichert werden, einige können auf der Festplatte gespeichert werden.

5. ByKey-Operation

  • Shuffles sind schwere Operationen, die viel Speicher verbrauchen.
  • Während der Codierung in Spark sollte der Benutzer immer versuchen, den Shuffle-Vorgang zu vermeiden.
  • High Shuffling kann zu einem OutOfMemory-Fehler führen; Um einen solchen Fehler zu vermeiden, kann der Benutzer den Parallelitätsgrad erhöhen.
  • Verwenden Sie reduceByKey anstelle von groupByKey.
  • Partitionieren Sie die Daten korrekt.

Wie wir während unserer Transformation von Spark wissen, haben wir viele ByKey-Operationen. ByKey-Operationen erzeugen viel Shuffle. Shuffles sind schwere Operation, weil sie viel Speicher verbrauchen. Beim Codieren in Spark sollte ein Benutzer immer versuchen, einen Shuffle-Vorgang zu vermeiden, da der Shuffle-Vorgang die Leistung beeinträchtigt. Wenn ein hohes Shuffling vorliegt, kann ein Benutzer den Fehler aus dem Speicher holen. In diesem Fall sollte ein Benutzer den Parallelitätsgrad erhöhen, um diesen Fehler zu vermeiden. Anstelle von groupBy sollte ein Benutzer den reduceByKey verwenden, da groupByKey viel Shuffling erzeugt, was die Leistung beeinträchtigt, während reduceByKey die Daten nicht so stark mischt. Daher ist reduceByKey im Vergleich zu groupByKey schneller. Wenn eine ByKey-Operation verwendet wird, sollte der Benutzer die Daten korrekt partitionieren.

6. Dateiformatauswahl

  • Spark unterstützt viele Formate wie CSV, JSON, XML, PARQUET, ORC, AVRO usw.
  • Spark-Jobs können optimiert werden, indem die richtige Datei mit schneller Komprimierung ausgewählt wird, die die hohe Leistung und beste Analyse bietet.
  • Die Parquet-Datei ist nativ für Spark, das die Metadaten zusammen mit der Fußzeile enthält.

Spark kommt mit vielen Dateiformaten wie CSV, JSON, XML, PARQUET, ORC, AVRO und mehr. Ein Spark-Job kann optimiert werden, indem die Parkettdatei mit bissiger Komprimierung ausgewählt wird. Die Parkettdatei ist nativ für Spark, die die Metadaten zusammen mit der Fußzeile enthält Wie wir wissen, ist die Parkettdatei nativ für Spark, das sich im Binärformat befindet, und zusammen mit den Daten, die auch die Fußzeile enthalten, werden auch die Metadaten und die Fußzeile übertragen Sie werden sehen, wann immer Sie eine Parkettdatei erstellen .Metadatendatei im selben Verzeichnis zusammen mit der Datendatei.

Code:

val peopleDF = Funke.lesen.json („Beispiele / src / main / Ressourcen / Personen.json“)

peopleDF.schreiben.parkett(„Menschen.parkett“)

val parquetFileDF = funke.lesen.parkett(„Menschen.parkett“)

val usersDF = Funke.lesen.formatieren(„avro“).load(„Beispiele / src / main/ Ressourcen / Benutzer.avro“)

usersDF.wählen Sie („Name“, „favorite_color“).schreiben.formatieren(„avro“).speichern(„namesAndFavColors.avro“)

Garbage Collection Tuning

  • JVM Garbage Collection kann ein Problem sein, wenn Sie eine große Sammlung nicht verwendeter Objekte haben.
  • Der erste Schritt beim GC-Tuning besteht darin, Statistiken zu sammeln, indem Sie beim Senden von Spark–Jobs – verbose auswählen.
  • In einer idealen Situation versuchen wir, GC-Overheads < 10% des Heap-Speichers zu halten.

Wie wir wissen, wird unser Spark-Job auf der JVM-Plattform ausgeführt, sodass die JVM-Speicherbereinigung problematisch sein kann, wenn Sie eine große Sammlung eines nicht verwendeten Objekts haben. Im Allgemeinen sollten wir in einer idealen Situation unseren Speicher für die Garbage Collection auf weniger als 10% des Heap-Speichers belassen.

8. Parallelitätsgrad

  • Parallelität spielt eine sehr wichtige Rolle bei der Abstimmung von Spark-Jobs.
  • Jede Partition ~ Task benötigt einen einzelnen Kern für die Verarbeitung.
  • Es gibt zwei Möglichkeiten, die Parallelität beizubehalten:
    • Repartition: Gibt die gleiche Anzahl von Partitionen mit hohem Shuffling
    • Coalesce: Reduziert im Allgemeinen die Anzahl von Partitionen mit weniger Shuffling.

In jeder verteilten Umgebung spielt die Parallelität eine sehr wichtige Rolle bei der Optimierung Ihres Spark-Jobs. Jedes Mal, wenn ein Spark-Job übermittelt wird, wird der Schreibtisch erstellt, der Phasen enthält, und die Aufgaben hängen von der Partition ab, sodass für jede Partition oder Aufgabe ein einzelner Kern des Systems zur Verarbeitung erforderlich ist. Es gibt zwei Möglichkeiten, die Parallelität beizubehalten – Repartition und Coalesce. Wenn Sie die Partitionierungsmethode anwenden, erhalten Sie die gleiche Anzahl von Partitionen, aber es wird viel gemischt, sodass es nicht ratsam ist, eine Partitionierung durchzuführen, wenn Sie alle Daten wiederherstellen möchten. Coalesce wird in der Regel die Anzahl der Partitionen reduzieren und schafft weniger Schlurfen von Daten.

Diese Faktoren für die Funkenoptimierung können, wenn sie richtig verwendet werden, –



+