modelos de procesamiento distribuido basados en spark para

117
Universidad Nacional del Centro de la Provincia De Buenos Aires Facultad de Ciencias Exactas - Departamento de Computación y Sistemas Ingeniería de Sistemas Modelos de procesamiento distribuido basados en Spark para algoritmos de recomendación sobre grandes datos sociales por Hernán Joel Zequeira Director: Prof. Dr. Alejandro Zunino Co-Directora: Prof. Dra. Daniela Godoy Trabajo final de la carrera de Ingeniero de Sistemas Tandil, 26 de Febrero de 2019.

Upload: others

Post on 10-Jul-2022

2 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: Modelos de procesamiento distribuido basados en Spark para

Universidad Nacional del Centro de la Provincia De Buenos AiresFacultad de Ciencias Exactas - Departamento de Computación y Sistemas

Ingeniería de Sistemas

Modelos de procesamiento distribuidobasados en Spark para algoritmos de

recomendación sobre grandes datos sociales

por

Hernán Joel Zequeira

Director: Prof. Dr. Alejandro Zunino

Co-Directora: Prof. Dra. Daniela Godoy

Trabajo final de la carrera de Ingeniero de Sistemas

Tandil, 26 de Febrero de 2019.

Page 2: Modelos de procesamiento distribuido basados en Spark para

Resumen

Las redes sociales han cobrado una gran importancia en la vida cotidiana de las personas.Son una gran fuente de información a la que millones de personas acceden para informar-se o conectarse con gente alrededor del mundo. Es por eso que es muy importante queexista un mecanismo de recomendaciones que ayude a los usuarios a interactuar entreellos, a través de recomendaciones de amistades. Encontrar personas de interés puedetornarse en una tediosa tarea en redes tan numerosas en usuarios. La asistencia automa-tizada proporcionará una ayuda a aquellas personas que busquen información en unared social.

Gracias al modelo de la computación paralela y distribuida, hoy es posible gracias a la si-nergia computacional, procesar grandes cantidades de información que una sola compu-tadora, aún siendo la mas potente y costosa, pueda lograr. Mejorar los tiempos y la cali-dad de las recomendaciones de amistades, entre otros tipos de recomendaciones, será elobjetivo principal de este trabajo. La optimización del uso de los recursos, tales como lamemoria y la red, nivel de paralelismo de tareas, tomarán un importante lugar a la horade diseñar un sistema distribuido.

Page 3: Modelos de procesamiento distribuido basados en Spark para

Agradecimientos

Un especial y cordial agradecimiento a todas las personas que hicieron su aporte en estetrabajo y durante toda mi carrera académica y estuvieron para colaborar cuando fue nece-sario. Un especial reconocimiento a mi directores Alejandro Zunino y Daniela Godoy, queme acompañaron durante todo el proceso con la mejor voluntad y disposición motiván-dome a aprender y superarme.

Principalmente, quiero agradecer a mi familia que me alentó a continuar y a no desistirante las vicisitudes, desde el primer día en estos últimos años. Gracias a todos mis amigosque también fueron un fuerte sustento emocional y moral. Y por último, a todos aquellosque hoy me motivan a comenzar mi carrera profesional con sus mejores deseos.

Esta tesis esta dedicada a todos ellos. ¡Muchas gracias por todo!

3

Page 4: Modelos de procesamiento distribuido basados en Spark para
Page 5: Modelos de procesamiento distribuido basados en Spark para

Acrónimos

SR Sistema de recomendación

SALSA Stochastic Approach for Link-Structure Analysis

HITS Hypertext Induced Topic Selection

BSP Bulk Synchronous Parallel

DPM Distributed Partitioned Merge

HDFS Hadoop Distributed File System

ISISTAN Instituto Superior de Ingeniería de Software de Tandil

SN Social Network

OSN Online Social Network

LDA Latent Dirichlet Allocation

ANSA Agenzia Nazionale Stampa Associata

RM-ODP Reference Model for Open Distributed Processing

API Application Programming Interface

JSON JavaScript Object Notation

XML Extensible Markup Language

YARN Yet Another Resource Negotiator

WORM Write Once Read many

JVM Java Virtual Machine

Page 6: Modelos de procesamiento distribuido basados en Spark para

CLI Command Line Interface

SparkLP Spark Link Predictor

RDD Resilient Distributed Dataset

IS Information Source

ISW Information Source Weighted

DAG Directed Acyclic Graph

DAGS Directed Acyclic Scheduler

SSH Secure Shell

IP Internet Protocol

AUFS Advanced Multi-layered Unification Filesystem

BTRFS B-tree FS

VFS Virtual File System

GC Garbage Collection

GI Grado de Interacción

Page 7: Modelos de procesamiento distribuido basados en Spark para

Índice general

1. Introducción 9

1.1. Contexto general . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 9

1.2. Motivación . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 10

1.3. Objetivos . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 12

1.4. Organización del trabajo . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 12

2. Fundamentos teóricos 15

2.1. Sistemas de recomendación . . . . . . . . . . . . . . . . . . . . . . . . . . . 15

2.1.1. Sistemas de recomendación y redes sociales . . . . . . . . . . . . . 17

2.1.1.1. Recomendacion de amigos . . . . . . . . . . . . . . . . . . 18

2.1.1.2. Recomendación de usuarios a seguir . . . . . . . . . . . . 18

2.2. Predicción de enlaces . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 20

2.2.1. Definición del problema . . . . . . . . . . . . . . . . . . . . . . . . . 20

2.3. Computación paralela y distribuida . . . . . . . . . . . . . . . . . . . . . . 23

2.3.1. Definición de un sistema distribuido . . . . . . . . . . . . . . . . . . 23

2.3.2. Necesidad de la computación paralela y distribuida . . . . . . . . . 26

2.4. Trabajos relacionados . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 27

3. Trabajo propuesto 29

3.1. Enfoque . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 29

3.2. Hadoop . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 30

1

Page 8: Modelos de procesamiento distribuido basados en Spark para

3.2.1. Historia . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 30

3.2.2. Conceptos básicos . . . . . . . . . . . . . . . . . . . . . . . . . . . . 31

3.2.3. HDFS . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 31

3.2.4. Arquitectura general de HDFS . . . . . . . . . . . . . . . . . . . . . 32

3.3. Apache Spark . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 33

3.3.1. Por qué Spark . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 35

3.3.2. Comenzando con Apache Spark . . . . . . . . . . . . . . . . . . . . 36

3.3.2.1. Arquitectura general . . . . . . . . . . . . . . . . . . . . . 36

3.3.2.2. Administrador del clúster (cluster manager) . . . . . . . . . 36

3.3.2.3. Contexto de Spark (Spark context) . . . . . . . . . . . . . . 38

3.3.2.4. Nodos trabajadores (worker nodes) . . . . . . . . . . . . . . 38

3.3.2.5. El Controlador (Driver) . . . . . . . . . . . . . . . . . . . . 39

3.3.3. El rol del RDD . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 41

3.3.4. Operaciones sobre RDD . . . . . . . . . . . . . . . . . . . . . . . . . 42

3.3.4.1. Transformaciones . . . . . . . . . . . . . . . . . . . . . . . 42

3.3.4.2. Acciones . . . . . . . . . . . . . . . . . . . . . . . . . . . . 43

3.3.5. RDD Shuffle . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 43

3.3.6. Clasificación de RDD . . . . . . . . . . . . . . . . . . . . . . . . . . . 44

3.3.7. Particionado de datos . . . . . . . . . . . . . . . . . . . . . . . . . . 45

3.3.7.1. Vista general del particionado . . . . . . . . . . . . . . . . 46

3.3.8. Plan de ejecución . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 46

3.3.8.1. RDD Lineage . . . . . . . . . . . . . . . . . . . . . . . . . 46

3.3.8.2. Plan de ejecución lógico . . . . . . . . . . . . . . . . . . . . 47

3.3.8.3. DAG Scheduler . . . . . . . . . . . . . . . . . . . . . . . . 47

3.3.8.4. Transformaciones Narrow . . . . . . . . . . . . . . . . . . 48

3.3.8.5. Transformaciones Wide . . . . . . . . . . . . . . . . . . . . 49

3.3.8.6. Tareas (tasks) . . . . . . . . . . . . . . . . . . . . . . . . . . 49

3.3.8.7. Trabajos (jobs) . . . . . . . . . . . . . . . . . . . . . . . . . 50

3.3.8.8. Stages (etapas) . . . . . . . . . . . . . . . . . . . . . . . . . 50

2

Page 9: Modelos de procesamiento distribuido basados en Spark para

3.3.9. En síntesis . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 51

3.4. Implementación . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 51

3.4.1. Enfoque . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 51

3.4.1.1. Common Neighbors . . . . . . . . . . . . . . . . . . . . . 52

3.4.2. SparkLP . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 53

3.4.3. Alternativas propuestas . . . . . . . . . . . . . . . . . . . . . . . . . 57

3.4.3.1. Estrategias de filtrado . . . . . . . . . . . . . . . . . . . . 58

3.4.3.2. Estrategias de cálculo de Score . . . . . . . . . . . . . . . . 61

3.5. Conclusiones . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 62

4. Entorno experimental 63

4.1. Relevamiento técnico del clúster . . . . . . . . . . . . . . . . . . . . . . . . 63

4.2. Topología de red . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 64

4.2.1. Comunicación SSH entre nodos . . . . . . . . . . . . . . . . . . . . 64

4.3. Análisis del entorno experimental . . . . . . . . . . . . . . . . . . . . . . . 65

4.4. Docker . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 65

4.4.1. Cgroups . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 66

4.4.2. Namespaces . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 67

4.4.3. Sistemas de archivos de unión . . . . . . . . . . . . . . . . . . . . . 67

4.4.4. Libcontainer . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 67

4.4.5. Docker daemon . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 67

4.4.6. El cliente Docker . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 68

4.4.7. Redes de contenedores . . . . . . . . . . . . . . . . . . . . . . . . . . 68

4.4.8. Docker Swarm . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 69

4.4.9. Redes overlay . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 70

4.5. Automatización del proceso . . . . . . . . . . . . . . . . . . . . . . . . . . . 71

4.5.1. Inicio del clúster . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 71

4.5.2. Parada del clúster . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 73

3

Page 10: Modelos de procesamiento distribuido basados en Spark para

5. Experimentación 75

5.1. Alcance y limitaciones . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 75

5.1.1. Reducción del conjunto de datos . . . . . . . . . . . . . . . . . . . . 78

5.2. Preparación del sistema . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 79

5.3. Filtrado de usuarios . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 79

5.3.1. Baseline . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 79

5.3.2. IS e ISW . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 81

5.4. Cálculo de las recomendaciones . . . . . . . . . . . . . . . . . . . . . . . . . 87

5.5. Análisis de los resultados . . . . . . . . . . . . . . . . . . . . . . . . . . . . 93

5.5.1. Baseline . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 94

5.5.2. IS . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 94

5.5.3. ISW . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 96

5.6. Conclusiones de la experimentación . . . . . . . . . . . . . . . . . . . . . . 100

5.6.1. Evolución de la experimentación . . . . . . . . . . . . . . . . . . . . 100

6. Conclusiones 103

6.1. Trabajos Futuros . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 105

Bibliografía 107

4

Page 11: Modelos de procesamiento distribuido basados en Spark para

Índice de figuras

2.1. Definición formal de Link Prediction . . . . . . . . . . . . . . . . . . . . . . 21

2.2. Vista del grafo social y su evolución. . . . . . . . . . . . . . . . . . . . . . . 22

3.1. División de un archivo en bloques cuando es tomado por HDFS. Al usua-rio se le muestra como un único archivo de 500MB . . . . . . . . . . . . . . 32

3.2. Bloques HDFS en clúster multi nodo . . . . . . . . . . . . . . . . . . . . . . 33

3.3. Arquitectura de NameNode y DataNode . . . . . . . . . . . . . . . . . . . 33

3.4. El Spark core y sus librerías complementarias. . . . . . . . . . . . . . . . . 34

3.5. Vista interna de ejecución de Spark . . . . . . . . . . . . . . . . . . . . . . 37

3.6. Spark context actúa como el maestro de la aplicación Spark . . . . . . . . 39

3.7. Componentes en tiempo de ejecución de Spark Standalone . . . . . . . . . 40

3.8. RDDs . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 41

3.9. Transformaciones narrow y wide . . . . . . . . . . . . . . . . . . . . . . . . 43

3.10. Acciones sobre RDDs en Spark . . . . . . . . . . . . . . . . . . . . . . . . . 44

3.11. Proceso Shuffle en operación reduceByKey . . . . . . . . . . . . . . . . . . 45

3.12. RDD Lineage . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 47

3.13. DAG Scheduler transformando el RDD Lineage a un plan físico . . . . . . . 48

3.14. Vista de una transformación narrow . . . . . . . . . . . . . . . . . . . . . . 49

3.15. Vista de una transformación wide . . . . . . . . . . . . . . . . . . . . . . . 49

3.16. Una acción desencadena la creación de una etapa la cual se materializa entareas. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 50

3.17. Grafo de etapas obtenido a partir de un plan lógico . . . . . . . . . . . . . 50

5

Page 12: Modelos de procesamiento distribuido basados en Spark para

3.18. Ejemplo de puntaje para dos casos en un grafo social . . . . . . . . . . . . 53

3.19. Common neighbors redefinido . . . . . . . . . . . . . . . . . . . . . . . . . 53

3.20. Vista de paquetes de SparkLP. . . . . . . . . . . . . . . . . . . . . . . . . . 54

3.21. Diagrama de clases de SparkLP . . . . . . . . . . . . . . . . . . . . . . . . . 55

3.22. Vista de clases para los métodos de filtrado de usuarios . . . . . . . . . . . 60

3.23. Vista de clases para estrategia de cálculos de scores. . . . . . . . . . . . . . 61

4.1. Vista de conectividad del clúster . . . . . . . . . . . . . . . . . . . . . . . . 64

4.2. Integración de Docker con el Kérnel Linux . . . . . . . . . . . . . . . . . . 66

4.3. Vista del Docker Engine y su interacción con cada componente del sistema. 68

4.4. Vista de una red overlay sobre una topología de red subyacente . . . . . . 70

5.1. Histograma para dataset reducido en función de su grado de interacción 78

5.2. Diagrama de secuencia de configuración inicial. . . . . . . . . . . . . . . . 79

5.3. Diagrama de secuencia para la configuración de filtrado nulo. . . . . . . . 81

5.4. Diagrama de secuencia para configuración de filtrado efectivo. . . . . . . 82

5.5. Diagrama de secuencia para recolección de usuarios descartados. . . . . . 83

5.6. DAG para filtrado activo en IS o ISW . . . . . . . . . . . . . . . . . . . . . 84

5.7. DAG en detalle para la primera etapa. . . . . . . . . . . . . . . . . . . . . . 85

5.8. DAG en detalle para la etapa de descarte . . . . . . . . . . . . . . . . . . . 86

5.9. Comienzo del descarte . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 86

5.10. Diagrama de secuencia para configuración de filtrado efectivo. . . . . . . 87

5.11. DAG para la predicción de arcos. . . . . . . . . . . . . . . . . . . . . . . . . 88

5.12. Continuación del proceso de predicción con ayuda de persistencia. . . . . 89

5.13. RDD lineage de el calculo de scores de cada tupla. . . . . . . . . . . . . . . 90

5.14. Obtención de seguidores para un usuario dado. . . . . . . . . . . . . . . . 91

5.15. Intersección de dos conjuntos de seguidores para obtener un score. . . . . 91

5.16. RDD Lineage para ordenamiento. . . . . . . . . . . . . . . . . . . . . . . . . 92

5.17. Ordenamiento terminado. . . . . . . . . . . . . . . . . . . . . . . . . . . . . 92

5.18. Histograma para el grado de IS en el dataset. . . . . . . . . . . . . . . . . . 95

5.19. Histograma para el grado de relevancia de usuarios del dataset. . . . . . . 97

5.20. Gráfico de barras para el modo ISW . . . . . . . . . . . . . . . . . . . . . . 100

6

Page 13: Modelos de procesamiento distribuido basados en Spark para

Índice de cuadros

5.1. Cuadro comparativo del método Baseline . . . . . . . . . . . . . . . . . . . 94

5.2. Cuadro comparativo de cada posible parametrización del método IS . . . 95

5.3. Gráfico de barras para el modo IS. . . . . . . . . . . . . . . . . . . . . . . . 96

5.4. Cuadro comparativo para ISW con valor de umbral 0.5 . . . . . . . . . . . 97

5.5. Cuadro comparativo para ISW con valor de umbral 0.75 . . . . . . . . . . 98

5.6. Cuadro comparativo para ISW con valor de umbral 0.85 . . . . . . . . . . 98

5.7. Cuadro comparativo para ISW con valor de umbral 0.95 . . . . . . . . . . 99

5.8. Cuadro comparativo para ISW con valor de umbral 0.995 . . . . . . . . . 99

7

Page 14: Modelos de procesamiento distribuido basados en Spark para

8

Page 15: Modelos de procesamiento distribuido basados en Spark para

Capı́tulo 1Introducción

Los Sistemas de Recomendación (SRs) (Adomavicius and Tuzhilin, 2005; Bobadilla et al.,2013) se han convertido en una herramienta vital para ayudar a los usuarios a hacer fren-te a la enorme cantidad de información disponible en la Web, por lo que se ha dedicadoconsiderable atención a su diseño y desarrollo. Con la aparición de la Web Social, estossistemas se vuelven fundamentales para ayudan a los usuarios a hacer frente a la abru-madora cantidad de contenido que se intercambia a través de medios de comunicaciónsocial, incluyendo mensajes, opiniones, fotos, videos y documentos, mejorando la expe-riencia del usuario mediante la recomendación de personas, eventos o contenido de inte-rés. La asistencia brindada por los sistemas de recomendación se basa en el aprendizajede modelos capaces de predecir las preferencias, objetivos e intenciones de los usuariosa partir de su comportamiento pasado y las opiniones o acciones de otros usuarios (Ricciet al., 2015). Estos elementos son analizados con técnicas provenientes de campos comoaprendizaje de máquina, minería de datos y recuperación de información, con el fin dedescubrir nuevo conocimiento que permita realizar inferencias acerca de información re-levante, intenciones de los usuarios, formas de interacción preferidas u otros aspectos.Los resultados del aprendizaje se presentan luego al usuario en la forma de sugerenciassobre distintos elementos, como páginas Web o productos en un sitio de e-commerce,permitiendo así enriquecer la experiencia del usuario con un sistema Web. Los algorit-mos de recomendación para redes sociales se valen del grafo subyacente que representaa la red para realizar sugerencias sobre ítems o personas de interés.

1.1. Contexto general

Particularmente la recomendación de amigos o personas a seguir se puede ver como unproblema de predicción de enlaces. La parte central de los algoritmos de predicción de

9

Page 16: Modelos de procesamiento distribuido basados en Spark para

enlaces es una función de similitud entre vértices que permite elaborar un ranking de losenlaces candidatos para recomendación de acuerdo a una probabilidad estimada de quedicho enlace se forme en un futuro. Distintas métricas de similitud basadas en topologíase han propuesto en la literatura (Liben-Nowell and Kleinberg, 2007), que se pueden cla-sificar en basadas en vecinos (como Common Neighbors, Jaccard o Adamic-Adar, entreotras), basadas en caminos (como Katz) y basadas en caminatas aleatorias. En esta últimacategoría caen una serie de algoritmos que han ganado importancia en los últimos añospor ser la base de servicios populares, tales como PageRank centralizado (Page et al.,1999), SimRank (Jeh and Widom, 2002), HITS (Kleinberg, 1999), SALSA (Lempel and Mo-ran, 2001) o WhoToFollow (Gupta et al., 2013). SALSA (Lempel and Moran, 2001), porejemplo, es un algoritmo que combina la idea de navegación aleatoria de PageRank conel concepto de hubs y autoridades de HITS para recomendar resultados de una búsque-da amplia en un tópico dado. Basado en SALSA, el algoritmo WhoToFollow (del inglés,Who To Follow) de Twitter realiza recomendaciones a un usuario. El tamaño de las re-des sociales y su crecimiento constante hace que estos problemas enfrentes problemas deescalabilidad y los ambientes distribuidos sean la opción natural para su procesamiento.Entre los modelos de procesamiento distribuidos más populares se encuentran MapRe-duce (Dean and Ghemawat, 2008), BSP (Valiant, 1990), ForkJoin (Mateos et al., 2010) y,específicamente para grafos, Pregel (Malewicz et al., 2010). La mayoría de los algorit-mos de procesamiento de grafos como PageRank realizan múltiples iteraciones sobre losmismos datos y esto requiere un mecanismo de pasaje de mensajes. En modelos comoMapReduce se necesita programar explícitamente el manejo de iteraciones múltiples, locual es muy ineficiente ya que implica costosas operaciones de entrada/salida en disco yreplicación de datos en el clúster para tolerancia a fallos. Además, cada iteración en Ma-pReduce tiene latencia muy alta, y la siguiente iteración puede comenzar sólo después deque el trabajo anterior terminó completamente. Diferentes herramientas de procesamien-to de grafos como Pregel fueron diseñados para abordar la necesidad de una plataformaeficiente para algoritmos sobre grafos. Estas herramientas son rápidas y escalables, pe-ro no son eficientes para la creación y postprocesamiento de complejos algoritmos querequieren múltiples iteraciones. Distributed Partitioned Merge (DPM) (Corbellini et al.,2017a) se presenta como una alternativa híbrida entre ForkJoin y Pregel que combina lasimplicidad del estilo de programación de ForkJoin con la performance y escalabilidadde Pregel.

1.2. Motivación

En la Web social de hoy se generan enormes cantidades de datos, incluyendo mensajesde texto, fotos, videos, entre otros ítems, además de generar grandes estructuras de da-

10

Page 17: Modelos de procesamiento distribuido basados en Spark para

tos subyacentes, como grafos y folksonomías, que conectan a los usuarios con otros y conítems de distinto tipo, que además están en constante evolución. Esta situación ha llevadoal desarrollo de técnicas y herramientas para almacenar y procesar grandes cantidadesde datos. En la literatura, el término Big Data (Zikopoulos and Eaton, 2011) fue acuñadopara referirse a la gestión de datos a gran escala, incluyendo su análisis y las plataformasde soporte correspondientes. Las redes sociales son una de las fuentes más importantesde lo que se conoce como Social Big Data, ya que éstas plataformas permiten a grandescantidades de individuos interactuar entre sí, proporcionando información acerca de suspreferencias y relaciones (Bello-Orgaz et al., 2016). En los últimos años han ido surgien-do un amplio abanico de tecnologías y herramientas tanto para procesamiento de gran-des datos (como Spark, Hive, Storm, entre otras), como para su almacenamiento (comoHDFS, Cassandra, MongoDB, entre otras)12 (Corbellini et al., 2017b). Uno de los pilarestecnológicos de Big Data es Spark3, un framework open-source orientado al cómputodistribuido rápido sobre Big Data utilizando primitivas de memoria desarrollado por elAMPLab de la Universidad de California en Berkeley. Spark es un motor de análisis dedatos universal y rápido basado en memoria sobre clusters de cómputo distribuido co-mo Hadoop motivado en las limitaciones del paradigma MapReduce/Hadoop que fuer-za un flujo de datos lineal que hace uso intensivo de disco (García-Gil et al., 2017). Estaplataforma permite a los programas del usuario cargar datos en memoria y consultarlosrepetidamente, lo que lo hace ideal para procesamiento online e iterativo. Los algoritmosde recomendación sociales que requieren procesamiento de grandes grafos son efectiva-mente algoritmos iterativos que se ven beneficiados por el procesamiento en memoria deSpark. Dentro de este tipo de algoritmo se encuentran los algoritmos de predicción deenlaces (link prediction), que se utilizan para la recomendación de personas (nodos) enredes sociales (Liben-Nowell and Kleinberg, 2007). El ejemplo clásico de uso de este tipode algoritmos es la recomendación de amistades o usuarios a quienes seguir, como el ser-vicio Who to Follow ofrecido por Twitter (Gupta et al., 2013). Otro punto fuerte de Sparkes el soporte a streaming de datos entrantes junto con el procesamiento de distribuido,cuya combinación permite ofrecer resultados en tiempo real. Este es un requerimiento demuchos sistemas de recomendación en la actualidad dada la tecnología móvil y la necesi-dad de brindar sugerencias context-aware calculadas en base al procesamiento de datosentrantes (como los mensajes provenientes del timeline de Twitter).

