Clase 2: GFS/HDFS & MapReduce/Hadoopaidanhogan.com/teaching/gdd-2017/02/GdD2017-02.pdf1. Parsear...

Preview:

Citation preview

GESTIÓN DE DATOS (MASIVOS)

DIPLOMADO DE DATOS 2017

Clase 2: GFS/HDFS & MapReduce/Hadoop

Aidan Hogan

aidhog@gmail.com

GESTIÓN DE DATOS MASIVOS

... COMO LO HACE GOOGLE ...

Dentro de Google: 1997/98

Dentro de Google: 2017

Implementando la búsqueda de Google

¿Qué procesos y algoritmos necesita Google

para implementar su búsqueda de la Web?

Crawling1. Parsear enlaces de las páginas

2. Ordenar los enlaces para bajar

3. Bajar páginas, GOTO 1

Indexación1. Parsear keywords de las páginas

2. Indexear sus keywords

3. Administrar actualizaciones

Ranking1. ¿Cuán relevante es una página?

2. ¿Cuán importante es?

3. ¿Cuántos clics tiene?

...

Implementando la búsqueda de Google

¿Qué procesos y algoritmos necesita Google

para implementar su búsqueda de la Web?

Crawling1. Parsear enlaces de las páginas

2. Ordenar los enlaces para bajar

3. Bajar páginas, GOTO 1

Indexación1. Parsear keywords de las páginas

2. Indexear sus keywords

3. Administrar actualizaciones

Ranking1. ¿Cuán relevante es una página?

2. ¿Cuán importante es?

3. ¿Cuántos clics tiene?

...

Google

≈ 100 PB / día

≈ 2,000,000 Wiki / día

(2014, procesados)

Implementando la búsqueda de Google

Implementando la búsqueda de Google

¿Qué abstracciones distribuidas podemos

considerar para facilitar aplicaciones así?

Crawling1. Parsear enlaces de las páginas

2. Ordenar los enlaces para bajar

3. Bajar páginas, GOTO 1

Indexación1. Parsear keywords de las páginas

2. Indexear sus keywords

3. Administrar actualizaciones

Ranking1. ¿Cuán relevante es una página?

2. ¿Cuán importante es?

3. ¿Cuántos clics tiene?

...

• write(file f )

• read(file f )

• delete(file f )

• append(file f, data d) ... al menos para empezar

GOOGLE FILE SYSTEM (GFS)(SISTEMA DE ARCHIVOS DE GOOGLE)

Google File System (GFS): White-Paper

Google File System

¿Qué es un sistema de archivos (un “file-system”)?

Google File System

¿Qué es un sistema de archivos (un “file-system”)?

1. Divide un archivo en bloques de almacenamiento

• Guarda la ubicación y secuencia de los bloques de un archivo

Google File System

¿Qué hace un sistema de archivos?

1. Divide un archivo en bloques de almacenamiento

• Guarda la ubicación y secuencia de los bloques de un archivo

2. Organiza una estructura jerárquica de carpetas

• Guarda las carpetas y los archivos en una carpeta

Google File System

¿Qué hace un sistema de archivos?

1. Divide un archivo en bloques de almacenamiento

• Guarda la ubicación y secuencia de los bloques de un archivo

2. Organiza una estructura jerárquica de carpetas

• Guarda las carpetas y los archivos en una carpeta

3. Guarda los meta datos de los archivos

• Tamaño de los archivos, fecha de creación

Google File System

¿Qué hace un sistema de archivos?

1. Divide un archivo en bloques de almacenamiento

• Guarda la ubicación y secuencia de los bloques de un archivo

2. Organiza una estructura jerárquica de carpetas

• Guarda las carpetas y los archivos en una carpeta

3. Guarda los meta datos de los archivos

• Tamaño de los archivos, fecha de creación

4. Protege los archivos

• Propiedad, permisos, bloqueos de sincronización

Google File System

¿Qué hace un sistema de archivos?

1. Divide un archivo en bloques de almacenamiento

• Guarda la ubicación y secuencia de los bloques de un archivo

