الأخبار التكنولوجية والاستعراضات والنصائح!

إرشادات إنتاج تدفق الهواء – اللحاق بالمهمة المناسبة (وليس DAG) – نحو الذكاء الاصطناعي

ستساعدك المقالة التالية: إرشادات إنتاج تدفق الهواء – اللحاق بالمهمة المناسبة (وليس DAG) – نحو الذكاء الاصطناعي

نُشر في الأصل على نحو AI ، الشركة الرائدة في العالم في مجال الذكاء الاصطناعي والأخبار التقنية والإعلام. إذا كنت تقوم ببناء منتج أو خدمة متعلقة بالذكاء الاصطناعي ، فنحن ندعوك للتفكير في أن تصبح راعيًا للذكاء الاصطناعي. في نحو الذكاء الاصطناعي ، نساعد في توسيع نطاق الشركات الناشئة في مجال الذكاء الاصطناعي والتكنولوجيا. دعنا نساعدك على إطلاق التكنولوجيا الخاصة بك للجماهير.

نصائح حول إنتاج تدفق الهواء – اللحاق بالمهمة المناسبة (وليس DAG)

أصبح Apache Airflow هو المعيار القياسي لتنظيم البيانات. ومع ذلك ، على مر السنين والإصدارات ، تراكمت لديها مجموعة من الفروق الدقيقة والأخطاء التي يمكن أن تعيق استخدام الإنتاج.

تهدف هذه السلسلة من المقالات إلى توجيه مستخدمي Apache Airflow خلال عملية التغلب على هذه المشكلات ، وهي نفس المشكلات التي واجهتها.

Note: كالعادة ، كل الكود متاح في مستودع GitHub الخاص بي ،.

المهمة المناسبة (وليس DAG) اللحاق بالركب

TLDR: قدرة Airflow على الحصول على تاريخ TaskInstance السابق الناجح لا يعمل على النحو المنشود ، ويعيد تاريخ تشغيل DAG الناجح السابق بدلاً من ذلك ، مما يمنعك من التقاط المعلومات المفقودة / الفاشلة بدقة وبشكل صحيح. في هذا المنشور ، سوف تتعرف على كيفية تجاوزه. انظر تقرير الخطأ الأصلي.

TLDR # 2: انقر هنا للانتقال مباشرة إلى الحل

عرض المشكلة

تتمثل إحدى الميزات الأكثر إثارة للاهتمام والمفيدة في Apache Airflow في قدرته على اللحاق بالماضي في حالة فشل المهمة. لا ، أنا لا أشير إلى المعلمة التي يمكنك تحديدها في DAG ، ولكن بدلاً من ذلك ، أعطي للمهام القدرة على تضمين TaskInstances التي فشلت سابقًا (وتاريخ تنفيذها) باستخدام إما

نموذج JINJA ، أو الوصول مباشرة إلى TaskInstance السابقة التي كانت حالتها ناجحة:

يكون هذا مفيدًا بشكل خاص عندما تريد التأكد من أن بياناتك تبقى محدثة في الابتكارات أو تستهلك ETL الخاص بك جميع المعلومات لكل من TaskInstances السابقة الحالية والفاشلة ، على سبيل المثال:

في مثال أكثر عملية ، بالنظر إلى الصورة أدناه ، تتوقع أنه في حالة فشل التشغيل الأول للمهمة أثناء استخدام التكوين أعلاه ، سيتم التقاط البيانات الفاشلة / المفقودة في التشغيل التالي ، في الساعة التالية ، هل صحيح؟

تكرار الحدث المتوقع:

  • الحلقة 1
    – تشغيل مهمة التقاط واحد ساعة البيانات ،: نجاح
    – تشغيل مهمة التحويل واحد ساعة البيانات ،: يفشل
    – تشغيل تحميل المهام واحد ساعة البيانات ،: يفشل
  • الحلقة 2
    – تشغيل مهمة التقاط واحد ساعة البيانات ،: نجاح
    – تشغيل مهمة التحويل اثنين ساعات من البيانات (الحالية والفشل سابقًا) ،: نجاح
    – تشغيل تحميل المهام اثنين ساعات من البيانات (الحالية والفشل سابقًا) ،: نجاح

لسوء الحظ ، هذا غير صحيح. في حين أن هذا هو السلوك الواضح والمتوقع ، فإن رمز Airflow القديم يمنع حدوث ذلك. انظر تقرير الخطأ الأصلي.

يوجد أدناه القالب المطابق للتشغيل الأول والثاني للمهمة ، على التوالي:

