Añadimos la librería de SparkR y creamos un sc y sqlContext local:
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R/lib/"),.libPaths()))
library(SparkR)
library(magrittr)
sc <- sparkR.init(master = "local[*]",appName = "Prueba II")
## Launching java with spark-submit command ./spark-1.5.1-bin-hadoop2.6//bin/spark-submit sparkr-shell /tmp/RtmpJJZ4z7/backend_port31c8571fde21
sqlContext <- sc %>% sparkRSQL.init()
Creamos de nuevo un DataFrame basado en iris de nuevo
df_iris <- sqlContext %>% createDataFrame(iris)
Spark es vago (lazy) en ejecución, ¿qué significa esto? Solo se ejecuta las sentencias cuando son estrictamente necesarias. Por ejemplo:
p <- proc.time()
df_filtrado <- df_iris %>% filter(df_iris$Species=="setosa")
proc.time()-p
## user system elapsed
## 0.005 0.000 0.054
Tarda muy poco, porque no ha hecho la operación solo la ha registrado. Usamos una función de acción, por ejemplo contar.
p <- proc.time()
df_filtrado %>% count
## [1] 50
proc.time()-p
## user system elapsed
## 0.004 0.000 1.244
Al ejecutar una función de tipo acción (en esta caso count) lanza todo los procesos necesarios para conseguir realizar la acción pedida. En este caso sería hacer el filtro y luego contar. Esto se puede ver en Spark UI (la web).
Si volvemos a ejecutar el mismo código exactamente:
p <- proc.time()
df_filtrado %>% count
## [1] 50
proc.time()-p
## user system elapsed
## 0.004 0.000 0.248
Hace exactamente lo mismo, primer filtra y luego cuenta Ya que por defecto no persiste el DataFrame intermedio aunque lo estemos definiendo como df_filtrado.
Si queremos persistir un DataFrame intermedio podemos con la función persist o su versión más usada cache.
df_filtrado_cacheado <- df_iris %>%
filter(df_iris$Species=="setosa") %>%
cache
p <- proc.time()
df_filtrado_cacheado %>% count
## [1] 50
proc.time()-p
## user system elapsed
## 0.004 0.000 0.360
p <- proc.time()
df_filtrado_cacheado %>% count
## [1] 50
proc.time()-p
## user system elapsed
## 0.004 0.000 0.203
La primera vez que contamos ejecuta el filtro y guarda el resultado en memoria. Podemos ver que el DataFrame está persistido en: http://127.0.0.1:4040/storage/. La segunda vez que contamos ya no tiene que hacer el filtro porque ese resultado está ya guardado en memoria.
En general, siempre que vamos a usar un DataFrame intermedio varias veces merece la pena cachearlo. En el momento en el que no vamos a necesitarlo más, podemos eliminarlo de la memoria con unpersist
df_filtrado_cacheado %>% unpersist()
## DataFrame[Sepal_Length:double, Sepal_Width:double, Petal_Length:double, Petal_Width:double, Species:string]
Por defecto la API de DataFrame de Spark tiene multitud de funciones matemáticas, estadísticas,… Veamos algunos ejemplos con estas funciones.
La función describe que es similar al summary de R:
df_iris %>% describe() %>% collect
## summary Sepal_Length Sepal_Width Petal_Length
## 1 count 150 150 150
## 2 mean 5.843333333333335 3.057333333333334 3.7580000000000027
## 3 stddev 0.8253012917851231 0.43441096773547977 1.7594040657752978
## 4 min 4.3 2.0 1.0
## 5 max 7.9 4.4 6.9
## Petal_Width Species
## 1 150 150
## 2 1.199333333333334 <NA>
## 3 0.7596926279021587 <NA>
## 4 0.1 setosa
## 5 2.5 virginica
Podemos operar con estas funciones de forma muy similar al paquete dplyr:
nuevo <- df_iris %>% select(
.$Sepal_Length %>% log,
.$Species %>% lower %>% alias("bajo"),
lit(Sys.Date() %>% as.character()) %>% to_date() %>% alias("fecha")
) %>%
mutate(
mes=month(.$fecha)
)
nuevo %>% limit(10) %>%
collect()
## LOG(Sepal_Length) bajo fecha mes
## 1 1.629241 setosa 2015-11-02 11
## 2 1.589235 setosa 2015-11-02 11
## 3 1.547563 setosa 2015-11-02 11
## 4 1.526056 setosa 2015-11-02 11
## 5 1.609438 setosa 2015-11-02 11
## 6 1.686399 setosa 2015-11-02 11
## 7 1.526056 setosa 2015-11-02 11
## 8 1.609438 setosa 2015-11-02 11
## 9 1.481605 setosa 2015-11-02 11
## 10 1.589235 setosa 2015-11-02 11
nuevo %>% group_by("bajo") %>%
avg() %>%
collect()
## bajo avg(LOG(Sepal_Length)) avg(mes)
## 1 versicolor 1.777319 11
## 2 setosa 1.608205 11
## 3 virginica 1.880654 11
Más información:
Cuando hemos terminado cerramos Spark:
sparkR.stop()
Este obra está bajo una licencia de Creative Commons Reconocimiento-CompartirIgual 4.0 Internacional.