COVID-19 Data Collection using R, Apache Spark, and Delta-Lake
Do you want to have some fun scraping COVID-19 data from the web, working with R on the Apache Spark Cluster, with Delta-Lake support? Then read this short article.
The main objective of this article is to demonstrate the Integration of R with Apache Spark and Data-Lake operations for web-scraping, data refinery, and transactional storage.
At the time of writing, the technology stack used consisted of
- R 4.0,
- Jupyter Notebooks 6.0.3,
- Apache Spark 3.0 and,
- Delta-Lake API 0.7.0
Git Repo with the Jupyter Notebook.
The public COVID-19 data used in this article was scraped from Worldometer In particular, we used the big table at the top of the page (with id main_table_countries_today).
Start your Apache Spark cluster, create a new Jupyter notebook with an R-kernel and configure Spark with support for the Delta-Lake API (you will need to download and copy the delta-core_2.12–0.7.0.jar jar-file in the …/spark-3.0.0/jars path.
Load the necessary R libraries.
library(sparklyr) # Interface to Apache Spark
library(dplyr) # Data Refinery/wrangling
library(ggplot2) # Data Visualization
library(rvest) # Web Scraping
We will interact with the Spark Runtime via the R sparklyr package that allows a familiar data frame data-wrangling API via dplyr (Data Refinery). Create the configuration for Spark to load the Delta-Lake API.
configuration <- spark_config()
configuration$`sparklyr.jars.default` <- c("C://spark-3.0.0//jars//delta-core_2.12-0.7.0.jar")
Connect to the Spark Cluster.
sc <- spark_connect(master="spark://node1:7077", version="3.0.0",
app_name = "COVID-19_ETL",
Set the URL for the data source and scrape an HML table from it using rvest.
url <- "https://www.worldometers.info/coronavirus/"data <- url %>%
Convert the scraped list into an R data frame.
df <- as.data.frame(data, stringsAsFactors = F)
Remove the first eight rows (continent summaries) from the data frame, remove the last rows containing the string “Total:” in the Countries column, and finally remove columns: 1, 17, 18, and 19.
df <- as.data.frame(data, stringsAsFactors = F)
df <- df[-(1:8),]
df <- subset(df, df$Country.Other != "Total:")
df <- df[,-c(1,17,18,19)]
Rename the remaining data columns in the data frame.
colnames(df) <- c("Country",
Transform the core data columns into numerical and add a new column with the date of the data extraction.
df$TotalCases <- as.numeric(gsub(",", "", df$TotalCases))
df$NewCases <- as.numeric(gsub(",", "", df$NewCases))
df$TotalDeaths <- as.numeric(gsub(",", "", df$TotalDeaths))
df$NewDeaths <- as.numeric(gsub(",", "", df$NewDeaths))
df$TotalRecovered <- as.numeric(gsub(",", "", df$TotalRecovered))
df$NewRecovered <- as.numeric(gsub(",", "", df$NewRecovered))
df$ActiveCases <- as.numeric(gsub(",", "", df$ActiveCases))
df$Critical <- as.numeric(gsub(",", "", df$Critical))
df$TotalCases1Mpop <- as.numeric(gsub(",", "", df$TotalCases1Mpop))
df$Deaths1Mpop <- as.numeric(gsub(",", "", df$Deaths1Mpop))
df$TotalTests <- as.numeric(gsub(",", "", df$TotalTests))
df$Tests1Mpop <- as.numeric(gsub(",", "", df$Tests1Mpop))
df$Population <- as.numeric(gsub(",", "", df$Population))df$date <- Sys.Date()
Preview the data frame.
Explore the data frame visually by plotting the key metrics for USA for the day of Extraction.
reshape2::melt(subset(df, df$Country=="USA")[,1:9]) %>% ggplot(aes(x=variable,y=value))+ geom_bar(stat="identity") +
theme(axis.text.x = element_text(angle = 90, vjust = 0.5, hjust=1))
Copy the R data frame into Spark’s memory (spark data frame). Name the Spark in-Memory data frame “covid19” and overwrite any previously loaded data frame with the same name.
covid_df <- copy_to(sc, df, "covid19", overwrite=T)
Write the Spark data frame to disk using the Delta-Lake format, with ACID transactional support. Use the append mode to append new data rows to the Delta-file. You will need to provide the spark data frame name, path on the Delta-Lake, and the mode setting. Optionally you can re-partition the data frame before persisting it on the disk. Repartitioning can assist the performance of down-stream refinery and data exploration operations.
options = list(),
partition_by ="Country" )
The data partitions on disk, in the Delta-Lake path are shown below.
Read the Delta-Lake Covid19 data back into a new spark data frame. You will need to provide the spark connection, delta-lake path, and a name for the new, in-memory, spark data frame.
Perform a spark aggregation on the in-memory, spark data frame. Calculate the total of all NewCases across the globe. Collect the spark aggregation into R’s memory.
df2 %>% summarise(total_NewCases = sum(NewCases,na.rm=T)) %>% collect()
Repeat the above data extraction recipe on the days to come, and keep appending new rows of daily Covid-19 data into the Transactional Delta-Lake file.
R can help your organization drive large-scale data refinery processes on the Apache Spark compute cluster, while Delta-Lake storage will provide ACID Transactional support (for Big Data), Schema-Enforcement on WRITE, Open file-format (Parquet) for storage, and Time Travel (across data set versions).
Delta-Lake is a component of the Databricks Unified Data Analytics Platform.