Pyspark. Big Data Analysis When Pandas Isn’t Enough

Pandas is one of the most used open source Python libraries for working with structured tabular data for analysis. However, it does not support distributed processing, so you will always have to add resources when you need more power to support growing data. And there will always come a time when resources are not enough. In this article, we will look at how PySpark helps out in the face of a lack of data processing capacity.

Well, let’s get started. First, let’s download the necessary datasets from kaggle. Instructions for downloading data to colab directly from kaggle are below:

  1. Go to the Account section kaggle.com

  2. Scroll down to the API subsection

  3. Click the Create New API Token button, download the kaggle.json file

  4. Further, this file can be immediately thrown into Files on Google Colab, but since every 12 hours the Google Colab session is completely updated, I prefer to put it in the Colab Notebooks directory on Google Drive

(https://www.kaggle.com/general/74235) – instructions more

Let’s try experimenting with the dataset Riid Answer Correctness Prediction

The code below downloads the dataset, after that all the necessary files are in the data folder.


! pip install -q kaggle
from google.colab import drive
drive.mount('/content/drive')
! mkdir ~/.kaggle
! cp '/content/drive/MyDrive/Colab Notebooks/kaggle.json' ~/.kaggle/ # файл kaggle.json можно положить в любую папку в Drive, главное указать актуальный адрес в первом аргументе команды cp
! chmod 600 ~/.kaggle/kaggle.json
! kaggle competitions download -c 'riiid-test-answer-prediction'
! mkdir data
! unzip riiid-test-answer-prediction.zip -d data

The dataset, let’s say, is not huge, and, perhaps, the resources of your machine will be enough to work with pandas. Therefore, I will show an example on the free version of Google Colab. In the free mode, we are provided with no more than 12 GB of RAM, and for our training case, this is just what we need.

First, let’s try to analyze our dataset using the pandas library.

1.Pandas

import pandas as pd

df_train = pd.read_csv('data/train.csv',
                        dtype={'content_id': 'int16',
                               'content_type_id': 'int8',
                               'task_container_id': 'int16',
                               'user_answer': 'int8',
                               'answered_correctly': 'int8',
                               'prior_question_elapsed_time': 'float32'})

Let’s take a look at our data.

df_train.head()
df_train.info()

As you can see, our table data takes up a little over 4 GB of RAM, which is about a third of the memory allocated by colab for us.

Let’s see how many empty values ​​are in our table.

df_train.isna().sum()
df_train[['prior_question_elapsed_time', 'prior_question_had_explanation']].isna().mean()

We have: in the column prior_question_elapsed_time there are only a little more than 2% gaps, in prior_question_had_explanation and even less. Trying to remove them using the pandas library’s dropna method and…

df = df_train.dropna()

… we get an error of lack of RAM.

Note. In fact, we could get this error at the very beginning, when reading the dataset. To do this, it would be enough to use the read_csv method with default arguments. Then pandas would assign each column of numeric data either an int64 or a float64. And they take up a lot of memory…

Where pandas failed, let’s resort to the help of pyspark.

2.Pyspark

Install pyspark and pyarrow. Pyarrow significantly speeds up the work of pyspark, which is very useful in our case.

! pip install pyspark
! pip install pyarrow
from pyspark.sql import SparkSession
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" # без этой строчки у нас будет возникать постоянное предупреждение с просьбой установить эту переменную в значение 1, что мы заранее и делаем


spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark

Let’s read our file and see what columns are present in our table. If we are sure that we have one specific data type in each column, we can set the inferSchema=True parameter, pyspark will independently determine the types for each column.

df = spark.read.csv('data/train.csv', header=True, inferSchema=True)

df.printSchema()

To view data in pyspark there is a show method

df.show()

Prior_question_had_explanation column data must be cast to integer type before skipping

from pyspark.sql.types import IntegerType

df = df.withColumn('prior_question_had_explanation', df['prior_question_had_explanation'].cast(IntegerType()))
df.printSchema()

Let’s see how many empty values ​​are in our table. The pandas_api method converts an existing DataFrame to a pandas-on-Spark DataFrame (this is only available if pandas is installed and available).

df.pandas_api().isna().mean() # выведем процентное соотношение

This time, the missing data can be deleted without problems.

df = df.dropna()
df.pandas_api().isna().sum()

Now let’s look at how you can display the correlation matrix using pyspark.

Correlation matrix.

The corr method of the Correlation class only works with vector columns. Therefore, before creating a correlation matrix, it is necessary to apply the dataset transformation using VectorAssembler

From documentation:column – the name of the column of vectors for which you want to calculate the correlation coefficient. This must be a dataset column and must contain Vector objects.

from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
import pandas as pd

# сначала преобразуем данные в объект типа Vector
vector_col = "corr_features"
assembler = VectorAssembler(inputCols=df.columns, outputCol=vector_col)
df_vector = assembler.transform(df).select(vector_col)

# получаем матрицу корреляции и тут же преобразуем данные в numpy вектор
matrix = Correlation.corr(df_vector, vector_col).collect()[0][matrix.columns[0]].toArray() # запаситесь попкорном, данное действие, к сожалению не быстрое
corr_matrix_df = pd.DataFrame(data=matrix, columns = df.columns, index=df.columns) # последний штрих - оборачиваем полученную корреляционную матрицу в pandas DataFrame

We display the correlation matrix on the screen

import seaborn as sns 
import matplotlib.pyplot as plt
plt.style.use('seaborn')

# plt.figure(figsize=(16,5))  
sns.heatmap(corr_matrix_df, 
            xticklabels=corr_matrix_df.columns.values,
            yticklabels=corr_matrix_df.columns.values,  cmap="Greens", annot=True)
plt.show()

As you can see, pyspark allowed us to cope with the amount of data for which the beloved pandas was no longer enough. Moreover, the syntax of pyspark is very similar in places to pandas. And where there are not enough pure pyspark methods, the pandas_api method comes to the rescue