Leer datos desde SparkR

En los ejemplos que hemos visto hasta ahora, hemos creado un DataFrame de Spark desde un data.frame de R. Pero eso no suele ser así en general, ya que los datos que solemos trabajar con Spark suelen ser demasiados grandes para R (por ese motivo usamos Spark).

Spark puede leer cualquier formato que pueda leer Hadoop/HDFS y conectarse a bases de datos por medio de JDBC. Vamos a ver dos casos: Parquet y CSV.

Ejemplo con datos en Parquet

Parquet es un formato de tipo columnar diseñado para el ecosistema bigdata.

parquet

Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language.

Más información: https://parquet.apache.org/.

Vamos a leer una tabla generada aleatoriamente y guardada en categorias.parquet. Esta carpeta contienen varios archivos, esto es normal en este mundo ya que al trabajar en paralelo y varios ordenadores se suele guardar los datos en trozos.

list.files("categorias.parquet")[1:10]
##  [1] "_common_metadata"                                            
##  [2] "_metadata"                                                   
##  [3] "part-r-00000-1e02285f-a7e0-482c-8917-6c70cae10368.gz.parquet"
##  [4] "part-r-00001-1e02285f-a7e0-482c-8917-6c70cae10368.gz.parquet"
##  [5] "part-r-00002-1e02285f-a7e0-482c-8917-6c70cae10368.gz.parquet"
##  [6] "part-r-00003-1e02285f-a7e0-482c-8917-6c70cae10368.gz.parquet"
##  [7] "part-r-00004-1e02285f-a7e0-482c-8917-6c70cae10368.gz.parquet"
##  [8] "part-r-00005-1e02285f-a7e0-482c-8917-6c70cae10368.gz.parquet"
##  [9] "part-r-00006-1e02285f-a7e0-482c-8917-6c70cae10368.gz.parquet"
## [10] "part-r-00007-1e02285f-a7e0-482c-8917-6c70cae10368.gz.parquet"



Creamos el contexto de Spark para empezar a trabajar:

.libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R/lib/"),.libPaths()))
library(SparkR)
library(magrittr)
sc <- sparkR.init(master = "local[*]",appName = "Prueba III: Parquet")
## Launching java with spark-submit command ./spark-1.5.1-bin-hadoop2.6//bin/spark-submit   sparkr-shell /tmp/RtmpObBElj/backend_port1e9f1a8473b9
sqlContext <- sc %>% sparkRSQL.init()


Spark ya tiene las librerías necesarias para leer archivos de tipo Parquet, así que la manera de leer es muy fácil:

meta <- sqlContext %>% read.df("categorias.parquet")
meta %>% printSchema
## root
##  |-- user_id: string (nullable = true)
##  |-- category: string (nullable = true)
##  |-- variable1: double (nullable = true)
##  |-- variable2: double (nullable = true)
##  |-- variable3: double (nullable = true)
meta %>% head
##                            user_id              category variable1
## 1 399ab31343f91ea7684a5c28746c94a3 Hobbies_and_Interests  8.386499
## 2 9e8ebe9004918b73274ccd4ccf68b3cc           Movistar_Es  9.736967
## 3 43bfff7d70ff5ec185205d24157c91ce           Advertising 10.733888
## 4 22def4895959f8b5d58052cff9623367                Travel  9.569020
## 5 720226e79785677bf3f35d1d68a81216 Hobbies_and_Interests 12.732734
## 6 db2bd8626a0f4064cdce15c12cf83838               Science 13.493586
##   variable2 variable3
## 1 11.091339  9.140698
## 2 14.467683 12.063429
## 3  8.687635 11.899612
## 4  3.213519  8.629476
## 5 10.440578  9.255001
## 6  9.837519  9.882442


Ejercicio

Queremos pivotar esta tabla. Es decir, conseguir una tabla nueva, donde tengamos una fila por cada user_id distinto y tantas columnas como número de categorías distintas para cada una de las tres variables. Esto en R lo haríamos con las funciones reshape o spread del paquete tidyr. Veamos una manera de hacerlo en Spark basándonos en SQL.


Primero eliminamos las filas con la categoría vacía y cacheamos el DataFrame resultante:

meta <- meta %>% filter(meta$category!="") %>% cache()

Veamos cuantas filas tenemos y cuantos user_id distintos (el número de filas del DataFrame que queremos construir):

meta %>% agg(count(meta$user_id),countDistinct(meta$user_id)) %>% collect()
##   count(user_id) COUNT(DISTINCT user_id)
## 1        4431631                 1626967

Con collect convertimos el DataFrame de Spark a un data.frame de R. Mucho cuidado al usar esta función, porque podemos quedarnos sin memoria si el DataFrame es grande.