2. Organiza una estructura jerárquica de carpetas

• Guarda las carpetas y los archivos en una carpeta

3. Guarda los meta datos de los archivos

• Tamaño de los archivos, fecha de creación

4. Protege los archivos

• Propiedad, permisos, bloqueos de sincronización

5. Proviene una interfaz para interactuar con archivos

• Crear, borrar, copiar, etc.

Google File System

¿Qué hace un sistema de archivos?

Google File System

¿Qué hace el "Google File System"?

1. Divide un archivo en bloques de almacenamiento

• Guarda la ubicación y secuencia de los bloques de un archivo

2. Organiza una estructura jerárquica de carpetas

• Guarda las carpetas y los archivos en una carpeta

3. Guarda los meta datos de los archivos

• Tamaño de los archivos, fecha de creación

4. Protege los archivos

• Propiedad, permisos, bloqueos de sincronización

5. Proviene una interfaz para interactuar con archivos

• Crear, borrar, copiar, etc.

Lo mismo, pero distribuido:

(y abstrae la distribución)

Google File System

¿Qué hace el "Google File System"?

Google File System: Suposiciones

Archivos enormes

Archivos frecuentemente leídos o anexados

Concurrencia es importante

Fallas son frecuentes

"Streaming" es importante

GFS: Arquitectura

Esclavos

• 64 MB por bloque• Etiqueta de 64 bit para cada bloque• Suponer factor de replicación: 3

Sistema de archivos (en memoria)

A1 B1 C1 D1 E1

Maestro

GFS: Escritura

Esclavos

• 64 MB por bloque• Etiqueta de 64 bit para cada bloque• Suponer factor de replicación: 3

Sistema de archivos (en memoria)

A1 B1 C1 D1 E1

Maestroblue.txt

(150 MB: 3 bloques)

¿Cómo deberíamos proceder con la escritura?

GFS: Escritura

Esclavos

1

1 12

2

2

3

3

3

1

1 122

2

Sistema de archivos (en memoria)/blue.txt [3 bloques]

1: {A1, C1, E1}2: {A1, B1, D1}3: {B1, D1, E1}/orange.txt [2 bloques]1: {B1, D1, E1}2: {A1, C1, E1}

A1 B1 C1 D1 E1

blue.txt

(150 MB: 3 bloques)

orange.txt

(100 MB: 2 bloques)

Maestro

• 64 MB por bloque• Etiqueta de 64 bit para cada bloque• Suponer factor de replicación: 3

Sistema de archivos (en memoria)/blue.txt [3 bloques]

1: {A1, C1, E1}2: {A1, B1, D1}3: {B1, D1, E1}/orange.txt [2 bloques]1: {B1, D1, E1}2: {A1, C1, E1}

Sistema de archivos (en memoria)/blue.txt [3 bloques]

1: {A1, B1, E1}2: {A1, B1, D1}3: {B1, D1, E1}/orange.txt [2 bloques]1: {B1, D1, E1}2: {A1, D1, E1}

GFS: Tolerancia a fallos

1

1 12

2

2

3

3

3

1

1 122

2

A1 B1 D1 E1C1

1

2

Esclavos

blue.txt

(150 MB: 3 bloques)

orange.txt

(100 MB: 2 bloques)

Maestro

• 64 MB por bloque• Etiqueta de 64 bit para cada bloque• Suponer factor de replicación: 3

• 64 MB por bloque• Etiqueta de 64 bit para cada bloque• Suponer factor de replicación: 3

GFS: Lecturas directas

Esclavos

1

1 12

2

2

3

3

3

1

1 122

2

Sistema de archivos (en memoria)/blue.txt [3 bloques]

1: {A1, C1, E1}2: {A1, B1, D1}3: {B1, D1, E1}/orange.txt [2 bloques]1: {B1, D1, E1}2: {A1, C1, E1}

A1 B1 C1 D1 E1

Maestro

Busco/blue.txt

1 2 3

• 64 MB por bloque• Etiqueta de 64 bit para cada bloque• Suponer factor de replicación: 3

