Leer datos desde SparkR (CSV)

Ya hemos leído datos desde Parquet, una de las características de estos ficheros es que tienen guardado unos “metadatos”. Estos datos contienen información acerca de la tabla como el número de columnas y de qué tipo (string, integer…) es cada columna.

Cuando leemos datos en texto plano como CSV no tenemos esa información en el propio archivo. Por ejemplo en R cuando usamos la función read.csv y no definimos los tipos de las columnas, los estima basándose en algunas líneas. Esto mismo lo podemos hacer con Spark gracias a un paquete externo: http://spark-packages.org/package/databricks/spark-csv.

Creamos el contexto de Spark pero añadiendo este paquete (se necesita internet)

.libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R/lib/"),.libPaths()))
library(SparkR)
library(magrittr)

sc <- sparkR.init(master = "local[*]",
                  appName = "Prueba IV: CSV",
                  sparkPackages = "com.databricks:spark-csv_2.11:1.2.0")
## Launching java with spark-submit command ./spark-1.5.1-bin-hadoop2.6//bin/spark-submit  --packages com.databricks:spark-csv_2.11:1.2.0 sparkr-shell /tmp/RtmpAC2WU7/backend_port23ef6c3bcc10
sqlContext <- sc %>% sparkRSQL.init()


Probamos si hemos cargado bien el paquete y si se puede leer un csv de manera cómoda:

df1 <- sqlContext %>% read.df("ejemplo_csv/modelo1/1.txt",
                              source = "com.databricks.spark.csv",
                              header="true",
                              inferSchema = "true"
                              )

df1 %>% printSchema()
## root
##  |-- user_id: string (nullable = true)
##  |-- predict: integer (nullable = true)
##  |-- p0: double (nullable = true)
##  |-- p1: double (nullable = true)
df1 %>% head
##                            user_id predict         p0           p1
## 1 dad75b3c39c6bc0f54f7a0cae7a394b1       0 0.99999920 7.994137e-07
## 2 afa7185f4160ae9b643c792da100494f       1 0.21278299 7.872170e-01
## 3 c2f0d1d7a29d46dc0ebe5b001dcd351e       1 0.91548787 8.451213e-02
## 4 fb78a988bd74b4bdad4f524459404af7       0 1.00000000 5.400533e-10
## 5 a07a3ef75a626d2d9de09a3cd7c50dde       0 0.99998901 1.098914e-05
## 6 dcee579d7e4894f628c9272e4fbbc2e8       1 0.05112203 9.488780e-01
df1 %>% count
## [1] 999860

Con la opción inferSchema = "true" conseguimos que automáticamente detecte los tipos de los campos.

Ejercicio

En el siguiente ejercicio vamos a leer y unir varios csv. En la carpeta ejemplo_csv tenemos la salida de tres modelos (ficticios) que se han realizado previamente:

list.files("ejemplo_csv",full.names = T)
## [1] "ejemplo_csv/modelo1" "ejemplo_csv/modelo2" "ejemplo_csv/modelo3"

Para cada una de estas carpetas hay dos archivos con el mismo formato, cada archivo contienen líneas distintas y la unión de los dos forman la salida de cada modelo:

list.files("ejemplo_csv",full.names = T,recursive = T)
## [1] "ejemplo_csv/modelo1/1.txt" "ejemplo_csv/modelo1/2.txt"
## [3] "ejemplo_csv/modelo2/1.txt" "ejemplo_csv/modelo2/2.txt"
## [5] "ejemplo_csv/modelo3/1.txt" "ejemplo_csv/modelo3/2.txt"

Queremos leer todos los archivos y construir un único DataFrame con todas las columnas.

Como hemos visto en el ejercicio anterior, es normal tener los datos en varios archivos y Spark está diseñado para leer todos a la vez. Por ejemplo si leemos la carpeta del modelo1 nos lee todos los archivos que contiene, en este caso son dos pero podrían ser muchos más:

sqlContext %>% read.df("ejemplo_csv/modelo1",
                      source = "com.databricks.spark.csv",
                      header="true",
                      inferSchema = "true"
                      ) %>% count
## [1] 1999731

Vamos a crear una función leer, la variable de entrada será el directorio que tiene que leer, y devolverá un DataFrame resultante de leer los archivos del interior de la carpeta y renombrar las columnas para poder identificarlas después. Por ejemplo para el modelo1, renombramos las columnas de la siguiente manera:

