Skalierungsbarrieren abbauen: Wie wir die Nutzung von Elasticsearch bei Intercom optimiert haben
Veröffentlicht: 2022-09-22Elasticsearch ist ein unverzichtbarer Bestandteil von Intercom.
Es unterstützt die wichtigsten Intercom-Funktionen wie Posteingang, Posteingangsansichten, API, Artikel, die Benutzerliste, Berichterstattung, Resolution Bot und unsere internen Protokollierungssysteme. Unsere Elasticsearch-Cluster enthalten mehr als 350 TB an Kundendaten, speichern mehr als 300 Milliarden Dokumente und verarbeiten zu Spitzenzeiten mehr als 60.000 Anfragen pro Sekunde.
Da die Elasticsearch-Nutzung von Intercom zunimmt, müssen wir sicherstellen, dass unsere Systeme skalieren, um unser kontinuierliches Wachstum zu unterstützen. Mit der kürzlichen Einführung unseres Posteingangs der nächsten Generation ist die Zuverlässigkeit von Elasticsearch wichtiger denn je.
Wir haben uns entschieden, ein Problem mit unserem Elasticsearch-Setup anzugehen, das ein Verfügbarkeitsrisiko darstellte und zukünftige Ausfallzeiten drohte: ungleichmäßige Verteilung von Datenverkehr/Arbeit zwischen den Knoten in unseren Elasticsearch-Clustern.
Frühe Anzeichen von Ineffizienz: Lastungleichgewicht
Mit Elasticsearch können Sie horizontal skalieren, indem Sie die Anzahl der Knoten erhöhen, die Daten speichern (Datenknoten). Wir bemerkten allmählich ein Lastungleichgewicht zwischen diesen Datenknoten: Einige von ihnen standen aufgrund einer höheren Festplatten- oder CPU-Auslastung unter mehr Druck (oder „wärmer“) als andere.
(Abb. 1) Ungleichgewicht in der CPU-Auslastung: Zwei heiße Knoten mit ~20 % höherer CPU-Auslastung als der Durchschnitt.
Die integrierte Shards-Platzierungslogik von Elasticsearch trifft Entscheidungen auf der Grundlage einer Berechnung, die den verfügbaren Speicherplatz in jedem Knoten und die Anzahl der Shards eines Index pro Knoten grob schätzt. Die Ressourcennutzung durch Shard wird in dieser Berechnung nicht berücksichtigt. Infolgedessen könnten einige Knoten mehr ressourcenhungrige Shards erhalten und „heiß“ werden. Jede Suchanfrage wird von mehreren Datenknoten verarbeitet. Ein heißer Knoten, der während des Spitzenverkehrs über seine Grenzen hinausgedrängt wird, kann zu Leistungseinbußen für den gesamten Cluster führen.
Ein häufiger Grund für Hot Nodes ist die Shard-Platzierungslogik, die Clustern große Shards (basierend auf der Festplattenauslastung) zuweist, wodurch eine ausgewogene Zuordnung weniger wahrscheinlich wird. Typischerweise kann einem Knoten ein großer Shard mehr als den anderen zugewiesen werden, wodurch er in der Festplattenauslastung heißer wird. Das Vorhandensein großer Shards behindert auch unsere Fähigkeit, den Cluster inkrementell zu skalieren, da das Hinzufügen eines Datenknotens keine Lastreduzierung von allen heißen Knoten garantiert (Abb. 2).
(Abb. 2) Das Hinzufügen eines Datenknotens hat nicht zu einer Lastreduzierung auf Host A geführt. Das Hinzufügen eines weiteren Knotens würde die Last auf Host A verringern, aber der Cluster weist immer noch eine ungleichmäßige Lastverteilung auf.
Im Gegensatz dazu tragen kleinere Shards dazu bei, die Last auf allen Datenknoten zu reduzieren, wenn der Cluster skaliert – einschließlich der „heißen“ (Abb. 3).
(Abb. 3) Viele kleinere Shards tragen dazu bei, die Last auf allen Datenknoten zu reduzieren.
Hinweis: Das Problem ist nicht auf Cluster mit großen Shards beschränkt. Wir würden ein ähnliches Verhalten beobachten, wenn wir „Größe“ durch „CPU-Auslastung“ oder „Suchverkehr“ ersetzen, aber der Größenvergleich erleichtert die Visualisierung.
Lastungleichgewichte wirken sich nicht nur auf die Cluster-Stabilität aus, sondern auch auf unsere Fähigkeit, kosteneffektiv zu skalieren. Wir müssen immer mehr Kapazität als nötig hinzufügen, um die heißeren Knoten unter gefährlichen Werten zu halten. Die Behebung dieses Problems würde eine bessere Verfügbarkeit und erhebliche Kosteneinsparungen durch eine effizientere Nutzung unserer Infrastruktur bedeuten.
Unser tiefes Verständnis des Problems half uns zu erkennen, dass die Last gleichmäßiger verteilt werden könnte, wenn wir Folgendes hätten:
- Mehr Shards relativ zur Anzahl der Datenknoten. Dies würde sicherstellen, dass die meisten Knoten die gleiche Anzahl von Shards erhalten.
- Kleinere Shards im Verhältnis zur Größe der Datenknoten. Wenn einige Knoten einige zusätzliche Shards erhalten würden, würde dies zu keiner nennenswerten Erhöhung der Last für diese Knoten führen.
Cupcake-Lösung: Weniger größere Knoten
Dieses Verhältnis der Anzahl der Shards zur Anzahl der Datenknoten und der Größe der Shards zur Größe der Datenknoten kann durch eine größere Anzahl kleinerer Shards optimiert werden. Es kann jedoch einfacher optimiert werden, indem zu weniger, aber größeren Datenknoten übergegangen wird.
Wir entschieden uns, mit einem Cupcake zu beginnen, um diese Hypothese zu überprüfen. Wir haben einige unserer Cluster auf größere, leistungsfähigere Instanzen mit weniger Knoten migriert – und dabei die gleiche Gesamtkapazität beibehalten. Beispielsweise haben wir einen Cluster von 40 4xlarge-Instanzen auf 10 16xlarge-Instanzen verschoben, wodurch das Lastungleichgewicht verringert wurde, indem die Shards gleichmäßiger verteilt wurden.
(Abb. 4) Bessere Lastverteilung auf Festplatte und CPU durch Verschieben auf weniger größere Knoten.
Die Verringerung der Anzahl größerer Knoten bestätigte unsere Annahmen, dass eine Optimierung der Anzahl und Größe der Datenknoten die Lastverteilung verbessern kann. Wir hätten dort anhalten können, aber der Ansatz hatte einige Nachteile:
- Wir wussten, dass das Lastungleichgewicht erneut auftreten würde, wenn die Shards im Laufe der Zeit größer werden oder wenn mehr Knoten zum Cluster hinzugefügt werden, um den erhöhten Datenverkehr zu berücksichtigen.
- Größere Knoten machen die inkrementelle Skalierung teurer. Das Hinzufügen eines einzelnen Knotens würde jetzt mehr kosten, selbst wenn wir nur ein wenig zusätzliche Kapazität benötigen würden.
Herausforderung: Überschreiten der Schwelle für komprimierte gewöhnliche Objektzeiger (OOPs).
Der Wechsel zu weniger größeren Knoten war nicht so einfach wie das Ändern der Instanzgröße. Ein Engpass, mit dem wir konfrontiert waren, war die Beibehaltung der verfügbaren gesamten Heap-Größe (Heap-Größe auf einem Knoten x Gesamtzahl der Knoten) während der Migration.
Wir haben die Heap-Größe in unseren Datenknoten auf ~30,5 GB begrenzt, wie von Elastic vorgeschlagen, um sicherzustellen, dass wir unter der Grenze bleiben, damit die JVM komprimierte OOPs verwenden kann. Wenn wir die Heap-Größe nach dem Wechsel zu weniger, größeren Nodes auf ~30,5 GB begrenzen würden, würden wir unsere Heap-Kapazität insgesamt reduzieren, da wir mit weniger Nodes arbeiten würden.
„Die Instanzen, zu denen wir migrierten, waren riesig und wir wollten dem Heap einen großen Teil des RAM zuweisen, damit wir Platz für die Zeiger hatten und genug Platz für den Dateisystem-Cache übrig blieb.“
Wir konnten nicht viele Ratschläge zu den Auswirkungen des Überschreitens dieser Schwelle finden. Die Instanzen, zu denen wir migrierten, waren riesig und wir wollten dem Heap einen großen Teil des Arbeitsspeichers zuweisen, damit wir Platz für die Zeiger hatten und genug Platz für den Dateisystem-Cache übrig blieb. Wir haben mit einigen Schwellenwerten experimentiert, indem wir unseren Produktionsdatenverkehr auf Testcluster repliziert haben, und haben uns auf ca. 33 % bis ca. 42 % des RAM als Heap-Größe für Computer mit mehr als 200 GB RAM festgelegt.
Die Änderung der Heap-Größe wirkte sich unterschiedlich auf verschiedene Cluster aus. Während einige Cluster keine Änderung bei Metriken wie „JVM % verwendeter Heap“ oder „Young GC Collection Time“ zeigten, war der allgemeine Trend ein Anstieg. Trotzdem war es insgesamt eine positive Erfahrung, und unsere Cluster laufen seit mehr als 9 Monaten mit dieser Konfiguration – ohne Probleme.