1http://cassandra.apache.org/2https://www.mongodb.com/es3https://spark.apache.org/

11

Page 18: Modelos de procesamiento distribuido basados en Spark para

1.3. Objetivos

El objetivo general que se plantea para este trabajo es la evaluación de modelos de proce-samiento sobre Spark para facilitar el desarrollo de algoritmos de recomendación socialesmasivamente distribuidos y con requerimientos de tiempo real. Particularmente, se en-focará el plan al tratamiento de algoritmos de predicción de enlaces para grafos socialescomo los algoritmos clásicos basados en vecindarios o caminos. Los primeros no sólo sonlos más utilizados en la actualidad, sino que además son los que potencialmente obten-drían mayores ventajas del procesamiento en memoria ofrecido por Spark. A diferenciade soluciones existentes, se buscarán modelos que exploten las características propiastanto de los algoritmos (por ejemplo, iteraciones en múltiples etapas) como de los grafossociales (altos coeficientes de clustering, distribución power-law) para la distribución dedatos y trabajos de procesamiento.

Para lograr este objetivo, se implementará una métrica denominada Common Neighbors,y se utilizará Apache Spark como motor de procesamiento distribuido, corriendo sobreun clúster perteneciente a ISISTAN4. Trataremos en detalle las características técnicasde dicho clúster y las limitantes que se encontraron al momento de la experimentación,y como se mitigaron. La importancia del buen uso de los recursos cobrarán un lugarimportante a la hora del análisis de los resultados.

1.4. Organización del trabajo

Para cumplir el objetivo propuesto en este trabajo se debe ahondar en distintos aspectosde interés, por este motivo en cada uno de los siguientes capítulos se introduce al lec-tor en una sección específica. A continuación se realizará una breve descripción de loscontenidos de cada uno de los capítulos.

Capítulo 2 Nos centraremos en todos los aspectos teóricos involucrados en estetrabajo, desde una vista conceptual sobre qué son los sistemas de recomendacion,características, objetivos y teoría de recomendación de usuarios. Trataremos los pro-blemas de predicción de enlaces y el modelo de computación paralela y distribuida,sus características fundamentales y su motivación.

Capítulo 3 Se tratará el trabajo a realizar en concreto. Un sistema de predicción deenlaces, orientado a redes sociales y a la recomendación de amigos. Se dará unaintroducción a las tecnologías utilizadas para tal fin, como Apache Spark, sus ca-racterísticas principales y breve historia.

4http://www.isistan.unicen.edu.ar/

12

Page 19: Modelos de procesamiento distribuido basados en Spark para

Capítulo 4 Se evaluará el ambiente físico donde se llevarán a cabo las pruebas, seincluirán detalles sobre la creación y configuración del clúster computacional, asícomo también de la automatización de dicho proceso mediante scripts. También sedará una breve reseña a Docker y sus características básicas.

Capítulo 5 Se describirá el conjunto de datos a tratar en las pruebas del software im-plementado. Se verá en detalle su integración con Spark, y se analizarán diferentesescenarios de ejecución, tiempos y resultados obtenidos.

Capítulo 6 Se discutirán las conclusiones globales de este trabajo, cuáles fueron lasmayores limitantes, decisiones de diseño y trabajos a futuro a partir de aquí.

13

Page 20: Modelos de procesamiento distribuido basados en Spark para

14

Page 21: Modelos de procesamiento distribuido basados en Spark para

Capı́tulo 2Fundamentos teóricos

Como se menciona en la introducción de este trabajo, en el presente capítulo se pretendedar una visión teórica sobre las áreas involucradas en el transcurso del documento: talescomo problemas de predicción de enlaces, computación paralela y distribuida y cómoambos se relacionan entre sí para este trabajo.

Los algoritmos de predicción de enlaces suelen ser problemas intratables para un siste-ma de cómputo convencional, debido a la gran cantidad de datos que éstos procesan, ya la complejidad exponencial que éstos conllevan. Es así, que el paradigma de compu-tación paralela y distribuida es de gran ayuda en problemas de este tipo, donde se puedesimplemente partir el problema inicial en divisiones del mismo, y procesarlos a todos deforma paralela en un sistema de múltiples unidades de cómputo. De este modo, es po-sible procesar grandes volúmenes de información y de esta forma buscar una mejora enlas recomendaciones resultantes de este tipo de algoritmos.

2.1. Sistemas de recomendación

Los Sistemas de Recomendación (RS, por sus siglas en inglés) han atraído una atencióncreciente en los últimos años debido a la creciente popularidad de la web y más tardela web social, caracterizado por los sitios de redes sociales que permiten a millones deusuarios comunicarse, intercambiar opiniones y compartir contenido generado por losusuarios (por ejemplo, fotos, videos y documentos). El uso generalizado y el interés enlas redes sociales, como Facebook1 o Twitter2, plantean nuevos desafíos a los enfoquestradicionales para la construcción de sistemas de recomendación ya que las relacionessociales tienen un papel destacado. La información acerca de los vínculos sociales y su

1http://www.facebook.com/2http://www.twitter.com/

15

Page 22: Modelos de procesamiento distribuido basados en Spark para

semántica es explotada por los sistemas de recomendación basados en redes sociales paraproporcionar diversos tipos de recomendaciones. Por ejemplo, recomendar a la gente unusuario que puede estar interesado en seguir en una red social como Twitter, mediante laobtención de los intereses de sus mensajes de Twitter o los usuarios de que él/ella sigue.Estas técnicas no sólo ayudan a aumentar la conectividad entre los usuarios, sino tambiéna fomentar la adopción y el crecimiento del sistema.

Los sistemas de recomendación son herramientas y técnicas que proporcionan sugeren-cias acerca de los artículos más adecuados (productos o servicios) a determinados usua-rios (individuos o empresas) a través de la predicción de interés de un usuario en un ítem(Ricci et al., 2010). Una predicción puede basarse en la información relacionada acerca delos elementos, las preferencias de los usuarios y/o las interacciones entre ítems y usuarios(Bobadilla et al., 2013). Estas sugerencias pueden ayudar a diversos procesos de toma dedecisiones, tales como qué artículos para comprar, qué música escuchar, o qué noticia leer.Las recomendaciones suelen ser personalizadas, es decir que diferentes usuarios o gru-pos de usuarios reciben sugerencias diferentes. Las recomendaciones no personalizadasson más fáciles de generar, pero más genéricas (por ejemplo, los diez primeros artículosleídos en un sitio Web del periódico). Por lo general, las sugerencias se presentan a losusuarios en forma de una lista clasificada de artículos. Los sistemas de recomendaciónhacen predicciones sobre artículos mediante el aprovechamiento de múltiples fuentes deinformación y, posiblemente, considerando el contexto de usuario o tarea en cuestión(por ejemplo, sugiriendo páginas Web mientras el usuario está navegando, o productossimilares a los que el usuario está inspeccionando o ya ha comprado).

Los RS han demostrado ser útiles para ayudar a los usuarios a hacer frente a la cantidadposiblemente abrumadora de artículos ofrecidos por un sistema (por ejemplo, un sitio deventa de libros Web, como Amazon3, que contiene miles de libros). El objetivo principalde los RS es aliviar el problema de la sobrecarga de información, indicando a cada usuarionuevos artículos, aún no experimentados/vistos/usados que puedan ser relevantes parala tarea actual del usuario (por ejemplo, páginas web, libros, películas, etc.)

Los ámbitos de aplicación de los sistemas de recomendación son diversos. Una de lasaplicaciones más fructíferas es el comercio electrónico, en el que los RS sugieren produc-tos (por ejemplo, libros, música, programas de televisión) a los clientes del sitio y pro-porcionan información para ayudarles a decidir qué productos comprar en sus tiendasen línea. El turismo es también un dominio adecuado para los RS que ofrecen a los viaje-ros sugerencias sobre planes para visitar lugares de interés, alojamientos y servicios. Dehecho, los sistemas de recomendación móviles sensibles al contexto prometen enrique-cer sustancialmente las experiencias turísticas. Los RS se han convertido en un elementoimportante de muchos sistemas basados en la Web, tales como asistentes de navegación

3http://www.amazon.com/

16

Page 23: Modelos de procesamiento distribuido basados en Spark para

y filtrado de información. Otros campos de aplicación incluyen E-learning, sistemas debibliotecas, desarrollo de software, juegos, salud, entre muchos otros.

2.1.1. Sistemas de recomendación y redes sociales

Las redes sociales (SN, su sigla en inglés) son estructuras que consisten de vértices querepresentan a las personas u otras entidades integradas en un contexto social y arcos querepresentan la interacción, colaboración, o alguna otra forma de vinculación entre las en-tidades. En los sitios de redes sociales, los usuarios forman una red social en línea (OSN),que proporciona un poderoso medio para compartir, organizar y encontrar contenido ycontactos. El núcleo de estas OSN, formado por perfiles personales de usuarios, que sue-len contener información de identificación (por ejemplo, nombre y foto), intereses (porejemplo, si suscribieron a los grupos de interés), y los contactos personales (por ejemplo,la lista de usuarios conectados, llamado “amigos” ) (Heidemann et al., 2012).

En algunas redes, se establece un vínculo entre dos personas si ambas personas están deacuerdo en tener una relación de amistad. Esto conduce a una red social sin direcciónya que ambas personas comparten una amistad mutua. Facebook, MySpace4 y LinkedIn5 son ejemplos de redes sociales no dirigidas. En otros sitios de redes sociales, se formaun enlace si un usuario se suscribe para recibir actualizaciones de otro usuario (es de-cir, publicaciones o posts). Servicios de micro-blogging como Twitter o Sina Weibo6 sonun ejemplo de estas últimas, así como las redes de colaboración como GitHub7, en laque los usuarios se siguen cuando trabajan en proyectos interesantes. Las redes socialesresultantes son redes dirigidas que permiten a los usuarios seguir a otros en no nece-sariamente relaciones recíprocas (por ejemplo, las celebridades suelen ser seguidas pormuchos usuarios, pero ellos no siguen a sus seguidores). Mientras que el primer tipo deOSN el énfasis está en las amistades, las últimas están las orientadas a la información.

El uso generalizado de las redes sociales genera la expansión de la OSN a ritmos sin pre-cedentes. En estas comunidades en línea, se vuelve cada vez engorroso encontrar nuevoscontactos y amigos. En respuesta a este problema, muchas plataformas de redes socialeshan comenzado a prestar asistencia a sus usuarios en el establecimiento de nuevas re-laciones sociales al hacer recomendaciones (por ejemplo, el servicio “a quién seguir” deTwitter o la función “la gente que puede conocer” de Facebook). Las técnicas de forma-ción de enlaces que apoyan el descubrimiento y el establecimiento de relaciones en redessociales y sus aplicaciones incluyen (Schall, 2015): establecimiento de nuevas relacionessociales, el apoyo a la formación de comunidades de expertos, formación estratégica de

4www.weibo.com5www.linkedin.com6www.weibo.com7www.github.com

17

Page 24: Modelos de procesamiento distribuido basados en Spark para

los equipos y la reducción de las comunidades en línea. La recomendación de amigos y larecomendación de usuarios a seguir presentan diferentes retos en el diseño de sistemasde recomendación.

A continuación se describen estos dos tipos de recomendación.

2.1.1.1. Recomendacion de amigos

La recomendación de amigos tiene por objetivo sugerir usuarios que un usuario destinopuede estar interesado en contactar, por ejemplo, se construyen recomendaciones parapares de usuarios que puedan estar interesados uno en otro. Se han propuesto varios en-foques en la literatura para descubrir amigos valiosos. En su mayoría, los enfoques derecomendación de amigos se basan en el análisis del grafo social, explorando el conjuntode personas vinculadas al usuario destino con el fin de elaborar recomendaciones. Losusuarios más cercanos en la red social, como amigos de amigos (la funcionalidad “Perso-nas que quizás conozcas” ofrecida por Facebook), o usuarios con la más alta probabilidadde ser cruzados en un recorrido aleatorio, son enfoques comúnmente utilizados. En estadirección, (Liben-Nowell and Kleinberg, 2007) han discutido la predicción de enlace enlas redes sociales y han comparado varios métodos basados en la proximidad del nododesde un punto de vista puramente topológico. Otro enfoque al problema consiste en en-contrar usuarios afines mediante la evaluación de sus perfiles de similitud. En (Mazhariet al., 2015) se presenta un modelo para medir las similitudes de dos miembros de la redsocial, sobre la base de la información contenida en sus perfiles. A diferencia de otrosmodelos de perfiles de coincidencia, este método no asigna el mismo peso a los diferen-tes elementos de los perfiles, sino que se utiliza un modelo de minería para descubrir elgrado de influencia de los diferentes factores que afectan la formación de amistades. En(Xie, 2010) han diseñado un marco general para recomendación de amigos mediante elaprovechamiento de características basadas en los intereses que pueden caracterizar elinterés de los usuarios en dos dimensiones: contexto (lugar, tiempo) y el contenido, asícomo la combinación de conocimiento del dominio para mejorar la calidad de recomen-dar. Las propuestas híbridas aprovechan tanto las conexiones de red sociales, así como elcontenido generado por el usuario y los perfiles.

2.1.1.2. Recomendación de usuarios a seguir

La recomendación de usuarios a seguir (followees) consiste en hacer sugerencias a losusuarios acerca de a quién seguir en las redes sociales dirigidas. Las plataformas demicro-blogging son un ejemplo de redes atención-información o redes híbridas (Romeroand Kleinberg, 2010), situadas en la intersección de las redes sociales y de información.

18

Page 25: Modelos de procesamiento distribuido basados en Spark para

En estas redes, los usuarios pueden seguir a otros usuarios y suscribirse a los conteni-dos que publican. Sin embargo, la búsqueda de usuarios interesantes a seguir puede serdifícil, especialmente para los nuevos usuarios. Se han propuesto varios algoritmos derecomendación de followees en la literatura que explotan la topología de la red segui-dor/seguido, utilizando medidas de similitud del perfil para recomendar a los usuariosaquellos con preferencias de información similares, o evaluando resultados de populari-dad para encontrar buenas fuentes de información. El mecanismo de recomendación defollowees más simple es el ranking de usuarios en función de su influencia y sugiriendolos más influyentes.

TwitterRank (Weng et al., 2010), una extensión del algoritmo PageRank, trata de encon-trar tuiteros influyentes teniendo en cuenta la similitud de tópicos entre los usuarios, asícomo la estructura de enlaces. TURank (Yamaguchi et al., 2010) considera la gráfica socialy el flujo tweet real. (Garcia and Amatriain, 2010) han propuesto un método para pon-derar la popularidad y la actividad de enlaces para rankear usuarios. La recomendaciónde usuario, sin embargo, no puede basarse exclusivamente en los estudios rankings deinfluencia general en Twittersphere completa ya que las personas que son populares enTwitter no necesariamente coinciden con los intereses de un usuario en particular. En sumayoría, las estrategias de recomendación de followees generan recomendaciones me-diante el análisis de la estructura de enlaces de la red seguidor/seguido. El servicio “aquién seguir” de Twitter, responsable de más de un octavo de todas las nuevas conexio-nes (Goel et al., 2015), lleva a cabo una exploración del grafo para identificar un “círculode confianza” y luego rankear a estos usuarios (Gupta et al., 2013). Romero and Klein-berg (Romero and Kleinberg, 2010) examinaron “el cierre dirigido” (directed closure),una forma dirigida de cierre triádico (hay una mayor probabilidad de que una amistadse forme entre dos personas - es decir, una díada - si ya tienen un amigo en común -que formar una tríada), en las redes de atención-información y se encontró que existeun comportamiento diferente de formación de vínculo entre subredes. En (Armentanoet al., 2012) proponen un recorrido de la red sobre la base de una distinción entre losusuarios que actúan como solicitantes de información o fuentes de información (?; Krish-namurthy et al., 2008a). Por lo tanto, un solicitante de información tiene algunos usuariosde Twitter interesantes que actúan como fuentes de información (es decir, sus followees).Otras personas que también siguen estas fuentes (es decir, seguidores de los followeesdel usuario) es probable que compartan algunos intereses con el usuario objetivo y po-drían haber descubierto otras fuentes de información relevantes sobre los mismos temas(es decir, sus followees). Este último grupo entonces se compone de potenciales candi-datos para sugerir al usuario como followees futuras. En (Zhao et al., 2013) un métodobasado en LDA sobre las relaciones seguidor/seguido se utiliza para descubrir las comu-nidades de usuarios con influencia similar, así como los intereses. Entonces, un métodode factorización de la matriz se aplica en cada una de las comunidades para mejorar

19

Page 26: Modelos de procesamiento distribuido basados en Spark para

las recomendaciones personalizadas. En (Yu and Qiu, 2014) han adaptado el modelo defactorización de la matriz de los SR tradicionales a la recomendación de followees y uti-lizan regularización estructural para explotar la información topológica de la SN pararestringir el modelo. Métodos basados en la topología de red ignoran los atributos quelos individuos pueden tener, como información de contenido del tweet o característicasdel perfil, y por lo tanto no pueden explicar suficientemente la preferencia de los usua-rios en los sistemas de microblogging. La recomendación de followees no es exclusiva deldominio micro-blogging. En el contexto de redes sociales basadas en la ubicación (LBSN,su sigla en inglés) tales como Foursquare, se ha propuesto un método de recomenda-ción de followee basado en información geográfica, textual y social (GTS-FR) (Ying et al.,2012). GTS-FR tiene en cuenta los movimientos del usuario, mensajes de texto en líneay propiedades sociales para descubrir la relación entre las necesidades de informaciónde los usuarios y la información proporcionada para la recomendación de followees. Git-Hub es una plataforma de colaboración social en línea, que permite a la gente trabajaren proyectos privados o públicos (open source). En sitios como GitHub, la gente es en sumayoría seguida porque trabajan en proyectos interesantes. En (Schall, 2013) presentaronun enfoque de recomendación basado en una métrica de autoridad novel ya que en lascomunidades de desarrolladores de software una autoridad es un experto o gurú en unárea específica. Por lo tanto, la autoridad se calcula en base a las acciones realizadas al-macenadas en un repositorio (por ejemplo, las actividades de codificación, corrección deerrores, etc.) y se genera un rango de autoridad contextual personalizado.

2.2. Predicción de enlaces

Más allá del contexto de las redes sociales, la predicción de enlaces tiene muchas otrasaplicaciones. Por ejemplo, en bioinformática, puede ser utilizada para encontrar inter-acciones entre proteínas (Airoldi et al., 2008); en comercio electrónico puede ayudar aconstruir sistemas de recomendación (Huang et al., 2005) tal como «gente que compróesto también compró..» en Amazon; y en al ámbito de la seguridad informática, puedeasistir en la identificación de grupos ocultos terroristas o criminales.

2.2.1. Definición del problema

Disponemos de un grafo G = 〈V, E〉 representando la estructura topológica de una redsocial en la cual cada arista o arco e = 〈u,v〉 ∈ E representa una interacción entre u yv que tomó lugar en un tiempo particular t (e). Sean dos tiempos: t y t′(siendo t′ > t),entonces podemos definir a G [t, t′] como el subgrafo de G que contiene todos los arcoscon una marca de tiempo (timestamp) entre t y t′. Luego, definiremos to,t′o,t1 y t′1 como

20

Page 27: Modelos de procesamiento distribuido basados en Spark para

los cuatro tiempos, donde to < t′o ≤ t1 < t′1 t. De este modo, la tarea de predicción deenlaces es: dada una red G [t0, t′0]; devolver una lista de arcos no presentes en dicha red,que estarán predichos de aparecer en G [t1, t′1]. Nos referimos entonces a [t0, t′0]. como elintervalo de entrenamiento y a [t1, t′1]. como el intervalo de evaluación. La figura 2.2 en lapágina siguiente, muestra dicha evolución.

Figura 2.1: Definición formal de Link Prediction

Para generar esta lista, se usan técnicas que asignan a cada par u y v, un valor numé-rico, considerado puntaje del posible arco. Dicho puntaje puede verse como la medidade similitud entre dos nodos. Para cada par de nodos x,y ∈ V, generalmente diremosque Sxy = Syx. Todos los arcos no existentes estarán ordenados de forma decreciente deacuerdo al puntaje . Los primeros arcos resultantes de la lista serán los que tengan la ma-yor probabilidad de existir. Dado que no podemos predecir qué sucederá en el futuro nicuales serán las nuevas interacciones, entonces podemos definir la precisión o exactituddel algoritmo tomando un conjunto de entrenamiento (ET) y el resto, por lo general el10% de los arcos que no forman parte de ET, se usan para realizar la evaluación. Esteúltimo conjunto de prueba (EP) puede variar en función de cuántos datos reservemospara entrenar y además lo ideal es tomar muestras aleatorias para formar el conjunto.Claramente, podemos definir al conjunto E como:

E = ET⋃

EP (2.1)

ET⋂

EP = /o (2.2)

Entonces la lista resultante devuelta por el algoritmo puede verse como en la ecuaciónsiguiente, de tal que modo que nos referimos a U como el conjunto universal de todos losposibles arcos presentes y no presentes en la red social.

21

Page 28: Modelos de procesamiento distribuido basados en Spark para

Figura 2.2: Vista del grafo social y su evolución.

L : eL ∈U − Etrain

|U| = |V|(|V| − 1)2

22

Page 29: Modelos de procesamiento distribuido basados en Spark para

2.3. Computación paralela y distribuida

Los sistemas computacionales están experimentando una revolución. De 1945, cuandocomenzó la era moderna de las computadoras, a 1985, éstas eran grandes y caras. Inclusolas minicomputadoras costaban al menos decenas de miles de dólares. Como resultado,muchas empresas tenían solamente unas cuantas, y debido a la falta de un medio de co-nexión entre ellas, operaban de manera independiente. Sin embargo, hacia la mitad dela década de 1980, dos avances en la tecnología comenzaron a cambiar esa situación. Elprimero de estos avances fue el desarrollo de poderosos microprocesadores. Inicialmen-te, los microprocesadores eran máquinas de 8 bits, pero pronto se hicieron comunes lasCPU de 16, 32 y 64 bits. Muchas de ellas tenían el poder de una mainframe (es decir,una computadora grande), pero a una fracción de su precio. La cantidad de mejoras quehan tenido lugar en la tecnología de las computadoras a partir de la segunda mitad delsiglo XX es verdaderamente impresionante, y no tiene precedente en otras industrias. Deuna máquina que costaba 10 millones de dólares y ejecutaba 1 instrucción por segundo,saltamos a máquinas que cuestan 1000 dólares y son capaces de ejecutar un millón demillones de instrucciones por segundo; esto significa una ganancia precio/rendimientode 1013.

El segundo desarrollo importante fue la invención de las redes de computadoras de altavelocidad. Las redes de área local, o LAN (local-area networks), permiten la intercone-xión de cientos de máquinas localizadas dentro de un mismo edificio, de tal manera quees posible transferir pequeños volúmenes de información entre máquinas en unos cuan-tos microsegundos, más o menos. Podemos transferir grandes volúmenes de datos entremáquinas a velocidades que van de los 100 millones a los 10 millones de millones debits/segundo. Las redes de área amplia, o WAN (wide-area networks), permiten la inter-conexión de millones de máquinas ubicadas alrededor del mundo a velocidades que vandesde los 64 Kbps (kilobits por segundo) hasta gigabits por segundo.

El resultado de estas tecnologías es que ahora no solamente es factible, sino fácil, ponera trabajar sistemas de cómputo compuestos por grandes cantidades de computadorasinterconectadas mediante una red de alta velocidad. Por lo general, a estos sistemas se lesconoce como redes de computadoras o sistemas distribuidos, al contrario de los sistemascentralizados (o sistemas de un solo procesador) que por lo general constan de una solacomputadora, sus periféricos, y quizás algunas terminales remotas.

2.3.1. Definición de un sistema distribuido

Un sistema distribuido es aquel en el que los componentes localizados en computadores,conectados en red, comunican y coordinan sus acciones únicamente mediante el paso de

23

Page 30: Modelos de procesamiento distribuido basados en Spark para

