Fernando Gandia - Airflow - PyData Mallorca 18-10-2016

Preview:

Citation preview

Fernando Gandia

Fernando Gandia

Conceptos generales

La plataforma Sus componentes principales Interfaces de comunicación

DAG’s ¿Como se especifican? ¿Como se importan en la plataforma? ¿Como se ejecutan y se prueban?

Operadores Por defecto Propios

Interfaz de usuario

Ejemplos

Fernando Gandia

¿Qué es Airflow?

Seleccionar datos Predecir Decidir

00:00 02:0022:00

Predecir Decidir

23:30 01:0022:00

Seleccionar datos

Predecir Decidir

23:30 01:0022:00

Seleccionar datos

CRON

• Especifica el inicio de cada tarea

• Cada tarea se dispara al llegar un instante. No por dependencia con otra tarea

• Manejo de errores es complejo

Describir

¿Por qué surge Airflow?

Plataforma que permite , Temporizar y Monitorizar flujos de trabajo.

Fernando Gandia

¿La esencia de Airflow?

DAG (Directed Acyclic Graphs):

Colección de tareas, organizadas de tal manera, que queden reflejadas sus relaciones y dependencias.

Grafo acíclico dirigido:

Las aristas tienen un sentido definido.

No tiene ciclos.

Se especifican en ficheros Python

Fernando Gandia

Conceptos generales

La plataforma Sus componentes principales Interfaces de comunicación

DAG’s ¿Como se especifican? ¿Como se importan en la plataforma? ¿Como se ejecutan y se prueban?

Operadores Por defecto Propios

Interfaz de usuario

Ejemplos

Fernando Gandia

Comandoairflow initdb

Comandoairflow scheduler

Comandoairflow webserver –p <puerto>

Web Server Scheduler DataBase

Fernando Gandia

Command Line Interface Web User Interface

Fernando Gandia

Conceptos generales

La plataforma Sus componentes principales Interfaces de comunicación

DAG’s ¿Como se especifican? ¿Como se importan en la plataforma? ¿Como se ejecutan y se prueban?

Operadores Por defecto Propios

Interfaz de usuario

Ejemplos

Fernando Gandia

Importar módulos.

Diccionario de argumentos

Instanciar el DAG

Crear cada tarea, asignándole un tipo de operador y sus correspondientes parámetros.

Especificar relaciones y dependencias.

1

2

3

4

5

Fernando Gandia

Copiar fichero .py en la carpeta dags

Dag Folder

Comando airflow list_dags

Si el Scheduler está en marcha el Dag se compila y se importa automáticamente en la UI

Scheduler

1

2

3

Fernando Gandia

Por defecto, al importar un DAG, inicialmente la temporización del mismo está encendida.

A menos que, en las opciones de configuración hayamos desactivado dicha característica.

Fernando Gandia

Si hemos especificado una fecha de inicio, se llevan a cabo ejecuciones de relleno hasta la fecha actual

Fernando Gandia

python dag_file.pyCompilar

airflow test dag_id task_id execution_dateEjecutar una

tarea específica

airflow backfill dag_id –s start_date –e end_dateEjecutar DAG

Fernando Gandia

Conceptos generales

La plataforma Sus componentes principales Interfaces de comunicación

DAG’s ¿Como se especifican? ¿Como se importan en la plataforma? ¿Como se ejecutan y se prueban?

Operadores Por defecto Propios

Interfaz de usuario

Ejemplos

Fernando Gandia

Llevan a cabo una

acción o le

indican a otro sistema que la

realice.

3 tipos de operadores

Transfierendatos de un

sistema a otro

Sensores que se

mantienen en ejecución hasta que

se lleva a cabo cierta condición

Tipo 1 Tipo 2 Tipo 3

Fernando Gandia

BashOperator

DummyOperator

BranchPythonOperator

HttpSensor

SimpleHttpOperator

PythonOperator

ShortCircuitOperator

DummySkipOperator

SubDagOperator

SubDagOperator

TriggerDagRunOperator

BaseOperator

Fernando Gandia

¿Qué necesidad debía cubrir?