GFS: Replicas Primarias

Esclavos

1

1 12

2

2

3

3

3

1

1 122

2

Sistema de archivos (en memoria)/blue.txt [3 bloques]

1: {A1, C1, E1}2: {A1, B1, D1}3: {B1, D1, E1}/orange.txt [2 bloques]1: {B1, D1, E1}2: {A1, C1, E1}

A1 B1 C1 D1 E1

Maestro

2

Quiero cambiar el bloque 2 de /blue.txt

/blue.txt [3 chunks]2: {A1, B1, D1}

22 2

COMMIT

COMMIT COMMIT

ACK ACK

ACK

GFS: Replicas Primarias

Cliente quiere modificar un archivo:

1. Cliente solicita réplicas (incl. la

primaria) al Maestro

2. Maestro retorna información de

la réplica al cliente

3. Cliente envía cambios del archivo

4. Cliente solicita ejecutar los

cambios a la réplica primaria

5. Primaria solicita ejecutar los

cambios a secundarias

6. Secundarias notifican a primaria

7. Primaria notifica al cliente Datos y Control desacoplados

Maestro asigna tarea a una de las réplicas: “réplica primaria”

GFS: Conocimiento sobre Racks

GFS: Conocimiento sobre Racks

Rack ASwitch

Rack BSwitch

Rack CSwitch

CoreSwitch

CoreSwitch

GFS: Conocimiento sobre Racks

Rack ASwitch

Rack BSwitch

CoreSwitch

1

1

1

A1

A2

A3

A4

A5

B1

B2

B3

B4

B5

Archivos:/orange.txt

1: {A1, A4, B3}2: {A5, B1, B5}

2

22

Racks:A: {A1, A2, A3, A4, A5}B: {B1, B2, B3, B4, B5}

GFS: Otras operaciones

Rebalanceo:

• Distribuir el almacenamiento equitativamente

Borrado:

• Renombrar el archivo con nombre de archivo oculto– Para recuperar, volver a renombrar a versión original

– Si no, será eliminado después de tres días

Monitoreo de réplicas obsoletas:

• Esclavo “muerto” reaparece con datos antiguos: maestro guardará información de la versión y reciclará bloques viejos

GFS: ¿Debilidades?

¿Cuales son las debilidades principales del GFS?

Maestro es un único punto de falla• Usar replicación de hardware

• ¡Logs y checkpoints!

Meta-datos del nodo maestro se mantiene en memoria• Cada bloque sólo necesita 64 bytes en memoria

• Datos del bloque pueden ser consultados en cada esclavo

Maestro es un cuello de botella• Usar una máquina más poderosa

• Minimizar tráfico por nodo maestro

Hadoop Distributed File System (HDFS)

• Versión open-source de GFS

– Esclavo = "Data node"

– Maestro = "Name node"

Sistema de archivos distribuido ...

Crawling1. Parsear enlaces de las páginas

2. Ordenar los enlaces para bajar

3. Bajar páginas, GOTO 1

Indexación1. Parsear keywords de las páginas

2. Indexear sus keywords

3. Administrar actualizaciones

Ranking1. ¿Cuán relevante es una página?

2. ¿Cuán importante es?

3. ¿Cuántos clics tiene?

...

• write(file f )

• read(file f )

• delete(file f )

• append(file f, data d)

HDFS/GFS

¿Eso no más?

Sistema de archivos distribuido ...

... es sólo un sistema de archivos.

GOOGLE'S MAPREDUCE

MapReduce: White-Paper

Empecemos con una tarea simple

¿Cómo podemos hacer un conteo de palabras distribuido?

¿Contar partes en la memoria principal de cada máquina?

¿Pero si una parte no cabe en memoria (e.g., 4-gramas)?

¿Y cómo podemos hacer la unificación/suma de los conteos?

¿Contar partes usando el disco duro de cada máquina?

¿Y de nuevo, cómo podemos hacer la suma de los conteos?

Conteo de palabras distribuido

1 23

A1 B1 C1