mensajes. Esta definición lleva a las siguientes características de los sistemas distribui-dos: concurrencia de los componentes, carencia de un reloj global y fallos independientesde los componentes. Tres ejemplos muy conocidos de sistemas distribuidos podrían ser:Internet, una Intranet, que es una porción de internet gestionada por una organización, ytambién la computación móvil y ubicua.

Compartir recursos es uno de los motivos principales para construir sistemas distribui-dos. Los recursos pueden ser administrados por servidores accedidos por clientes o pue-den ser encapsulados como objetos y accedidos por otros objetos clientes. Se analiza elWeb como un ejemplo de recursos compartidos y se introducen sus principales caracte-rísticas.

Los desafíos que surgen en la construcción de sistemas distribuidos son la heterogenei-dad de sus componentes, su carácter abierto, que permite que se puedan añadir o reem-plazar componentes, la seguridad y la escalabilidad, que es la capacidad para funcionarbien cuando se incrementa el número de usuarios, el tratamiento de los fallos, la concu-rrencia de sus componentes y la transparencia.

Heterogeneidad: Permitir a los usuarios acceder a servicios y ejecutar aplicacionessobre un conjunto heterogéneo de redes y computadores. Es decir, permitir varie-dad y diferencia en términos de redes, hardware, sistemas operativos, lenguajes deprogramación, etc.

Extensibilidad: Capacidad del sistema para ser extendido y reimplementado en di-versos aspectos. La extensibilidad de los sistemas distribuidos se determina en pri-mer lugar por el grado en el cual se pueden añadir nuevos servicios de comparti-ción de recursos y ponerlos a disposición para el uso por una varidad de programascliente. Los sistemas diseñados de este modo para dar soporte a la compartición derecursos se etiquetan como sistemas distribuidos abiertos (open distributed systems) pa-ra remarcar el hecho de ser extensibles. Pueden ser extendidos en el nivel hardwaremediante la inclusipon de computadores a la red y en el nivel de software a travésde servicios, y la reimplementación de los antiguos.

Seguridad: La seguridad de los recursos de información tiene tres componentes:

• Confidencialidad: protección contra el descubrimiento por entidades no auto-rizadas.

• Integridad: protección contra la alteración o la corrupción.

• Disponibilidad: protección contra la interferencia con los procedimientos deacceso a los recursos.

Escalabilidad: Los sistemas distribuidos operan efectivamente en muchas escalas di-ferentes, desde pequeñas intranets a Internet. Se dice que un sistema es escalable

24

Page 31: Modelos de procesamiento distribuido basados en Spark para

si conserva su efectividad cuando ocurre un incremento significativo en el númerode usuarios. El diseño de sistemas distribuidos escalables presenta los siguientesretos: Control del costo de los recursos físico, control de las pérdidas de las presta-ciones, prevención de desbordamiento de recursos software, y evitación de cuellosde botellas de prestaciones.

Tratamiento ante fallos: los sistemas computacionales a veces fallan. Cuando apare-cen fallos en el hardware o software, los programas pueden producir resultadosincorrectos o pudieran parar antes de haber completado el cálculo pedido. Los fa-llos de un sistema distribuido son parciales: es decir, algunos componentes fallanmientras otros siguen funcionando. Consecuentemente, el tratamiento de fallos esparticularmente difícil.

Concurrencia: en una red de computadores, la ejecución de programas concurrenteses la norma. Se puede realizar un trabajo en un ordenador, mientras en un orde-nador separado se realiza otro trabajo, compartiendo recursos como páginas webo archivos, cuando sea necesario. La capacidad del sistema para manejar recursoscompartidos se puede incrementar añadiendo más recursos (por ejemplo, compu-tadores) a la red. La coordinación de programas que recursos y se ejecutan de formaconcurrente es también un tema importante y recurrente.

Transparencia: se define como la ocultación al usuario y al programador de aplica-ciones de la separación de los componentes en un sistema distribuido, de forma quese perciba el sistema como un todo más que como una colección de componentes in-dependientes. Las implicaciones de la transparencia son de gran calado en el diseñodel sistema de software. El Manual de Referencia ANSA (ANSA Reference Manual)[ANSA 1989] y el Modelo de Referencia para el Procesamiento Distribuido Abierto(RM-ODP: Reference Model for Open Distributed Processing) de la organización Inter-nacional de Estándares [ISO 1992] identifican ocho formas de transparencia. Hemosreproducido las definiciones originales de ANSA, reemplazando su transparenciade migración por nuestra propia transparencia de movilidad, de mayor alcance:

• Transparencia de acceso permite acceder a los recursos locales y remotos em-pleando operaciones idénticas.

• Transparencia de ubicación permite acceder a los recursos sin conocer su loca-lización geográfica.

• Transparencia de concurrencia permite que varios procesos operen concurren-temente sobre recursos compartidos sin interferencia mutua.

• Transparencia de replicación permite utilizar múltiples ejemplares de cada re-curso para aumentar la fiabilidad y las prestaciones sin que los usuarios y losprogramadores de aplicaciones necesiten su conocimiento.

25

Page 32: Modelos de procesamiento distribuido basados en Spark para

• Transparencia frente a fallos que permiten ocultar los fallos, dejando que losusuarios y programadores completen sus tareas a pesar de fallos del hardwareo de los componentes software.

• Transparencia de movilidad que permite la reubicación de recursos y clientesen un sistema sin afectar la operación de los usuarios y los programas.

• Transparencia de prestaciones que permite reconfigurar el sistema para mejo-rar las prestaciones según varía su carga.

• Transparencia al escalado que permite al sistema y a las aplicaciones expan-dirse en tamaño sin cambiar la estructura del sistema o los algoritmos de apli-cación.

Inexistencia del reloj global: cuando los programas necesitan cooperar coordinan susacciones mediante el intercambio de mensajes. La coordinación estrecha depende amenudo de una idea compartida del instante en el que ocurren las acciones de losprogramas. pero resulta que hay límites a la precisión con el que los computadoresen una red pueden sincronizar sus relojes, no hay una única noción global del tiem-po correcto. Esto es una consecuencia directa del hecho que la única comunicaciónse realiza enviando mensajes a través de la red.

2.3.2. Necesidad de la computación paralela y distribuida

La necesidad de la computación paralela y distribuida se origina por las limitacionesde los computadores secuenciales: integrando varios procesadores para llevar a cabo lacomputación es posible resolver problemas que requieren de más memoria o de mayorvelocidad de cómputo. También hay razones económicas, pues el precio de los compu-tadores secuenciales no es proporcional a su capacidad computacional, sino que paraadquirir una máquina el doble de potente suele ser habitual tener que invertir bastantemás del doble; mientras que la conexión de varios procesadores utilizando una red nospermite obtener un aumento de prestaciones prácticamente proporcional al número deprocesadores con un coste adicional mínimo. La programación paralela y distribuida esuna solución para solventar esos problemas, pero presenta otras dificultades. Algunasdificultades son físicas, como la dificultad de integración de componentes y la disipaciónde calor que conlleva, o la mayor complejidad en el acceso a los datos, que se puede lle-gar a convertir en un cuello de botella que dificulta la obtención de buenas prestacionesaunque se aumente la capacidad computacional teórica del sistema. Hay también difi-cultades lógicas, como son la mayor dificultad de desarrollar compiladores y entornos deprogramación eficientes para estos sistemas, que son más complejos que los secuenciales,o la necesidad de utilizar programación paralela y distribuida en vez de la programaciónsecuencial, que es bastante más sencilla.

26

Page 33: Modelos de procesamiento distribuido basados en Spark para

La computación paralela y distribuida se utiliza para reducir el tiempo de resoluciónde problemas computacionales, o bien para resolver problemas grandes que no cabríanen la memoria de un procesador secuencial. Y para esto es necesario utilizar sistemasde altas prestaciones y algoritmos adaptados a este modelo que utilicen estos sistemasde manera eficiente. Los problemas típicos que se abordan con este nuevo paradigmason: problemas de alto coste, o problemas de no tan alto coste pero de gran dimensión, oproblemas de tiempo real, en los que se necesita la respuesta en un tiempo máximo. Así, lacomunidad científica usa la computación paralela y distribuida para resolver problemasque sin técnicas de paralelismo serían intratables, o que se pueden resolver con mayorprecisión o en menor tiempo usando dichas técnicas. Algunos campos que se beneficiande la programación paralela y distribuida son: predicciones y estudios meteorológicos,estudio del genoma humano, modelado de la biósfera, predicciones sísmicas, simulaciónde moléculas... En algunos casos se dispone de cantidades ingentes de datos que seríamuy lento o imposible tratar con máquinas convencionales. En problemas de simulación(meteorología, moléculas...) en muchos casos se discretiza el espacio a simular, y unamalla más fina produciría una simulación más precisa, pero necesita de más recursoscomputacionales, y por tanto de más velocidad de cómputo y espacio de memoria.

Una motivación interesante adicional para mencionar es la posibilidad de compartir re-cursos. El término recurso es un poco abstracto, pero caracteriza bien el rango de cosasque pueden ser compartidas de forma útil en un sistema de computadores conectados enred, tales como discos e impresoras, hasta entidades de software definidas como archivos,bases de datos, objetos de datos de todos los tipos.

2.4. Trabajos relacionados

En (Corbellini et al., 2015) plantean que la creación de nuevos y mejores sistemas de reco-mendacion para redes sociales, está recibiendo actualmente mucha atención debido a lamayor necesidad de crear nuevas herramientas para asistir a los usuarios. El volumen dedatos sociales disponible así como también de conjuntos de datos experimentales, con-lleva a los algoritmos de recomendación a escalar hacia muchas unidades de cómputo enlugar de limitarse a una gran computadora computacionalmente poderosa. Puesto quelas redes sociales pueden modelarse como grafos, este equipo de investigadores argen-tinos planteó una arquitectura que simplifica el diseño de algoritmos de recomendaciónbasados en grafos que corren sobre clusters de computadoras. Proponen la creación deun software llamado Graphly, una API en Java que ofrece operaciones dedicadas al tra-tamiento de grafos y jLiME, un framework que soporta la distribución de código algo-rítmico y datos de grafos. Responde a la necesidad de tratar información proveniente deredes sociales a gran escala. Dado que las redes sociales pueden modelarse como grafos,

27

Page 34: Modelos de procesamiento distribuido basados en Spark para

propusieron una plataforma orientada exclusivamente a grafos.

La motivación subyacente de la creación de dicha arquitectura es permitir a los usua-rios definir sus algoritmos de recomendación a través de la API y luego configurar laejecución usando estrategias de distribución de trabajos (de los cuales se hablará muchodurante el desarrollo del trabajo propuesto en capítulos siguientes), sin modificar el algo-ritmo original. La ventaja de esto es que el software puede ser escrito y evaluado sin tenerque preocuparse por los detalles técnicos referentes a lo paralelo y distribuido en cues-tión. Esta plataforma fue testeada usando un algoritmo de recomendación de seguidorespara Twiter como caso de estudio en concreto.

En (Corbellini et al., 2017a) se discute que los grandes grafos se han convertido en par-tes esenciales de los medios sociales. Los sistemas de recomendacion en estos contextosconllevan grandes desafíos con respecto al diseño y a la eficiencia del uso de los recursosa la hora de procesar recomendaciones en sistemas de cómputos distribuidos. Adicio-nalmente, los algoritmos de recomendación para grafos, particularmente algoritmos depredicción de enlaces, poseen diferentes requerimientos dependiendo de la forma en quedichos grafos son recorridos. Los algoritmos basados en caminos usualmente recorren endiferentes direcciones para construir un gran conjunto de vértices a recomendar, mien-tras que los algoritmos de recorrido aleatorio construyen un subgrafo inicial y realizarmúltiples iteraciones sobre los vértices para computar el resultado inal. Este grupo deinvestigadores propuso un framework de procesamiento distribuido llamado Distribu-ted Partitioned Merge (DPM), el cual soporta los dos tipos de algoritmos mencionadosanteriormente, y comparan su performance global y eficiencia con el uso de los recursos.

En el año 2015, en (Kermarrec et al., 2015) discutieron y plantearon una alternativa alanálisis de grafos muy grandes enfocándose en los tiempos y la eficiencia de recursos almáximo. Con grafos sociales que rondan los miles de millones de elementos, muchas or-ganizaciones buscan escalar fácilmente en data-centers o en la nube. Desafortunadamen-te al optar por estas opciones se está fuertemente limitado por los modelos de programa-ción impuestos por dichas infraestructuras, que además fueron diseñadas para procesargrafos comunes, desaprovechando toda posibilidad de optimización. En este paper ex-ploran una forma muy eficiente de implementar los problemas de predicción de enlacesen plataformas GAS (gatherapply-scatter), un modelo distribuido muy popular basadoen grafos. La herramienta llamada Snaple, explota una nueva técnica de cálculo de scoresy minimiza el costo de transferencia de datos a la vez que la calidad de las prediccionesse mantiene. Esta herramienta promete escalar hacia grafos de datos sociales que unaimplementación estándar de link prediction no podría procesar.

28

Page 35: Modelos de procesamiento distribuido basados en Spark para

Capı́tulo 3Trabajo propuesto

3.1. Enfoque

En este capítulo se presenta la propuesta concreta del trabajo, la cual consiste en probaralgoritmos de predicción de enlaces sobre datos de redes sociales, más concretamentehablando, sobre interacciones entre usuarios. Se pretende analizar el comportamiento delos mismos, aplicando el modelo de programación paralela y distribuida, ejecutando losmismos en un clúster real de computadoras interconectadas y evaluar la calidad de laspredicciones.

La métrica de common neighbors, es una técnica simple pero muy útil a la hora de pre-decir enlaces, pero es un algoritmo computacionalmente muy costoso. Para instanciasdonde la entrada es un dataset con unas pocas interacciones, obtenemos respuestas entiempos razonables. Pero en escenarios de la vida real, donde el grafo (dataset) a tratares considerablemente grande en cantidad de usuarios y, en especial, poco conexo, nosestaríamos enfrentando a un problema intratable debido a la elevada complejidad espa-cial y temporal del algoritmo. Probaremos tres aproximaciones de dicha métrica, siendoel descarte de usuarios una estrategia clave a la hora de obtener los usuarios candidatospara ofrecerles recomendaciones.

La plataforma elegida para implementar este software es Spark, de Apache. A lo largode este capítulo, trataremos los conceptos técnicos necesarios para comprender en mayordetalle tanto el funcionamiento del clúster, como las características del software escritopara tal fin.

Dado que el framework de cómputo distribuido es Apache Spark, también hablaremosbrevemente sobre la historia y los conceptos fundamentales de Apache Hadoop, de quienSpark tomó el modelo de ejecución, las estructuras de datos y también el paradigma.

29

Page 36: Modelos de procesamiento distribuido basados en Spark para

Continuaremos con Spark, su arquitectura general, modelo de procesamiento y compor-tamiento.

3.2. Hadoop

Los grandes volúmenes de datos (Big Data) y Hadoop están inexorablemente juntos. Ha-doop, como una plataforma de almacenamiento y procesamiento de datos, fue la razón yla motivación más grande para la creación de Spark, y por lo tanto Hadoop y Spark estánfuertemente hermanados uno con el otro. Hadoop continúa siendo una plataforma claveen el uso con Spark. En esta sección trataremos una breve reseña de los conceptos básicosde Hadoop, conocido como el padre de Spark, y de cómo ambos se combinan, tanto en lafuente de información que procesan como en el framework de planificación de recursosque usan.

3.2.1. Historia

El conjunto de metodologías de procesamiento y almacenamiento comúnmente conocidocomo Big Data, surgieron de los motores de búsqueda en la década del 2000, principal-mente de la mano de Google y Yahoo!. Los proveedores de motores de búsqueda fueronlos primeros grupos de usuarios que se enfrentaron a los problemas de escalabilidad enInternet, principalmente en cómo almacenar e indexar todos los documentos en el uni-verso de la Internet. Esto al comienzo pareció un desafío insuperable para la época, aúncuando la cantidad de datos en Internet de ese momento no era ni una fracción compara-da con lo que es hoy en día. Yahoo! y Google comenzaron a trabajar independientementeen un conjunto de estrategias para afrontar este problema. En 2003, Google publicó unartículo llamado «The Google File System»(Ghemawat et al., 2003). Subsecuentemente,en 2004, publicó otro artículo titulado «MapReduce: Simplified Data Processing on Lar-ge Clusters(Dean and Ghemawat, 2004)». Al mismo tiempo, en Yahoo!, Doug Cutting(quien generalmente es considerado el creador inicial de Hadoop) estaba trabajando enun proyecto web de indexación llamado Nutch1.

Los artículos de Google inspiraron a Doug Cutting a tomar el trabajo que él había hechohasta la fecha en su proyecto Nutch, e incorporar los principios de procesamiento y al-macenamiento propuestos en dichas publicaciones. El producto resultante de esta fusiónes lo que hoy se conoce como Hadoop. Cabe destacar que justo en aquella época, muchasotras tecnologías comenzaban a surgir a la par de Hadoop, tales como:

La rápida expansión del comercio electrónico (e-commerce)

1nutch.apache.org

30

Page 37: Modelos de procesamiento distribuido basados en Spark para

El nacimiento y el rápido crecimiento de la Internet móvil

Redes sociales

Blogs y sitios web basados en contenidos de usuarios.

Estas innovaciones en conjunto llevaron a lo que hoy es conocido como grandes volú-menes de datos. Esto aceleró la expansión del movimiento de Big Data y conllevó alnacimiento de Spark, sistemas de código abierto de mensajería, plataformas NoSQL, etc.

3.2.2. Conceptos básicos

Hadoop es una plataforma de procesamiento y almacenamiento de datos, basado en unconcepto central y fundamental: localidad de datos (data locality). Localidad de datos serefiere a la habilidad de mover los cómputos cerca de donde reside la información a pro-cesar (directo al nodo en cuestión) en lugar de mover grandes cantidades de informacióna un nodo para procesarlos. Esto minimiza el uso de red e incrementa la velocidad globaldel sistema.

Con la gran creciente cantidad de datos en Internet, ya no es eficiente (ni siquiera po-sible, en algunos casos) mover grandes volúmenes de información de datos requeridospara un cómputo en un tiempo determinado. Hadoop no reconoce esquemas con res-pecto a operaciones de escritura. Esto quiere decir que puede almacenar y procesar ungran rango de tipo de datos, desde datos sin estructurar como archivos de texto plano,archivos semi-estructurados como los JSON o documentos XML, a datos estructuradostales como los extraídos de un sistema de base de datos relacional. El esquema no esinterpretado durante la operación de escritura en Hadoop; entonces no hay índices, esta-dísticas, u otros datos típicamente empleados en bases de datos para optimizar consultasy filtrar o reducir la cantidad de datos devueltos al cliente. Esto además necesita el re-querimiento de localidad de datos. Hadoop posee dos componentes clave: El sistema dearchivos distribuido HDFS (Hadoop Distributed File System) y también YARN (Yet AnotherResource Negociator), donde YARN puede pensarse como el subsistema de planificaciónde procesos de Hadoop. Cada componente es independiente entre sí y pueden operar ensu propio clúster.

3.2.3. HDFS

El sistema de archivos distribuido (HDFS), es la plataforma de almacenamiento de infor-mación de Hadoop. Aunque Hadoop puede interactuar con múltiples sistemas de archi-vos, HDFS es el medio primario que Hadoop prefiere para leer información y también

31

Page 38: Modelos de procesamiento distribuido basados en Spark para

Figura 3.1: División de un archivo en bloques cuando es tomado por HDFS. Al usuario se lemuestra como un único archivo de 500MB

para procesarla y almacenarla. Fue inspirado por las publicaciones de Google en el año2003.

Hay muchos conceptos de diseño clave detrás de HDFS, tales como:

Escalabilidad (económicamente)

Tolerante a fallas

No necesita hardware sofisticado

Soporta alta concurrencia

Favorece un constante alto ancho de banda de acceso aleatorio de baja latencia.

3.2.4. Arquitectura general de HDFS

La arquitectura de HDFS consiste de algunas áreas clave tales como: archivos, bloques yreplicación.

HDFS es un sistema de archivos virtual, lo que significa que un cliente que opera sobreél sólo ve un sistema único sin saber dónde reside actualmente la información físicamen-te. HDFS está implementado sobre sistemas de archivos nativos tales como ext3, ext4 yxfs disponibles en Linux, o también en NTFS que opera bajo el sistema Windows. Unade las propiedades más importantes de HDFS es su inmutabilidad. La inmutabilidad serefiere a la incapacidad de actualizar los datos luego de haber escrito en el sistema dearchivos. Usualmente HDFS es referido como WORM (Write Once Read many) lo que enespañol podría traducirse como «Escribe una vez, y lee múltiples veces». Los archivos enHDFS están formados por bloques, los cuales por defecto ocupan 128MB de espacio dealmacenamiento, aunque hay opciones que permiten reconfigurar este valor. Los archi-vos son divididos en bloques una vez son tomados por HDFS (proceso que en inglés esdenominado HDFS ingestion).

En cuanto a los bloques, estos son distribuidos al ser tomados por HDFS si un clústercontiene más de un nodo. En la figura 3.2 se muestra una posible distribución de bloquesen un clúster.

32

Page 39: Modelos de procesamiento distribuido basados en Spark para

Figura 3.2: Bloques HDFS en clúster multi nodo

Figura 3.3: Arquitectura de NameNode y DataNode

Los bloques son replicados de acuerdo a un factor preconfigurado de replicación. En unambiente de ejecución totalmente distribuido (con tres o mas nodos), este valor de con-figuración es típicamente 3. La replicación de bloques ocurre también en el proceso depersistencia del archivo sobre HDFS (ingestion). El propósito de esta característica obede-ce a dos razones: incrementar la posibilidad de aprovechamiento de localidad de datos ytambién la tolerancia a fallos.

HDFS usa una arquitectura Master-Slave (Figura 3.3) donde el maestro es el NameNode quese encarga de gestionar los ficheros y los metadatos. Estos metadatos contienen informa-ción sobre el fichero, bloquees y la localización de estos en los DataNodes. Los DataNodesposeen las siguientes características:

Tienen la responsabilidad de almacenar y recuperar los bloques

Formar un clúster donde los bloquees se replican (por defecto tres veces) sobre losDataNodes para garantizar la tolerancia a fallos

3.3. Apache Spark

Apache Spark es un framework de procesamiento distribuido open source, mayormenteusado en ambientes de trabajo Big Data. Se caracteriza por ofrecer una ejecución optimi-zada ofreciendo alto desempeño. Es compatible con procesamiento por lotes, aprendizajeautomático, bases de datos de grafos, entre otras. Apache Spark es compatible nativa-mente con los lenguajes de programación Java, Scala y Python. En la figura 3.4, se velas librerías complementarias que provee Spark para diferentes ámbitos de procesamien-to, tales como librerías para procesamiento de SQL, algoritmos de procesado de grafos,

33

Page 40: Modelos de procesamiento distribuido basados en Spark para

Figura 3.4: El Spark core y sus librerías complementarias.

streaming de datos, para análisis en tiempo real, y machine learning. Su alto desempeño,basado en el uso de un motor de ejecución de grafos acíclicos dirigidos (DAG), suponeque Apache Spark puede crear planes de consultas eficientes para transformaciones dedatos. Apache Spark también almacena datos de entrada, de salida e intermedios en lamemoria como conjuntos de datos distribuidos resistentes (RDD) , lo que facilita un procesa-miento rápido sin costos de E/S y mejora el desempeño de cargas de trabajo iterativas ointeractivas.

Una de las ventajas de trabajar con Spark son las consolas interactivas que tiene para dosde los lenguajes con los que se puede programar, Scala (que se ejecuta en una máquinavirtual Java- JVM) y Python. Estas consolas permiten analizar los datos de forma inter-activa, con la conexión a los clústeres. Spark nació para dar velocidad a la consulta y eltratamiento de grandes paquetes de datos, esto es, para optimizar la gestión de grandesdatos.

También podríamos describir a Spark como un motor distribuido de procesamiento dedatos para modos de trabajo en lote y en streaming, tanto sea para consultas SQL, o al-goritmos de aprendizaje de máquina. En contraste con Hadoop, basado en el modeloMapReduce que utiliza el disco como almacenamiento primario, Spark procesa mayori-tariamente todo en la memoria principal, permitiendo ejecutar trabajo con mejor rendi-miento en ciertas aplicaciones tales como algoritmos iterativos. Spark apunta más haciala velocidad, facilidad de uso y extensibilidad. Spark provee una abstracción eficientepara procesamiento en memoria llamado RDD (Resilient Distributed Dataset).