categorias <- meta %>% select("category") %>% distinct() %>% collect()
categorias$category
##  [1] "Reference_and_Education"     "Searching"                  
##  [3] "Business"                    "Personal_Finance"           
##  [5] "Advertising"                 "Games"                      
##  [7] "Style_and_Fashion"           "Automotive"                 
##  [9] "News"                        "Health"                     
## [11] "Competitors"                 "Society"                    
## [13] "Kids_and_Teens"              "Law_Government_and_Politics"
## [15] "Shopping"                    "Sports"                     
## [17] "Science"                     "Careers"                    
## [19] "Family_and_Parenting"        "Internet_Services"          
## [21] "Movistar_Es"                 "Pets"                       
## [23] "Online_Communities"          "Real_Estate"                
## [25] "Religion_and_Spirituality"   "Technology_and_Computing"   
## [27] "Food_and_Drink"              "Hobbies_and_Interests"      
## [29] "Arts_and_Entertainment"      "Home_and_Garden"            
## [31] "Travel"

Ayudándonos de R, vamos a crear una query de SQL para crear las columnas de nuestro nuevo DataFrame esto lo hacemos con la sentencia CASE:

categorias$category %>% 
  sapply(function(x){
    paste0("case when category='",x,"' then variable",1:3,
           " else 0 end as variable",1:3,"_",x %>% tolower(),collapse = ",")
  }) %>% 
  paste(collapse = ", ") %>% 
  paste("select user_id,",.,"from meta") -> query

query
## [1] "select user_id, case when category='Reference_and_Education' then variable1 else 0 end as variable1_reference_and_education,case when category='Reference_and_Education' then variable2 else 0 end as variable2_reference_and_education,case when category='Reference_and_Education' then variable3 else 0 end as variable3_reference_and_education, case when category='Searching' then variable1 else 0 end as variable1_searching,case when category='Searching' then variable2 else 0 end as variable2_searching,case when category='Searching' then variable3 else 0 end as variable3_searching, case when category='Business' then variable1 else 0 end as variable1_business,case when category='Business' then variable2 else 0 end as variable2_business,case when category='Business' then variable3 else 0 end as variable3_business, case when category='Personal_Finance' then variable1 else 0 end as variable1_personal_finance,case when category='Personal_Finance' then variable2 else 0 end as variable2_personal_finance,case when category='Personal_Finance' then variable3 else 0 end as variable3_personal_finance, case when category='Advertising' then variable1 else 0 end as variable1_advertising,case when category='Advertising' then variable2 else 0 end as variable2_advertising,case when category='Advertising' then variable3 else 0 end as variable3_advertising, case when category='Games' then variable1 else 0 end as variable1_games,case when category='Games' then variable2 else 0 end as variable2_games,case when category='Games' then variable3 else 0 end as variable3_games, case when category='Style_and_Fashion' then variable1 else 0 end as variable1_style_and_fashion,case when category='Style_and_Fashion' then variable2 else 0 end as variable2_style_and_fashion,case when category='Style_and_Fashion' then variable3 else 0 end as variable3_style_and_fashion, case when category='Automotive' then variable1 else 0 end as variable1_automotive,case when category='Automotive' then variable2 else 0 end as variable2_automotive,case when category='Automotive' then variable3 else 0 end as variable3_automotive, case when category='News' then variable1 else 0 end as variable1_news,case when category='News' then variable2 else 0 end as variable2_news,case when category='News' then variable3 else 0 end as variable3_news, case when category='Health' then variable1 else 0 end as variable1_health,case when category='Health' then variable2 else 0 end as variable2_health,case when category='Health' then variable3 else 0 end as variable3_health, case when category='Competitors' then variable1 else 0 end as variable1_competitors,case when category='Competitors' then variable2 else 0 end as variable2_competitors,case when category='Competitors' then variable3 else 0 end as variable3_competitors, case when category='Society' then variable1 else 0 end as variable1_society,case when category='Society' then variable2 else 0 end as variable2_society,case when category='Society' then variable3 else 0 end as variable3_society, case when category='Kids_and_Teens' then variable1 else 0 end as variable1_kids_and_teens,case when category='Kids_and_Teens' then variable2 else 0 end as variable2_kids_and_teens,case when category='Kids_and_Teens' then variable3 else 0 end as variable3_kids_and_teens, case when category='Law_Government_and_Politics' then variable1 else 0 end as variable1_law_government_and_politics,case when category='Law_Government_and_Politics' then variable2 else 0 end as variable2_law_government_and_politics,case when category='Law_Government_and_Politics' then variable3 else 0 end as variable3_law_government_and_politics, case when category='Shopping' then variable1 else 0 end as variable1_shopping,case when category='Shopping' then variable2 else 0 end as variable2_shopping,case when category='Shopping' then variable3 else 0 end as variable3_shopping, case when category='Sports' then variable1 else 0 end as variable1_sports,case when category='Sports' then variable2 else 0 end as variable2_sports,case when category='Sports' then variable3 else 0 end as variable3_sports, case when category='Science' then variable1 else 0 end as variable1_science,case when category='Science' then variable2 else 0 end as variable2_science,case when category='Science' then variable3 else 0 end as variable3_science, case when category='Careers' then variable1 else 0 end as variable1_careers,case when category='Careers' then variable2 else 0 end as variable2_careers,case when category='Careers' then variable3 else 0 end as variable3_careers, case when category='Family_and_Parenting' then variable1 else 0 end as variable1_family_and_parenting,case when category='Family_and_Parenting' then variable2 else 0 end as variable2_family_and_parenting,case when category='Family_and_Parenting' then variable3 else 0 end as variable3_family_and_parenting, case when category='Internet_Services' then variable1 else 0 end as variable1_internet_services,case when category='Internet_Services' then variable2 else 0 end as variable2_internet_services,case when category='Internet_Services' then variable3 else 0 end as variable3_internet_services, case when category='Movistar_Es' then variable1 else 0 end as variable1_movistar_es,case when category='Movistar_Es' then variable2 else 0 end as variable2_movistar_es,case when category='Movistar_Es' then variable3 else 0 end as variable3_movistar_es, case when category='Pets' then variable1 else 0 end as variable1_pets,case when category='Pets' then variable2 else 0 end as variable2_pets,case when category='Pets' then variable3 else 0 end as variable3_pets, case when category='Online_Communities' then variable1 else 0 end as variable1_online_communities,case when category='Online_Communities' then variable2 else 0 end as variable2_online_communities,case when category='Online_Communities' then variable3 else 0 end as variable3_online_communities, case when category='Real_Estate' then variable1 else 0 end as variable1_real_estate,case when category='Real_Estate' then variable2 else 0 end as variable2_real_estate,case when category='Real_Estate' then variable3 else 0 end as variable3_real_estate, case when category='Religion_and_Spirituality' then variable1 else 0 end as variable1_religion_and_spirituality,case when category='Religion_and_Spirituality' then variable2 else 0 end as variable2_religion_and_spirituality,case when category='Religion_and_Spirituality' then variable3 else 0 end as variable3_religion_and_spirituality, case when category='Technology_and_Computing' then variable1 else 0 end as variable1_technology_and_computing,case when category='Technology_and_Computing' then variable2 else 0 end as variable2_technology_and_computing,case when category='Technology_and_Computing' then variable3 else 0 end as variable3_technology_and_computing, case when category='Food_and_Drink' then variable1 else 0 end as variable1_food_and_drink,case when category='Food_and_Drink' then variable2 else 0 end as variable2_food_and_drink,case when category='Food_and_Drink' then variable3 else 0 end as variable3_food_and_drink, case when category='Hobbies_and_Interests' then variable1 else 0 end as variable1_hobbies_and_interests,case when category='Hobbies_and_Interests' then variable2 else 0 end as variable2_hobbies_and_interests,case when category='Hobbies_and_Interests' then variable3 else 0 end as variable3_hobbies_and_interests, case when category='Arts_and_Entertainment' then variable1 else 0 end as variable1_arts_and_entertainment,case when category='Arts_and_Entertainment' then variable2 else 0 end as variable2_arts_and_entertainment,case when category='Arts_and_Entertainment' then variable3 else 0 end as variable3_arts_and_entertainment, case when category='Home_and_Garden' then variable1 else 0 end as variable1_home_and_garden,case when category='Home_and_Garden' then variable2 else 0 end as variable2_home_and_garden,case when category='Home_and_Garden' then variable3 else 0 end as variable3_home_and_garden, case when category='Travel' then variable1 else 0 end as variable1_travel,case when category='Travel' then variable2 else 0 end as variable2_travel,case when category='Travel' then variable3 else 0 end as variable3_travel from meta"

