Añadimos la librería de SparkR y creamos un sc
y sqlContext
local:
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R/lib/"),.libPaths()))
library(SparkR)
library(magrittr)
sc <- sparkR.init(master = "local[*]",appName = "Prueba II")
## Launching java with spark-submit command ./spark-1.5.1-bin-hadoop2.6//bin/spark-submit sparkr-shell /tmp/RtmpJJZ4z7/backend_port31c8571fde21
sqlContext <- sc %>% sparkRSQL.init()
Creamos de nuevo un DataFrame
basado en iris
de nuevo
df_iris <- sqlContext %>% createDataFrame(iris)
Spark es vago (lazy) en ejecución, ¿qué significa esto? Solo se ejecuta las sentencias cuando son estrictamente necesarias. Por ejemplo:
p <- proc.time()
df_filtrado <- df_iris %>% filter(df_iris$Species=="setosa")
proc.time()-p
## user system elapsed
## 0.005 0.000 0.054
Tarda muy poco, porque no ha hecho la operación solo la ha registrado. Usamos una función de acción, por ejemplo contar.
p <- proc.time()
df_filtrado %>% count
## [1] 50
proc.time()-p
## user system elapsed
## 0.004 0.000 1.244
Al ejecutar una función de tipo acción (en esta caso count
) lanza todo los procesos necesarios para conseguir realizar la acción pedida. En este caso sería hacer el filtro y luego contar. Esto se puede ver en Spark UI (la web).
Si volvemos a ejecutar el mismo código exactamente:
p <- proc.time()
df_filtrado %>% count
## [1] 50
proc.time()-p
## user system elapsed
## 0.004 0.000 0.248
Hace exactamente lo mismo, primer filtra y luego cuenta Ya que por defecto no persiste el DataFrame
intermedio aunque lo estemos definiendo como df_filtrado
.
Si queremos persistir un DataFrame
intermedio podemos con la función persist
o su versión más usada cache
.
df_filtrado_cacheado <- df_iris %>%
filter(df_iris$Species=="setosa") %>%
cache
p <- proc.time()
df_filtrado_cacheado %>% count
## [1] 50
proc.time()-p
## user system elapsed
## 0.004 0.000 0.360
p <- proc.time()
df_filtrado_cacheado %>% count
## [1] 50
proc.time()-p
## user system elapsed
## 0.004 0.000 0.203
La primera vez que contamos ejecuta el filtro y guarda el resultado en memoria. Podemos ver que el DataFrame
está persistido en: http://127.0.0.1:4040/storage/. La segunda vez que contamos ya no tiene que hacer el filtro porque ese resultado está ya guardado en memoria.
En general, siempre que vamos a usar un DataFrame
intermedio varias veces merece la pena cachearlo. En el momento en el que no vamos a necesitarlo más, podemos eliminarlo de la memoria con unpersist
df_filtrado_cacheado %>% unpersist()
## DataFrame[Sepal_Length:double, Sepal_Width:double, Petal_Length:double, Petal_Width:double, Species:string]
Por defecto la API de DataFrame de Spark tiene multitud de funciones matemáticas, estadísticas,… Veamos algunos ejemplos con estas funciones.
La función describe
que es similar al summary
de R:
df_iris %>% describe() %>% collect
## summary Sepal_Length Sepal_Width Petal_Length
## 1 count 150 150 150
## 2 mean 5.843333333333335 3.057333333333334 3.7580000000000027
## 3 stddev 0.8253012917851231 0.43441096773547977 1.7594040657752978
## 4 min 4.3 2.0 1.0
## 5 max 7.9 4.4 6.9
## Petal_Width Species
## 1 150 150
## 2 1.199333333333334 <NA>
## 3 0.7596926279021587 <NA>
## 4 0.1 setosa
## 5 2.5 virginica
Podemos operar con estas funciones de forma muy similar al paquete dplyr
:
nuevo <- df_iris %>% select(
.$Sepal_Length %>% log,
.$Species %>% lower %>% alias("bajo"),
lit(Sys.Date() %>% as.character()) %>% to_date() %>% alias("fecha")
) %>%
mutate(
mes=month(.$fecha)
)
nuevo %>% limit(10) %>%
collect()
## LOG(Sepal_Length) bajo fecha mes
## 1 1.629241 setosa 2015-11-02 11
## 2 1.589235 setosa 2015-11-02 11
## 3 1.547563 setosa 2015-11-02 11
## 4 1.526056 setosa 2015-11-02 11
## 5 1.609438 setosa 2015-11-02 11
## 6 1.686399 setosa 2015-11-02 11
## 7 1.526056 setosa 2015-11-02 11
## 8 1.609438 setosa 2015-11-02 11
## 9 1.481605 setosa 2015-11-02 11
## 10 1.589235 setosa 2015-11-02 11
nuevo %>% group_by("bajo") %>%
avg() %>%
collect()
## bajo avg(LOG(Sepal_Length)) avg(mes)
## 1 versicolor 1.777319 11
## 2 setosa 1.608205 11
## 3 virginica 1.880654 11
Más información:
Cuando hemos terminado cerramos Spark:
sparkR.stop()
Este obra está bajo una licencia de Creative Commons Reconocimiento-CompartirIgual 4.0 Internacional.