En casos de tener grandes cantidades de datos que requieran un procesamiento de bajalatencia, que un modelo MapReduce típico no puede proveer, entonces Spark se convierteen la alternativa viable. Puede correr tanto de forma local, como en clústers o en la nube.Los programadores usan una API específica del lenguaje elegido y trabajar a nivel deRDD usando transformaciones y acciones, las cuales abordaremos más adelante.

34

Page 41: Modelos de procesamiento distribuido basados en Spark para

3.3.1. Por qué Spark

Existen muchas razones por las cuales usar Spark, tales como:

Fácil para comenzar a programar: Spark ofrece un shell interactivo que hace real-mente fácil y rápido el comienzo del desarrollo de una aplicación. Tan sólo es ne-cesario escribir las instrucciones en dicho shell y Spark irá ejecutando el trabajoconforme el usuario vaya ejecutando transformaciones y acciones sobre conjuntosde datos (RDD). También es posible crear aplicaciones standalone, haciendo uso delas librerías de Java Spark, o de Phyton Spark.

Entorno unificado para diferentes cargas de trabajo: tal como dijo Matei Zaharia,el autor de Apache Spark, uno de los objetivos del proyecto Spark es ofrecer unaplataforma que soporte una amplia variedad de flujos de trabajo y no solamentetrabajos en modo lote de MapReduce, sino que también procesamiento de grafos oMachine Learning. Spark combina modos de trabajo por lotes (Batch processing),interactivo (a través del shell), o a través de Streams (a través de la librería SparkStreaming, para análisis de fuentes de información en tiempo real).

Aprovecha lo mejor en procesamiento de datos distribuidos en modo batch: cuandopensamos en procesamiento de datos por lotes, suele pensarse en Hadoop como lasolución viable. Pero Spark viene a ofrecer nuevas técnicas mejoradas basadas enHadoop MapReduce. De hecho, ambos trabajan juntos, Spark en YARN y en HDFS,mejorando el rendimiento y la simplicidad del motor de procesamiento.

Optimizaciones de bajo nivel: Apache Spark usa un grafo acíclico dirigido (DAG, si-glas en ingles para directed acyclic graph) para las etapas de ejecución (conocido comoplan de ejecución, grafo de ejecución, o simplemente DAG). La poderosa caracte-rística que hace a Spark diferente es su modo de ejecución perezoso (lazy execution),lo que hace posponer cualquier procesamiento hasta que se ejecute una acción (delas cuales hablaremos más adelante en este capítulo). Este modo de ejecución ofrecemuchas posibilidades de aplicar optimizaciones antes de comenzar a ejecutar todaslas tareas. De este modo el usuario no debe conocer tanto detalle de bajo nivel yconcentrarse más en su programa.

Reuso de datos ya procesados: en el modelo Hadoop MapReduce, se podía reusardatos entre cómputos (datos parciales de ejecución, por ejemplo, resultados obteni-dos entre dos o más acciones o transformaciones). Pero éstos sólo podian accesarseposteriormente a haberlos escrito en algún medio de almacenamiento como Ha-doop Distributed FileSystem(HDFS). Ésto puede costar muchísimo tiempo, ya querequeriría recomputar cada paso intermedio, generando un aumento del uso de

35

Page 42: Modelos de procesamiento distribuido basados en Spark para

entrada/salida (IO) y posiblemente uso de la red debido al overhead. Una de lasmejoras que Spark propone es el reúso de datos y el hecho de mantener los mismosen memoria en su mayor posiblidad y hasta que los trabajos finalicen. Únicamentehay que considerar la memoria principal disponible al programar y las habilidadesdel programador con la API Spark para generar la menor cantidad de barajamientode datos posible (data shuffle). Cuanto menor sea el uso de IO y de red, mejores serála performance obtenida.

Tolerancia a fallos: como en todo sistema distribuido, Spark sabe manejar y recu-perarse ante fallos en algún nodo worker, teniendo políticas de recuperación talescomo la reasignación de tareas ante un timeout asignado a cada tarea.

3.3.2. Comenzando con Apache Spark

En esta sección vamos a tratar la organización pragmática de Spark, así como un resumende sus componentes y su funcionalidad para poder luego, teniendo un conocimientode su funcionamiento, dar paso a la explicación de la implementación del algoritmo depredicción de enlaces propuesto.

3.3.2.1. Arquitectura general

Una aplicación Spark esta compuesta por el controlador (driver) y trabajadores (execu-tors) que pueden correr localmente en una única JVM (Java Virtual Machine) o usandorecursos de un clúster que son administrados por un administrador (de clúster).

A continuación, se muestra una abstracción gráfica de la estructura de un sistema hechoen Spark. El cual consta de ciertos componentes que pueden verse en la figura 3.5 y quese describen en las siguientes subsecciones de la presente sección.

3.3.2.2. Administrador del clúster (cluster manager)

Es el encargado de coordinar la ejecución intermediando entre el controlador y el clús-ter. Se encarga de lanzar ejecutores a computar tareas. El controlador (driver) solicita aladministrador del clúster los recursos necesarios para poder correr la aplicación.

La tarea principal del administrador del clúster es ofrecer un servicio externo a las dife-rentes aplicaciones que requieran cómputos, dividiendo los recursos entre ellas para quepuedan ejecutarse. También despacha trabajos al clúster. Spark soporta varios tipos deadministradores:

36

Page 43: Modelos de procesamiento distribuido basados en Spark para

Figura 3.5: Vista interna de ejecución de Spark

Apache Standalone Cluster Manager: es un administrador simple que forma partede la distribución Spark. Posee alta disponibilidad para el master, es tolerante afallas con los nodos trabajadores, y es compatible administrando recursos para cadaaplicación, ejecutándose a la par con algún eventual sistema Hadoop y acceder adatos en HDFS (Hadoop Distributed File System). Compatible con sistemas Linux,Windows y Mac.

Apache Mesos: es un administrador que posee alta disponibilidad para masters co-mo slaves, administrar también recursos de cada aplicación, y agrega soporte paracontendores Docker. Es capaz de ejecutar trabajos de Spark, Hadoop MapReduce,o cualquier otro servicio de aplicación. Posee APIs para Java, Phyton, e inclusiveC++. Compatible con Lionux y Mac.

Hadoop YARN: es un framework para planificar trabajos y también administrarrecursos de un cluster. Posee alta disponibilidad para masters y slaves, soporte pa-ra contenedores Docker en modo no seguro, ejecutores en contenedores Linux yWindows en modo seguro, y también la posibilidad de integrar otro planificador.Compatible con Windows.

Las prestaciones de un administrador de clúster en cualquiera de los mencionados casos,tanto como los trabajos o las acciones son planificadas por el Planificador de Spark en unmodo FIFO (First in, First out). Alternativamente, dicha planificación puede configurarsepara que sea de un modo más parejo asignando recursos mediante un algoritmo Round-Robin. Adicionalmente, la memoria usada por la aplicación puede ser controlada a travésde configuraciones especiales en el Contexto de Spark (Spark Context). Dichos recursosusados por la aplicación pueden ser dinámicamente ajustados dependiendo la carga de

37

Page 44: Modelos de procesamiento distribuido basados en Spark para

trabajo del momento. Por lo tanto, la aplicación puede liberar recursos no usados y vol-verlos a pedir cuando sea necesario.

En el caso de Apache mesos, que posee procesos maestros y esclavos, el maestro puedeofrecer recursos a la aplicación la cual puede aceptar dicha oferta o no. De este modo,pedir recursos disponibles y ejecutar trabajos, depende puramente de la aplicación. Apa-che Mesos, permite granularidad fina en el control sobre los recursos del sistema, talescomo CPUs, memoria, discos y puertos. También permite granularidad gruesa sobre losrecursos donde Spark aloja un número de CPUs a cada ejecutor, donde se sabe de an-temano que no serán liberados hasta que la aplicación termine. Por lo tanto, es posibletener aplicaciones usando un control por granularidad fina mientras que otras lo hacen através de granularidad gruesa al mismo tiempo.

Hadoop YARN tiene un administrador de recursos con dos partes, un planificador y unadministrador de aplicaciones. El planificador es un objeto configurable (pluggable). Eladministrador de aplicaciones es responsable de aceptar trabajos entrantes y lanzarlos.

También puede ejecutarse Spark en modo local. En esta forma de ejecución no distribuiday de única JVM, Spark despliega todos los componentes de ejecución en una misma JVM,tales como: controlador, ejecutor, master, etc. El paralelismo por defecto será el númerode hilos especificados en la configuración de Spark. Nótese que este es el único modo endonde un driver también ejecuta tareas. Sólo se recomienda este modo para propósitosde aprendizaje, testing o debugging.

3.3.2.3. Contexto de Spark (Spark context)

Representa la conexión, o punto de entrada, al motor de Spark. El cual puede correr bajomodo local o clúster. Cuando es creado, puede usarse para crear RDDs, acumuladores yvariables broadcast, acceder a servicios de Spark y correr trabajos. Básicamente actúa comoun cliente del entorno de ejecución de Spark y cumple el rol de maestro de la aplicación(no confundir con el significado de maestro antes explicado). En la figura 3.6, puede versealgunos de los componentes principales del mismo.

3.3.2.4. Nodos trabajadores (worker nodes)

Un nodo, puede verse como una máquina: PC de escritorio, laptop, teléfono móvil, ounidad de cómputo independiente, interconectado con otros nodos en el clúster. Normal-mente un nodo cumple el rol de trabajador (worker) es decir, ser capaz de recibir tareas,procesarlas y devolverlas al controlador.

38

Page 45: Modelos de procesamiento distribuido basados en Spark para

Figura 3.6: Spark context actúa como el maestro de la aplicación Spark

Los ejecutores, son los procesos host alojados en el nodo ejecuto, en los cuales las tareasdel DAG son computadas. Los ejecutores reservan CPU y memoria y están dedicados auna aplicación Spark específica y liberados cuando la aplicación termina.

Típicamente un nodo ejecutor posee un número fijo de ejecutores que pueden ser asigna-dos en cualquier momento, por ende, un clúster posee un número finito o fijo de ejecu-tores para correr tareas. Cabe mencionar, que los ejecutores se alojan en JVMs. Las JVMspara ejecutores, residen en el heap de la memoria.

Es importante remarcar que tanto los nodos trabajadores como los ejecutores, sólo cono-cen las tareas asignadas a ellos, mientras que el controlador es responsable de conocertodas las tareas y sus respectivas dependencias entre sí (Aven, 2017).

3.3.2.5. El Controlador (Driver)

Es el proceso donde el proceso main() o punto de entrada de la aplicación es ejecutado.Representa el proceso corriendo el código de usuario que crea un Contexto de Spark(SparkContext), crear RDDs, y realizar cómputos sobre los mismos. Cuando se lanza unshell de Spark, se crea un programa driver (el cual viene con un contexto de spark creadoimplícitamente). Una vez que el controlador termina, la aplicación termina también.

Cuando el controlador se ejecuta, realiza dos deberes (Karau et al., 2015):

1. Convertir el programa de usuario en tareas (tasks)

El controlador es responsable de convertir el código de usuario en unidades deejecución física llamadas tareas. Visto desde un nivel de abstracción alto, los pro-gramas Spark siguen el siguiente modelo de ejecución. Crean RDDs a partir de unaentrada. Derivan nuevos RDDs a partir de la aplicación de transformaciones sobrelos mismos y realizar acciones para recuperar o guardar los resultados obtenidos.Aunque Spark, internamente se encarga de crear un grafo dirigido acíclico (DAG) ló-

39

Page 46: Modelos de procesamiento distribuido basados en Spark para

Figura 3.7: Componentes en tiempo de ejecución de Spark Standalone

gico de operaciones. Cuando el controlador ejecuta el programa, materializa esteplan lógico en un plan físico de ejecución.

La razón de este comportamiento, está basada en el hecho de que pueden aplicar-se optimizaciones antes de comenzar a computar datos. Tales como «pipelining»en transformaciones de tipo map, o fusionar transformaciones. Esto genera que elgrafo de ejecución quede dividido en etapas (stages), donde cada etapa está com-puesta por múltiples tareas. Estas tareas son empaquetadas y preparadas para serenviadas al clúster. Las tareas son la unidad de trabajo más pequeñas que manejaSpark.

2. Planificar las tareas en los ejecutores (executors):

Una vez obtenido el grafo de ejecución físico, el controlador de Spark debe coordi-nar y planificar todas las tareas a lo largo de todos los ejecutores disponibles (quese registran en el controlador ni bien son iniciados para avisar de su disponibili-dad). Cada ejecutor representa un proceso capaz de computar tareas y almacenarinformación de RDD.

El controlador realizará un sondeo del conjunto de ejcutores disponibles e intentarádistribuir la carga de trabajo en alguno de ellos.

Por último, el controlador se encarga de exponer información acerca de la ejecucióndel sistema, a través de una interfaz web.

La vida de la aplicación Spark comienza (y termina) con el controlador Spark. El contro-lador de Spark es el proceso que los clientes usan para enviar aplicaciones a computarcomo se ve en la figura 3.7. También es responsable de planificar y coordinar la ejecucióny retornar resultados (mensajes o datos) al cliente. La planificación es posible, mediantela creación de un grafo dirigido y acíclico (DAG) de ejecución.

40

Page 47: Modelos de procesamiento distribuido basados en Spark para

Figura 3.8: RDDs

Como se puede ver, Spark presenta una arquitectura de maestro-esclavo: Un coordinadorcentral (el driver) y uno o más trabajadores (ejecutores). Donde tanto el driver como lostrabajadores, se ejecutan en procesos separados.(salvo en el modo de ejecución local, vistoen la siguiente sección).

3.3.3. El rol del RDD

Hasta ahora solo hemos mencionado a los RDD como un conjunto fundamental de datossobre el cual Spark es capaz de procesar y obtener resultados. En este apartado, veremosun poco más en detalle el concepto de RDD y qué representa. La figura 3.8 muestra unaabstracción general de lo que un RDD puede contener internamente.

Un RDD es una abstracción de estructura de datos usada por Spark. Desglosando sunombre podemos decir que son:

Resilientes (resilient)

Con la ayuda de un grafo acíclico dirigido (DAG), los RDD son tolerantes a fallas ycapaces de recomputarse ante la caída de un nodo del clúster.

Distribuidos (distributed)

La información contenida no esta centralizada en un sólo nodo, los datos estándistribuidos, en múltiples nodos del clúster. Esto se logra a partir de técnicas departicionado.

Conjunto de datos (dataset)

Es una colección particionada de datos formada por valores primitivos o valores devalores (por ejemplo, tuplas de objetos que representan registros de información).

Otra propiedad clave de los RDD es su inmutabilidad, la cual indica que después de serinstanciados y cargados con datos, éstos no se pueden actualizar. En su lugar, lo que

41

Page 48: Modelos de procesamiento distribuido basados en Spark para

sucede, es que un nuevo RDD es creado con la información actualizada, es decir cadatransformación que hagamos los datos en un RDD generará la creación de uno nuevo.

Los RDD en Spark operan sobre sistemas de archivos distribuidos tolerante a fallos talescomo HDFS (Hadoop Distributed File System) o Amazon S3. De esta forma, todos losRDD que son generados a partir de datos tolerantes a fallos serán por ende tolerantes afallos. Esto último no aplica en el caso de trabajar con flujos de información en vivo (datastreaming). La semántica de la tolerancia a fallos en Spark puede definirse como:

Como un RDD es un conjunto de datos inmutable, cada RDD conoce mediante quéoperación determinística fue creado, a través del grafo de ejecución (DAG).

Si debido a una falla en un nodo se pierde un RDD, entonces esta partición puedeser recuperada (recomputada) desde sus antecesores, usando el grafo (DAG).

Asumiendo que todas las transformaciones son determinísticas, los datos en el últi-mo RDD transformado siempre serán los mismos, independientemente de si hubofallas en el clúster o no.

La creación de un RDD puede ser de muchas formas. Una de ellas es a través de un archi-vo de texto plano, a través de una base de datos, de un streaming de datos o a partir demanipulación de RDDs ya creados que devuelven otro RDD (como una transformación).También es posible mediante la generación de datos dentro del programa controlador,paralelizando una colección de objetos ya existentes en el programa.

3.3.4. Operaciones sobre RDD

Como se ha mencionado a lo largo de este capítulo, los RDDs soportan dos tipos de ope-raciones: transformaciones y acciones. Las transformaciones son operaciones que retornanun nuevo RDD, tal como map() y filter(). Las acciones son operaciones que retornan unresultado al programa controlador o escribir en el almacenamiento, y desatar una ejecu-ción, tales como count() y first(). Spark trata las transformaciones y las acciones de formamuy diferente.

3.3.4.1. Transformaciones

Las transformaciones son operaciones sobre RDDs que retornan un nuevo RDD. Lastransformaciones son computadas de modo perezoso (lazy), es decir, sólo cuando unaacción es invocada.

Algunas de las transformaciones soportadas por Spark pueden verse en la figura 3.9.

42

Page 49: Modelos de procesamiento distribuido basados en Spark para

Figura 3.9: Transformaciones narrow y wide

En cada operación de transformación aplicada a un RDD, Spark va creando implícita-mente un RDD Lineage Graph (RLG), que mantiene las dependencias padres de cada RDD(característica fundamental necesaria para la tolerancia a fallos, como hablábamos ante-riormente). El concepto de RLG lo trataremos más adelante, así como también los tiposde dependencias estrechas (narrow) y amplias (wide).

3.3.4.2. Acciones

Ya hemos visto como crear RDDs a partir de transformaciones, pero en cierto momentoquerremos obtener un resultado. También habíamos mencionado que las transformacio-nes son perezosas para poder construir el RLG y poder organizar la ejecución de formamás eficiente.

Es entonces cuando las acciones desatan la computación de las transformaciones luego deque Spark cree el plan físico de ejecución. La figura 3.10 muestra algunas de las accionesmás comunes ofrecidas por Spark.

3.3.5. RDD Shuffle

Un concepto muy importante a tratar cuando hablamos tanto de transformaciones comode acciones, es que algunas de ellas, desencadenan un proceso interno llamado shufflingde datos. Para entender este proceso, podemos considerar el ejemplo de la operación re-duceByKey. La operación reduceByKey genera un RDD donde todos los valores para unaclave son combinados en una tupla (la clave y el resultado de ejecutar una función reduce

43

Page 50: Modelos de procesamiento distribuido basados en Spark para

Figura 3.10: Acciones sobre RDDs en Spark

sobre todos los valores asociados a esa clave). El desafío es que no todos los valores parauna clave en particular residen en la misma partición, o aún peor, en la misma máquina,pero aún así tienen que ser todas localizadas para computar el resultado.

En Spark, los datos no son generalmente distribuidos en particiones de modo que es-tén en el lugar justo para una operación específica. Durante el procesamiento, una tareaoperará sobre una única partición, por lo tanto, para organizar toda la información pa-ra una operación reduceByKey, Spark necesitará realizar una operación todos-con-todos.Esto implica leer de todas las particiones para encontrar todos los valores para todas lasclaves, para luego agruparlos juntos en particiones para computar el resultado final paracada clave. Esto se denomina shuffle de datos.

Por ende, el shuffle de datos impacta negativamente en la performance del sistema, yaque es una operación muy cara tanto en aspectos de memoria como de tiempo. Involucrauso de disco (I/O), serialización (y desserialización) de datos y uso de red.

Para organizar los datos necesarios para el shuffling, Spark genera un conjunto de tareas:tareas map para organizar los datos, y tareas reduce para combinarlos. Esta nomenclaturasurge de MapReduce, pero cabe destacar que no se relaciona con las operaciones map yreduce que realiza Spark. En la figura 3.11se muestra dicho proceso.

3.3.6. Clasificación de RDD

Los RDD, pueden clasificarse según el tipo de operación que ha sido aplicada a sus ante-cesores en el lineage (Laskowski, 2016).

Paralelizado

44

Page 51: Modelos de procesamiento distribuido basados en Spark para

Figura 3.11: Proceso Shuffle en operación reduceByKey

Tipo de colección de elementos paralelizada. Es el resultado de paralelizar median-te SparkContext, a una colección existente en el programa controlador.

Mapped

Es el tipo de RDD resultante de aplicar una función a cada elemento del RDD padre.Algunas de esas funciones pueden ser:

• map()

• flatMap()

• filter()

• glom()

• mapPartitions()

Grouped

El tipo de RDD que agrupa a su padre. Para cada clave en el padre, el RDD re-sultante contendrá una tupla con una lista de todos los valores incidentes en esaclave.

Hadoop

Un RDD que provee la funcionalidad principal para leer datos almacenados enHDFS usando una API MapReduce.

Shuffled

Un RDD resultante de una operación shuffle (por ejemplo, reparticionado de datos).

3.3.7. Particionado de datos

En un programa distribuido, la comunicación resulta ser muy cara, de forma que la dis-tribución de los datos adecuada para que el uso de red sea el mínimo posible, resultará en

45

Page 52: Modelos de procesamiento distribuido basados en Spark para

un aumento de la performance. En Spark existe algo llamado particionado de datos, queconsiste en controlar cómo se distribuyen los datos en los RDDs para reducir la comu-nicación de red. Esta característica resulta útil cuando un conjunto de datos es reusadomuchas veces.

Un particionado eficiente puede mejorar la performance de la aplicación en varios ór-denes de magnitud. Por el contrario, un mal particionado, puede llevar a un programaSpark a fallar en su ejecución (produciendo errores de executor-out-of-memory, por tenerque tratar con particiones demasiado grandes de datos)

3.3.7.1. Vista general del particionado

Dentro de algunas de las características más importantes relacionadas al particionado deRDDs en Spark podemos nombrar las tres más importantes.

El número de particiones a crear como resultado de una transformación a un RDDes usualmente configurable. Hay ciertas opciones por defecto en caso de no configu-rarse. Spark creará una partición RDD por bloque cuando se use HDFS (típicamenteel tamaño de un bloque en HDFS es de 128 MB).

Las operaciones shuffle tales como las de naturaleza ByKey (groupByKey o reduceBy-key) en las cuales el valor del número de particiones no se configura mediante algúnparámetro en la invocación, resultará en un número de particiones igual al valorque spark posea en una directiva llamada spark.default.parallelism.

El particionador por defecto usado por Spark es mediante una técnica hash. El Hash-Partitioner dispersa todas las claves a través de una función de dispersión determi-nística, y usa esta clave hash para crear (aproximadamente) baldes iguales. El objeti-vo es distribuir los datos parejos a través de un número especificado de particionesbasados en la clave.

3.3.8. Plan de ejecución

3.3.8.1. RDD Lineage

El RDD Lineage (RDDL) También conocido como RDD operator graph o grafo de depen-dencia de RDD, es un grafo que indica todos los padres de cada RDD. Es construido comoresultado del uso de las transformaciones; y además crea un plan de ejecución lógico. Porlo tanto, el RDDL representa cuáles transformaciones necesitan ser ejecutadas (y en quéorden) después de que una acción es invocada.

46

Page 53: Modelos de procesamiento distribuido basados en Spark para

Figura 3.12: RDD Lineage

El grafo de la figura 3.12, podría ser el resultado de aplicar la siguiente serie de transfor-maciones:

val r00 = sc.parallelize(0 to 9)

val r01 = sc.parallelize(0 to 90 by 10)

val r10 = r00 cartesian r01

val r11 = r00.map(n => (n, n))

val r12 = r00 zip r01 val

r13 = r01.keyBy(_ / 20)

val r20 = Seq(r11, r12, r13).foldLeft(r10)(_ union _)

3.3.8.2. Plan de ejecución lógico

El plan de ejecución lógico comienza con los primeros RDDs (aquellos que no poseenninguna dependencia con otros RDDs o con datos en caché) y termina con el RDD queproduce el resultado de la acción que ha sido llamada a ejecución.

3.3.8.3. DAG Scheduler

El DAG de ejecución, o plan físico de ejecución es un DAG de etapas (stages)

En la figura 3.13, puede verse un ejemplo de como a partir de la creación de una serie detransformaciones realizadas desde el shell de scala, el código es procesado por el schedu-ler y crea el plan físico de ejecución.

