análisis de datos con apache spark
TRANSCRIPT
Introducción a Apache SparkAnálisis de Datos con R
Ing. Eduardo Castro, [email protected]
Referencias
▪ Using Apache Spark. Pat McDonough – Databricks
▪ Spark in the Hadoop Ecosystem. Eric Baldeschwieler
Apache Spark
spark.incubator.apache.org
github.com/ Apache / incubator-
spark
Apache Spark
Introducción a Spark
¿Qué es Spark?
▪ No es una versión modificada de Hadoop
▪ Independiente, rápido, MapReduce
▪ Almacenamiento en memoria para consultas iterativas muy rápidas
▪ Hasta 40x más rápido que Hadoop
▪ Compatible con el API Hadoop con respecto al almacenamiento
▪ Se puede leer / escribir en cualquier sistema soportado por Hadoop, incluyendo HDFS, HBase,SequenceFiles,etc
Eficiente
• Almacenamiento en memoria
Utilizable
• API en Java, Scala, Python
• Shell Interactivo
Qué es Spark?Fast and Expressive Cluster Computing System
Compatible con Apache Hadoop
Evolución de Spark
• 2008 - Yahoo! equipo de Hadoop inicia colaboración con laboratorio de Berkeley Amp / Rad
• 2009 - Ejemplo de Spark construido para Nexus -> Mesos
• 2011 - "Spark está 2 años por delante de cualquier cosa en Google"
• 2012 - Yahoo! Trabaja con Spark / Shark
• Hoy - Muchas historias de éxito
Usuarios Spark
Actualizaciones Spark Hadoop
• Hardware ha mejorardo desde que Hadoop comenzó:
• Gran cantidad de RAM, redes más rápidas (10 Gb +)
• Ancho de banda de los discos no se manteniene al día
• MapReduce es incómodo para las cargas de trabajo:
Spark, "lengua franca?"
• Soporte para muchas técnicas de desarrollo
• SQL, Streaming, Graph & in memory, MapReduce
• Escriba "UDF" una vez y utilizar en todos los contextos
• Pequeño, sencillo y elegante API
• Fácil de aprender y utilizar; expresivo y extensible
• Retiene ventajas de MapReduce (tolerancia a fallos ...)
Conceptos
• Colecciones de objetos se propagan en un
cluster, almacenada en RAM o en Disco
• Construido a través de
transformaciones paralelas
• Automáticamente reconstruido en caso de falla
• Transformations (e.g.
map, filter, groupBy)
• Acciones
(e.g. count, collect, save)
Escribir programas en términos de transformaciones en
conjuntos de datos distribuido
Resilient Distributed Datasets Operationes
Intercambio de Datos en MapReduce
iter. 1 iter. 2 . . .
Entrada
HDFSleer
HDFSescribir
HDFSleer
HDFSescribir
Entrada
consulta 1
consulta 2
consulta 3
resultado 1
resultado 2
número 3
. . .
HDFSleer
Lento debido a la replicación, la serialización, y el disco IO
iter. 1 iter. 2 . . .
Entrada
Intercambio de Datos en Spark
Repartidomemoria
Entrada
consulta 1
pregunta 2
consulta 3
. . .
una veztratamiento
10-100× más rápido que la red y el disco
Prueba de rendimiento de SortHadoop MR
Record (2013)Spark
Record (2014)
Data Size 102.5 TB 100 TB
Elapsed Time 72 mins 23 mins
# Nodes 2100 206
# Cores 50400 physical 6592 virtualized
Cluster disk throughput
3150 GB/s(est.)
618 GB/s
Networkdedicated data center, 10Gbps
virtualized (EC2) 10Gbps network
Sort rate 1.42 TB/min 4.27 TB/min
Sort rate/node 0.67 GB/min 20.7 GB/min
Sort benchmark, Daytona Gray: sort of 100 TB of data (1 trillion records)
http://databricks.com/blog/2014/11/05/spark-officially-sets-a-new-record-in-large-scale-
sorting.html
Spark, 3x
más rápido
1/10 de
nodos
Combina streaming y análisiscomplejos
From single machines to distributed computingEscalar Spark en Cluster
Spark clusters in Azure HDInsightSpark en Azure y Blob Storage
Fuentes de datos
Spark con HDFS
Stack para aplicaciones
Spark dentro de Hortonworks
Interacción con Yarn
Programación en Spark
▪ Idea clave: conjuntos de datos distribuidos elásticos (DDR)
▪ Colecciones distribuidas de objetos que pueden ser almacenadas en caché en la memoria a través de los nodos del clúster
▪ Manipulada a través de diversos operadores paralelos
▪ Reconstruida automáticamente en caso de fallo
▪ Interfaz
▪ Lenguaje integrado Clean API Scala
▪ Puede ser usado interactivamente desde Scala
Trabajo con RDDs
RDDRDD
RDDRDD
Transformations
Action Value
linesWithSpark = textFile.filter(lambda line: "Spark” in line)
linesWithSpark.count() 74
linesWithSpark.first()# Apache Spark
textFile = sc.textFile(”SomeFile.txt”)
DDR: Distribuidos
● Datos no tienen que estar en una misma máquina
● Los datos se separa en particiones
○ Si necesitamos podemos operar en nuestra partición de datos
al mismo tiempo
Trabajador
Worker
Worker
DriverBlock 1
Block 2
Block 3
Ejemplos: Log MiningCargar mensajes de error en memoria y buscar patrones
Worker
Worker
Worker
Driver
val lines = spark.textFile("hdfs://...")
Ejemplos: Log Mining
Worker
Worker
Worker
Driver
val lines = spark.textFile("hdfs://...")
Cargar mensajes de error en memoria y buscar
patrones
RDD Base
Ejemplos: Log MiningCargar mensajes de error en memoria y buscar patrones
val lines = spark.textFile("hdfs://...")
val errors = lines.filter(_.startsWith("ERROR"))
Worker
Worker
Worker
Driver
Ejemplos: Log Mining
val lines = spark.textFile("hdfs://...")
val errors = lines.filter(_.startsWith("ERROR"))
Worker
Worker
Worker
Driver
Cargar mensajes de error en memoria y buscar
patronesRDD Transformado
Ejemplos: Log MiningCargar mensajes de error en memoria y buscar patrones
val lines = spark.textFile("hdfs://...")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split('\t´)(2))
Worker
Driver
messages.filter(_.contains("mysql")).count()
Worker
Worker
Ejemplos: Log MiningCargar mensajes de error en memoria y buscar patrones
val lines = spark.textFile("hdfs://...")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split('\t´)(2))
messages.cache()
Worker
Driver
messages.filter(_.contains("mysql")).count()
Worker
Worker
Poner RDD en cache
Ejemplos: Log MiningCargar mensajes de error en memoria y buscar patrones
val lines = spark.textFile("hdfs://...")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split('\t´)(2))
messages.cache()
Worker
Worker
Worker
Driver
messages.filter(_.contains("mysql")).count()Acción
Ejemplos: Log MiningCargar mensajes de error en memoria y buscar patrones
val lines = spark.textFile("hdfs://...")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split('\t´)(2))
messages.cache()
Worker
Worker
Driver
messages.filter(_.contains("mysql")).count()
Worker
Block 1
Block 2
Block 3
Ejemplos: Log MiningCargar mensajes de error en memoria y buscar patrones
val lines = spark.textFile("hdfs://...")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split('\t´)(2))
messages.cache()
Worker
Worker
Worker
Block 1
Block 2
Block 3
Drivertasks
tasks
messages.filter(_.contains("mysql")).count()
tasks
Ejemplos: Log MiningCargar mensajes de error en memoria y buscar patrones
val lines = spark.textFile("hdfs://...")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split('\t´)(2))
messages.cache()
Worker
Worker
messages.filter(_.contains("mysql")).count()
Worker
Block 1
Block 2
Block 3
Driver
Read
HDFS
Block
Read
HDFS
Block
Read
HDFS
Block
Ejemplos: Log MiningCargar mensajes de error en memoria y buscar patrones
val lines = spark.textFile("hdfs://...")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split('\t´)(2))
messages.cache()
Worker
Worker
Worker
messages.filter(_.contains("mysql")).count()
Block 1
Block 2
Block 3
Driver
Cache 1
Cache 2
Cache 3
Process
& Cache
Data
Process
& Cache
Data
Process
& Cache
Data
Ejemplos: Log MiningCargar mensajes de error en memoria y buscar patrones
val lines = spark.textFile("hdfs://...")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split('\t´)(2))
messages.cache()
Worker
Worker
Worker
messages.filter(_.contains("mysql")).count()
Block 1
Block 2
Block 3
Driver
Cache 1
Cache 2
Cache 3
results
results
results
Ejemplos: Log MiningCargar mensajes de error en memoria y buscar patrones
val lines = spark.textFile("hdfs://...")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split('\t´)(2))
messages.cache()
Worker
Worker
Worker
Block 1
Block 2
Block 3
Driver
Cache 1
Cache 2
Cache 3
messages.filter(_.contains("mysql")).count()
messages.filter(_.contains("php")).count()
Ejemplos: Log MiningCargar mensajes de error en memoria y buscar patrones
val lines = spark.textFile("hdfs://...")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split('\t´)(2))
messages.cache()
Worker
Worker
Worker
Block 1
Block 2
Block 3
Cache 1
Cache 2
Cache 3
messages.filter(_.contains("mysql")).count()
messages.filter(_.contains("php")).count()
tasks
tasks
tasks
Driver
Ejemplos: Log MiningCargar mensajes de error en memoria y buscar patrones
val lines = spark.textFile("hdfs://...")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split('\t´)(2))
messages.cache()
Worker
Worker
Worker
Block 1
Block 2
Block 3
Cache 1
Cache 2
Cache 3
messages.filter(_.contains("mysql")).count()
messages.filter(_.contains("php")).count()
Driver
Process
from
Cache
Process
from
Cache
Process
from
Cache
Ejemplos: Log MiningCargar mensajes de error en memoria y buscar patrones
val lines = spark.textFile("hdfs://...")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split('\t´)(2))
messages.cache()
Worker
Worker
Worker
Block 1
Block 2
Block 3
Cache 1
Cache 2
Cache 3
messages.filter(_.contains("mysql")).count()
messages.filter(_.contains("php")).count()
Driver
results
results
results
Ejemplos: Log MiningCargar mensajes de error en memoria y buscar patrones
val lines = spark.textFile("hdfs://...")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split('\t´)(2))
messages.cache()
Worker
Worker
Worker
Block 1
Block 2
Block 3
Cache 1
Cache 2
Cache 3
messages.filter(_.contains("mysql")).count()
messages.filter(_.contains("php")).count()
Driver
Poner datos en cache ³ Resultados más rápidos
1 TB de datos de log
• 5-7 sec desde cache vs. 170s desde disco
Interactive Shell
• Forma rápida de Aprender Spark
• Disponible enPython y Scala
• Se ejecuta como un aplicación en una existente SparkCluster ...
Administrative GUIsH5P: // <Standalone Master>: 8080 (porpredeterminado)
Source: http://spark.rstudio.com/
• Instalación mediante devtools
• Carga datos en Spark DataFramesdesde: data frames locales de R, Hive tables, CSV, JSON.
• Conectar al instancias locales de Spark y Cluster Remotos de Spark
sparklyr: R interface para Apache Spark
dplyr and ML in sparklyr
• Incluye 3 familias de funciones para machine learning• ml_*: Machine learning algorithms para analizar datos en el paquete spark.ml.
• K-Means, GLM, LR, Survival Regression, DT, RF, GBT, PCA, Naive-Bayes, Multilayer Perceptron
• ft_*: Feature transformers para manipulación de features individuales.
• sdf_*: Funciones para manipulación de SparkDataFrames.
• Provee un backend dplyr para manipulación, análisis y visualizaciónde datos
%>%
dplyr con ML en sparklyr
R Server: scale-out R, Enterprise
▪ 100% compatible con el open source R
▪ Se puede ejecutar en paralelo funciones R▪ Ideal para parameter sweeps, simulation, scoring.
▪ Funciones distribuidas y escalables “rx” dentro del paquete“RevoScaleR”.
▪ Transformaciones: rxDataStep()
▪ Estadísticas: rxSummary(), rxQuantile(), rxChiSquaredTest(), rxCrossTabs()…
▪ Algoritmos: rxLinMod(), rxLogit(), rxKmeans(), rxBTrees(), rxDForest()…
▪ Paralelismo: rxSetComputeContext()
Deep Learning con Spark
Deep Learning con Spark
R Server Hadoop Architecturehitecture
R R R R
R
R R R R
R
Microsoft R Server
Proceso R Maestro en Edge Node
Apache YARN con Spark
Proceso R Worker R en Data Nodes
Datos en almacenamiento distribuido
Procesos R en Edge Node
Limpiar datos con SparkR dentro de R Server
Train, Score, Evaluate por medio del R Server
Publicar el Web Service desde R
TRABAJO CON SPARK
Anexos
Uso del Shell
Modes:
MASTER=local ./spark-shellMASTER=local[2] ./spark-shell
# local, 1# local, 2
thread threads
MASTER=spark://host:port ./spark-shell # cluster
Launching:
spark-shell pyspark (IPYTHON=1)
Creación de RDDs
# Load text file from local FS, HDFS, or S3> sc.textFile(“file.txt”)> sc.textFile(“directory/*.txt”)> sc.textFile(“hdfs://namenode:9000/path/file”)
# Use existing Hadoop InputFormat (Java/Scala only)> sc.hadoopFile(keyClass, valClass, inputFmt, conf)
# Turn a Python collection into an RDD> sc.parallelize([1, 2, 3])
Transformaciones básicas
> nums = sc.parallelize([1, 2, 3])
# Pass each element through a function> squares = nums.map(lambda x: x*x) // {1, 4, 9}
zero or more others# Map each element to> nums.flatMap(lambda
> # => {0, 0, 1, 0, 1,x: => range(x))2}
Range object (sequence of numbers 0, 1, …, x-‐1)
# Keep elements passing a predicate> even = squares.filter(lambda x: x % 2 == 0) // {4}
Acciones base
> nums = sc.parallelize([1, 2, 3])
# Retrieve RDD contents as> nums.collect() # => [1,
a local collection 2, 3]
# Count number of elements> nums.count() # => 3
# Merge elements with function> nums.reduce(lambda
an associative x, y: x + y) # => 6
# Write elements to a text file> nums.saveAsTextFile(“hdfs://file.txt”)
# Return first K elements> nums.take(2) # => [1, 2]
Trabajo con Key-Value Pairs
Spark’s “distributed reduce” transformaciones para
operar sobre RDDs de key-value pairs
Python: pair = (a, b)pair[0] # => a pair[1] # => b
Scala: val pair = (a, b)
pair._1 // => a pair._2 // => b
Java: Tuple2 pair = new Tuple2(a, b);
pair._1 // => apair._2 // => b
Operaciones Key-Value
> pets = sc.parallelize( [(“cat”, 1), (“dog”, 1), (“cat”, 2)])
> pets.sortByKey() # => {(cat, 1), (cat, 2), (dog, 1)}
reduceByKey implementa combinadores del
map
> pets.reduceByKey(lambda x, y:# => {(cat,
x + 3),
y)(dog, 1)}
> pets.groupByKey() # => {(cat, [1, 2]), (dog, [1])}
> lines = sc.textFile(“hamlet.txt”)
> counts = lines.flatMap(lambda line: line.split(“ ”)).map(lambda word => (word, 1)).reduceByKey(lambda x, y: x + y)
Ejemplo: Word Count
“not to be” “to”“be”
(to, 1)(be, 1)
(be, 2)(not, 1)
(or, 1)(to, 2)
“to be or”“to”“be”
(to, 1)(be, 1)
“or” (or, 1)
“not” (not, 1)
Key-Value Operaciones Adiconales
> visits = sc.parallelize([ (“index.html”,
(“about.html”,(“index.html”,
“1.2.3.4”),“3.4.5.6”),“1.3.3.1”) ])
> pageNames = sc.parallelize([ (“index.html”,(“about.html”,
“Home”),“About”) ])
>visits.join(pageNames
)
> visits.cogroup(pageNames)
# (“index.html”, ([“1.2.3.4”, “1.3.3.1”],[“Home”]))# (“about.html”, ([“3.4.5.6”], [“About”]))
##(“index.html”,(“index.html”,
(“1.2.3.4”,(“1.3.3.1”,
“Home”))“Home”))
# (“about.html”, (“3.4.5.6”, “About”))
Establecer el nivel de paralelistmo
Todas las operaciones RDD tienen un
parámetros para establecer la cantidad de
tareasy: x + y, 5)> words.reduceByKey(lambda x,
> words.groupByKey(5)
> visits.join(pageViews, 5)
Ajuste de la Nivel de Paralelismo
Todos la par RDD operaciones tomar una
opcional segundo parámetro para número
de tareasy: x + y, 5)>palabras.reduceByKey(lambda
X,
> palabras.groupByKey(5)
>visitas.unirse(Páginas
vistas, 5)
RDD Operadores Adicionales
• map
• filter
• groupBy
• sort
• union
• join
• leftOuterJoin
• rightOuterJoin
• reduce
• count
• fold
• reduceByKey
• groupByKey
• cogroup
• cross
• zip
sample
take
first
partitionBy
mapWith
pipe
save ...