Note أن المهمة الثانية لا تعوض عن الفشل السابق. بدلاً من ذلك ، يمرر فقط معلمة تاريخ التنفيذ السابقة الخاصة به ، بدلاً من تضمين الجولة السابقة الناجحة ، اللحاق الكامل بالركب .

لماذا يحدث هذا؟

السلوك الافتراضي لـ Apache Airflow ، عندما كان تاريخ التنفيذ السابق ناجحًا ، هو النظر إلى تاريخ التنفيذ الناجح لـ DAG السابق ، وليس تاريخ التنفيذ TaskInstance ، مما يجعل DAGs الخاصة بك غير قادرة على اللحاق بالركب تلقائيًا ما لم يفشل DAG بالكامل.

فهل توقعنا أن يحدث بدلاً من ذلك؟ حسنًا ، نتوقع أن يكون النموذج المعروض في الجولة الثانية:

يمكنك الاطلاع على تقرير الخطأ الأصلي حول المشكلة ، والذي يعود تاريخه إلى عام 2021 وحتى لفترة أطول على StackOverflow.

الحل

مثل العديد من أطر عمل Python الأخرى ، يستخدم Apache Airflow ORM (مخطط علاقة الكائنات) لتلخيص الوصول إلى قاعدة البيانات الخلفية الخاصة به. على وجه التحديد ، تم الاستفادة من مشروع SQL Alchemy المعتاد لتسريع تدفق الهواء في Apache. يتيح لنا ذلك الوصول إلى قاعدة البيانات الوصفية مباشرة ومعالجتها وفقًا لمتطلباتنا.

ثم يتم تقسيم حل مشكلتنا إلى أربع خطوات بسيطة:

  1. استرداد الكائنات على أساس دولة محددة
  2. استرجع آخر مثيل ناجح لملف
  3. استرجع المرتبط بآخر مثيل ناجح للمهمة المتوفرة
  4. تسليحنا بالقدرة على استخدام هذه المعلومات!

أخيرًا ، تمت إضافة مثال قسم عملي لتوضيح الهدف!

الخطوة 1: استرجع كائنات TaskInstance بناءً على حالة محددة

تتوافق الخطوة الأولى مع القدرة على الاستعلام عن ORM لاسترداد قاعدة البيانات لاسترداد آخر تشغيل مطابق لحالة معينة:

Note: الوظيفة عبارة عن تعميم يستخدم للسماح لك بالتقاط أي حالة محددة قد تحتاجها

الوظيفة تشرح نفسها بنفسها ، حيث تقوم بالاستعلام عن كائن ORM وتصفيته عبر معلمتين: حالة البحث المطلوبة والمثيل.

الخطوة 2: استرجع آخر مثيل TaskInstance ناجح للمهمة المتوفرة

بالاستفادة من التعميم المحدد مسبقًا ، يمكننا الآن الاستعلام عن ORM لاسترداد آخر تشغيل فقط كانت حالته نجاح.

الخطوة 3: استرجع DAGRun المرتبط بآخر مثيل TaskInstance ناجح للمهمة المتوفرة.

بعد تعقب آخر مثيل ناجح ، يمكننا الآن استرداد الكائن المرتبط بالمثيل المذكور. يحتوي النموذج على معلومات تتعلق بمثيل run إلى جانب مجموعة كبيرة من المعلومات المفيدة:

مسلحين بهذه المعرفة ، يمكننا الآن استرداد المثيل المرتبط بآخر مثال ناجح تم استرداده في عملية مماثلة كما كان من قبل:

الخطوة 4: تسليح DAG لدينا بالقدرة على استخدام هذه المعلومات!

أخيرًا ، يتعين علينا التأكد من أن Apache Airflow على دراية بوظائفنا الجديدة ويمكنه حقنها في محرك JINJA.

يمكننا القيام بذلك عن طريق استيراد الوظيفة وتمرير الوظيفة المعرفة من قبل المستخدم إلى DAG مباشرة من مُنشئها ، في هذه الحالة ، عبر مدير السياق:

يمكنك الآن استدعاء الوظيفة بحرية مباشرة في JINJA وتمريرها كوسيطة للاستخراج أو ETL أو أي عمليات أخرى قد تكون لديك!

اسمحوا لي أن أعرف في التعليقات إذا وجدت هذه الموارد مفيدة وكالعادة ، يمكنك العثور على هذا الرمز في مستودع GitHub الخاص بي!


تم نشر إرشادات إنتاج تدفق الهواء – المهمة المناسبة (وليس DAG) في الأصل في نحو AI on Medium ، حيث يواصل الأشخاص المحادثة من خلال تسليط الضوء على هذه القصة والرد عليها.

تم النشر عبر نحو الذكاء الاصطناعي