El DAGScheduler (DAGS) es una capa de Apache Spark que implementa un planificadororientado a etapas (stages). Transforma el plan lógico (el grafo RDD de dependencias) aun plan físico constituido por etapas.

Luego de que una acción es llamada, SparkContext entrega el plan lógico al DAGSchedulerque a su vez lo convierte en un conjunto de etapas que son preparadas para ejecución (enforma de tareas).

DAGS realiza tres funciones primordiales en Spark:

47

Page 54: Modelos de procesamiento distribuido basados en Spark para

Figura 3.13: DAG Scheduler transformando el RDD Lineage a un plan físico

Computar un DAG de ejecución (basado en etapas), para un determinado trabajo.

Determina dónde se ejecutarán las tareas de forma predeterminada.

Se encarga de tratar las fallas debido pérdida de información durante el shuffle dedatos.

Es necesario computar un DAG para cada trabajo solicitado, haciendo un seguimiento decuales RDDs y resultados de etapas son materializados, y encuentra una forma rápida deplanificar los trabajos para ejecutarlos. Luego entrega las etapas al TaskScheduler (TS). Ladeterminación de la ubicación de preferencia para ejecutar cada tarea como se mencionóesta a cargo del DAGS, basándose en el estado actual de la caché, y entregando toda lainformación necesaria al TS.

DAGS lleva un seguimiento de cuales son los RDDs cacheados o persistidos para evitarrecomputarlos innecesariamente. Adicionalmente se encarga de tratar fallos cuando enuna operación shuffle hay datos que se pierden, en tal caso, etapas anteriores tendránque ser reejecutadas. Las fallas que ocurren dentro de una etapa en particular (que noson causadas por fallos en un shuffle) son tratadas por el TS, el cual reintentará ejecutarcada tarea un cierto número de veces antes de cancelar la etapa.

3.3.8.4. Transformaciones Narrow

Se utilizan cuando los datos que se necesitan tratar están en la misma partición del RDD yno es necesario realizar una mezcla de dichos datos para obtenerlos todos. Algunos ejem-

48

Page 55: Modelos de procesamiento distribuido basados en Spark para

Figura 3.14: Vista de una transformación narrow

Figura 3.15: Vista de una transformación wide

plos son las funciones filter(), sample(), map() o flatMap(). La figura 3.14 muestra una vistasimplificada de cómo fluyen los datos de un RDD a otro al aplicar una transformación.

3.3.8.5. Transformaciones Wide

Se utilizan cuando la lógica de la aplicación necesita datos que se encuentran en diferen-tes particiones de un RDD y es necesario mezclar dichas particiones (shuffle) para agru-par los datos necesarios en un RDD determinado. Ejemplos de wide transformation son:groupByKey() o reduceByKey(). Se muestra una vista gráfica del flujo de datos y cómo sonmezclados y repartidos en la figura 3.15

3.3.8.6. Tareas (tasks)

Las tareas (también conocidas como comandos), son la unidad de ejecución más pequeñay fundamental que conoce Spark. Éstas se encargan de computar sobre los registros de

49

Page 56: Modelos de procesamiento distribuido basados en Spark para

Figura 3.16: Una acción desencadena la creación de una etapa la cual se materializa en tareas.

Figura 3.17: Grafo de etapas obtenido a partir de un plan lógico

una partición de un RDD para una determinada etapa de un determinado trabajo. En lafigura 3.16, se puede ver como ante una acción realizada, un RDD es procesado paraobtener un conjunto de tareas.

3.3.8.7. Trabajos (jobs)

Un trabajo, es la unidad de ejecución de alto nivel manejada por Spark. Se crea al invocaruna acción y es entregado al DAGS para computar el resultado.

3.3.8.8. Stages (etapas)

Una etapa es una unidad de ejecución fíisica. Representa un paso dentro del plan deejecución físico. Está compuesto por un conjunto de tareas palalelizables. El conjunto deetapas obtenido en la figura 3.17, proviene de un plan lógico de ejecución.

Visto lo discutido anteriormente sobre trabajos en Spark, ahora podemos concluir, queun trabajo es un cómputo dividido en etapas. Una etapa es identificada por un Id autoin-cremental (Cada vez que se crea una etapa, DAGS incrementa un contador interno paratener el próximo Id) y solamente puede trabajar sobre particiones de un único RDD, peropueden ser asociados con muchos otras etapas padre (de las cuales dependan).

50

Page 57: Modelos de procesamiento distribuido basados en Spark para

DAGS divide un trabajo en una colección de etapas. Cada etapa contiene una secuenciade transformaciones narrow que pueden ser completadas sin realizar mezcla de datos(shuffle) en el dataset. Aquellas operaciones RDD que sean de dependencia narrow, comomap() o filter(), son conectadas internamente (pipelined) en un único conjunto de tareasen cada etapa, pero las operaciones con dependencia wide (shuffle), requerirán más deuna etapa (por ejemplo, una para escribir un conjunto de datos resultantes de un map, yotra para leer esos archivos).

En algún punto de la vida de una etapa, cada partición contenida en ésta será convertidaen una tarea.

3.3.9. En síntesis

Cuando se envía una aplicación a correr en Spark, esto es lo que ocurre (Karau et al.,2015):

El usuario carga una aplicación usando spark-submit

spark-submit lanza el controlador de Spark e invoca al método main() especificadoen el código fuente.

El controlador se comunica con el administrador del clúster para reservar recursos(ejecutores)

El administrador del clúster inicia los ejecutores en nombre del programa controla-dor.

El proceso del controlador, se ejecuta a través de la aplicación del usuario. Basadoen acciones sobre RDDs y transformaciones, enviándoles trabajo en forma de tareas.

Las tareas se ejecutan en los nodos trabajadores y guardan sus resultados

Cuando el main termina o el usuario hace una llamada de terminación al contextoSpark, entonces esto desencadena una acción en las que todos los ejecutores liberansus recursos del administrador del clúster.

3.4. Implementación

3.4.1. Enfoque

En este capítulo, hemos hablado sobre tecnologías como Apache Spark y Docker, parapoder crear un clúster distribuido con el fin de ejecutar algoritmos de predicción de en-laces. Más en concreto, la técnica de common neighbors la cual puede implementarse de

51

Page 58: Modelos de procesamiento distribuido basados en Spark para

muchas formas, en función de los objetivos específicos a alcanzar: tales como mejorar elrendimiento o la calidad de las predicciones. En el capítulo 5 nos centraremos en esosdetalles.

3.4.1.1. Common Neighbors

El predictor de common-neighbors, captura la noción de que dos nodos, más precisa-mente en el caso de una red social, amigos, que posean un amigo en común, puedan enel futuro, ser presentados y por ende establecerse una conexión. Esto tiene el efecto decerrar un triángulo en el grafo, lo cual guarda mucha relación a lo que sucede en la vidareal también. En (Newman, 2001) se ha experimentado en este contexto redes colaborati-vas, verificando una correlación positiva entre el número de vecinos en común de x e yen un tiempo t, y la probabilidad de que ambos puedan colaborar en algún tiempo futurodespués de t.

Dicha posibilidad, puede cuantificarse en una puntuación. Cuanto más alta sea la pun-tuación obtenida, mayor será la posibilidad de conexión entre ambos nodos. La ecuaciónpara calcular dicho puntaje es la siguiente. Definiendo a Γ como la cantidad de vecinos,o usuarios seguidos en la red social, para un nodo en particular.

Score(x,y) = |Γ(x) ∪ Γ(y)|

En la figura 3.18 podemos ver un ejemplo de common neighbors aplicado a un grafo decinco vértices. Las líneas continuas representan conexiones ya existentes, y las punteadasrepresentan los posibles arcos resultantes de la aplicación del algoritmo, y el respectivoanálisis de los puntajes que determinan la medida de similitud entre los vértices analiza-dos.

Es importante aclarar que el puntaje no es una probabilidad, ya que el mismo siempreserá mayor a cero y simplemente indica la cantidad de vecinos en común, lo que de al-guna forma nos da una noción de similitud entre ambos vértices. Un valor de cero indicaque para esta técnica no existen vecinos en común y por ende la posibilidad de existenciade arco es nula. Adicionalmente, tampoco significa que estrictamente no existirá una co-nexión futura, puesto que common neighbors sólo contempla casos de vecinos directos.Es decir, una distancia de solo un vecino en común. Un puntaje muy alto entre dos no-dos, tampoco podría asegurar que fehacientemente existirá un enlace en ambos vértices,puesto que dichos usuarios pueden conocerse en la vida real, pero aún así desinteresarseen tener algún tipo de relación en la red social. En (Yao et al., 2016) proponen eventual-mente ampliar la cantidad de saltos (hops) a un factor de 2, como en la figura 3.19, los

52

Page 59: Modelos de procesamiento distribuido basados en Spark para

Figura 3.18: Ejemplo de puntaje para dos casos en un grafo social

Figura 3.19: Common neighbors redefinido

vértices sombreados representan un posible caso de análisis de similitud considerandolos vecinos en común en dos saltos. De este modo, se puede mejorar la calidad de las pre-dicciones, teniendo en cuenta hasta dos usuarios en común transitivamente para calcularel puntaje de similitud. El impacto que esto tendrá, será a nivel exponencial en la com-plejidad del problema, aunque no trataremos dicha técnica alternativa en este trabajo.

3.4.2. SparkLP

SparkLP es el software escrito en Java para este trabajo, utilizando la API de Spark parapoder conectarse con los servicios que ofrece el mismo y poder crear, manipular datosy RDDs de forma distribuida. El mismo implementa un algoritmo basado en commonneighbors de un hop, como veníamos discutiendo anteriormente. Veamos primero unavista general del sistema a través de un diagrama de paquetes en la figura 3.20, y de undiagrama de clases en la figura 3.21 que muestra la estructura general del programa y suintegración con Apache Spark.

En secciones siguientes se presentan las clases restantes correspondientes a los métodosde filtrado y estrategias de cálculos de recomendaciones. A continuación se explica bre-

53

Page 60: Modelos de procesamiento distribuido basados en Spark para

Figura 3.20: Vista de paquetes de SparkLP.

vemente las responsabilidades básicas de cada clase de la figura siguiente.

ConfigInterpreter

Es una clase que implementa el patrón de diseño singleton. Una vez instanciada,es responsable de la puesta en ejecución del programa a través de la llamada init().Ya que SparkLP es puramente pre-configurado por el usuario para comenzar laejecución a través de los argumentos de la CLI; ConfigInterpreter tiene como res-ponsabilidad adicional analizar la línea de argumentos del programa y procesarloshaciendo uso del patrón factory para instanciar las posibles métricas implementa-das, como así también de sus posibles estrategias y/o configuraciones adicionales.También se encarga de configurar todos los valores que el usuario indique, talescomo valores de particionado y de arcos del grafo a procesar. Provee una opción deayuda especificando - -help, donde se muestran todos los valores de configuraciónposibles y valores aceptados por cada modificador.

StopWatch

Es una clase que se encarga de registrar lapsos de tiempo durante la ejecución delprograma, para ofrecer un reporte de tiempos pudiéndose incluir comentarios encada lap realizado, logrando así una descripción de cuánto tarda cada tarea delsistema en forma ordenada y también pudiendo obtener el tiempo total de ejecuciónde la instancia del programa.

PercentageException

Es una especialización de la clase Exception de Java. Su utilidad es proveer un me-jor tratamiento de errores ante un posible equívoco del usuario al ingresar valoresincorrectos que representen porcentaje, como en los casos de Information Source,donde el valor tiene que ser un flotante entre cero y uno.

LinkPredictor

Es el supertipo para la implementación de todas las posibles métricas de predic-ción de enlaces, nuevas métricas a implementarse, como Jaccard o Graph Distance,

54

Page 61: Modelos de procesamiento distribuido basados en Spark para

Figura 3.21: Diagrama de clases de SparkLP

55

Page 62: Modelos de procesamiento distribuido basados en Spark para

entre otras, tienen que heredar de esta clase e implementar los métodos abstrac-tos de configuración y run(). Es la responsable de comunicarse con Spark, creandouna instancia del Contexto de Spark, configurando algunas opciones básicas co-mo el nombre del programa. Cuando el software se corre en modo local (una solacomputadora), aquí se puede especificar con cuantos cores de CPU y memoria sedesea trabajar. En modo clúster la configuración es mejor realizarla desde la líneade comandos al llamar al comando spark-submit, el cual veremos más adelante. Porúltimo es la responsable de realizar un reporte final de la tarea de predicción, reco-lectando todas las métricas de tiempos y resultados obtenidos para luego generarun informe de la ejecución.

CommonNeighbors

Especialización de LinkPredictor, donde se implementa todo el comportamientode la métrica Common Neighbors. Implementa los métodos abstractos necesariospara dicha ejecución, y es la encargada de actuar como programa driver, al ser laresponsable de crear, manipular y obtener datos haciendo uso de la API RDD deJava Spark.

SparkContext

Es el punto de entrada para acceder a la funcionalidad de Spark. El contexto deSpark representa la conexión al clúster, y proporciona métodos para poder crearRDDs, acumuladores, variables broadcast, realizar acciones, obtener informaciondel estado del sistema, etc. Sólo un contexto de Spark puede estar activo por JVM,por lo que debe pararse mediante stop() antes de crear uno nuevo (si fuera necesa-rio). Se prevee que esta limitación será removida en versiones futuras de Spark. Aefectos prácticos de este trabajo, esta limitante no generó ningun tipo de inconve-nientes.

JavaSparkContext

Puesto que SparkContext esta hecho para trabajar sobre el lenguaje Scala, es nece-saria una capa de abstracción que nos permita interactuar con Spark sin tener quemezclar código en Scala y Spark. Si bien son lenguajes que se ejecutan sobre la JVM,pero poseen tipos de datos propios de cada lenguaje y que poseen semánticas dife-rentes. Para resolver esta incompatibilidad, JavaSparkContext proveé una amigableinterfaz que nos permite interactuar con SparkContext haciendo uso de las colec-ciones propias de Java y no las de Scala. (Lo mismo sucede con la implementacionde RDD y JavaRDD).

Tuple2

56

Page 63: Modelos de procesamiento distribuido basados en Spark para

Todas las operaciones donde intervengan datos en forma de clave-valor Spark lasmanipula a través de la clase Tuple2, que no es mas que un objeto wrapper. Existentuplas de mayor orden para aquellos casos donde se quieran almacenar más de dosobjetos.

Serializable

Todos los objetos que esten directamente involucrados con tareas de Spark, u obje-tos donde resida información (variables, constantes, objetos) que se usen en algunatarea Spark, deben poder ser serializados para poder ser enviados a través de la reda los ejecutores.

Principal

Es el punto de entrada de SparkLP. Implementa el método main() del programa.Aquí se invoca al interprete de configuraciones, a la ejecución de los algoritmos yposteriormente a la confección del reporte de las actividades realizadas.

3.4.3. Alternativas propuestas

Puesto que la técnica de common neighbors se basa en la búsqueda exhaustiva para laobtención de todos los arcos faltantes del grafo provisto, resulta interesante proponer op-timizaciones al algoritmo, ofreciendo estrategias que nos permitan refinar el conjunto dedatos y centrarnos en el análisis de cierto tipo de usuarios. Las alternativas a continua-ción, tratan de abarcar diferentes políticas de procesamiento de usuarios, en la que cadared social es libre de elegir a la hora de ejecutar algoritmos de recomendación.

Cuando hablamos de obtener recomendaciones en un contexto de la vida real, CommonNeighbors en sí nos ofrece una solución relativamente sencilla y rápida (esto no quieredecir que no sea compleja computacionalmente) para poder obtener recomendacionesbasándose en amigos en común que haya entre dos usuarios. Pero la realidad es que enlas redes sociales, podemos hacer una clasificación general de tipos de usuario y de quétipo de expectativas tienen con respecto al uso de dicha red social. Por un lado podríamosdefinir a los usuarios que buscan información (information seekers), y a los usuarios queson fuente de dicha información (information source) (Java et al., 2007; Krishnamurthy et al.,2008b). A partir de esta clasificación podríamos pensar que este segundo grupo, tal vez noestá interesado, o no le es tan relevante recibir recomendaciones, puesto que su objetivoprincipal en la red social es ofrecer contenido o información, y quizá no recibirla. Espor eso que plantearemos una posible estrategia de filtrado, de forma tal que podamosdescartar usuarios en el proceso de obtención de recomendaciones con el fin de agilizarel proceso, disminuir el tiempo de computo, y quizá, en contextos de tiempo de cómputofijo, poder dedicarlo a usuarios que de verdad busquen más y mejores recomendaciones(information seekers).

57

Page 64: Modelos de procesamiento distribuido basados en Spark para

3.4.3.1. Estrategias de filtrado

La implementación de Common Neighbors en este trabajo presenta múltiples formasdiferentes de procesamiento, y las que explicaremos a continuación se centran en el fil-trado de usuarios, o también llamado descarte de usuarios. De forma tal que aquellos queno cumplan con un criterio específico a la hora de la ejecución (valor parametrizable alcorrer el software), serán excluidos del proceso. Las tres modalidades de descarte son:

Baseline

En sí no supone optimización ni filtrado alguno. Este modo de ejecución se basa enprocesar todo el grafo social sin realizar descartes o filtrados. Modo útil en casosque el algoritmo tenga que correr sobre red social que se encuentre en un estado dearranque en frío (cold-start: con pocas interacciones en el grafo), ya que nos ahorrael tiempo de pre-procesamiento de los algoritmos de filtrado.

Information Source (IS)

Se basa en la premisa antes mencionada de que existen usuarios que buscan in-formación, y usuarios que proveen o exponen información, tal como es el caso delas celebridades, personajes de la política, músicos, etcétera. El criterio base para de-terminar si un usuario dado es considerado como una fuente de información estádeterminado por (Armentano et al., 2013):

IS(u) =( Seguidores(u)−Seguidos(u)

Seguidores(u)+Seguidos(u) ) + 1

2

Siendo IS un índice que oscila entre cero y uno, de modo tal que valores cercanos auno indicarían que se trata de un usuario fuente de información, y valores cercanosa cero, se tratarían de usuarios que buscan información. En un principio esta fór-mula nos permite hacer una rápida inferencia de tipo de usuario; pero en la prácticapuede llevar a clasificar a alguien como fuente de información sin tener en cuentala relevancia de dicho usuario en la red social. Veamos un ejemplo.

Usuario #Seguidores #Seguidos IS(usuario)

A 2.500.000 10000 0,996015936

B 25.000 100 0,996015936

C 250 1 0,996015936

En la tabla anterior vemos como, indistintamente de la proporción de usuarios quetratemos, el clasificador marcará a los tres usuarios como fuentes de información,lo cual en un principio es correcto. Pero a fines prácticos y computacionales no eslo mismo procesar usuarios del tipo de A que del tipo de C (por la forma en que

58

Page 65: Modelos de procesamiento distribuido basados en Spark para

trabaja Common Neighbors, teniendo que calcular arcos faltantes del grafo). Porlo que mediante un pequeño ajuste a este método, visto en el siguiente ítem, nospodrá dar una idea más acertada de qué tan costoso será procesar a un usuario ygenerar recomendaciones para él.

Information Source Weighted (ISW)

La idea subyacente en este método de filtrado es utilizar Information Source paraclasificar a un usuario, pero en aquellos casos en que alguno quede descartado poresta técnica, la idea sería poder darle una segunda oportunidad a través de un sondeode relevancia de usuario en la red social antes de descartarlo definitivamente. Elobjetivo de este método es no excluir usuarios tales como C en la tabla anterior.

Relevancia(u) =Seguidores(u)

#Usuarios

Tanto IS como ISW son métodos cuyos valores de tolerancia son parametrizados aliniciar el programa, con lo cual podemos obtener para el mismo dataset, diferentesvalores de descarte y recuperación de usuarios según los valores que demos a losargumentos que ahora llamaremos:

• Umbral: define al valor límite que determina si se descartará un usuario cuyoIS sobrepase al umbral.

• Tipo: define al valor límite que determina si se descartará un usuario cuyo ISsobrepase al umbral y que además cuya relevancia supere al valor de tipo.

Siendo ambos umbral y tipo dos valores reales expresados entre cero y uno. Veamoscomo esto influye en el ejemplo anterior para una red social de un cinco millonesde usuarios en total.

Usuario #Seguidores #Seguidos IS(usuario) Relevancia

A 2.500.000 10000 0,996015936 0,5

B 25.000 100 0,996015936 0,005

C 250 1 0,996015936 0,00005

Si fijáramos a umbral = 0,75 y a tipo = 0.5, entonces con ISW podríamos recuperara B y a C de los usuarios descartados y procesar recomendaciones para ellos. Enel caso de A, el valor de 0,5 indica que posee una relevancia que supera la cotasuperior de tipo y que por lo tanto estará definitivamente descartado.

Con respecto a la implementación de estas estrategias en SparkLP, veremos una vista declases que muestra la solución Java para estas funcionalidades, en la figura 3.22.

59

Page 66: Modelos de procesamiento distribuido basados en Spark para

Figura 3.22: Vista de clases para los métodos de filtrado de usuarios

60

Page 67: Modelos de procesamiento distribuido basados en Spark para

Figura 3.23: Vista de clases para estrategia de cálculos de scores.

3.4.3.2. Estrategias de cálculo de Score

Con respecto a los cálculos de los scores vistos en la figura 3.18, también se implementa-ron utilizando dos estrategias para realizar el cálculo. Una es más restrictiva y la segundaes libre, es decir, sin restricciones.

En cuanto a las restricciones que pueden aplicarse para calcular el puntaje para una tu-pla posible <usuario,usuario>, tenemos a Compact que descarta aquellas cuyo puntaje seaigual a cero. Esto tiene sentido puesto que en un principio podemos pensar que dos usua-rios cuyo índice de similitud (medido con common neighbors) igual a cero, significaríaque no comparten amigos en común, y que por lo tanto no pueden ser categorizadoscomo posible recomendación. Cabe aclarar que esta decición se basa en la premisa deque estamos trabajando con un algoritmo que solo mira hasta un hop de distancia comodiscutíamos en la subsección 3.4.1.1 en la página 52.

Con respecto a la implementación de estas estrategias en SparkLP, veremos una vista declases que muestra la solución Java para estas funcionalidades, en la figura 3.23.

61

Page 68: Modelos de procesamiento distribuido basados en Spark para

3.5. Conclusiones

A lo largo de este capítulo hemos expuesto una breve introducción a los conceptos bási-cos de Apache Hadoop quien dotó a Spark de muchas de sus características e influenciósu creación, basándose en la necesidad de procesar datos en memoria en lugar de en eldisco, y ofrecer una plataforma más simplificada y amigable al programador, siendo lamisma orientada a la velocidad de cómputo. Luego tratamos en mayor detalle todo lorelacionado a Spark, sus componentes principales y sus funciones para luego exponer enconcreto el software implementado en este trabajo: SparkLP. Hablamos de su estructura ydecisiones de diseño tomadas. En el siguiente capítulo veremos todo lo relativo a la crea-ción del clúster y su configuración, para luego centrarnos en lo experimental y analizarlos resultados, los retos y las limitaciones pertinentes.

62

Page 69: Modelos de procesamiento distribuido basados en Spark para

Capı́tulo 4Entorno experimental

4.1. Relevamiento técnico del clúster

En la experimentación pertinente al trabajo propuesto, se utilizó un clúster de compu-tadoras, propiedad del Instituto Superior de Ingeniería de Software Tandil (ISISTAN)1. Elmismo está compuesto por siete unidades de cómputo heterogéneas conectadas a travésde una red privada, accesible mediante Secure Shell (SSH). Las características generalesde las computadoras de dicho clúster son las siguientes:

GridCluster1, GridCluster4 y GridCluster7

• CPU: AMD Phenom(tm) II X6 1055T Processor

• Memoria Principal: 7731MB

GridCluster10, GridCluster11 y GridCluster12

• CPU: AMD FX(tm)-6100 Six-Core Processor

• Memoria Principal: 15GiB

GridCluster13

• CPU: AMD FX(tm)-6300 Six-Core Processor

• Memoria Principal: 15GiB

1http://www.isistan.unicen.edu.ar/

63

Page 70: Modelos de procesamiento distribuido basados en Spark para

4.2. Topología de red

El GridCluster computacional de ISISTAN, además de ser accesible in situ, está configura-do para poder ser usado de forma remota, haciendo uso de conexiones tunelizadas SSH,las cuales son seguras y rápidas. El cliente remoto puede ubicarse tanto en la universidady usar la intranet, que es mucho más rápida, o a través de Internet.