a-k r-zl-q

A1 B1 C1

EntradaDatos en GFS/HDFS

Partición

Ordenar / Contar

SalidaDatos en GFS/HDFS

¿Mejor método de partición?

Conteo de palabras distribuido

1 23

A1 B1 C1

hash(w)%3==0 hash(w)%3==2hash(w)%3==1

A1 B1 C1

EntradaDatos en GFS/HDFS

Partición

Ordenar / Contar

SalidaDatos en GFS/HDFS

Conteo de palabras distribuido

1 23

A1 B1 C1

hash(w)%3==0 hash(w)%3==2hash(w)%3==1

A1 B1 C1

EntradaDatos en GFS/HDFS

Partición

Ordenar / Contar

SalidaDatos en GFS/HDFS

¿Podemos abstraer un framework general de este ejemplo?

Conteo de palabras distribuido

1 23

A1 B1 C1 EntradaDatos en GFS/HDFS

Partición

Ordenar / Contar

¿Cuál parte es general y cuál es específica al conteo?

Conteo de palabras distribuido

1 23

A1 B1 C1 EntradaDatos en GFS/HDFS

Partición

Ordenar / Contar

¿Cuál parte es general y cuál es específica al conteo?

MapReduce: Pseudocódigo de conteo de palabras

MapReduce: La idea general

Pero ¿cómo se puede implementar (2) en un sistema distribuido?

1. Hacer una partición por la llave de map

2. Ordenar (en paralelo) por la llave de map

• Implícitamente agrupa por llave

3. Ejecutar reduce

Conteo de palabras distribuido

1 23

A1 B1 C1

hash(w)%3==0 hash(w)%3==2hash(w)%3==1

A1 B1 C1

EntradaDatos en GFS/HDFS

Partición / Transmisión

Ordenamiento

SalidaDatos en GFS/HDFS

Map

Reduce

MapReduce (en detalle)

1. Entrada: Lectura desde el cluster (GFS/HDFS)

– Representa el input en pares de la forma

– Cada tiene tipo y cada tiene tipo

2. Mapeo: Para cada par , genera 0-a-n pares

de la forma

– Cada tiene tipo y cada tiene tipo

– no tienen que ser relacionados

¿Qué hace Entrada en el caso de conteo de palabras?

¿Qué hace Mapeo en el caso de conteo de palabras?

MapReduce (en detalle)

3. Partición: Define la asignación de cada llave del mapeo (y sus pares ) a una máquina (reductor) particular

4. Transmisión ("shuffle"): Los datos son trasladados desde los mapeadores a los reductores (usando GFS/HDFS)

5. Ordenamiento: Cada reductor ordena sus pares por usando una función de comparación (en particular, para agrupar los pares por )

¿Cómo podría funcionar Partición para conteo de palabras?

MapReduce (en detalle)

6. Reducción: Toma todos los valores ... con

el mismo llave y produce 0-a-n pares

7. Salida: Escribe todos los pares de los

reductores a GFS/HDFS en la máquina local

donde se producen los pares

¿Cómo podría funcionar Reducción para conteo de palabras?

1. Entrada

2. Mapeo

4. Transmisión (“Shuffle”)

5. Ordenamiento /

Agrupación

7. Salida

3. Partición / Ordenamiento

6. Reducción

MapReduce (en resumen)

perro sed que

que decir que

la que sed

(0,perro sed que)

(13,que decir que)

(26,la que sed)

(perro,1)

(que,1)

(sed,1)

(decir,1)

(que,1)

(que,1)

(sed,1)

(que,1)

(la,1)

(perro,1)

(que,1)

(que,1)

(que,1)

(que,1)

(decir,1)

(sed,1)

(sed,1)

(la,1)

(decir,{1})

(sed,{1,1})

(que,{1,1,1,1})

(pero,{1})

(la,{1})

(perro,1)

(sed,1)

(que,1)

(que,1)

(decir,1)

(que,1)

(la,1)

(que,1)

(sed,1)

(decir,1)

(sed,2)

(perro,1)

(que,4)