Trata con datos de Hive o Impala a través de scripts de Python / PySpark desde un entorno de Airflow remoto.

SSHExecuteOperatorOpción

1

Entorno Airflow

Dag

Infraestructura de datos y procesamiento distribuido

Puerto 22

Python / PySpark

Hive

Bash

Impala

Librerías , Scripts

HDFS

Ssh

Si se interrumpe la tarea se mantiene la ejecución en la insfraestructura

¿Qué opciones me aportaba Airflow?

Fernando Gandia

¿Qué necesidad debía cubrir?

Trata con datos de Hive o Impala a través de scripts de Python / PySpark desde un entorno de Airflow remoto.

¿Qué opciones me aportaba Airflow?

SSHExecuteOperator

BashOperator

PythonLibrerías , Scripts

Impyla

Opción 2

Opción 1

Entorno Airflow

Dag

Infraestructura de datos y procesamiento distribuido

Puerto 22

Python / PySpark

Hive

Bash

Impala

Librerías , Scripts

HDFS

Ssh

HiveServerPuerto 1000

Los datos se tratan en el entorno Airflow y su ámbito está dentro del script Python.

No hay procesamiento distribuido.ODBC

Fernando Gandia

¿Qué necesidad debía cubrir?

Trata con datos de Hive o Impala a través de scripts de Python / PySpark desde un entorno de Airflow remoto.

¿Qué opciones me aportaba Airflow?

ODBC

SSHExecuteOperator

BashOperator

PythonLibrerías , Scripts

Impyla

Opción 2

Opción 3

HiveOperator

Hive - Beeline

Hadoop

ODBC

Opción 1

Entorno Airflow

Dag

Infraestructura de datos y procesamiento distribuido

Puerto 22

Python / PySpark

Hive

Bash

Impala

Librerías , Scripts

HDFS

Ssh

HiveServerPuerto 1000

Se necesita tener Hadoop y Hiveinstalados en mismo entorno Airflow

Fernando Gandia

• No instalar nada mas en el entorno Airflow• Los datos y los scripts debían mantenerse en la infraestructura• Sesión SSH interactiva

¿Requisitos para mi operador?

¿Cómo encripto los datos de conexión SSH?

• Airflow proporciona módulos de conexión llamados Hooks

1 Crear una conexión SSH con nuestro usuario propio en la interfaz de Airflow

mi_conexion

2 Utilizar estos datos de conexión desde el código

from airflow.contrib.hook import SSHHook

mi_hook = SSHHook ( “mi_conexion” )

mi_hook.hostmi_hook.loginmi_hook.password

Librería pxssh

Fernando Gandia

¿Cómo hago de mi solución un operador de Airflow?

Creo un fichero .py

PluginsFolder

from airflow.plugins_manager import AirflowPlugin

class myOperador (BaseOperator)

def __init__ (argumentos_del_operador)

def execute

def on_kill

class myPlugin(AirflowPlugin)

name = “myPlugin”operators = [myOperador]flask_blueprint = []hook = []executors = []…

super (myOperador, self).__init__(*args,**kwargs)

Copiar fichero .py en la carpeta plugins

1

2

Fernando Gandia

Conceptos generales

La plataforma Sus componentes principales Interfaces de comunicación

DAG’s ¿Como se especifican? ¿Como se importan en la plataforma? ¿Como se ejecutan y se prueban?

Operadores Por defecto Propios

Interfaz de usuario

Ejemplos

Fernando Gandia

DAGS Resumen del estado de todos los DAGS.

Fernando Gandia

Data Profiling Querys sobre los orígenes de datos definidos.

Fernando Gandia

Browse Comprobar el estado de ejecución de un elemento en particular

Fernando Gandia

Admin Administración de la plataforma, orígenes de datos y variables

Fernando Gandia

Conceptos generales

La plataforma Sus componentes principales Interfaces de comunicación

DAG’s ¿Como se especifican? ¿Como se importan en la plataforma? ¿Como se ejecutan y se prueban?

Operadores Por defecto Propios

Interfaz de usuario

Ejemplos