Introducción

¿Qué es Spark?

spark



“Apache Spark™ is a fast and general engine for large-scale data processing.”

¿Por qué Spark? ¿Qué ventajas tiene? Mi resumen

Hadoop nació (~2005) para procesar grandes cantidades de datos en paralelo. Poco a poco han surgido nuevas problemáticas que no se podían resolver con el paradigma MapReduce y han ido surgiendo nuevos programas para solventar estas problemáticas, creando así nuevos sistemas especializados:

spark

Spark es uno de estos nuevos sistemas, cambia la manera de trabajar internamente (utiliza memoria, RDD,DAG…) y unifica bajo un solo proyecto los grandes problemas de datos hasta el momento: Procesamiento en Batch, en streaming, machine learning, SQL

Además incluye en el mismo proyecto varios lenguajes: Scala, Java, python y R.

¡NO SOLO JAVA!

spark



Spark se ha hecho famoso y todo el BigData mira hacia él. Esto ha hecho que muchas de las aplicaciones ya existentes se hayan hecho compatibles con Spark y que estén surgiendo nuevas enfocadas en trabajar con Spark. Pero además Spark es compatible con Hadoop así que podemos usar el mismo cluster y los mismos datos con uno u otro.

spark

Más información:

SparkR: Usando Spark desde R

SparkR es muy nuevo (junio 2015 en el proyecto oficial), por ahora solo es compatible con la DataFrame API. Esto significa que hoy por hoy solo podemos trabajar con datos estructurados (tipo dplyr) y no se puede paralelizar código en R (R no hace falta que esté instalado en todos los nodos).
Lo bueno: Al entrar a formar parte del proyecto su evolución está garantizada. Para la versión 1.6 se esperan muchas novedades.

Más información:

Primeros pasos con SparkR

Veamos algunos ejemplos usando SparkR en local (solo necesitamos tener R y Java instalado). Descargamos y configuramos para poder usar el paquete de R, seguimos los siguientes pasos:

  1. Descargamos Spark (links arriba)
  2. Descomprimimos el archivo descargado
  3. Introducimos en el archivo .Renviron lo siguiente:

SPARK_HOME = <Directorio donde hemos descomprimido spark>

  1. (Recomendable): En la carpeta $SPARK_HOME/conf copiar log4j.properties.template a log4j.properties y cambiar la siguiente línea:

log4j.rootCategory=INFO, console

por

log4j.rootCategory=ERROR, console

Iniciamos R y comprobamos que la variable de entorno está bien configurada:

Sys.getenv("SPARK_HOME")
## [1] "./spark-1.5.1-bin-hadoop2.6/"



Añadimos la librería de SparkR al libPath y cargamos la librería:

.libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R/lib/"),.libPaths()))
library(SparkR)
library(magrittr)

También vamos a usar la librería magrittr para usar %>% para programar.

Arrancamos Spark, esto lo hacemos creando un Spark Context en la variable sc. En nuestro caso indicamos que queremos arrancar Spark de manera local y con todos los núcleos disponibles.

sc <- sparkR.init(master = "local[*]",appName = "Primera Prueba")
## Launching java with spark-submit command ./spark-1.5.1-bin-hadoop2.6//bin/spark-submit   sparkr-shell /tmp/Rtmpj4ePcb/backend_port1cfb24e0f061

Una vez arrancado Spark, podemos ver la Spark UI en http://localhost:4040:

ui



Inicializamos el sqlContext para poder hacer uso de la DataFrame API

sqlContext <- sc %>% sparkRSQL.init()



Podemos crear un DataFrame de Spark desde un data.frame local:

class(iris)
## [1] "data.frame"
df_iris <- sqlContext %>% createDataFrame(iris)
class(df_iris)
## [1] "DataFrame"
## attr(,"package")
## [1] "SparkR"
df_iris
## DataFrame[Sepal_Length:double, Sepal_Width:double, Petal_Length:double, Petal_Width:double, Species:string]
df_iris %>% head
##   Sepal_Length Sepal_Width Petal_Length Petal_Width Species
## 1          5.1         3.5          1.4         0.2  setosa
## 2          4.9         3.0          1.4         0.2  setosa
## 3          4.7         3.2          1.3         0.2  setosa
## 4          4.6         3.1          1.5         0.2  setosa
## 5          5.0         3.6          1.4         0.2  setosa
## 6          5.4         3.9          1.7         0.4  setosa

La diferencia entre un DataFrame de Spark es que no vive en la memoria de R. Si no en Spark así que podremos trabajar con grandes datasets de manera cómoda, por ejemplo:

df_iris %>% filter("Sepal_Length>7") %>% count
## [1] 12

También podemos registrar el DataFrame y usar SQL:

df_iris %>% registerTempTable("iris")
sqlContext %>% tables %>% collect
##   tableName isTemporary
## 1      iris        TRUE
sqlContext %>% sql("select count(*) from iris where Sepal_Length>7") %>% collect
##   _c0
## 1  12

Cuando hemos terminado cerramos Spark:

sparkR.stop()




Licencia de Creative Commons
Este obra está bajo una licencia de Creative Commons Reconocimiento-CompartirIgual 4.0 Internacional.