(la,1)

(sed,1)

(decir,1)

(sed,1)

(perro,1)

(que,1)

(que,1)

(que,1)

(que,1)

(la,1)

Entrada MapeoPartición /

OrdenamientoTrans. Agrupación Reducción Salida

MapReduce: Conteo de Palabras

MAPREDUCE:

UNA OPTIMIZACIÓN CON EL "COMBINER"

perro sed que

que decir que

la que sed

(0,perro sed que)

(13,que decir que)

(26,la que sed)

(perro,1)

(que,1)

(sed,1)

(decir,1)

(que,1)

(que,1)

(sed,1)

(que,1)

(la,1)

(perro,1)

(que,1)

(que,1)

(que,1)

(que,1)

(decir,1)

(sed,1)

(sed,1)

(la,1)

(decir,{1})

(sed,{1,1})

(que,{1,1,1,1})

(pero,{1})

(la,{1})

(perro,1)

(sed,1)

(que,1)

(que,1)

(decir,1)

(que,1)

(la,1)

(que,1)

(sed,1)

(decir,1)

(sed,2)

(perro,1)

(que,4)

(la,1)

(sed,1)

(decir,1)

(sed,1)

(perro,1)

(que,1)

(que,1)

(que,1)

(que,1)

(la,1)

MapReduce: Conteo de Palabras

¿Hay algo que podríamos optimizar fácilmente aquí?

1. Entrada

2. Mapeo

4. Transmisión (“Shuffle”)

5. Ordenamiento /

Agrupación

7. Salida

3. Partición / Ordenamiento

6. Reducción

MapReduce (con el "Combiner")

(“Combiner”)

(perro,1)

(que,1)

(sed,1)

(decir,1)

(que,1)

(que,1)

(sed,1)

(que,1)

(la,1)

(perro,1)

(que,1)

(que,1)

(que,2)

(decir,1)

(sed,1)

(sed,1)

(la,1)

(decir,{1})

(sed,{1,1})

(que,{1,1,2})

(pero,{1})

(la,{1})

(perro,1)

(sed,1)

(que,1)

(que,1)

(decir,1)

(que,1)

(la,1)

(que,1)

(sed,1)

(decir,1)

(sed,2)

(perro,1)

(que,4)

(la,1)

(sed,1)

(decir,1)

(sed,1)

(perro,1)

(que,1)

(que,2)

(que,1)

(la,1)

MapeoPartición /

OrdenamientoTrans. Agrupación Reducción Salida

(sed,1)

(perro,1)

(que,1)

(decir,1)

(que,2)

(sed,1)

(que,1)

(la,1)

CombineEntrada

(0,perro sed que)

(13,que decir que)

(26,la que sed)

MapReduce (con el "Combiner")

MapReduce (con el "Combiner")

• Se define el "combiner" como una reducción

• A menudo, se puede usar la reducción misma como un

combiner sin cambiar nada

MapReduce (con el "Combiner")

¿Cuándo se puede usar un combiner?

1. Cuando se produzcan pares con el mismo tipo de llave/valor que

el mapeo (porque se mezclan ambos en la reducción)

2. Cuando la operación de reducción "#" sea

• conmutativa: a#b = b#a

• asociativa a#(b#c) = (a#b)#c = a#b#c

• es decir, cuando se puedan combinar los valores en

cualquier orden con cualquier combinación de

argumentos sin afectar el resultado final; por ejemplo:• +, ×, max, min, etc., están bien

• promedio no: p(a,p(b,c)) ≠ p(p(a,b),c) ≠ p(a,b,c)

MAPREDUCE:

TAREAS MÁS COMPLEJAS

Salida

Entrada

Ventas totales

¿Computar ventas totales por hora?

(Se asume que los precios no cambien)

Entrada

Ventas totales

Indicaciones

• A veces se necesitan varias tareas

– Cada tarea es un join/agrupación

– Se puede encadenar varias tareas así

• Se puede tener varios mapeos para una reducción

– Por ejemplo, para hacer un join sobre varios archivos