\[ \begin{align*} \text{user_id} &\longrightarrow \text{user_id} \\ \text{predict} &\longrightarrow \text{modelo1_predict} \\ \text{p0} &\longrightarrow \text{modelo1_p0} \\ \text{p1} &\longrightarrow \text{modelo1_p1} \end{align*} \]

leer <- function(x){
  sqlContext %>% read.df(x,
                         source = "com.databricks.spark.csv",
                         header="true",
                         inferSchema = "true"
                        ) %>% 
    withColumnRenamed("predict",paste0(basename(x),"_predict")) %>% 
    withColumnRenamed("p0",paste0(basename(x),"_p0")) %>% 
    withColumnRenamed("p1",paste0(basename(x),"_p1"))
}

Aplicamos la función a las tres carpetas, con lapply:

lista_df <- list.files("ejemplo_csv",full.names = T) %>% lapply(leer)


El resultado es una lista con tres elementos, cada elemento es un DataFrame de Spark con el resultado de leer los ficheros y renombras las columnas:

str(lista_df)
## List of 3
##  $ :Formal class 'DataFrame' [package "SparkR"] with 2 slots
##   .. ..@ env:<environment: 0x3c792a0> 
##   .. ..@ sdf:Class 'jobj' <environment: 0x3c745c8> 
##  $ :Formal class 'DataFrame' [package "SparkR"] with 2 slots
##   .. ..@ env:<environment: 0x4174b48> 
##   .. ..@ sdf:Class 'jobj' <environment: 0x4162040> 
##  $ :Formal class 'DataFrame' [package "SparkR"] with 2 slots
##   .. ..@ env:<environment: 0x3a5f540> 
##   .. ..@ sdf:Class 'jobj' <environment: 0x3a59098>
lista_df[[2]]
## DataFrame[user_id:string, modelo2_predict:int, modelo2_p0:double, modelo2_p1:double]
lista_df[[2]] %>% head
##                            user_id modelo2_predict  modelo2_p0
## 1 dad75b3c39c6bc0f54f7a0cae7a394b1               0 1.000000000
## 2 afa7185f4160ae9b643c792da100494f               1 0.942319838
## 3 c2f0d1d7a29d46dc0ebe5b001dcd351e               1 0.996262538
## 4 fb78a988bd74b4bdad4f524459404af7               0 1.000000000
## 5 a07a3ef75a626d2d9de09a3cd7c50dde               1 0.001128571
## 6 dcee579d7e4894f628c9272e4fbbc2e8               1 0.926135974
##     modelo2_p1
## 1 1.194553e-12
## 2 5.768016e-02
## 3 3.737462e-03
## 4 1.150139e-11
## 5 9.988714e-01
## 6 7.386403e-02


Ahora queremos ir uniendo estos DataFrames por la columnas en común: user_id, hasta conseguir un único DataFrame. Veamos cómo podemos usar la función merge en Spark de manera similar a R:

merge(lista_df[[1]],
     lista_df[[2]],
     lista_df[[1]]$user_id==lista_df[[2]]$user_id
     ) %>% head
##                            user_id modelo1_predict modelo1_p0   modelo1_p1
## 1 00016fafceb91f5a1e6ca85170661ccb               1  0.1058929 8.941071e-01
## 2 00120ac59fc4d6128733a05646f612ba               1  0.9983485 1.651529e-03
## 3 001603e5446f81b3fd62876a89e8c7bb               0  0.9999988 1.211489e-06
## 4 0016526f878ca978dd17d98f653c7187               0  0.9999977 2.347241e-06
## 5 001b6e275952ee97cbe7adf3e2358134               1  0.9963000 3.699982e-03
## 6 00320b8b7840a841ac65449bf4720018               0  1.0000000 1.366172e-10
##                            user_id modelo2_predict   modelo2_p0
## 1 00016fafceb91f5a1e6ca85170661ccb               1 6.345547e-06
## 2 00120ac59fc4d6128733a05646f612ba               1 2.750906e-01
## 3 001603e5446f81b3fd62876a89e8c7bb               1 3.259788e-05
## 4 0016526f878ca978dd17d98f653c7187               0 9.999994e-01
## 5 001b6e275952ee97cbe7adf3e2358134               0 9.997958e-01
## 6 00320b8b7840a841ac65449bf4720018               0 9.995617e-01
##     modelo2_p1
## 1 9.999937e-01
## 2 7.249094e-01
## 3 9.999674e-01
## 4 6.493657e-07
## 5 2.041528e-04
## 6 4.382983e-04

