Ya hemos leído datos desde Parquet, una de las características de estos ficheros es que tienen guardado unos “metadatos”. Estos datos contienen información acerca de la tabla como el número de columnas y de qué tipo (string
, integer
…) es cada columna.
Cuando leemos datos en texto plano como CSV no tenemos esa información en el propio archivo. Por ejemplo en R cuando usamos la función read.csv
y no definimos los tipos de las columnas, los estima basándose en algunas líneas. Esto mismo lo podemos hacer con Spark gracias a un paquete externo: http://spark-packages.org/package/databricks/spark-csv.
Creamos el contexto de Spark pero añadiendo este paquete (se necesita internet)
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R/lib/"),.libPaths()))
library(SparkR)
library(magrittr)
sc <- sparkR.init(master = "local[*]",
appName = "Prueba IV: CSV",
sparkPackages = "com.databricks:spark-csv_2.11:1.2.0")
## Launching java with spark-submit command ./spark-1.5.1-bin-hadoop2.6//bin/spark-submit --packages com.databricks:spark-csv_2.11:1.2.0 sparkr-shell /tmp/RtmpAC2WU7/backend_port23ef6c3bcc10
sqlContext <- sc %>% sparkRSQL.init()
Probamos si hemos cargado bien el paquete y si se puede leer un csv de manera cómoda:
df1 <- sqlContext %>% read.df("ejemplo_csv/modelo1/1.txt",
source = "com.databricks.spark.csv",
header="true",
inferSchema = "true"
)
df1 %>% printSchema()
## root
## |-- user_id: string (nullable = true)
## |-- predict: integer (nullable = true)
## |-- p0: double (nullable = true)
## |-- p1: double (nullable = true)
df1 %>% head
## user_id predict p0 p1
## 1 dad75b3c39c6bc0f54f7a0cae7a394b1 0 0.99999920 7.994137e-07
## 2 afa7185f4160ae9b643c792da100494f 1 0.21278299 7.872170e-01
## 3 c2f0d1d7a29d46dc0ebe5b001dcd351e 1 0.91548787 8.451213e-02
## 4 fb78a988bd74b4bdad4f524459404af7 0 1.00000000 5.400533e-10
## 5 a07a3ef75a626d2d9de09a3cd7c50dde 0 0.99998901 1.098914e-05
## 6 dcee579d7e4894f628c9272e4fbbc2e8 1 0.05112203 9.488780e-01
df1 %>% count
## [1] 999860
Con la opción inferSchema = "true"
conseguimos que automáticamente detecte los tipos de los campos.
En el siguiente ejercicio vamos a leer y unir varios csv. En la carpeta ejemplo_csv
tenemos la salida de tres modelos (ficticios) que se han realizado previamente:
list.files("ejemplo_csv",full.names = T)
## [1] "ejemplo_csv/modelo1" "ejemplo_csv/modelo2" "ejemplo_csv/modelo3"
Para cada una de estas carpetas hay dos archivos con el mismo formato, cada archivo contienen líneas distintas y la unión de los dos forman la salida de cada modelo:
list.files("ejemplo_csv",full.names = T,recursive = T)
## [1] "ejemplo_csv/modelo1/1.txt" "ejemplo_csv/modelo1/2.txt"
## [3] "ejemplo_csv/modelo2/1.txt" "ejemplo_csv/modelo2/2.txt"
## [5] "ejemplo_csv/modelo3/1.txt" "ejemplo_csv/modelo3/2.txt"
Queremos leer todos los archivos y construir un único DataFrame
con todas las columnas.
Como hemos visto en el ejercicio anterior, es normal tener los datos en varios archivos y Spark está diseñado para leer todos a la vez. Por ejemplo si leemos la carpeta del modelo1
nos lee todos los archivos que contiene, en este caso son dos pero podrían ser muchos más:
sqlContext %>% read.df("ejemplo_csv/modelo1",
source = "com.databricks.spark.csv",
header="true",
inferSchema = "true"
) %>% count
## [1] 1999731
Vamos a crear una función leer
, la variable de entrada será el directorio que tiene que leer, y devolverá un DataFrame
resultante de leer los archivos del interior de la carpeta y renombrar las columnas para poder identificarlas después. Por ejemplo para el modelo1
, renombramos las columnas de la siguiente manera:
\[ \begin{align*} \text{user_id} &\longrightarrow \text{user_id} \\ \text{predict} &\longrightarrow \text{modelo1_predict} \\ \text{p0} &\longrightarrow \text{modelo1_p0} \\ \text{p1} &\longrightarrow \text{modelo1_p1} \end{align*} \]
leer <- function(x){
sqlContext %>% read.df(x,
source = "com.databricks.spark.csv",
header="true",
inferSchema = "true"
) %>%
withColumnRenamed("predict",paste0(basename(x),"_predict")) %>%
withColumnRenamed("p0",paste0(basename(x),"_p0")) %>%
withColumnRenamed("p1",paste0(basename(x),"_p1"))
}
Aplicamos la función a las tres carpetas, con lapply
:
lista_df <- list.files("ejemplo_csv",full.names = T) %>% lapply(leer)
El resultado es una lista con tres elementos, cada elemento es un DataFrame
de Spark con el resultado de leer los ficheros y renombras las columnas:
str(lista_df)
## List of 3
## $ :Formal class 'DataFrame' [package "SparkR"] with 2 slots
## .. ..@ env:<environment: 0x3c792a0>
## .. ..@ sdf:Class 'jobj' <environment: 0x3c745c8>
## $ :Formal class 'DataFrame' [package "SparkR"] with 2 slots
## .. ..@ env:<environment: 0x4174b48>
## .. ..@ sdf:Class 'jobj' <environment: 0x4162040>
## $ :Formal class 'DataFrame' [package "SparkR"] with 2 slots
## .. ..@ env:<environment: 0x3a5f540>
## .. ..@ sdf:Class 'jobj' <environment: 0x3a59098>
lista_df[[2]]
## DataFrame[user_id:string, modelo2_predict:int, modelo2_p0:double, modelo2_p1:double]
lista_df[[2]] %>% head
## user_id modelo2_predict modelo2_p0
## 1 dad75b3c39c6bc0f54f7a0cae7a394b1 0 1.000000000
## 2 afa7185f4160ae9b643c792da100494f 1 0.942319838
## 3 c2f0d1d7a29d46dc0ebe5b001dcd351e 1 0.996262538
## 4 fb78a988bd74b4bdad4f524459404af7 0 1.000000000
## 5 a07a3ef75a626d2d9de09a3cd7c50dde 1 0.001128571
## 6 dcee579d7e4894f628c9272e4fbbc2e8 1 0.926135974
## modelo2_p1
## 1 1.194553e-12
## 2 5.768016e-02
## 3 3.737462e-03
## 4 1.150139e-11
## 5 9.988714e-01
## 6 7.386403e-02
Ahora queremos ir uniendo estos DataFrames
por la columnas en común: user_id
, hasta conseguir un único DataFrame
. Veamos cómo podemos usar la función merge
en Spark de manera similar a R:
merge(lista_df[[1]],
lista_df[[2]],
lista_df[[1]]$user_id==lista_df[[2]]$user_id
) %>% head
## user_id modelo1_predict modelo1_p0 modelo1_p1
## 1 00016fafceb91f5a1e6ca85170661ccb 1 0.1058929 8.941071e-01
## 2 00120ac59fc4d6128733a05646f612ba 1 0.9983485 1.651529e-03
## 3 001603e5446f81b3fd62876a89e8c7bb 0 0.9999988 1.211489e-06
## 4 0016526f878ca978dd17d98f653c7187 0 0.9999977 2.347241e-06
## 5 001b6e275952ee97cbe7adf3e2358134 1 0.9963000 3.699982e-03
## 6 00320b8b7840a841ac65449bf4720018 0 1.0000000 1.366172e-10
## user_id modelo2_predict modelo2_p0
## 1 00016fafceb91f5a1e6ca85170661ccb 1 6.345547e-06
## 2 00120ac59fc4d6128733a05646f612ba 1 2.750906e-01
## 3 001603e5446f81b3fd62876a89e8c7bb 1 3.259788e-05
## 4 0016526f878ca978dd17d98f653c7187 0 9.999994e-01
## 5 001b6e275952ee97cbe7adf3e2358134 0 9.997958e-01
## 6 00320b8b7840a841ac65449bf4720018 0 9.995617e-01
## modelo2_p1
## 1 9.999937e-01
## 2 7.249094e-01
## 3 9.999674e-01
## 4 6.493657e-07
## 5 2.041528e-04
## 6 4.382983e-04
Funciona, pero el campo user_id
aparece repetido en el DataFrame
resultante. Eso es un problema para después seguir uniendo ya que al estar repetida la columna habrá conflictos al seguir trabajando con este DataFrame
(esto está solucionado en la versión de python y scala pero no en la de R por ahora).
Construimos una segunda función unir
, que haga el merge
que necesitamos, pero que primero renombre una de las columnas user_id
y así podamos obtener el DataFrame
de la unión pero sin repetir esta columna:
unir <- function(x,y){
y_aux <- y %>% withColumnRenamed("user_id","user_id_aux")
unido <- x %>% merge(y_aux,x$user_id==y_aux$user_id_aux)
quiero <- setdiff(names(unido),"user_id_aux")
unido %>% select(as.list(quiero))
}
Probamos su funcionamiento:
unir(lista_df[[1]],lista_df[[2]])
## DataFrame[user_id:string, modelo1_predict:int, modelo1_p0:double, modelo1_p1:double, modelo2_predict:int, modelo2_p0:double, modelo2_p1:double]
Pata terminar, queremos unir la lista de DataFrame
con esta función. En nuestro caso solo tenemos 3 pero queremos hacerlo de una manera que funcione igual para cuando la lista sea más grande. En R este problema podemos solucionarlo con la función Reduce
(ver por ejemplo: http://stackoverflow.com/a/17171655).
Con Reduce
usamos la función unir
de manera recursiva para todos los elementos de la lista.
df_unido <- Reduce(unir,lista_df) %>% cache
df_unido
## DataFrame[user_id:string, modelo1_predict:int, modelo1_p0:double, modelo1_p1:double, modelo2_predict:int, modelo2_p0:double, modelo2_p1:double, modelo3_predict:int, modelo3_p0:double, modelo3_p1:double]
df_unido %>% count()
## [1] 1999731
df_unido %>% head
## user_id modelo1_predict modelo1_p0 modelo1_p1
## 1 00016fafceb91f5a1e6ca85170661ccb 1 0.1058929 8.941071e-01
## 2 00120ac59fc4d6128733a05646f612ba 1 0.9983485 1.651529e-03
## 3 001603e5446f81b3fd62876a89e8c7bb 0 0.9999988 1.211489e-06
## 4 0016526f878ca978dd17d98f653c7187 0 0.9999977 2.347241e-06
## 5 001b6e275952ee97cbe7adf3e2358134 1 0.9963000 3.699982e-03
## 6 00320b8b7840a841ac65449bf4720018 0 1.0000000 1.366172e-10
## modelo2_predict modelo2_p0 modelo2_p1 modelo3_predict modelo3_p0
## 1 1 6.345547e-06 9.999937e-01 0 0.9998915
## 2 1 2.750906e-01 7.249094e-01 0 0.9986017
## 3 1 3.259788e-05 9.999674e-01 0 0.9999993
## 4 0 9.999994e-01 6.493657e-07 0 0.9999964
## 5 0 9.997958e-01 2.041528e-04 0 0.9999977
## 6 0 9.995617e-01 4.382983e-04 0 0.9999683
## modelo3_p1
## 1 1.085172e-04
## 2 1.398254e-03
## 3 6.588702e-07
## 4 3.570069e-06
## 5 2.285370e-06
## 6 3.167008e-05
De este modo hemos conseguido leer todos los archivos y conseguir un único DataFrame
con toda la información.
Para terminar, podemos usar crosstab
para hacer una tabla de conteo. Por ejemplo entre las predicciones del modelo1 y las del modelo2, y después hacer un gráfico:
conteo <- df_unido %>% crosstab("modelo1_predict","modelo2_predict")
conteo
## modelo1_predict_modelo2_predict 0 1
## 1 1 327068 655753
## 2 0 595172 421738
Manipulamos el data.frame
local para usar la función mosaic
:
rownames(conteo) <- paste0("modelo1_",conteo[,1])
conteo <- conteo[,-1]
colnames(conteo) <- paste0("modelo2_",colnames(conteo))
ordeno:
conteo<-conteo[order(rownames(conteo)),order(colnames(conteo))]
conteo
## modelo2_0 modelo2_1
## modelo1_0 595172 421738
## modelo1_1 327068 655753
y grafico:
mosaicplot(conteo,color=TRUE,main = "modelo1 vs modelo2")
Cuando hemos terminado cerramos Spark:
sparkR.stop()
Este obra está bajo una licencia de Creative Commons Reconocimiento-CompartirIgual 4.0 Internacional.