Entendiendo (un poco) Spark

Añadimos la librería de SparkR y creamos un sc y sqlContext local:

.libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R/lib/"),.libPaths()))
library(SparkR)
library(magrittr)
sc <- sparkR.init(master = "local[*]",appName = "Prueba II")
## Launching java with spark-submit command ./spark-1.5.1-bin-hadoop2.6//bin/spark-submit   sparkr-shell /tmp/RtmpJJZ4z7/backend_port31c8571fde21
sqlContext <- sc %>% sparkRSQL.init()



Creamos de nuevo un DataFrame basado en iris de nuevo

df_iris <- sqlContext %>% createDataFrame(iris)

Spark es vago (lazy) en ejecución, ¿qué significa esto? Solo se ejecuta las sentencias cuando son estrictamente necesarias. Por ejemplo:

  p <- proc.time()
  df_filtrado <- df_iris %>% filter(df_iris$Species=="setosa")
  proc.time()-p
##    user  system elapsed 
##   0.005   0.000   0.054

Tarda muy poco, porque no ha hecho la operación solo la ha registrado. Usamos una función de acción, por ejemplo contar.

p <- proc.time()
df_filtrado %>% count
## [1] 50
proc.time()-p
##    user  system elapsed 
##   0.004   0.000   1.244

Al ejecutar una función de tipo acción (en esta caso count) lanza todo los procesos necesarios para conseguir realizar la acción pedida. En este caso sería hacer el filtro y luego contar. Esto se puede ver en Spark UI (la web).


Si volvemos a ejecutar el mismo código exactamente:

p <- proc.time()
df_filtrado %>% count
## [1] 50
proc.time()-p
##    user  system elapsed 
##   0.004   0.000   0.248

Hace exactamente lo mismo, primer filtra y luego cuenta Ya que por defecto no persiste el DataFrame intermedio aunque lo estemos definiendo como df_filtrado.

Si queremos persistir un DataFrame intermedio podemos con la función persist o su versión más usada cache.

df_filtrado_cacheado <- df_iris %>% 
                          filter(df_iris$Species=="setosa") %>%
                          cache

p <- proc.time()
df_filtrado_cacheado %>% count
## [1] 50
proc.time()-p
##    user  system elapsed 
##   0.004   0.000   0.360
p <- proc.time()
df_filtrado_cacheado %>% count
## [1] 50
proc.time()-p
##    user  system elapsed 
##   0.004   0.000   0.203

La primera vez que contamos ejecuta el filtro y guarda el resultado en memoria. Podemos ver que el DataFrame está persistido en: http://127.0.0.1:4040/storage/. La segunda vez que contamos ya no tiene que hacer el filtro porque ese resultado está ya guardado en memoria.

En general, siempre que vamos a usar un DataFrame intermedio varias veces merece la pena cachearlo. En el momento en el que no vamos a necesitarlo más, podemos eliminarlo de la memoria con unpersist

df_filtrado_cacheado %>% unpersist()
## DataFrame[Sepal_Length:double, Sepal_Width:double, Petal_Length:double, Petal_Width:double, Species:string]

Funciones predefinidas en Spark

Por defecto la API de DataFrame de Spark tiene multitud de funciones matemáticas, estadísticas,… Veamos algunos ejemplos con estas funciones.

La función describe que es similar al summary de R:

df_iris %>% describe() %>% collect
##   summary       Sepal_Length         Sepal_Width       Petal_Length
## 1   count                150                 150                150
## 2    mean  5.843333333333335   3.057333333333334 3.7580000000000027
## 3  stddev 0.8253012917851231 0.43441096773547977 1.7594040657752978
## 4     min                4.3                 2.0                1.0
## 5     max                7.9                 4.4                6.9
##          Petal_Width   Species
## 1                150       150
## 2  1.199333333333334      <NA>
## 3 0.7596926279021587      <NA>
## 4                0.1    setosa
## 5                2.5 virginica

Podemos operar con estas funciones de forma muy similar al paquete dplyr:

nuevo <- df_iris %>% select(
                        .$Sepal_Length %>% log,
                        .$Species %>% lower %>% alias("bajo"),
                        lit(Sys.Date() %>% as.character()) %>% to_date() %>% alias("fecha")
                      ) %>% 
                     mutate(
                      mes=month(.$fecha)
                     )

nuevo %>% limit(10) %>% 
      collect()
##    LOG(Sepal_Length)   bajo      fecha mes
## 1           1.629241 setosa 2015-11-02  11
## 2           1.589235 setosa 2015-11-02  11
## 3           1.547563 setosa 2015-11-02  11
## 4           1.526056 setosa 2015-11-02  11
## 5           1.609438 setosa 2015-11-02  11
## 6           1.686399 setosa 2015-11-02  11
## 7           1.526056 setosa 2015-11-02  11
## 8           1.609438 setosa 2015-11-02  11
## 9           1.481605 setosa 2015-11-02  11
## 10          1.589235 setosa 2015-11-02  11
nuevo %>% group_by("bajo") %>%
          avg() %>% 
          collect()
##         bajo avg(LOG(Sepal_Length)) avg(mes)
## 1 versicolor               1.777319       11
## 2     setosa               1.608205       11
## 3  virginica               1.880654       11

Más información:

Cuando hemos terminado cerramos Spark:

sparkR.stop()




Licencia de Creative Commons
Este obra está bajo una licencia de Creative Commons Reconocimiento-CompartirIgual 4.0 Internacional.