Funciona, pero el campo user_id aparece repetido en el DataFrame resultante. Eso es un problema para después seguir uniendo ya que al estar repetida la columna habrá conflictos al seguir trabajando con este DataFrame (esto está solucionado en la versión de python y scala pero no en la de R por ahora).


Construimos una segunda función unir, que haga el merge que necesitamos, pero que primero renombre una de las columnas user_id y así podamos obtener el DataFrame de la unión pero sin repetir esta columna:

unir <- function(x,y){
  y_aux  <- y %>% withColumnRenamed("user_id","user_id_aux")
  unido  <- x %>% merge(y_aux,x$user_id==y_aux$user_id_aux)
  quiero <- setdiff(names(unido),"user_id_aux")
  unido %>% select(as.list(quiero))
}

Probamos su funcionamiento:

unir(lista_df[[1]],lista_df[[2]])
## DataFrame[user_id:string, modelo1_predict:int, modelo1_p0:double, modelo1_p1:double, modelo2_predict:int, modelo2_p0:double, modelo2_p1:double]

Pata terminar, queremos unir la lista de DataFrame con esta función. En nuestro caso solo tenemos 3 pero queremos hacerlo de una manera que funcione igual para cuando la lista sea más grande. En R este problema podemos solucionarlo con la función Reduce (ver por ejemplo: http://stackoverflow.com/a/17171655).

Con Reduce usamos la función unir de manera recursiva para todos los elementos de la lista.

df_unido <- Reduce(unir,lista_df) %>% cache
df_unido
## DataFrame[user_id:string, modelo1_predict:int, modelo1_p0:double, modelo1_p1:double, modelo2_predict:int, modelo2_p0:double, modelo2_p1:double, modelo3_predict:int, modelo3_p0:double, modelo3_p1:double]
df_unido %>% count()
## [1] 1999731
df_unido %>% head
##                            user_id modelo1_predict modelo1_p0   modelo1_p1
## 1 00016fafceb91f5a1e6ca85170661ccb               1  0.1058929 8.941071e-01
## 2 00120ac59fc4d6128733a05646f612ba               1  0.9983485 1.651529e-03
## 3 001603e5446f81b3fd62876a89e8c7bb               0  0.9999988 1.211489e-06
## 4 0016526f878ca978dd17d98f653c7187               0  0.9999977 2.347241e-06
## 5 001b6e275952ee97cbe7adf3e2358134               1  0.9963000 3.699982e-03
## 6 00320b8b7840a841ac65449bf4720018               0  1.0000000 1.366172e-10
##   modelo2_predict   modelo2_p0   modelo2_p1 modelo3_predict modelo3_p0
## 1               1 6.345547e-06 9.999937e-01               0  0.9998915
## 2               1 2.750906e-01 7.249094e-01               0  0.9986017
## 3               1 3.259788e-05 9.999674e-01               0  0.9999993
## 4               0 9.999994e-01 6.493657e-07               0  0.9999964
## 5               0 9.997958e-01 2.041528e-04               0  0.9999977
## 6               0 9.995617e-01 4.382983e-04               0  0.9999683
##     modelo3_p1
## 1 1.085172e-04
## 2 1.398254e-03
## 3 6.588702e-07
## 4 3.570069e-06
## 5 2.285370e-06
## 6 3.167008e-05

De este modo hemos conseguido leer todos los archivos y conseguir un único DataFrame con toda la información.


Para terminar, podemos usar crosstab para hacer una tabla de conteo. Por ejemplo entre las predicciones del modelo1 y las del modelo2, y después hacer un gráfico:

conteo <- df_unido %>% crosstab("modelo1_predict","modelo2_predict")
conteo
##   modelo1_predict_modelo2_predict      0      1
## 1                               1 327068 655753
## 2                               0 595172 421738

Manipulamos el data.frame local para usar la función mosaic:

rownames(conteo) <- paste0("modelo1_",conteo[,1])
conteo <- conteo[,-1]
colnames(conteo) <- paste0("modelo2_",colnames(conteo))

ordeno:

conteo<-conteo[order(rownames(conteo)),order(colnames(conteo))]
conteo
##           modelo2_0 modelo2_1
## modelo1_0    595172    421738
## modelo1_1    327068    655753

y grafico:

mosaicplot(conteo,color=TRUE,main = "modelo1 vs modelo2")

Cuando hemos terminado cerramos Spark:

sparkR.stop()




Licencia de Creative Commons
Este obra está bajo una licencia de Creative Commons Reconocimiento-CompartirIgual 4.0 Internacional<