En la figura 4.1 se muestra un cliente remoto enviando trabajos desde una ubicacióngeográfica diferente al clúster, a través de una conexión SSH cifrada.

Figura 4.1: Vista de conectividad del clúster

En el clúster se usa una topología de red estrella, donde todos los nodos están conectadosa un enrutador principal que a su vez redirecciona el tráfico de datos a las diferentessubredes de la Universidad y a Internet.

4.2.1. Comunicación SSH entre nodos

En el clúster, cada nodo puede comunicarse entre sí a través de terminales SSH de laforma

$ ssh [email protected]

Donde X representa el número de pc al que quiera conectarse. Es importante mencionaresto pues es fundamental para el buen funcionamiento de Spark y el proceso de iniciacióndel clúster.

Cada nodo conoce la clave pública SSH de los demás nodos, por lo cual no hace faltainiciar sesión con contraseña al ir navegando entre las diferentes computadoras para ins-peccionar su sistema de archivos o árbol de procesos activos. Para esto es necesario tenerinstalado un servidor SSH en cada máquina del clúster y luego generar un par de cla-ves privada y pública para cada uno, mediante el comando ssh-keygen. Luego copiar laclave pública del nodo maestro en el archivo de claves autorizadas de cada uno de lostrabajadores de la siguiente forma, donde X representa el número de cada máquina.

64

Page 71: Modelos de procesamiento distribuido basados en Spark para

$ scp id_master.pub [email protected]:~/.ssh/;

$ ssh [email protected] "cat ~/.ssh/id_master.pub\

> >> authorized_keys";

4.3. Análisis del entorno experimental

En un principio, cuando enfrentamos problemas en el que muchas computadoras interco-nectadas necesitan ejecutar el mismo servicio, resultaría tedioso tener que escribir scriptsque se encarguen de comunicarse con todas las unidades de cómputo del clúster y dar-les la orden de instalar, configurar y preparar todo el sistema para soportar un softwarede procesamiento distribuido tal como Spark. Otro aspecto a tener en cuenta es que elclúster es usado por muchas personas, por lo que realizar la instalación de software que,entre otras cosas, crea nuevas interfaces de red y muchos archivos log residuales en lossistemas de archivos, no parece ser una buena opción para mantenerlas libres de softwareque ocupas recursos de almacenamiento en el clúster. Por todo esto, surge la necesidadde poder disponer de las computadoras sin hacer tantos cambios en ellas. Entonces po-dríamos proponer el uso de alguna tecnología tal como las máquinas virtuales, que sonnada más que un software que simula un sistema de computación y que puede ejecutarprogramas como si fuese una computadora real. De este modo podemos tener todo elSoftware necesario dentro de cada máquina virtual y luego cuando el trabajo este ter-minado, simplemente desecharlas dejando el clúster intacto. Por supuesto sabemos quevirtualizar un sistema operativo entero, impactaría negativamente en la performance deltrabajo en cuestión, puesto que en cada ordenador se estarán destinando recursos delsistema base para ejecutar otro sistema operativo más. Es así como concluimos que el ca-mino a seguir es a través del uso de contenedores. El concepto es similar al de máquinavirtual, pero un contenedor tiene algunas diferencias. Un contenedor es más ligero, yaque mientras que una máquina virtual necesita un sistema operativo para funcionar, uncontenedor funciona utilizando el sistema operativo que tiene la máquina anfitriona.

4.4. Docker

Docker ofrece la capacidad de empaquetar y ejecutar una aplicación en un entorno aisla-do llamado contenedor. El aislamiento y la seguridad le permiten ejecutar muchos conte-nedores simultáneamente en una máquina determinado. Los contenedores son livianosporque no necesitan la carga adicional de un monitor de máquina virtual (llamado hiper-visor), sino que se ejecutan directamente dentro del núcleo del equipo anfitrión. Por endeel impacto en el rendimiento global del sistema es mínimo.

65

Page 72: Modelos de procesamiento distribuido basados en Spark para

Docker, permite ubicar en un contenedor todo lo que una aplicación necesita para ser eje-cutada (Java, Maven, Scala ,Phyton, entre otros) para el caso de Spark y la propia aplica-ción. Así se puede llevar ese contenedor a cualquier máquina que tenga instalado Dockery ejecutar la aplicación sin tener que hacer nada más, ni tener que ver qué versiones desoftware tiene instalada esa máquina, de si tiene los elementos necesarios para que fun-cione la aplicación, o de si son compatibles. Se ejecuta el software desde el contenedorde Docker, y dentro de él estarán todas las bibliotecas y otras cosas que necesita dichaaplicación para funcionar correctamente.

El contenedor de Docker toma los recursos más básicos, que no cambian de un ordena-dor a otro de sistema operativo de la máquina en la que se ejecuta. Para el caso práctico,Docker es iniciado, montando un punto de anclaje con el sistema anfitrión para podertener una carpeta de archivos compartidos, por donde se pueden pasar archivos y otrosrecursos. El soporte del kérnel Linux para los espacios de nombres aísla la vista que tieneuna aplicación Docker de su entorno operativo, incluyendo árboles de proceso, red, IDde usuario y sistemas de archivos montados, mientras que los cgroups del kérnel propor-cionan aislamiento de recursos, incluyendo la CPU, memoria, el bloque de E/S y red.

Figura 4.2: Integración de Docker con el Kérnel Linux

4.4.1. Cgroups

Es un mecanismo que permite soportar a nivel del sistema operativo la capacidad deaislar el consumo de recursos (CPU, memoria, E/S, ancho de banda, etc. . . ) de formaindependiente a cada grupo de procesos. Por tanto cgroups es una herramienta útil paracontrolar la asignación de los recursos a los procesos. Los grupos de control (cgroups)permiten definir una jerarquía de grupos de procesos de manera que un administradorpueda definir al detalle la asignación de sus recursos.

Para el caso práctico, los grupos de control permiten que Docker comparta los recursos

66

Page 73: Modelos de procesamiento distribuido basados en Spark para

de hardware disponibles con los contenedores y, opcionalmente, imponga límites y res-tricciones. Por ejemplo, puede limitar la memoria disponible a un contenedor específico.

4.4.2. Namespaces

Docker utiliza una tecnología llamada namespaces para proporcionar el espacio de tra-bajo aislado denominado contenedor. Cuando se ejecuta un contenedor, Docker crea unconjunto de espacios de nombres para ese contenedor. Estos espacios de nombres propor-cionan una capa de aislamiento. Cada contenedor se ejecuta en un espacio de nombresseparado y su acceso está limitado a ese espacio de nombres.

Docker usa los siguientes espacios de nombres en Linux:

pid: aislamiento de procesos (PID: Process ID).

net: gestión de interfaces de red (NET: Networking).

ip: gestión del acceso a los recursos de IPC (IPC: InterProcess Communication).

mnt: gestión de los puntos de montaje del sistema de archivos (MNT: Mount).

uts: aislando los identificadores de kérnel y versión. (UTS: Unix Timesharing Sys-tem).

4.4.3. Sistemas de archivos de unión

Los sistemas de archivos de unión, o UnionFS, son sistemas de archivos que funcionanmediante la creación de capas, lo que los hace muy ligeros y rápidos. Docker utilizaUnionFS para proporcionar los bloques para los contenedores. Docker puede usar múlti-ples variantes de UnionFS, incluyendo AUFS, BTRFS, VFS y DeviceMapper.

4.4.4. Libcontainer

Docker combina los espacios de nombres, los grupos de control en un contenedor llama-do formato de contenedor. El formato predeterminado del contenedor es libcontainer.

4.4.5. Docker daemon

El demonio de Docker (Docker daemon), es un proceso residente en memoria, que admi-nistra y manipula los contenedores. Se mantiene a la escucha de peticiones enviadas através de la API de Docker. El programa cliente llamado «docker», provee una interfaz

67

Page 74: Modelos de procesamiento distribuido basados en Spark para

de línea de comandos que permite al usuario interactuar con el demonio como se ve enla figura 4.32.

Figura 4.3: Vista del Docker Engine y su interacción con cada componente del sistema.

4.4.6. El cliente Docker

El cliente Docker (docker) es la forma principal en que muchos usuarios de Docker inter-actúan con Docker. Cuando usa comandos como:

$ docker run "imagen"

el cliente envía estos comandos a dockerd, que los ejecuta. Este comando utiliza la API deDocker. El cliente Docker puede comunicarse con más de un demonio.

4.4.7. Redes de contenedores

Cuando nos conectamos a una aplicación corriendo dentro de un contenedor y revisamoslas interfaces de red en nuestra computadora podemos ver que hay una interfaz llamadadocker0.

$ ifconfig docker0

docker0 Link encap:Ethernet HWaddr xx:xx:xx:xx:xx:xx

inet addr:172.17.0.1 Bcast:0.0.0.0 Mask:255.255.0.0

2docs.docker.com/engine/docker-overview/

68

Page 75: Modelos de procesamiento distribuido basados en Spark para

La interfaz docker0 es creada por el docker daemon para poder conectarse con el con-tenedor. Esta red tiene un rango de direcciones IP típico: 174.17.0.1/24. Las primerasversiones de Docker presentaban problemas de seguridad o más bien de confiabilidad yaque cualquier contenedor de una máquina podía tener acceso a cualquier contenedor delmismo. La versiones más recientes de Docker permiten crear distintas subredes y aislarperfectamente grupos de contenedores para no permitir accesos no autorizados.

Las distintas redes que podemos tener configuradas por defecto en nuestra máquina son:

host representa la red del propio equipo y haría referencia a eth0

bridge representa la red docker0 y a ella se conectan todos los contenedores pordefecto.

none significa que el contenedor no se incluye en ninguna red y si verificamos estocon el comando ifconfig dentro del contenedor veríamos que solo tiene la interfazde loopback.

4.4.8. Docker Swarm

Uno de los desafíos de esta etapa, es lograr que sólo baste con descargar una imagen deSpark para Docker y crear un contenedor a través de un simple comando con el cual seejecute todo lo necesario para poder empezar a usar Spark de forma distribuida en unclúster. En principio podríamos intuir que basta con tener las computadoras conectadasen red e iniciar Docker para que los servicios ejecutados en los contenedores puedancomunicarse entre los distintos nodos del clúster. En las etapas iniciales de la creación estono dio resultado, puesto que Docker requiere una breve configuración y ajustes de red,para otorgarle la capacidad de comunicarse con otras computadoras corriendo Docker,aún cuando la configuración de red del sistema operativo esté realizada correcta y existaconectividad.

Un enjambre (swarm) es un grupo de máquinas que ejecutan Docker y se unen para for-mar un clúster. Una vez creado, puede ejecutarse el software necesario, pero ahora habi-litando la opción de trabajo distribuido a través del Swarm manager. Las máquinas unidasa un enjambre pueden ser tanto virtuales como físicas. Luego de unirse forman parte delconjunto de nodos. Los mismos pueden ser nodos administradores o trabajadores.

Para la creación del swarm sólo basta con ejecutar en el host que será máster la siguientesecuencia de comandos.

$ docker swarm init --advertise-addr=192.168.240.1

Lo que hacemos aquí es iniciar el modo swarm, en la interfaz de red con la IP especificada.Lo siguiente es definir la red donde los nodos van a conectarse.

69

Page 76: Modelos de procesamiento distribuido basados en Spark para

4.4.9. Redes overlay

Una red superpuesta (en inglés, overlay network) es una red virtual de nodos enlazadoslógicamente, que está construida sobre una o más redes subyacentes (underlying network),tal como se ve en la figura 4.4. Se dice que los nodos de la red superpuesta están conec-tados por enlaces virtuales. Su objetivo es implementar servicios de red que no estándisponibles en las redes subyacentes. Las redes superpuestas pueden apilarse de formaque tenga capas que proporcionen servicios a la capa superior.

En una red, los nodos se comunican entre sí usando los llamados protocolos de red. Unprotocolo de red es un conjunto de reglas usadas por computadoras para comunicarseentre sí a través de una red. Los protocolos de la red normalmente se organizan en capasformando pilas. Por esta razón la forma habitual de crear una red superpuesta es añadircapas adicionales sobre los protocolos de las redes subyacentes.

Podemos construir una red superpuesta operando exclusivamente a nivel lógico. Porejemplo, se puede aprovechar una red de nodos ya conectados para construir sobre ellauna nueva red que proporcione servicios adicionales. Sin embargo, a veces también esnecesario operar a nivel físico (además de a nivel lógico). Por ejemplo, para conectar dosredes de área local se necesita interconectar las dos redes a través de un enrutador. Laconexión a través del enrutador no solamente proporciona una conexión física entre lasredes, también implementa una nueva capa de protocolos que permite la interconexión(Doval and OMahony, 2003).

Figura 4.4: Vista de una red overlay sobre una topología de red subyacente

Para crear la red overlay, hay que ejecutar el siguiente comando «docker»:

$ docker network create --driver=overlay --attachable nombre_de_la_red

Y por ultimo crear el contenedor correspondiente de Spark en cada máquina mediante lasiguiente instrucción

70

Page 77: Modelos de procesamiento distribuido basados en Spark para

$ docker run -it -p 4040:4040 -p 8080:8080 -p 8081:8081 -h spark --name=spark \

> -v ~/data:/data --network nombre_de_la_red --ip 10.0.0.X p7hb/docker-sparkip

- it para iniciar el contenedor en modo interactivo mediante una terminal bash.

- p para exponer puertos al sistema anfitrión.

- h para especificar el nombre del host virtual.

-- name para especificar el nombre del contenedor.

- v para especificar un punto de montaje para compartición de archivos entre siste-ma huésped y el contenedor.

-- network es la red overlay que especificamos.

-- ip es la dirección ip estática de la red overlay asignada al contenedor.

La imagen utilizada para este trabajo ya fue previamente creada por Prashanth Babu3.

4.5. Automatización del proceso

Spark provee scripts para iniciar y detener el clúster de forma automática. El problemase presenta al utilizar Docker como un contenedor que aísla y abstrae las configuracionesde red. Por lo que dichos scripts no funcionan sin antes realizar una reconfiguración totaldel sistema anfitrión, teniendo que otorgar privilegios de usuario raíz a Spark para podercomunicarse a través de la red overlay y así ejecutar los comandos para comenzar el clús-ter. En cambio podemos redefinir la forma en que Spark se interconecta, aprovechandocaracterísticas de Docker que nos permite llamar un ordenador remoto mediante SSH ypedirle que ejecute órdenes en cada nodo.

Primero y principal cabe destacar que los contenedores son procesos que pueden llegara detenerse y también necesario detener cuando ya no se usen para liberar los recursosusados. Esto es importante mencionar porque aquí haremos uso de ordenes Docker en-cargadas de iniciar y detener los contenedores.

4.5.1. Inicio del clúster

En el máster se ejecuta un script que inicia el contenedor Spark y también los servi-cios de máster. Luego se comunica con cada nodo mediante SSH ordenándole quese registre como un worker.

3hub.docker.com/r/p7hb/docker-spark/

71

Page 78: Modelos de procesamiento distribuido basados en Spark para

#!/bin/sh

#arrancar el master

docker start spark;

docker exec -it spark start-master.sh;

#comando para obtener IP overlay de la red