Una vez tenemos construida la primera query, registramos el DataFrame para poder ejecutarla (si no, no se podría procesar el trozo de “from meta”):

meta %>% registerTempTable("meta")
expandido <- sqlContext %>% sql(query)

Como ya sabemos, no se ha procesado todavía toda la query porque no ha sido necesario (evaluación perezosa).

Construimos la segunda query necesaria para terminar nuestro ejercicio, necesitamos sumar todas las columnas agrupando por id_user y de esta manera conseguir el DataFrame que buscamos:

names(expandido)[-1] %>% 
{paste("select user_id,",paste0("sum(",.,") ",.,collapse = ", ")
         ,"from expandido group by user_id")} -> query2

Registramos la tabla intermedia y ejecutamos la segunda query, además cacheamos esta tabla para explorarla:

expandido %>% registerTempTable("expandido")
final <- sqlContext %>% sql(query2) %>% cache
final
## DataFrame[user_id:string, variable1_reference_and_education:double, variable2_reference_and_education:double, variable3_reference_and_education:double, variable1_searching:double, variable2_searching:double, variable3_searching:double, variable1_business:double, variable2_business:double, variable3_business:double, variable1_personal_finance:double, variable2_personal_finance:double, variable3_personal_finance:double, variable1_advertising:double, variable2_advertising:double, variable3_advertising:double, variable1_games:double, variable2_games:double, variable3_games:double, variable1_style_and_fashion:double, variable2_style_and_fashion:double, variable3_style_and_fashion:double, variable1_automotive:double, variable2_automotive:double, variable3_automotive:double, variable1_news:double, variable2_news:double, variable3_news:double, variable1_health:double, variable2_health:double, variable3_health:double, variable1_competitors:do