Langfristige Lösung: Viele kleinere Shards
Eine längerfristige Lösung wäre eine größere Anzahl kleinerer Shards im Verhältnis zur Anzahl und Größe der Datenknoten. Wir können auf zwei Arten zu kleineren Shards gelangen:
- Migrieren des Index, um mehr primäre Shards zu haben: Dadurch werden die Daten im Index auf mehr Shards verteilt.
- Zerlegen des Index in kleinere Indizes (Partitionen): Dadurch werden die Daten im Index auf mehrere Indizes verteilt.
Es ist wichtig zu beachten, dass wir nicht eine Million winziger Shards erstellen oder Hunderte von Partitionen haben möchten. Jeder Index und jeder Shard erfordert einige Arbeitsspeicher- und CPU-Ressourcen.
„Wir haben uns darauf konzentriert, das Experimentieren und Beheben suboptimaler Konfigurationen innerhalb unseres Systems zu vereinfachen, anstatt uns auf die ‚perfekte‘ Konfiguration zu fixieren.“
In den meisten Fällen verbraucht ein kleiner Satz großer Shards weniger Ressourcen als viele kleine Shards. Aber es gibt noch andere Optionen – Experimentieren sollte Ihnen helfen, eine für Ihren Anwendungsfall besser geeignete Konfiguration zu finden.
Um unsere Systeme widerstandsfähiger zu machen, haben wir uns darauf konzentriert, das Experimentieren und Beheben suboptimaler Konfigurationen innerhalb unseres Systems zu vereinfachen, anstatt uns auf die „perfekte“ Konfiguration zu fixieren.
Indizes partitionieren
Das Erhöhen der Anzahl der primären Shards kann manchmal die Leistung für Abfragen beeinträchtigen, die Daten aggregieren, was wir bei der Migration des Clusters erlebt haben, der für das Reporting-Produkt von Intercom verantwortlich ist. Im Gegensatz dazu verteilt die Partitionierung eines Indexes in mehrere Indizes die Last auf mehr Shards, ohne die Abfrageleistung zu beeinträchtigen.
Intercom ist nicht verpflichtet, Daten für mehrere Kunden zusammenzufassen, daher haben wir uns für eine Aufteilung basierend auf den eindeutigen IDs der Kunden entschieden. Dies half uns, schneller Mehrwert zu liefern, indem die Partitionierungslogik vereinfacht und die erforderliche Einrichtung reduziert wurde.
„Um die Daten so zu partitionieren, dass sie die bestehenden Gewohnheiten und Methoden unserer Ingenieure am wenigsten beeinträchtigen, haben wir zunächst viel Zeit investiert, um zu verstehen, wie unsere Ingenieure Elasticsearch verwenden.“
Um die Daten so zu partitionieren, dass sie die bestehenden Gewohnheiten und Methoden unserer Ingenieure am wenigsten beeinträchtigen, haben wir zunächst viel Zeit investiert, um zu verstehen, wie unsere Ingenieure Elasticsearch verwenden. Wir haben unser Observability-System tief in die Elasticsearch-Clientbibliothek integriert und unsere Codebasis durchsucht, um mehr über die verschiedenen Möglichkeiten zu erfahren, wie unser Team mit Elasticsearch-APIs interagiert.
Unser Fehlerwiederherstellungsmodus bestand darin, Anfragen zu wiederholen, also haben wir erforderliche Änderungen vorgenommen, wo wir nicht-idempotente Anfragen gestellt haben. Am Ende haben wir einige Linters hinzugefügt, um die Verwendung von APIs wie `update/delete_by_query` zu verhindern, da sie es einfach machten, nicht-idempotente Anfragen zu stellen.
Wir haben zwei Funktionen entwickelt, die zusammengearbeitet haben, um die volle Funktionalität bereitzustellen:
- Eine Möglichkeit, Anforderungen von einem Index zu einem anderen weiterzuleiten. Dieser andere Index kann eine Partition oder einfach ein nicht partitionierter Index sein.
- Eine Möglichkeit, Daten doppelt in mehrere Indizes zu schreiben. Dadurch konnten wir die Partitionen mit dem zu migrierenden Index synchron halten.
„Wir haben unsere Prozesse optimiert, um den Explosionsradius von Zwischenfällen zu minimieren, ohne Kompromisse bei der Geschwindigkeit einzugehen.“
Insgesamt sieht der Vorgang zum Migrieren eines Indexes auf Partitionen wie folgt aus:
- Wir erstellen die neuen Partitionen und aktivieren das duale Schreiben, damit unsere Partitionen mit dem ursprünglichen Index auf dem neuesten Stand bleiben.
- Wir lösen ein Backfill aller Daten aus. Diese Abgleichanforderungen werden doppelt in die neuen Partitionen geschrieben.
- Wenn der Abgleich abgeschlossen ist, validieren wir, dass sowohl der alte als auch der neue Index dieselben Daten enthalten. Wenn alles gut aussieht, verwenden wir Feature-Flags, um die Partitionen für einige Kunden zu verwenden und die Ergebnisse zu überwachen.
- Sobald wir uns sicher sind, verschieben wir alle unsere Kunden auf die Partitionen, während wir gleichzeitig sowohl auf den alten Index als auch auf die Partitionen schreiben.
- Wenn wir sicher sind, dass die Migration erfolgreich war, beenden wir das duale Schreiben und löschen den alten Index.
Diese scheinbar einfachen Schritte sind sehr komplex. Wir haben unsere Prozesse optimiert, um den Explosionsradius von Vorfällen zu minimieren, ohne Kompromisse bei der Geschwindigkeit einzugehen.
Profitieren Sie von den Vorteilen
Diese Arbeit hat uns geholfen, die Lastverteilung in unseren Elasticsearch-Clustern zu verbessern. Noch wichtiger ist, dass wir jetzt die Lastverteilung jedes Mal verbessern können, wenn sie inakzeptabel wird, indem wir Indizes auf Partitionen mit weniger primären Shards migrieren und so das Beste aus beiden Welten erreichen: weniger und kleinere Shards pro Index.
Durch die Anwendung dieser Erkenntnisse konnten wir wichtige Leistungssteigerungen und Einsparungen erzielen.
- Wir haben die Kosten von zwei unserer Cluster um 40 % bzw. 25 % gesenkt und konnten auch bei anderen Clustern erhebliche Kosteneinsparungen feststellen.
- Wir haben die durchschnittliche CPU-Auslastung für einen bestimmten Cluster um 25 % reduziert und die mittlere Anforderungslatenz um 100 % verbessert. Wir haben dies erreicht, indem wir einen Index mit hohem Datenverkehr auf Partitionen mit weniger primären Shards pro Partition im Vergleich zum Original migriert haben.
- Die allgemeine Möglichkeit, Indizes zu migrieren, ermöglicht es uns auch, das Schema eines Index zu ändern, sodass Produktingenieure bessere Erfahrungen für unsere Kunden erstellen oder die Daten mit einer neueren Lucene-Version neu indizieren können, die uns die Möglichkeit gibt, auf Elasticsearch 8 zu aktualisieren.
(Abb. 5) 50 % Verbesserung des Lastungleichgewichts und 25 % Verbesserung der CPU-Auslastung durch Migration eines Index mit hohem Datenverkehr auf Partitionen mit weniger primären Shards pro Partition.
(Abb. 6) Die mittlere Anforderungslatenz wurde um durchschnittlich 100 % verbessert, indem ein Index mit hohem Datenverkehr auf Partitionen mit weniger primären Shards pro Partition migriert wurde.
Was kommt als nächstes?
Die Einführung von Elasticsearch zur Unterstützung neuer Produkte und Funktionen sollte unkompliziert sein. Unsere Vision ist es, unseren Ingenieuren die Interaktion mit Elasticsearch so einfach zu machen, wie moderne Web-Frameworks die Interaktion mit relationalen Datenbanken ermöglichen. Es sollte für Teams einfach sein, einen Index zu erstellen, aus dem Index zu lesen oder zu schreiben, Änderungen an seinem Schema vorzunehmen und vieles mehr – ohne sich Gedanken darüber machen zu müssen, wie die Anforderungen bedient werden.
Interessieren Sie sich für die Arbeitsweise unseres Engineering-Teams bei Intercom? Erfahren Sie mehr und sehen Sie sich unsere offenen Stellen hier an.