infoMaster=$(docker exec spark bash -c "ip addr show | grep eth0 | \

> grep inet | cut -f 6 -d ’"’ ’"’ | cut -f 1 -d ’"’/’"’ ; hostname;");

#obtengo ip del master

ip_master=$(echo $infoMaster| cut -f 1 -d ’ ’);

#comando para arrancar el slave

#arrancar los workers

# arranco 4

ip_4=$(ssh [email protected] "~/start-worker.sh $ip_master");

# arranco 7

ip_7=$(ssh [email protected] "~/start-worker.sh $ip_master");

# arranco 10

ip_10=$(ssh [email protected] "~/start-worker.sh $ip_master");

# arranco 11

ip_11=$(ssh [email protected] "~/start-worker.sh $ip_master");

# arranco 12

ip_12=$(ssh [email protected] "~/start-worker.sh $ip_master");

# arranco 13

ip_13=$(ssh [email protected] "~/start-worker.sh $ip_master");

En cada worker se inicia el contenedor de Spark con la IP pasada como parámetroen la invocación del script.

#!/bin/bash

ip_maestro=$1;

a=$(docker start spark);

a=$(docker exec spark bash -c "start-slave.sh spark://$ip_maestro:7077");

ip=$(docker exec spark bash -c "ip addr show | grep eth0 | grep inet |\

> cut -f 6 -d ’"’ ’"’ | cut -f 1 -d ’"’/’"’ ;");

info=$(echo $ip | tr ’\n’ ’ ’);

info="$info $(hostname)";

echo $info;

72

Page 79: Modelos de procesamiento distribuido basados en Spark para

4.5.2. Parada del clúster

En el máster se ejecuta un script que inicia el contenedor Spark y también los servi-cios de máster. Luego se comunica con cada nodo mediante SSH. Ordenándole quese registre como un worker.

#!/bin/sh

#comando para terminar el slave

ip_4=$(ssh [email protected] "~/stop-worker.sh");

ip_7=$(ssh [email protected] "~/stop-worker.sh");

ip_10=$(ssh [email protected] "~/stop-worker.sh");

ip_11=$(ssh [email protected] "~/stop-worker.sh");

ip_12=$(ssh [email protected] "~/stop-worker.sh");

ip_13=$(ssh [email protected] "~/stop-worker.sh");

docker exec -it spark stop-master.sh;

docker stop spark;

En cada worker se detiene el contenedor de Spark.

#!/bin/bash

a=$(docker exec spark bash -c "stop-slave.sh");

ip=$(docker exec spark bash -c "ip addr show | grep eth0 | grep inet |\

> cut -f 6 -d ’"’ ’"’ | cut -f 1 -d ’"’/’"’ ;");

info=$(echo $ip | tr ’\n’ ’ ’);

a=$(docker stop spark);

a="$a $(hostname)" echo $info "STOPPED";

73

Page 80: Modelos de procesamiento distribuido basados en Spark para

74

Page 81: Modelos de procesamiento distribuido basados en Spark para

Capı́tulo 5Experimentación

A lo largo de este capítulos trasladaremos todo lo visto en el capítulo 3 de trabajo pro-puesto en un escenario experimental ya detallado en el capítulo 4. En primer lugar discu-tiremos las limitaciones técnicas de dicho entorno y la naturaleza del conjunto de datos atratar, luego evaluaremos cada una de las técnicas de filtrado de usuarios para diferentesvalores de umbral y tolerancia respectivamente y contrastaremos los resultados con laejecución baseline del algoritmo. Por último mostraremos un análisis comparativo quemuestran el comportamiento y los tiempos para cada modo de ejecución realizado.

5.1. Alcance y limitaciones

Para realizar las pruebas en SparkLP, inicialmente se contaba con un conjunto de datos1

proveniente de la red social Weiboo 2. Del mismo podemos mencionar algunas de suscaracterísticas principales:

Tamaño total de archivo: 776,4 MBytes

Cantidad total de usuarios: 1.944.589

Cantidad total de interacciones: 50.655.143

Dicho conjunto de datos contiene únicamente un archivo histórico de interacciones entreusuarios «usuario que sigue a usuario», con el siguiente formato: IDseguidor→ IDseguido

Al comienzo de la etapa de experimentación se probó ejecutar el algoritmo de CommonNeighbors de diferentes formas y con diversas restricciones de memoria. La primera al-ternativa evaluada consistió en tomar muestras del dataset para no tener que sobrecargar

1https://www.kaggle.com/c/kddcup2012-track1/data2www.weibo.com

75

Page 82: Modelos de procesamiento distribuido basados en Spark para

la memoria del programa driver como en un principio sucedió. El sistema continuó pre-sentando fallas, arrojando los siguientes errores:

java.lang.OutOfMemoryError: GC Overhead limit exceeded

La causa de este error se da cuando tareas lo suficientemente demandantes de me-moria hacían uso intensivo de esta sin llegar a agotarla en su totalidad, pero sí de-jando muy poco espacio en heap disponible para el alojamiento de nuevos objetos.En estos casos el Garbage Collector (GC), es activado contínuamente intentando li-berar memoria. Cuando una tarea (proceso Java) gasta más del 98% de tiempo deejecución recolectando objetos basura, y el umbral de recolección de memoria esmenor del 2% del tamaño total del heap, entonces este mecanismo de error deten-drá la ejecución. En estas circunstancias, se puede decir que la tarea queda bloquea-da sin poder continuar su ejecución por falta de memoria. Si bien Spark es tolerantea fallos no controla los fallos provocados por la JVM.

java.lang.OutOfMemoryError: Requested array size exceeds VM limit

La causa de este error se identificó como un intento de Spark de crear resultados deuna tarea, que exceden las capacidades de memoria física de las que la JVM puededisponer.

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

La causa de este error es parecida a la mencionada anteriormente, sólo que en con-textos donde Spark intenta de antemano crear RDDs que superan los 2GB de espa-cio para una tarea. Esto nos da un indicio que el particionado propuesto al datasettrabajado, no es lo suficientemente grande para poder manejar los conjuntos deresultados.

OutOfMemoryError: Java heap space

En casos en que pudimos finalmente traer mediante collects, información al pro-grama driver, pero esta información excede la memoria física disponible en dichalocación. También ocurrió cuando en casos donde las tareas eran muy grandes, secalculaban muchos arcos faltantes del grafo social y de forma muy rápida sin dartiempo a iniciar el GC.

Es común ponernos a pensar cuál es la causa por la que Spark no está siendo capaz de po-der procesar un dataset, que a simple vista no parecen grandes datos sociales. La respuestaa esta pregunta responde a la siguiente cuestión. Teniendo aproximadamente cincuentamillones de interacciones, y un promedio de dos millones de usuarios, definiremos dossituaciones que se darán en el proceso de obtención de recomendaciones:

76

Page 83: Modelos de procesamiento distribuido basados en Spark para

1. Hit: «usuario de la red social ya existe en la lista de seguidores». Este es el caso ideal deejecución puesto que solo se realiza una comparación y no se crean objetos nuevos,el resultset de la tarea no tendrá una recomendación nueva.

2. Fail: «usuario de la red social no existe en la lista de seguidores». Se crea un nuevo objetode tipo tupla y se guarda en él una copia del usuario observado y el usuario de lared que no existe en seguidores. Este es el peor caso de ejecución (computacional-mente), debido a que además de realizar una comparación por existencia, tambiénse crearan tres objetos nuevos y luego agregados al resultset de la tarea.

Y ahora podemos evaluar dos posibles escenarios.

1. El grafo social es muy conexo, es decir, la cantidad de arcos faltantes es, a lo sumo,la cantidad de arcos ya existente. En este contexto podemos decir que el predictorde enlaces en cada tarea, experimentará mas situaciones de hit. Por lo tanto, esteescenario implica un menor consumo de memoria principal.

2. El grafo social es poco conexo, es decir, posee usuarios que han interactuado poco(seguir a otros usuarios). Hablando en términos de procesamiento, este escenario esel más costoso, ya que se producirán más situaciones de fail llevando a uso intensivode memoria. La cantidad de memoria a usar estará determinada por el grado deconectividad de ese usuario en la red social. Cuanto más bajo sea este valor, mayorserá el tiempo de cómputo requerido y la memoria necesaria.

Teniendo en cuenta lo antes planteado, definimos al grado de interacción como:

GI(u) =Seguidores(u) + Seguidos(u)

TotalRed social

Definido el grado de interacción, el dataset original posee una mediana muy cercana alcero, lo cual nos da una noción de la baja interacción entre usuarios en dicho grafo. Clara-mente podemos concluir que predominan los valores muy cercanos al cero. Por todos losproblemas de memorias anteriormente mencionados, fue imposible procesar con Spark eldataset propuesto. Por lo que fue estrictamente necesario realizar una reducción del con-junto de prueba debido a la gran limitación tecnológica con la que se contó al momentode la experimentación.

Entonces, la alternativa a seguir es la siguiente: plantear una estrategia que nos permitaoperar sobre un conjunto de datos más reducido que se adapte a las posibilidades decómputo de nuestro clúster.

77

Page 84: Modelos de procesamiento distribuido basados en Spark para

5.1.1. Reducción del conjunto de datos

La alternativa a seguir fue tomar una muestra aleatoria de nuestro dataset, mediante unalgoritmo escrito en lenguaje Scala. En este experimento se trató de obtener un total de25000 usuarios al azar (sin repeticiones), y luego del dataset original, extraer todas lastuplas correspondientes cuyos valores de clave y valor pertenezcan a la lista de usuariospreviamente obtenida.

Este experimento nos arrojó una muestra bastante representativa con respecto a la origi-nal, con las siguientes características:

Tamaño total de archivo: 116.7 MBytes

Cantidad total de usuarios: 25.000

Cantidad total de interacciones: 7.636.756

Analizando esta muestra del conjunto de datos, puede verse la nueva distribución delgrado de interacción de usuarios en el histograma de la figura 5.1. Sigue dándonos nú-meros muy bajos de interacción, pero al ser más pequeño en cantidad de usuarios e inter-acciones, estamos en condiciones computacionales de ponerlo a prueba y ver qué sucedecon la predicción de enlaces.

Figura 5.1: Histograma para dataset reducido en función de su grado de interacción

78

Page 85: Modelos de procesamiento distribuido basados en Spark para

5.2. Preparación del sistema

Para la ejecución de SparkLP, representado en un diagrama de secuencia en la figura 5.2,se observa la creación del interprete de configuraciones, cómo es instanciado el predictorde enlaces, y cómo es invocado Spark en el proceso de configuración del entorno distri-buido.

Figura 5.2: Diagrama de secuencia de configuración inicial.

5.3. Filtrado de usuarios

Como habíamos planteado en la sección 3.4.3.1, durante el proceso experimental, tratare-mos técnicas orientadas al filtrado de usuarios, que nos permitan deshacernos de aquellosque no sean relevantes según el criterio de quien necesite obtener las recomendaciones.En este trabajo nos centraremos en usuarios cuyo valor de popularidad sea alta, o re-lativamente alta con respecto al tamaño del grafo social. En las siguientes subseccionesveremos en mayor detalle los detalles de ejecución de cada técnica.

5.3.1. Baseline

En esta modalidad de ejecución, podemos observar como el objeto responsable del fil-trado nos devuelve la misma referencia al RDD que le hemos pasado, de modo que lastransformaciones y acciones que apliquemos, se harán sobre el conjunto de datos obteni-do al cargarlo del disco. También es posible ver el rol que cumple el contexto de Spark al

79

Page 86: Modelos de procesamiento distribuido basados en Spark para

ser el responsable de devolvernos un RDD a partir del dataset a procesar que se encuen-tra en el sistema de archivos, tanto sea local como en HDFS. Luego de obtener nuestroRDD convertido a clave-valor, llamamos a la acción count para forzar la persistencia delmismo en memoria, y poder así enviar mediante una variable broadcast (de sólo lectura)la lista de todos los usuarios de la red social a cada uno de los ejecutores.

Recordamos que Spark no ejecutará ninguna operación sobre los RDD hasta que se en-cuentre una acción. Aunque en esta modalidad, al obtener los usuarios descartados, ten-dremos un conjunto vacío, las llamadas al método encargado de eliminar los usuariosno realizará modificación alguna. Si bien parece una característica sin sentido por ahora,cobrará mucha importancia cuando el filtrado de usuarios sea efectivo, como veremosmás adelante.

La razón de la persistencia (caché) del RDD de interacciones filtrado, responde a la ne-cesidad de ahorrar cómputos de información que vamos a usar más de una vez, ya quecomo sabemos Spark recomputa cada uno de los RDD que sean necesarios en el RDD Li-neage cuando la información se necesite, o en caso de que particiones se pierdan durantela ejecución. Esto implicaría un coste extra de tiempo de procesamiento cuando explo-remos las estrategias de cálculo de scores para la recomendaciones, donde necesitamosdel RDD filtrado para su obtención. Por tal motivo tener el RDD de interacciones, quetambién fue convertido a clave-valor previamente, persistido en memoria, nos permitiráun incremento sustancial en la performance del sistema. En el diagrama de secuencia dela figura 5.3, se puede ver la ejecución de este proceso.

80

Page 87: Modelos de procesamiento distribuido basados en Spark para

Figura 5.3: Diagrama de secuencia para la configuración de filtrado nulo.

Como veíamos anteriormente en el proceso de filtrado baseline se retorna la misma refe-rencia del RDD sin aplicar ninguna transformación. Recondando anteriormente cuandomencionabamos al grafo de dependencias acíclico (DAG), aquí no se generan transfor-maciones por lo que no se agregan nodos al grafo. Pero sí sucede en los casos de filtradoefectivo, que a continuación se mostrará la parte del mismo pertintente a esta etapa deejecución y a la creación del RDD inicial con el que se trabajará posteriormente.

5.3.2. IS e ISW

En los métodos de filtrado de usuarios efectivo implementados en este trabajo, la inter-acción entre SparkLP y Apache Spark difiere sustancialmente con respecto a la versiónbaseline. Cuando pedimos a SparkLP que realice un descarte de usuarios, se destinará untiempo de cómputo extra en el clúster, para obtener todos los usuarios que no cumplancon el criterio de aceptación en función de cómo haya sido parametrizado el programa almomento de su ejecución. El fitrado de usuarios consiste básicamente en lo siguiente.

81

Page 88: Modelos de procesamiento distribuido basados en Spark para

Figura 5.4: Diagrama de secuencia para configuración de filtrado efectivo.

82

Page 89: Modelos de procesamiento distribuido basados en Spark para

Figura 5.5: Diagrama de secuencia para recolección de usuarios descartados.

Con respecto al plan lógico de ejecución podemos ver que el trabajo encargado de filtrarlos usuarios se divide en tres etapas. La primera y segunda etapa son muy similares porlo que sólo se darán detalles de la primera en la figura 5.7.

En síntesis, el filtrado consiste en que para cada columna del dataset en formato clave-valor, obtener un nuevo RDD para las claves, y otro RDD para los valores, en dondese pueda conocer cuántas ocurrencias de cada clave o valor existan respectivamente. Enotras palabras, queremos saber cuantos seguidores y cuantos seguidos tiene cada usuario,para poder aplicar la formula de information source.

La reducción por clave, que se puede ver en la figura 5.6, se lleva a un nuevo stage puestoque es un tipo de transformación de tipo wide. Por lo que desata un proceso shufflecomo ya discutimos en capítulos anteriores. Las transformaciones map y filter son detipo narrow, y esa es la razón por la que Spark las agrupa dentro de un mismo stage yoptimizar la ejecución a través de pipelines.

83

Page 90: Modelos de procesamiento distribuido basados en Spark para

Figura 5.6: DAG para filtrado activo en IS o ISW

Ahora veamos en mejor detalle lo que ocurre en la primera etapa. Partiendo del data-set inicial cargado en disco, se convierte a clave-valor, y luego almacenado en memoriaprincipal (lo que Spark define como caché). Luego se obtiene el conjunto de todos los va-lores del RDD, y luego se le aplica una transformación resultando en una tupla del tipo<usuario,ACUMULADOR> que servirá para obtener los seguidores totales de cada clave.Lo mismo aplica en el stage 1, pero en dicho caso, será para las claves del RDD.

84

Page 91: Modelos de procesamiento distribuido basados en Spark para

Figura 5.7: DAG en detalle para la primera etapa.

Una vez preparado el RDD que contiene la cuenta de seguidores y seguidos por usuario,es momento de coagrupar ambos conjuntos, y obtener uno único de forma que podamosfiltrar aquellos que no cumplan con las condiciones impuestas por la fórmula de IS queya hemos discutido con anterioridad.

Al final de la etapa que podemos ver en la figura 5.8, se recuperan todos aquellos usuariosdescartados en forma de conjunto hash, y se le pide a Spark que envíe mediante difusióna todos los ejecutores, la lista de usuarios actualizada (sin los excluidos) y se preparael sistema para quitar del dataset de interacciones, aquellas que contengan al menos unusuario descartado. En la figura 5.9, la acción count fuerza el comienzo de la ejecucióndel DAG obtenido hasta el momento.

85

Page 92: Modelos de procesamiento distribuido basados en Spark para

Figura 5.8: DAG en detalle para la etapa de descarte

Figura 5.9: Comienzo del descarte

Una vez terminado el filtrado de usuarios, podemos ver como comienza en el stage 5 elproceso de obtencion de los arcos faltantes. Primero veremos el diagrama de secuencia yluego veremos el DAG correspondiente a dicha etapa.

86

Page 93: Modelos de procesamiento distribuido basados en Spark para

5.4. Cálculo de las recomendaciones

Figura 5.10: Diagrama de secuencia para configuración de filtrado efectivo.

87

Page 94: Modelos de procesamiento distribuido basados en Spark para

Con respecto al DAG que se genera en Spark para obtener los resultados, aquí ya no de-pende del método de filtrado, por lo que la ejecución es la misma para cualquier método.Veremos la evolución del RDD lineage que corresponde al cálculo de los arcos faltantesdel grafo social. En la figura 5.11, podemos ver el trabajo 2, que toma el RDD ya filtrado(o no, en el caso de ejecución baseline), y realiza una agrupación por clave. Luego ge-nera las predicciones comparando la lista de seguidos de cada clave comparando con lalista de usuarios de la red social (que habíamos difundido en el paso anterior luego deldescarte).

Figura 5.11: DAG para la predicción de arcos.

En la figura 5.12, podemos ver que Spark va procesando en la medida de sus posibilida-des de memoria disponible en el clúster, por lo que lo hace en dos partes. Esto respondea la forma en que Spark organiza las tareas cuando usamos la orden groupByKey. Ob-tiene un vasto conjunto de claves con sus respectivas listas de seguidores, y las envía aprocesar. Cuando las tareas van siendo consumidas, Spark toma el resto de las claves queaún no fueron procesadas y las vuelve a enviar a procesar para completar con el proceso.Lo interesante de esta situación es que, como ya habíamos pedido persistir en memoriaprincipal el dataset de interacciones, ya no es necesario calcularlo, por lo que la etapa 6es omitida (skipped). Aquí es donde claramente se puede ver que un inteligente manejode la función de persistencia puede acelerar de forma drástica la performance global.

88

Page 95: Modelos de procesamiento distribuido basados en Spark para

Figura 5.12: Continuación del proceso de predicción con ayuda de persistencia.

Cuando se termina de agrupar y de tomar las muestras de arcos faltantes, ahora se pro-cede al calculo de los scores, que son parte del algoritmo de Common Neighbors. Elmismo consiste en contar cuántos amigos en común tiene un par de vértices (usuarios),obtentiendo para tal fin, una lista de seguidos de cada miembro y realizando una ope-ración de intersección y contando cuántos existen en este último conjunto. Cabe aclararnuevamente que la intersección es una transformación de dependencia wide, por lo quenuevamente será necesario shuffling de datos y por ende, ser tratado en una nueva etapa.

En la figura 5.13, se muestra el RDD Lineage que se ejecutará para cada tupla escogida dela muestra de arcos faltantes (predicciones).

89

Page 96: Modelos de procesamiento distribuido basados en Spark para

Figura 5.13: RDD lineage de el calculo de scores de cada tupla.

Ahora veremos en detalle en la figura 5.14, lo que sucede en el stage 8. Análogamente alo que sucedía en la figura 5.6, sólo se dará detalles para una de las etapas, ya que aquítambién lo único que varía es que en un camino se tratan las claves y en el otro los valores.

90

Page 97: Modelos de procesamiento distribuido basados en Spark para

Figura 5.14: Obtención de seguidores para un usuario dado.

Con respecto a la intersección, podemos ver en detalle en la figura 5.15, que es movidaa otra etapa por la naturaleza de este tipo de transformaciones. Al final de la misma, seobtendrá el puntaje buscado entre los dos usuarios observados; si el mismo es mayor acero entonces estaremos ante una nueva recomendación y será registrada en una listacreada con tal objetivo.

Figura 5.15: Intersección de dos conjuntos de seguidores para obtener un score.

91

Page 98: Modelos de procesamiento distribuido basados en Spark para

Por último creamos un trabajo que se encargará de ordenar el dataset de recomendacio-nes por puntaje en orden descendente. Como se ilustra en la figura 5.16, volvemos a pedira Spark que tome la lista de recomendaciones (que ahora está en el programa driver) yaprovechando el grado de paralelismo de los algoritmos de ordenamiento, la ordene deforma descendente, para ver primero las recomendaciones con mayor puntaje.

Figura 5.16: RDD Lineage para ordenamiento.

Por último, mediante collect se devolverá al driver en forma de lista ordenada todaslas recomendaciones. SortByKey es otro tipo de transformación de tipo wide por lo queserá movida a otra etapa diferente a la de paralelización de la lista de recomendaciones.Se crearán dos etapas para el trabajo de ordenamiento, una de paralelización como yamencionamos y otra de ordenamiento y recuperación de los datos respectivamente comose muestra en la figura 5.17.

Figura 5.17: Ordenamiento terminado.

92

Page 99: Modelos de procesamiento distribuido basados en Spark para

5.5. Análisis de los resultados

Anteriormente vimos en detalle la ejecución de SparkLP tanto en diagramas de secuenciapara mostrar la integración con Spark, y también vimos el plano lógico de ejecución a tra-vés de las vistas de los diferentes grafos DAG. Ahora veremos cómo para cada modo deejecución de filtrado, y dejando el particionado fijo, podemos ver los distintos resultados.Se verá en detalle las cantidades de recomendaciones obtenidas y los tiempos de las fasesmás relevantes del sistema. Se comenzará con el modo baseline, luego con el modo IS yluego se incluirá un experimento con el modo de segunda oportunidad: ISW.

Definiremos algunas variables que ayudarán a comprender los cuadros siguientes:

Tiempo de descarte: es la cantidad de tiempo que demora realizar el filtrado deusuarios, actualizar la lista miembros de la red social y anunciarla a los ejecutores.

Tiempo de predicción de arcos: es el tiempo que demora calcular todos los arcosfaltantes en el grafo social dado, incluyendo el tiempo que demora traer dicha in-formación de regreso al programa driver.

Tiempo de cálculo de scores: es el tiempo que toma SparkLP en tomar cada arco dela lista de posibles predicciones y calcular su función de similitud.

Otros: incluyen tiempos de entrada/salida en general, configuración del programa,y generación de reportes.

Tiempo total: Tiempo total de ejecución de SparkLP desde que es invocado hastaque termina el método main()

Total de usuarios procesados: puesto que el filtrado de usuarios reduce dicho con-junto, esta variable nos indicará cuántos miembros de la red serán tenidos en cuentapara generar recomendaciones.

Recomendaciones obtenidas: cantidad total de recomendaciones obtenidas parauna instancia en particular de SparkLP. Puesto que por propósitos de tiempos noslimitamos a tomar subconjuntos del dataset de arcos faltantes, en cada iteraciónpodríamos obtener diferente número de recomendaciones.

Recomendaciones posibles: es la cantidad teórica máxima de arcos que pueden serconsiderados en el proceso de generación de predicción. Este índice tan sólo da unanoción de la complejidad espacial del grafo social dado.

93

Page 100: Modelos de procesamiento distribuido basados en Spark para

5.5.1. Baseline

Como habíamos visto en la estrategia de filtrado baseline, no se realizará descarte deusuarios. A continuación veremos qué resultados se obtuvieron, y una vista detalladade tiempos de etapas de ejecución. Luego compararemos al mismo con los métodos quedescartan usuarios.

Tiempo descarte 00:00

Tiempo predicción arcos 02:05

Tiempo cálculo scores 05:05

Otros 00:13

Tiempo total 07:23

Total usuarios procesados 25000

Recomendaciones obtenidas 181

Recomendaciones posibles 312.487.500

Cuadro 5.1: Cuadro comparativo del método Baseline

Como es posible ver en el cuadro 5.1, el tiempo de descarte es cero puesto que lo únicoque hace el objeto responsable de dicha funcionalidad, es devolver la misma referenciaal RDD que le pasamos.

5.5.2. IS

Para el modo de filtrado Information Source, se planeó la etapa experimental asignandocinco posibles valores al umbral, y ver el comportamiento del algoritmo. Podemos vercómo los tiempos de descarte aumentan conforme el valor de umbral aumenta en el cua-dro 5.2. Como la frecuencia de aparición de usuarios con índice importante de IS es muyescasa, podemos relacionar los resultados de ejecución del cuadro con el histograma de lafigura 5.18. El máximo descarte de usuarios se produce en el intervalo [0,85,0,95], aunquepodríamos decir que realmente la mayor exclusión se realizará en los intervalos [0,0,06],pero dichos usuarios representan al tipo de information seekers, y no nos interesa dejar-los afuera del procesamiento en este método.

94

Page 101: Modelos de procesamiento distribuido basados en Spark para

Figura 5.18: Histograma para el grado de IS en el dataset.

IS

0,5 0,75 0,85 0,95 0,995

Tiempo descarte 00:49 00:50 00:50 00:52 00:54

Tiempo predicción arcos 00:06 00:10 00:13 00:29 01:33

Tiempo cálculo scores 04:46 04:39 04:54 04:47 05:04

Tiempo otros 00:04 00:04 00:05 00:04 00:03

Tiempo total 05:45 05:43 06:02 06:12 07:34

Total descartados 6.359 5.548 4.973 3.520 1.269

Total procesados 18.641 19.452 20.027 21.480 23.731

Recomendaciones obtenidas 8 37 50 108 172

Recomendaciones posibles 173.734.120 189.180.426 200.530.351 230.684.460 281.568.315

Cuadro 5.2: Cuadro comparativo de cada posible parametrización del método IS

En el método IS podemos ver cómo para el dataset analizado, el tiempo total de ejecu-ción va aumentando cuando el umbral es mayor. Es decir, se descartan cada vez menosusuarios, por ende el tiempo de predicciones aumenta, y la cantidad de scores a procesarpara los usuarios será superior. Esto es relativo a la naturaleza del grafo social elegido.Cuanto mayor sea la cantidad total de usuarios procesados, mayor será la posibilidad deobtener nuevas recomendaciones. En el gráfico de columnas de la figura 5.3, podemosver una comparación entre descartados y cantidad de usuarios procesados y la relacióninversamente proporcional que existe entre ambas variables.

95

Page 102: Modelos de procesamiento distribuido basados en Spark para

Cuadro 5.3: Gráfico de barras para el modo IS.

Por último, para finalizar el análisis de los resultados para IS, podemos ver que la distri-bución de los valores de IS de todo nuestro conjunto de datos, continúan con una media-na muy próxima a cero y que el 50% de los usuarios rondan en índices entre [0,0,54), porlo que dicha distribución presenta un importante sesgo hacia la izquierda.

5.5.3. ISW

Para el modo de filtrado ISW, se planeó la etapa experimental asignando cuatro posiblesvalores a la relevancia del usuario, y ver el comportamiento del algoritmo. Podemos vercomo los tiempos de descarte en general, disminuyen conforme el valor del tipo de usua-rio disminuye. Como la frecuencia de aparición de usuarios con índice importante derelevancia, es prácticamente nulo, podemos relacionar los resultados de ejecución de lossiguientes cinco cuadros (5.4 al 5.4) con el histograma de la figura 5.19. El máximo descar-te de usuarios se produce en el intervalo [0,01,0,001], obteniendo importantes cantidadesde usuarios a los que se le da una segunda oportunidad.

96

Page 103: Modelos de procesamiento distribuido basados en Spark para

Figura 5.19: Histograma para el grado de relevancia de usuarios del dataset.

En este método también se puede observar como la cantidad de usuarios procesados ylas recomendaciones obtenidas guardan una relación de linealidad. Y como guardan unarelación inversa con la cantidad de usuarios descartados definitivamente con el métodoISW.

0,5

0,5 0,1 0,01 0,001

Tiempo descarte 00:57 00:54 00:51 00:50

Tiempo predicción arcos 03:06 01:21 00:12 00:06

Tiempo cálculo scores 05:04 04:46 04:46 04:43

Otros 00:06 00:04 00:06 00:04

Tiempo total 09:13 07:05 05:55 05:43

Total descartados ISW 27 741 4374 6315

Total recuperados ISW 6332 5618 1985 44

Total procesados 24973 24259 20626 18685

Recomendaciones obtenidas 170 133 16 6

Recomendaciones posibles 311.812.878 294.237.411 212.705.625 174.555.270

Cuadro 5.4: Cuadro comparativo para ISW con valor de umbral 0.5

97

Page 104: Modelos de procesamiento distribuido basados en Spark para

0,75

0,5 0,1 0,01 0,001

Tiempo descarte 00:55 00:55 00:51 00:50

Tiempo predicción arcos 02:35 01:21 00:14 00:09

Tiempo cálculo scores 05:04 04:39 04:51 04:34

Otros 00:07 00:05 00:05 00:07

Tiempo total 08:41 07:00 06:01 05:40

Total descartados ISW 27 741 4178 5526

Total recuperados ISW 5521 4807 1370 22

Total procesados 24973 24259 20822 19474

Recomendaciones obtenidas 161 125 41 33

Recomendaciones posibles 311.812.878 294.237.411 216.767.431 189.608.601

Cuadro 5.5: Cuadro comparativo para ISW con valor de umbral 0.75

0,85

0,5 0,1 0,01 0,001

Tiempo descarte 00:58 00:53 00:52 00:51

Tiempo predicción arcos 03:09 01:19 00:17 00:14

Tiempo cálculo scores 04:54 04:42 04:41 04:46

Otros 00:05 00:05 00:04 00:05

Tiempo total 09:06 06:59 05:54 05:56

Total descartados ISW 27 740 3941 4958

Total recuperados ISW 4926 4233 1032 15

Total procesados 24973 24260 21059 20042

Recomendaciones obtenidas 171 124 51 41

Recomendaciones posibles 311.812.878 294.261.670 221.730.211 200.830.861

Cuadro 5.6: Cuadro comparativo para ISW con valor de umbral 0.85

98

Page 105: Modelos de procesamiento distribuido basados en Spark para

0,95

0,5 0,1 0,01 0,001

Tiempo descarte 00:57 00:53 00:53 00:55

Tiempo predicción arcos 02:36 01:24 00:35 00:31

Tiempo cálculo scores 05:03 04:54 04:45 04:48

Otros 00:05 00:04 00:04 00:05

Tiempo total 08:41 07:15 06:17 06:19

Total descartados ISW 27 714 3039 3513

Total recuperados ISW 3493 2806 481 7

Total procesados 24973 24286 21961 21487

Recomendaciones obtenidas 177 128 114 114

Recomendaciones posibles 311.812.878 294.892.755 241.131.780 230.834.841

Cuadro 5.7: Cuadro comparativo para ISW con valor de umbral 0.95

0,995

0,5 0,1 0,01 0,001

Tiempo descarte 00:56 00:57 00:54 00:56

Tiempo predicción arcos 02:38 02:00 01:30 01:40

Tiempo cálculo scores 05:00 04:43 04:47 04:43

Otros 00:05 00:04 00:05 00:06

Tiempo total 08:39 07:44 07:16 07:25

Total descartados ISW 21 437 1110 1265

Total recuperados ISW 1248 832 159 4

Total procesados 24979 24563 23890 23735

Recomendaciones obtenidas 167 156 175 109

Recomendaciones posibles 311.962.731 301.658.203 285.354.105 281.663.245

Cuadro 5.8: Cuadro comparativo para ISW con valor de umbral 0.995

En la figura 5.20 podemos ver de forma gráfica la evolución de cada configuración deIS en conjunto con ISW con respecto a los usuarios que finalmente se han procesado,los que fueron descartados, y los que han sido recuperados, según los veinte escenariosexperimentales planteados.

99

Page 106: Modelos de procesamiento distribuido basados en Spark para

Figura 5.20: Gráfico de barras para el modo ISW

5.6. Conclusiones de la experimentación

A lo largo de este capítulo hemos visto en detalle el modelo de ejecución de SparkLP,su integración con Apache Spark, y una explicación de lo que sucede dentro de estapoderosa plataforma a nivel lógico, a través de los DAGs.

La etapa más desafiante de todo este proceso fue el hallazgo de una muestra represen-tativa del conjunto de datos inicial, que nos permitiera realizar un completo análisis delos resultados de las predicciones de enlaces, en tiempos razonablemente medibles y tra-tables, con el fin de poder evaluar cantidad de predicciones obtenidas, y analizar lostiempos que toma cada tarea del sistema en general, en función de los diferentes valo-res asignado a las estrategias propuestas. Sabemos también que estos resultados aquíexpuestos presenten variaciones sustanciales en conjuntos de datos más uniformementedistribuidos. Cuanto mayor es el grado de usuarios con baja interacción, mayor será elpoder computacional o el tiempo requerido para poder generar todo el conjunto final derecomendaciones.

5.6.1. Evolución de la experimentación

Las primeras pruebas realizadas luego de finalizar la implementación de SparkLP consis-tieron en probar el dataset inicialmente propuesto. Esto supone una búsqueda exhaustivade arcos faltantes en un dataset de casi dos millones de usuarios, lo que nos da una no-ción de lo inmensamente grandes que serán las tareas distribuidas. Aún cuando Sparksugiere particionar el dataset en tantos núcleos de CPU como tenga nuestro clúster, estofue notablemente insuficiente, ya que los errores por colapso de memoria occurían en

100

Page 107: Modelos de procesamiento distribuido basados en Spark para

cuestión de segundos de ejecución. Nunca se logró recuperar (collect) la lista de arcosfaltantes. El driver no tenía la capacidad de memoria principal para alojar más de 600 GBde RAM necesarios según nuestros cálculos. En un experimento llevado a cabo, dondese logró elevar a mas de diez mil particiones el conjunto de datos original, el sistema es-tuvo quince horas en ejecución y sólo alcanzó a procesar poco más del veinte porcientodel conjunto total de arcos faltantes del grafo. En consecuencia, podemos estimar que elgrafo de predicciones final, rondaría el orden de TB de tamaño.

Luego se probó dividiendo el grafo a la mitad, siendo conscientes que esto podría ge-nerar resultados inconsistentes por dejar arcos fuera de procesamiento que luego el al-goritmo podía generar como una recomendación. En este caso los errores de memoriaocurrían por ser los resultsets mayores al límite impuesto por Spark (aproximadamente2GB). Esta situación se dio principalmente en cualquier intento de ejecución cuyo valorde particionado del dataset fuera relativamente bajo. Tan sólo pensar que para obtenerlos arcos faltantes Spark agrupará por clave el RDD y enviará un conjunto de usuarioscon sus respectivas listas de seguidores a cada worker en forma de tarea; si el conjuntode usuarios recibido por un worker es lo suficientemente grande para luego tener quegenerar una lista de resultados que no quepa en la memoria del worker, o que sí quepa,pero que resulte mayor al límite de Spark, entonces se producirá una cancelación de latarea y una falla irrecuperable del sistema por colapso de memoria.

Con respecto a la persistencia de Spark, una característica muy importante de éste; nosofrece persistir en almacenamiento secundario los RDD cuando éstos no caben en la me-moria principal. Se probó usar esta característica, pero al tener que procesar arcos dondenecesitamos la totalidad de la información del dataset, la performance se degradó drásti-camente, ya que se gastaba un importante tiempo de programa en E/S, donde sabemosmuy bien que la latencia de disco es millones de veces más lenta que la de la memo-ria principal. Esta opción quedó definitivamente descartada para un escenario donde elvolumen de información era sustancialmente grande. Quizás en Hadoop (u otras plata-formas) no sea necesaria tanta RAM, ya que las operaciones son en disco. En Spark todose realiza en memoria, y lo que no cabe en memoria, se recomputa cuando sea necesario.Es posible definir que algunas particiones se guarden en disco, pero resulta en la mayoríade los casos, rediseñar el algoritmo para evitar usar el disco y aprovechar de una formamás optima el RDD lineage persistiendo en memoria principal solamente los RDD quesean realmente necesarios. A veces pasa que tener un RDD que cuesta mucho tiempo enser creado y entonces es persistido, pero sólo se necesitan algunos de sus datos en trans-formaciones futuras. Entonces, dando una vista al RDD Lineage nos damos cuenta queera otro de la jerarquía quien de verdad tenía que estar cacheado. Con esto nos refería-mos al “inteligente” manejo de la persistencia, puesto que se gana mucha performance ymenor uso de memoria. Persistir sólo los datos que se vayan a usar es la clave aquí, y unfactor que determina el diseño de cualquier algoritmo en Spark.

101

Page 108: Modelos de procesamiento distribuido basados en Spark para

Persistir en disco puede ser útil luego de una compleja y extensa cadena de transfor-maciones. Vale la pena recurrir al disco cuando el tiempo que se tarda en recomputarel RDD desde cero es mucho mayor a la latencia de lectura en disco. Esto dependeráfuertemente de la naturaleza del problema a tratar. Sino, lo que conviene es recomputarya que Spark es muy rápido cuando el resultado proviene de transformaciones in-stage(agrupadas dentro de un único stage, donde se optimizan mediante pipelining). RDDsresultantes de múltiples stages, en un principio podrían guardarse en disco ya que elmulti-stage gasta mucha memoria y tiempo de procesamiento al tener que recomputar.También hay que tener en cuenta el tipo de acción que se vaya a usar como terminal (des-encadenante de la ejecución efectiva), ya que no conviene almacenar en disco si luego seva a recuperar sólo una pequeña porción de los datos (por ejemplo, a través de un take)que se sabe que no recorre el RDD entero. El caché y la persistencia juegan un rol muyimportante en el entorno de Spark por las cuestiones discutidas. Una simple persistenciade un RDD puede acelerar drásticamente la performance global. Durante la etapa expe-rimental, el algoritmo inicialmente desarrollado fue sufriendo diversas modificacionescon respecto a cuáles RDDs eran persistidos y cuáles eran recomputados. Se decidió noutilizar persistencia en disco ya que en todos los escenarios posibles, la performance seveía fuertemente afectada.

Uno de los intentos de obtener un dataset aleatorio, consistió en un algoritmo progra-mado en bash, que iba haciendo un shuffling de cada línea del archivo de forma talque pudieramos obtener una décima parte del conjunto original de datos y usarlo co-mo muestra. Esto fue imposible de procesar debido a que la naturaleza de dichos datosrepresentaban una red social totalmente inconexa, con lo que ya sabemos que este tipode datos son los que más agotan la memoria del clúster. Por esta razón, esta prueba fueabortada y se optó por un algoritmo seleccionador escrito en el lenguaje Scala, como elque ya se describió en la sección de reducción del conjunto de datos.

Con respecto a Information Source, se asume que únicamente se cuenta con el dataset deinteracciones, por lo que al correr el algoritmo, es necesario destinar un tiempo de pre-cálculo para obtener los valores de dicho índice para cada usuario. Esto podría haberseoptimizado si se hubiera utilizado una caché intermedia que realice el cálculo una únicavez. Esto en un principio supone problemas de coherencia, ya que habría que mantenerla caché actualizada, debido a que los usuarios interaccionan constantemente en la redsocial (siguen a nuevas personas y dejan de ser seguidos por personas). La decisión dequé información pertinente al usuario tener guardada y qué información recalcular cuan-do sea necesaria, será una decisión de cada red social. Si se decidiera mantener en unabase de datos la información de seguidores y seguidos de una persona, entonces SparkLPpodría ahorrarse el tiempo de cálculo y acelerar aún más la velocidad de respuesta.

102

Page 109: Modelos de procesamiento distribuido basados en Spark para

Capı́tulo 6Conclusiones

A lo largo de este trabajo final se ha desarrollado un software denominado SparkLP, quesigue el modelo computacional map-reduce. SparkLP está escrito en el lenguaje de pro-gramación Java y utiliza a Apache Spark como motor de ejecución paralelo y distribuido.El objetivo principal de este trabajo consistió en ejecutar algoritmos de predicción de en-laces, evaluar tiempos de ejecución y obtener métricas de usuarios procesados y cantidadde recomendaciones. Por el momento, el software provee soporte para la métrica Com-mon Neighbors, y sus diferentes estrategias de cálculo y filtrado de usuarios.

Durante el presente trabajo no sólo se llevó a cabo la codificación del sistema en cues-tión sino que también fue realizada una tarea de creación de un clúster computacional,debiendo instalar todo el software necesario y realizar las pertinentes configuraciones,a nivel sistema operativo y también a nivel de red, para poder pasar luego a la etapaexperimental.

SparkLP no conoce la topología sobre la cuál va a ejecutarse, puesto que esto es trabajo deSpark. Todas las operaciones sobre el dataset se realizan mediante llamadas a la API deSpark que se encarga internamente de llevar a ejecución el código. La gran dificultad deeste trabajo se basó en escribir el software que haga el mejor uso de los recursos posible,limitado por las especificaciones tecnicas del clúster y por las limitaciones del proppioapache spark.

Cuando programamos software es común pensar que el código compilado logrará ejecu-tarse sin problemas de memoria, ya que solemos contar con una considerable cantidad dememoria principal. En estos casos podemos obtener tiempos de ejecución más o menosaceptables en función de las estructuras de datos que usamos y de cómo las inicializamos.

Cuando se nombra el concepto de garbage collection, raramente nos ponemos a pensaren qué tan costoso puede resultar para nuestro sistema el mal uso, o el uso ineficiente de

103

Page 110: Modelos de procesamiento distribuido basados en Spark para

la memoria. Todo cambia sustancialmente cuando tenemos que procesar grandes canti-dades de información, en órdenes cercanos a GB o incluso hasta TB de tamaño. Con lacomplejidad a veces exponencial de los problemas, los recursos no alcanzan, aún cuandodecidimos ser más conscientes con los recursos y optimizar en todo lo que se pueda. Elhecho de pasar de programar aplicaciones que procesan información simple, a procesargrandes cantidades de información (big data), hace al programador más conservador conel uso de los recursos. Los datos con los que se esta trabaja pueden no caber en la memo-ria de una sola maquina ni de varias, aún cuando se opta por paralelizar y distribuir elproblema. En los problemas de predicción de enlaces, se va obteniendo muchísima canti-dad de información que, duplica triplica o hasta supera en orden exponencial al conjuntode datos inicial. La predicción de enlaces opera sobre grandes cantidades de datos agre-gados. Es entonces que se vuelve imperioso el uso inteligente de la memoria, y de lacomunicación entre los procesos distribuidos, tornándose en la complicación mayor deeste tipo de trabajos.

Podemos pensar que al paralelizar y distribuir tenemos el panorama resuelto, pero estosupone una gran falacia, dado que nos encontramos con muchas limitaciones y proble-mas en este modelo computacional, por lo que hay que seguir midiendo y siendo minu-ciosos con los recursos. No basta con dividir en trabajos y esperar que cada PC terminesu trabajo. Hay que planificar cómo operar sobre el dataset, qué información intermediase va a persistir, cuál es posible recalcular, cuál se puede escribir en disco, y de qué formase va a particionar luego.

El gran problema durante la experimentación pasó por obtener tiempos de cómputo ra-zonables para poder comparar resultados con otras estrategias, y también en tener queacotar la información procesada para que pueda ser procesada sin agotar la memoriadel clúster utilizado. Cierto volumen de información puede ser intratable en el modelode computación clásica, e incluso también en el modelo distribuido por las limitacionestecnológicas.

Una recomendación surge de un trabajo y no de una tarea. Una tarea es una unidad mí-nima de cómputo. Para lograrla es necesario desatar muchos tipos diferentes de tareassobre un grafo para llegar a ella. Esta es la razón por la que cada recomendación posibleserá encomendada como un trabajo (cuentas, intersecciones, uniones, agrupaciones dedatos). En un principio podríamos suponer que el programa driver podría evitar tenerque traer las predicciones desde los workers (arcos faltantes del grafo) a su espacio dememoria y seguir aplicando transformaciones y acciones a dicho conjunto hasta llegar alas recomendaciones buscadas. La respuesta a esto es trivial, puesto que a medida que va-mos encadenando transformaciones, se van creando más etapas, y para eso es necesariamucha mas memoria principal, con la cual no se cuenta. La forma más óptima encontra-da de computar el dataset fue sacrificando performance, haciendo uso de red para traer

104

Page 111: Modelos de procesamiento distribuido basados en Spark para

los resultados parciales de cada etapa importante del programa, y enviando al clústernuevamente aquella información que nos sea de relevancia procesar.

Para grafos sociales muy conexos, el problema se reduce drásticamente puesto que el con-junto de arcos faltantes será menor. Sin embargo, en redes sociales con muchos usuarios ycon poca interacción entre ellos (como se vio en este trabajo), es impracticable correr unaversión de Common Neighbors puro. Habría que pensar en técnicas de agrupamientode usuarios, tomando subconjuntos para realizar las comparaciones y limitarnos a unascientos de comparaciones en lugar de miles de millones. El sistema SparkLP funcionatanto para el conjunto reducido como para el conjunto real (adaptando el entorno expe-rimental por supuesto).

Por último, Spark es una plataforma de propósito general, por lo que no proveé soportepara problemas en específico, limitando las optimizaciones que podemos realizar paraacelerar el proceso, tal como utilizar únicamente listas con iteradores en lugar de ofre-cer estructuras de datos hash para la comparación e intersección de conjuntos, lo cualsabemos que son por su naturaleza más rápidas que las búsquedas secuenciales.

Al fin del período de experimentación se obtuvieron resultados favorables y cualifica-bles. Se pudo ver como en función de las diversas técnicas propuestas, los tiempos deejecución varíaron, el uso de memoria aumentó o disminuyó (mediante el indicador depredicciones posibles) y el tiempo total de programa se repartió de forma diferente a lolargo de las cuatro etapas: filtrado, predicción de arcos, cálculo de scores, y reordena-miento del resultado.

Como conclusión final, es importante añadir que el software escrito en este trabajo, puedeescalar hacia clusters de comptuadoras más grandes y potentes, y procesar el datasetinicialmente propuesto o incluso más grandes; ya que para la experimentación se tomóuna muestra aleatoria y representativa del conjunto de datos original que se adapte alas capacidades de cómputo de nuestro clúster y los resultados fueron exitosos como yahemos podido observar.

6.1. Trabajos Futuros

A partir de este trabajo, surgen nuevas alternativas aquí descriptas:

1. Probar el sistema en Apache Hadoop, y ver su comportamiento con el dataset ori-ginal, al no contar con la restricción de memoria principal que nos impone Spark.

2. Implementar nuevas técnicas de filtrado, como por ejemplo, descartar usuarios cu-yo índice de interacción sea bajo. Recordando que este tipo de usuarios es el quesatura la memoria del clúster.

105

Page 112: Modelos de procesamiento distribuido basados en Spark para

3. Implementar nuevas métricas de similitud de usuarios. En este trabajo, se recurrióa Common Neighbors. Pero existen otras métricas muy interesantes como Jaccard,Graph Distance, entre otras, con las que se podría comparar los resultados de estetrabajo.

4. Inferir datos estadísticamente sobre la media poblacional del dataset original y losdesvíos, como vimos en los diagramas de caja, para los índices de IS, ISW y Gradode Interacción vistos en este trabajo.

106

Page 113: Modelos de procesamiento distribuido basados en Spark para

Bibliografía

Adomavicius, G. and Tuzhilin, A. (2005). Toward the next generation of recommendersystems: A survey of the state-of-the-art and possible extensions. IEEE Transactionson Knowledge and Data Engineering, 17(6):734–749.

Airoldi, E. M., Blei, D. M., Fienberg, S. E., and of Computer Science, E. P. X. S. (2008).Mixed membership stochastic blockmodels. Journal of Machine Learning Research 9.

Armentano, M., Godoy, D., and Amandi, A. (2013). Followee recommendation based ontext analysis of micro-blogging activity. Information Systems, Volume 38 Issue 8:1116–1127.

Armentano, M. G., Godoy, D., and Amandi, A. (2012). Topology-based recommendationof users in micro-blogging communities. Journal of Computer Science and Technology,27(3):624–634.

Aven, J. (2017). Apache Spark in 24hs. Sams.

Bello-Orgaz, G., Jung, J. J., and Camacho, D. (2016). Social big data: Recent achievementsand new challenges. Information Fusion, 28:45–59.

Bobadilla, J., Ortega, F., Hernando, A., and GutiéRrez, A. (2013). Recommender systemssurvey. Knowledge-Based Systems, 46:109–132.

Corbellini, A., Godoy, D., Mateos, C., Schiaffino, S., and Zunino, A. (2017a). DPM: Anovel distributed large-scale social graph processing framework for link predictionalgorithms. Future Generation Computer Systems. In press.

Corbellini, A., Mateos, C., Godoy, D., Zunino, A., and Schiaffino, S. (2015). An architec-ture and platform for developing distributed recommendation algorithms on large-scale social networks. Journal of Information Science, 41(5):686–704.

107

Page 114: Modelos de procesamiento distribuido basados en Spark para

Corbellini, A., Mateos, C., Zunino, A., Godoy, D., and Schiaffino, S. (2017b). PersistingBig Data: The NoSQL landscape. Information Systems, 63:1–23.

Dean, J. and Ghemawat, S. (2004). Mapreduce: Simplified data processing on large clus-ters. In OSDI’04: Sixth Symposium on Operating System Design and Implementation,pages 137–150, San Francisco, CA.

Dean, J. and Ghemawat, S. (2008). MapReduce: Simplified data processing on large clus-ters. Communications of the ACM, 51(1):107–113.

Doval, D. and OMahony, D. (2003). Overlay networks a scalable alternative for p2p.

Garcia, R. and Amatriain, X. (2010). Weighted content based methods for recommendingconnections in online social networks. In Proceedings of the 2nd ACM Workshop onRecommender Systems and the Social Web, pages 68–71, Barcelona, Spain.

García-Gil, D., Ramírez-Gallego, S., García, S., and Herrera, F. (2017). A comparison onscalability for batch big data processing on Apache Spark and Apache Flink. BigData Analytics, 2(1):1:1–1:11.

Ghemawat, S., Gobioff, H., and Leung, S.-T. (2003). The google file system. In Procee-dings of the 19th ACM Symposium on Operating Systems Principles, pages 20–43, BoltonLanding, NY.

Goel, A., Gupta, P., Sirois, J., Wang, D., Sharma, A., and Gurumurthy, S. (2015). The Who-To-Follow system at Twitter: Strategy, algorithms, and revenue impact. Interfaces,45(1):98–107.

Gupta, P., Goel, A., Lin, J., Sharma, A., Wang, D., and Zadeh, R. (2013). WTF: The whoto follow service at Twitter. In Proceedings of the 22th International World Wide WebConference (WWW 2013), pages 505–514, Rio de Janeiro, Brazil.

Heidemann, J., Klier, M., and Probst, F. (2012). Online social networks: A survey of aglobal phenomenon. Computer Networks, 56(18):3866–3878.

Huang, J., S., W., Barrera, J., Matthews, K., and Pan, D. (2005). The hippo signaling path-way coordinately regulates cell proliferation and apoptosis by inactivating yorkie,the drosophila homolog of yap.

Java, A., Song, X., Finin, T., and Tseng, B. (2007). Why we twitter: Understanding mi-croblogging usage and communities. In Procedings of the Joint 9th WEBKDD and 1stSNA-KDD Workshop 2007, pages 56–65. Springer.

Jeh, G. and Widom, J. (2002). SimRank: A measure of structural-context similarity. InProceedings of the 8th ACM SIGKDD International Conference on Knowledge Discoveryand Data Mining (KDD ’02), pages 538–543, Edmonton, Canada.

108

Page 115: Modelos de procesamiento distribuido basados en Spark para

Karau, H., Konwinski, A., Wendell, P., and Zaharia, M. (2015). Learning Spark, a Lightning-fast Data Analisis. OReilly.

Kermarrec, A.-M., Taïani, F., and Tirado Martin, J. M. (2015). Scaling Out Link Predictionwith SNAPLE. In 16th Annual ACM/IFIP/USENIX Middleware Conference, page 12,Vancouver, Canada.

Kleinberg, J. M. (1999). Authoritative sources in a hyperlinked environment. Journal ofthe ACM, 46(5):604–632.

Krishnamurthy, B., Gill, P., and Arlitt, M. (2008a). A few chirps about Twitter. In Procee-dings of the 1st Workshop on Online Social Networks (WOSP’08), pages 19–24, Seattle,WA, USA.

Krishnamurthy, B., Gill, P., and Arlitt, M. (2008b). A few chirps about twitter. In WOSP’08: Proceedings of the first workshop on Online social networks, pages 19–24, New York,NY, USA. ACM.

Laskowski, J. (2016). Mastering Apache Spark. GitBook.

Lempel, R. and Moran, S. (2001). SALSA: The stochastic approach for link-structureanalysis. ACM Transactions on Information Systems, 19(2):131–160.

Liben-Nowell, D. and Kleinberg, J. (2007). The link-prediction problem for socialnetworks. Journal of the American Society for Information Science and Technology,58(7):1019–1031.

Malewicz, G., Austern, M. H., Bik, A. J. C., Dehnert, J. C., Horn, I., Leiser, N., and Czaj-kowski, G. (2010). Pregel: A system for large-scale graph processing. In Proceedings ofthe 2010 International Conference on Management of Data (SIGMOD ’10), pages 135–146,Indianapolis, IN, USA.

Mateos, C., Zunino, A., and Campo, M. (2010). An approach for non-intrusively addingmalleable fork/join parallelism into ordinary JavaBean compliant applications. Com-puter Languages, Systems & Structures, 36(3):288–315.

Mazhari, S., Fakhrahmad, S. M., and Sadeghbeygi, H. (2015). A user-profile-basedfriendship recommendation solution in social networks. Journal of Information Scien-ce, 41(3):284–295.

Newman, M. E. J. (2001). Clustering and preferential attachment in growing networks.Phys. Rev. E, 64:025102.

Page, L., Brin, S., Motwani, R., and Winograd, T. (1999). The PageRank citation ranking:Bringing order to the Web. Technical Report 1999-66, Stanford InfoLab.

109

Page 116: Modelos de procesamiento distribuido basados en Spark para

Ricci, F., Rokach, L., and Shapira, B. (2015). Recommender Systems Handbook. Springer.

Romero, D. M. and Kleinberg, J. M. (2010). The directed closure process in hybrid social-information networks, with an analysis of link formation on Twitter. In Proceedings ofthe 4th International Conference on Weblogs and Social Media (ICWSM 2010), Washing-ton, DC, USA.

Schall, D. (2013). Who to follow recommendation in large-scale online development com-munities. Information and Software Technology, 56(12):1543–1555.

Schall, D. (2015). Social Network-Based Recommender Systems. Springer.

Valiant, L. G. (1990). A bridging model for parallel computation. Communications of theACM, 33(8):103–111.

Weng, J., Lim, E.-P., Jiang, J., and He, Q. (2010). TwitterRank: finding topic-sensitive in-fluential twitterers. In Proceedings of the 3rd ACM International Conference on WebSearch and Data Mining (WSDM’10), pages 261–270, New York, NY, USA.

Xie, X. (2010). Potential friend recommendation in online social network. In Proceedingsof the IEEE/ACM International Conference on Cyber, Physical and Social Computing (CPS-Com) and Green Computing and Communications (GreenCom), pages 831–835, Hangz-hou, China.

Yamaguchi, Y., Takahashi, T., Amagasa, T., and Kitagawa, H. (2010). TURank: Twitter userranking based on user-tweet graph analysis. In Web Information Systems Engineering,volume 6488 of LNCS, pages 240–253, Hong Kong, China.

Yao, L., Wang, L., Pan, L., and Yao, K. (2016). Link prediction based on common-neighbors for dynamic social network. Procedia Computer Science, 83:82–89.

Ying, J. J.-C., Lu, E. H.-C., and Tseng, V. S. (2012). Followee recommendation in asymme-trical location-based social networks. In Proceedings of the 2012 ACM Conference onUbiquitous Computing (UbiComp ’12), pages 988–995, Pittsburgh, USA.

Yu, Y. and Qiu, R. G. (2014). Followee recommendation in microblog using matrix factori-zation model with structural regularization. The Scientific World Journal, page ArticleID 420841.

Zhao, G., Lee, M. L., Hsu, W., Chen, W., and Hu, H. (2013). Community-based user re-commendation in uni-directional social networks. In Proceedings of the 22Nd ACM In-ternational Conference on Conference on Information and Knowledge Management (CIKM’13), pages 189–198, San Francisco, CA, USA.

110

Page 117: Modelos de procesamiento distribuido basados en Spark para

Zikopoulos, P. and Eaton, C. (2011). Understanding big data: Analytics for enterprise classHadoop and streaming data. McGraw-Hill Osborne Media.

111