Õhuvool: kuidas kasutada tuhandete ülesannete ajastamist ühe võttega

Selles postituses hakkan arutama, kuidas saaksime ühe dagi sisse mahutada tuhandeid ülesandeid. Ma ei hakka keskenduma sellele, mis on Airflow ja kuidas saate seda installida, vaid selle asemel arutlen selle üle, kuidas saaksime ühe dagi sees planeerida arvukalt ülesandeid.

Põhimõtteliselt on Airflow ette nähtud mitme DAG-i jaoks ja DAG-i sees võib olla sadu või tuhat ülesannet. Mis saab siis, kui tahame planeerida suurt hulka ülesandeid, ütleme umbes 60000 või enam? Seda olen selles blogis selgitanud.

Töötan Airflow abil oma töövoo automatiseerimiseks. Kuid minu ettevõttes on meil tõesti palju andmeid ja proovisin Airflow erinevaid versioone kasutades ning tõesti tohutute andmete tõttu on mul ühe DAG-i sees peaaegu 70000 ülesannet. Olen proovinud Airflow erinevaid versioone ja uusim versioon suudab planeerida 5000 ülesannet, kuid kui tahame planeerida enamat, jääb planeerija töörežiimi ilma ülesandeid ajastamata. Leidsin iga probleemi ja kontrollisin, kuidas seda lahendada, mis on tegelik põhjus ja lõpuks selle blogi kirjutamise kohta.

See on üks Airflow kasutusjuhtudest, kui ühe DAG-i sees on tuhandeid ülesandeid. Alustuseks peame kasutama Airflow versiooni 1.10.3 versiooni, pärast seda ei tohiks keskenduda suurele hulgale ülesannetele, seega peame kasutama Airflow versiooni 1.10.3. Selle versiooni installimiseks toimige järgmiselt.

  • Kõigepealt peame looma uue keskkonna ja aktiveerima selle keskkonna, kasutades järgmist käsku:
conda looma -n õhuvoolu_3
conda aktiveeri õhuvool_3
  • Spetsiaalse 1.10.3 versiooniga Airflow installimiseks kasutage järgmist käsku:
conda install -c conda-forge õhuvool == 1.10.3
  • Peate kontrollima teatud konkreetsete nõuete täitmist, et see versioon ei töötaks kolbiga1.0.9, nii et kui teil on kolb suurem kui see versioon, kasutage järgmist käsku:
pipi paigaldamise kolb == 1.0.4
pip install funcsigs == 1.0.0 (see on veel üks nõue, mis tuleb installida)
  • Ja selle palju suure hulga ülesannetega töötamisel on soovitatav kasutada selleri täidesaatjat, kuna peame neid ülesandeid paralleelselt täitma ja seda saab saavutada selle abil. Selleri installimiseks kasutage järgmist käsku:
pip install selleri
  • Sellerihalduri kasutamiseks peate kasutama töötajaid ja määrama maakleri, mina kasutan maaklerina RabbitMQ. TO maakleri URL-i seadistamiseks võib kasutada järgmist struktuuri:
broker_url = amqp: // “kasutajanimi”: “parool” @ “hostinimi”: “port” /

näiteks

broker_url = amqp: // külaline: külaline @ localhost: 5672 /
  • Sellerijuhi kasutajaliidese nägemiseks saame kasutada rakendust Flower, et installida järgmine käsk:
conda install -c conda-forge lill
  • Pärast seda peame muutma mõnda konfiguratsiooni, et käivitada paralleelselt tuhandeid ülesandeid ja ajastada tuhandeid ülesandeid ühe võttega.
[tuum]
täitja = CeleryExecutor parallelism = 200000 non_pooled_task_slot_count = 100000 dag_concurrency = 100000 max_active_runs_per_dag = 2
[planeerija]
max_thread = 10 (saab kasutada lõime vastavalt oma programmile, suurendades või vähendades seda)

Need on peamised seaded, kui soovite ajastada tuhandeid ülesandeid ühe võttega. Te peate seda kohandama vastavalt sellele, mitu maksimaalset DAG-i soovite paralleelselt käivitada ja kui palju ülesandeid teil ühe DAG-i sees on.

Peamine parameeter on “Non_pooled_task_slot_count”, mis eemaldati Airflow versioonist 1.10.4, seega kasutan ma 1.10.3, kuna see parameeter mängib väga olulist rolli ülesannete ajastamisel.

Peamine erinevus pärast rakenduse “Non_pooled_task_slot_count” eemaldamist on see, et see kasutab vaikeseadet vaikimisi väärtuseks 128 (saab seda vastavalt nõudele suurendada). “Non_pooled_task_slot_count” peamine töö on ülesannete ajastamine ja see pole ühendatud default_pooliga ega mõne muu andmebaasi ühenduse arvuga, nii et me saame seda arvu suurendada nii palju kui soovite, kuid kui suurendate “default_pool” teenindusaegade arvu siis on see ühendatud ka teie andmebaasiühendustega ja paralleelselt töötava korraga ei saa teil olla 100000 andmebaasiühendust. Põhimõtteliselt eemaldati “Non_pooled_task_slot_count” kasuks “default_pool”.

See postitus sisaldab vastust küsimusele, et miks ajakava muutub kitsaskohaks, see takerdus, see ei ajasta suurt hulka ülesandeid või töötab terve päeva ilma midagi tegemata. Sellel kõigil vastustel on üks vastus Airflow versiooni 1.10.3 kasutamiseks.

Kui kasutate Airflow 1.10.3, peame täpsustama, millist basseini DAG peaks kasutama, kuna see ei kasuta vaikimisi “default_pool”, seega peame ülesannete loomisel läbima parameetri para mater pool = “defautl_pool”. 'Vaikimisi_pool' saab luua kasutajaliidese (administraator -> kogumid) abil või seda saab teha käsurealt:

airflow pool -s default_pool 128 'vaikimisi bassein'.

Siin on näide DAG-i näidisest:

import os datetime import datetime, timedelta import airflow from airflow import DAG from airflow.operators.dummy_operator import DummyOperator
default_args = {'omanik': 'Airflow', 'sõltub_on_past': Vale, 'start_date': airflow.utils.dates.days_ago (2), 'retries': 1, 'retry_delay': timedelta (minutes = 1),}
dag = DAG ('dummy_try1', vaikimisi_margid = vaikesarmid, ajakava_intervall = pole)
i jaoks vahemikus (50000): ülesanded = DummyOperator (task_id = '{}'. vorming (i), dag = dag, pool = 'vaikimisi_pool)

Kõigi versioonide erinevust saate vaadata alloleval lingil:

  • https://github.com/apache/airflow/blob/master/UPDATING.md#airflow-1104