Occupation market analytics with history (Yandex Data Lake + Power BI)
Once I wrote an article Market analyst profession in three clicks. In it, we connected to the HHru website from Power BI, picked up vacancies and built a dashboard for analysis.
But the connection described in the article deprives the analysis of vacancies of historicity. The dashboard stores only one “snapshot” of data at the time it is loaded into it. In other words, when the button is pressed Refresh all data in the dashboard will be overwritten. Would it be interesting to see the big picture over time? Yes, for me. Moreover, it is not particularly difficult to do this, and it costs a couple of hundred rubles a year. When choosing tools, there was a principle – as simple and cheap as possible.
In order to add historicity to the dashboard, let’s turn to cloud technologies. We need a place where we will add and store our “snapshots” of data. Moreover, raw site responses are in JSON format. We will connect to this storage from Power BI. A small diagram of the architecture in Figure 1. The algorithm is as follows, the CloudFunction in Python is scheduled to run, which makes a request to the site’s API and adds the responses to ObjectStroge. Next, the raw responses are transformed and saved in parquet format, to which Power BI will connect. Moreover, with this connection method, you can schedule an update in your Power BI personal account (you can not schedule an update with all sources).
I decided to use the Yandex Cloud infrastructure. In my opinion, the clouds of Western vendors, such as Microsoft Azure, are more mature and convenient. But the problem is that when registering you need to indicate a bank card, this will probably cut off most of the readers.
Further on steps.
We go to Yandex Cloudregister, link a bank card and create two services:
Yandex functions
S3 storage, aka Data Lake, aka Yandex Object Stroge
Yandex Object Strict
In Yandex Object Storage, you need to create a directory, then go to Object Storage and create a bucket. In the bucket, the access setting is Public, I didn’t really bother with access. You also need to create a static bucket access key. We act according to instructions from Yandex.
Yandex Cloud Functions
This is the most difficult, but you hold on :-). In Cloud Functions, you need to create three functions with a Python runtime. Function code is attached below. Timeout of functions is at least 300 seconds. And triggers to them, separated by intervals of at least 30 minutes. I got it like this, Figure 2:
In the code editor for each function, you need to create two files, the code is attached. The first is requirements.txt with the required Python packages. And the second one with Python code. In the code, we change the values of the variables to yours, they are marked, you will find them.
Function code
Function 1
def handler(event, context):
import requests
import datetime
import boto3
import json
import pandas as pd
import csv
import io
from io import StringIO
def create_con():
os = boto3.client('s3',
aws_access_key_id = 'ВАШ КЛЮЧ',
aws_secret_access_key = 'ВАШ СЕКРЕТНЫЙ КЛЮЧ',
region_name="central1",
endpoint_url="https://storage.yandexcloud.net"
)
return os
def get_maxpage(API):
adress = API
todos = requests.get(adress)
stat = todos.status_code
tbody = json.loads(todos.text)
maxpage = tbody["pages"]
return maxpage
def getAPI(text, API, page):
adress = API + '&page=" + page
todos = requests.get(adress)
stat = todos.status_code
todos.encoding = "utf-8'
todos.text
tbody = todos.json()
t2body = json.dumps(tbody, ensure_ascii=False)
df = pd.DataFrame(columns=['id', 'text', 'page'])
for item in tbody["items"]:
df = df.append({'id': item['id'], 'text': text, 'page': page}, ignore_index=True)
return {
'statusCode': 200,
'body': df,
}
def get_list_req(os):
obj = os.get_object(Bucket="ВАШ БАКЕТ",Key='ПУТЬ К ФАЙЛУ requests.csv')
body = obj['Body'].read()
df = pd.read_csv(io.BytesIO(body))
return df
os = create_con() #Подключение
list_req = get_list_req(os) #Получаю датафрейм запросов
df = pd.DataFrame(columns=['id', 'text', 'page'])
for index, row in list_req.iterrows(): #Цикл по запросам, генерю файлы json
maxpage = get_maxpage(row['API']) #получаю кол-во страниц
int(maxpage)
for i in range(0, maxpage):
req = getAPI(row['text'], row['API'], str(i))
df = df.append(req["body"], ignore_index=True)
csv_buffer = StringIO()
df.to_csv(csv_buffer)
os.put_object(Bucket="ВАШ БАКЕТ", Body=csv_buffer.getvalue(), Key="Lakehouse/Config/" + "All_id_for_parse.csv", StorageClass="STANDARD")
Function 2
def handler(event, context):
import requests
import datetime
import boto3
import json
import pandas as pd
import csv
import io
from io import StringIO
def create_con():
os = boto3.client('s3',
aws_access_key_id = 'ВАШ КЛЮЧ',
aws_secret_access_key = 'ВАШ СЕКРЕТНЫЙ КЛЮЧ',
region_name="central1",
endpoint_url="https://storage.yandexcloud.net"
)
return os
def getAPI(text, id):
adress="https://api.hh.ru/" + 'vacancies/' + str(id)
todos = requests.get(adress)
stat = todos.status_code
todos.encoding = 'utf-8'
todos.text
tbody = todos.json()
t2body = json.dumps(tbody, ensure_ascii=False)
now = datetime.date.today()
file_key = 'Lakehouse/Data/raw/' + now.strftime('%Y-%m-%d') + '/' + text + '/' + str(id) + '.json'
os.put_object(Bucket="ВАШ БАКЕТ", Body=t2body, Key=file_key, StorageClass="STANDARD")
def get_list_req(os):
obj = os.get_object(Bucket="ВАШ БАКЕТ",Key='Lakehouse/Config/All_id_for_parse.csv')
body = obj['Body'].read()
df = pd.read_csv(io.BytesIO(body))
return df
os = create_con() #Подключение
list_req = get_list_req(os) #Получаю датафрейм запросов
df = pd.DataFrame(columns=['id', 'text', 'page'])
for index, row in list_req.iterrows(): #Цикл по запросам, генерю файлы json
req = getAPI(row['text'], row['id'])
Function 3
def handler(event, context):
import pandas as pd
import requests
from datetime import datetime, date
import boto3
import json
import io
from io import StringIO
from io import BytesIO
import pyarrow, fastparquet
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
mydate = str(date.today())
data=[]
columns=['id', 'name', 'salary', 'experience', 'schedule', 'employment', 'key_skills', 'employerName', 'accept_temporary',\
'publishedAt', 'arhived', 'areaId', 'areaName', 'alternate_url', 'working_days', 'working_time_intervals', 'DateLoad', 'fileKey']
df = pd.DataFrame(data, columns=columns)
df3 = df
df_mart = df
def create_con():
os = boto3.client('s3',
aws_access_key_id = 'ВАШ КЛЮЧ',
aws_secret_access_key = 'ВАШ СЕКРЕТНЫЙ КЛЮЧ',
region_name="central1",
endpoint_url="https://storage.yandexcloud.net",
verify=False
)
return os
def get_all_s3_keys(os, bucket, prefix):
"""Get a list of all keys in an S3 bucket."""
df = pd.DataFrame(columns=['file_key'])
kwargs = {'Bucket': bucket, 'Prefix': prefix}
while True:
resp = os.list_objects_v2(**kwargs)
for obj in resp['Contents']:
df = pd.concat([pd.DataFrame([[obj['Key']]], columns=df.columns), df], ignore_index=True)
try:
kwargs['ContinuationToken'] = resp['NextContinuationToken']
except KeyError:
break
return df
def go_file(os, Key):
obj = os.get_object(Bucket="ВАШ БАКЕТ",Key=Key) #Получаю файл джон
todos = json.loads(obj['Body'].read())
if todos["experience"] is not None:
experience = todos["experience"]["id"]
if todos["schedule"] is not None:
schedule = todos["schedule"]["id"]
if todos["employment"] is not None:
employment = todos["employment"]["id"]
try:
employerName = todos["employer"]["name"]
except:
employerName = None
try:
areaId = todos["area"]["id"]
areaName = todos["area"]["name"]
except:
areaId = None
areaName = None
row = [todos["id"], todos["name"], todos["salary"], experience, schedule, employment, todos["key_skills"], employerName, todos["accept_temporary"],\
todos["published_at"],\
todos["archived"], areaId, areaName, todos["alternate_url"], todos["working_days"], todos["working_time_intervals"], datetime.now(), Key]
return pd.DataFrame([row], columns=columns)
os = create_con() #Подключение
df_list_file_key = get_all_s3_keys(os, 'ВАШ БАКЕТ', 'Lakehouse/Data/raw/' + mydate) #Получаю все ключи файлов
for ind in df_list_file_key.index:
df_res = go_file(os, df_list_file_key['file_key'][ind])
df3 = pd.concat([df3, df_res])
out_buffer = BytesIO()
try:
obj_mart = os.get_object(Bucket="ВАШ БАКЕТ",Key='Lakehouse/Data/proccessed/datamart') #Получаю файл parquet
df_mart = pd.read_parquet(io.BytesIO(obj_mart['Body'].read()))
except:
y = 1
df_datamart = pd.concat([df_mart,df3]).drop_duplicates(['fileKey'],keep='last').sort_values('fileKey')
df_datamart.astype(str).to_parquet(out_buffer, index=False)
os.put_object(Bucket="ВАШ БАКЕТ", Body=out_buffer.getvalue(), Key='Lakehouse/Data/proccessed/datamart', StorageClass="STANDARD")
return 'OK'
Requirements.txt internals
boto3
datetime
pandas
fsspec
requests
pyarrow
fast parquet
urllib3
The internals of the requests.csv file
"number","text","API"
"1","Azure","https://api.hh.ru/vacancies?text=Azure&search_field=name"
"2","data scientist","https://api.hh.ru/vacancies?text=data scientist&search_field=name"
"3","data engineer","https://api.hh.ru/vacancies?text=data engineer&search_field=name"
File requests.csv needed to store a list of requests for which you need to parse. This file must be put in YOUR BACKET – Lakehouse – Config – requests.csv.
Function 1 reads the requests.csv file from the bucket with vacancies for which we want to download hhru responses, and writes the id of the vacancies to the All_id_for_parse.csv file. Pythonists do not criticize the code too much :). After function 2 requests information from hhru separately for each job id. Two steps are needed because accessing each job by id will allow you to get more detailed json than with the general search results in function 1. Function 3 creates a showcase – file datamart.parquet with extracted data from json files. This file will be needed for Power BI.
Instead of three functions, two could be left if you use the new service Yandex query. But at the moment, the service is not able to read nested json, which are our json.
If everything worked out, and the functions worked out, you can start Power BI.
Power BI
To download a storefront in Power BI, you need to use the source File – Parquet. In the source, write a link to the file, as
“http://YOURBACKET.storage.yandexcloud.net/Lakehouse/Data/proccessed/datamart”.
In file datamart.parquet some fields are saved as dictionaries and lists as text, so you will need to apply a transformation in Power BI. In Power qwery, this is done in two clicks. Basically everything. You can build a dashboard. My dashboard is open and accessible by link.
Outcome
We got a whole project – Data Lake + BI. It can be used for quick deployment purposes for demos. It’s cheap. Of course, the solution is aimed at small and possibly medium volumes of data. At the moment the numbers are:
700Mb of raw data came in 15 days, json format
700 Mb of raw data = 0.85 Mb of data in the storefront, parquet format
the showcase grows by 21 Mb per year, that is, the download can live for 10 years
84 kopecks are spent every day, 295 rubles a year.
loading is fully automated, just opening the report, you can see the new data