• Una reducción necesita al menos un mapeo

– Incluso si el mapeo simplemente "copia" cada par

– El mapeo inicia la partición/transmisión/ordenamiento

• Hay otras soluciones posibles:

– Por ejemplo, hacer el join de producto primero

Entrada

Ventas totales

¿Se puede usar un combiner aquí?

MAPREDUCE:

PROGRAMACIÓN

MapReduce: Beneficios para los programadores

• Se encarga de la implementación a bajo nivel:

– Fácil manejo de inputs y outputs

– No es necesario el manejo de comunicación entre

máquinas

– No es necesario implementar ordenamiento y

agrupación

• Máquinas abstractas (transparencia):

– Tolerancia a fallas

– Ubicaciones físicas abstractas

– Agregar / remover máquinas

– Balanceo de carga

Hadoop

• Implementación open source de MapReduce

• Basado en HDFS

Preguntas?

PROGRAMMING WITH HADOOP

(REFERENCE MATERIAL FOR LAB)

1. Input/Output (cmd)

> hdfs dfs

1. Input/Output (Java)Creates a file

system fordefault

configuration

Check if the file exists; if so

delete

Create file and write a

message

Open and readback

1. Input (Java)

2. MapMapper<InputKeyType,

InputValueType,MapKeyType,

MapValueType>

(input) key: file offset.(input) value: line of the file.context: handles output and

logging.

Emit output

(Writable for values)

Same order

(not needed in therunning example)

(WritableComparable for keys/values)

Needed for default partition function

Needed to sort keys

New Interface

Same as before

(not needed in therunning example)

3. Partition

PartitionerInterface

(This happens to be the default partition method!)

(not needed in therunning example)

4. Shuffle

5. Sort/Comparison

Methods in WritableComparator

(not needed in therunning example)

6. Reduce Reducer<MapKey, MapValue,OutputKey, OutputValue>

key: as emitted frommap

values: iterator overall values for that key

context for output

Write to output

7. Output / Input (Java)Creates a file

system fordefault

configuration

Check if the file exists; if so

delete

Create file and write a

message

Open and readback

7. Output (Java)

Control Flow

Create a JobClient, a JobConfand pass it the main class

Set the type of map and output keys and values in

the configuration

Set input and output paths

Set the mapper class

Set the reducer class(and optionally “combiner”)

Run and wait for job to complete.

More in Hadoop: Combiner

• Map-side “mini-reduction”

• Keeps a fixed-size buffer in memory

• Reduce within that buffer

– e.g., count words in buffer

– Lessens bandwidth needs

• In Hadoop: can simply use Reducer class

More in Hadoop: Counters

Context has a group of mapsof counters

More in Hadoop: Chaining Jobs

• Sometimes we need to chain jobs

• In Hadoop, can pass a set of Jobs to the client

• x.addDependingJob(y)

More in Hadoop: Distributed Cache

• Some tasks need “global knowledge”

– For example, a white-list of conference venues and

journals that should be considered in the citation count

– Typically small

• Use a distributed cache:

– Makes data available locally to all nodes

– (Use sparingly!!)

RECAP

Distributed File Systems

• Google File System (GFS)

– Master and Chunkslaves

– Replicated pipelined writes

– Direct reads

– Minimising master traffic

– Fault-tolerance: self-healing

– Rack awareness

– Consistency and modifications

• Hadoop Distributed File System

– NameNode and DataNodes

MapReduce

1. Input

2. Map

3. Partition

4. Shuffle

5. Comparison/Sort

6. Reduce

7. Output

MapReduce/GFS Revision

• GFS: distributed file system

– Implemented as HDFS

• MapReduce: distributed processing framework

– Implemented as Hadoop

Hadoop

• FileSystem

• Mapper<InputKey,InputValue,MapKey,MapValue>

• OutputCollector<OutputKey,OutputValue>

• Writable, WritableComparable<Key>

• Partitioner<KeyType,ValueType>

• Reducer<MapKey,MapValue,OutputKey,OutputValue>

• JobClient/JobConf